diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 238638737..52e15e928 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -57,6 +57,7 @@ #include "utils/likely.hpp" #include "utils/logging.hpp" #include "utils/memory.hpp" +#include "utils/memory_tracker.hpp" #include "utils/message.hpp" #include "utils/on_scope_exit.hpp" #include "utils/pmr/deque.hpp" @@ -4859,6 +4860,7 @@ class CallProcedureCursor : public Cursor { result_signature_size_ = result_->signature->size(); result_->signature = nullptr; if (result_->error_msg) { + memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker blocker; throw QueryRuntimeException("{}: {}", self_->procedure_name_, *result_->error_msg); } result_row_it_ = result_->rows.begin(); diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp index afa060b9d..bdac76a93 100644 --- a/src/query/plan/rule_based_planner.hpp +++ b/src/query/plan/rule_based_planner.hpp @@ -176,7 +176,10 @@ class RuleBasedPlanner { PlanResult Plan(const QueryParts &query_parts) { auto &context = *context_; std::unique_ptr final_plan; - + // procedures need to start from 1 + // due to swapping mechanism of procedure + // tracking + uint64_t procedure_id = 1; for (const auto &query_part : query_parts.query_parts) { std::unique_ptr input_op; @@ -186,10 +189,6 @@ class RuleBasedPlanner { uint64_t merge_id = 0; uint64_t subquery_id = 0; - // procedures need to start from 1 - // due to swapping mechanism of procedure - // tracking - uint64_t procedure_id = 1; for (const auto &clause : single_query_part.remaining_clauses) { MG_ASSERT(!utils::IsSubtype(*clause, Match::kType), "Unexpected Match in remaining clauses"); diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 2a657aeb3..2a176f2ed 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -38,6 +38,7 @@ #include "utils/logging.hpp" #include "utils/math.hpp" #include "utils/memory.hpp" +#include "utils/memory_tracker.hpp" #include "utils/string.hpp" #include "utils/temporal.hpp" #include "utils/variant_helpers.hpp" @@ -158,6 +159,7 @@ template [[nodiscard]] mgp_error WrapExceptions(TFunc &&func, Args &&...args) noexcept { static_assert(sizeof...(args) <= 1, "WrapExceptions should have only one or zero parameter!"); try { + memgraph::utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_enabler; WrapExceptionsHelper(std::forward(func), std::forward(args)...); } catch (const DeletedObjectException &neoe) { spdlog::error("Deleted object error during mg API call: {}", neoe.what()); @@ -1544,6 +1546,7 @@ mgp_error mgp_duration_sub(mgp_duration *first, mgp_duration *second, mgp_memory mgp_error mgp_result_set_error_msg(mgp_result *res, const char *msg) { return WrapExceptions([=] { + memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker blocker{}; auto *memory = res->rows.get_allocator().GetMemoryResource(); res->error_msg.emplace(msg, memory); }); diff --git a/tests/e2e/memory/CMakeLists.txt b/tests/e2e/memory/CMakeLists.txt index 327f09106..3c4cdc279 100644 --- a/tests/e2e/memory/CMakeLists.txt +++ b/tests/e2e/memory/CMakeLists.txt @@ -2,6 +2,8 @@ add_subdirectory(procedures) find_package(gflags REQUIRED) +# Global memory limit + add_executable(memgraph__e2e__memory__control memory_control.cpp) target_link_libraries(memgraph__e2e__memory__control gflags mgclient mg-utils mg-io Threads::Threads) @@ -20,6 +22,12 @@ target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp) target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io) +add_executable(memgraph__e2e__memory_limit_global_multi_thread_proc_create memory_limit_global_multi_thread_proc_create.cpp) +target_link_libraries(memgraph__e2e__memory_limit_global_multi_thread_proc_create gflags mgclient mg-utils mg-io) + +add_executable(memgraph__e2e__memory_limit_global_thread_alloc_proc memory_limit_global_thread_alloc_proc.cpp) +target_link_libraries(memgraph__e2e__memory_limit_global_thread_alloc_proc gflags mgclient mg-utils mg-io) + # Query memory limit tests add_executable(memgraph__e2e__memory__limit_query_alloc_proc_multi_thread query_memory_limit_proc_multi_thread.cpp) @@ -34,7 +42,6 @@ target_link_libraries(memgraph__e2e__memory__limit_query_alloc_proc gflags mgcli add_executable(memgraph__e2e__memory__limit_query_alloc_create_multi_thread query_memory_limit_multi_thread.cpp) target_link_libraries(memgraph__e2e__memory__limit_query_alloc_create_multi_thread gflags mgclient mg-utils mg-io Threads::Threads) - # Procedure memory limit tests add_executable(memgraph__e2e__procedure_memory_limit procedure_memory_limit.cpp) diff --git a/tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp b/tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp new file mode 100644 index 000000000..2132fbb16 --- /dev/null +++ b/tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp @@ -0,0 +1,67 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include + +#include "utils/logging.hpp" +#include "utils/timer.hpp" + +DEFINE_uint64(bolt_port, 7687, "Bolt port"); +DEFINE_uint64(timeout, 120, "Timeout seconds"); +DEFINE_bool(multi_db, false, "Run test in multi db environment"); + +int main(int argc, char **argv) { + google::SetUsageMessage("Memgraph E2E Global Memory Limit In Multi-Thread Create For Local Allocators"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + memgraph::logging::RedirectToStderr(); + + mg::Client::Init(); + + auto client = + mg::Client::Connect({.host = "127.0.0.1", .port = static_cast(FLAGS_bolt_port), .use_ssl = false}); + if (!client) { + LOG_FATAL("Failed to connect!"); + } + + if (FLAGS_multi_db) { + client->Execute("CREATE DATABASE clean;"); + client->DiscardAll(); + client->Execute("USE DATABASE clean;"); + client->DiscardAll(); + client->Execute("MATCH (n) DETACH DELETE n;"); + client->DiscardAll(); + } + + bool error{false}; + try { + client->Execute( + "CALL libglobal_memory_limit_multi_thread_create_proc.multi_create() PROCEDURE MEMORY UNLIMITED YIELD " + "allocated_all RETURN allocated_all " + "QUERY MEMORY LIMIT 50MB;"); + auto result_rows = client->FetchAll(); + if (result_rows) { + auto row = *result_rows->begin(); + error = row[0].ValueBool() == false; + } + + } catch (const std::exception &e) { + error = true; + } + + MG_ASSERT(error, "Error should have happend"); + + return 0; +} diff --git a/tests/e2e/memory/memory_limit_global_thread_alloc_proc.cpp b/tests/e2e/memory/memory_limit_global_thread_alloc_proc.cpp new file mode 100644 index 000000000..0c2eb1ee6 --- /dev/null +++ b/tests/e2e/memory/memory_limit_global_thread_alloc_proc.cpp @@ -0,0 +1,68 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include + +#include "utils/logging.hpp" +#include "utils/timer.hpp" + +DEFINE_uint64(bolt_port, 7687, "Bolt port"); +DEFINE_uint64(timeout, 120, "Timeout seconds"); +DEFINE_bool(multi_db, false, "Run test in multi db environment"); + +// Test checks path of throwing error from different thread +// than main thread which started test +int main(int argc, char **argv) { + google::SetUsageMessage("Memgraph E2E Global Memory Limit In Multi-Thread For Procedures For Local Allocators"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + memgraph::logging::RedirectToStderr(); + + mg::Client::Init(); + + auto client = + mg::Client::Connect({.host = "127.0.0.1", .port = static_cast(FLAGS_bolt_port), .use_ssl = false}); + if (!client) { + LOG_FATAL("Failed to connect!"); + } + + if (FLAGS_multi_db) { + client->Execute("CREATE DATABASE clean;"); + client->DiscardAll(); + client->Execute("USE DATABASE clean;"); + client->DiscardAll(); + client->Execute("MATCH (n) DETACH DELETE n;"); + client->DiscardAll(); + } + + bool error{false}; + try { + client->Execute( + "CALL libglobal_memory_limit_thread_proc.thread() YIELD allocated_all RETURN allocated_all QUERY MEMORY LIMIT " + "100MB;"); + auto result_rows = client->FetchAll(); + if (result_rows) { + auto row = *result_rows->begin(); + error = row[0].ValueBool() == false; + } + + } catch (const std::exception &e) { + error = true; + } + + MG_ASSERT(error, "Error should have happend"); + + return 0; +} diff --git a/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp b/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp index 118ff41ce..f850ec6c2 100644 --- a/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp +++ b/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "utils/logging.hpp" @@ -53,11 +54,9 @@ int main(int argc, char **argv) { "CALL libproc_memory_limit.alloc_32_mib() PROCEDURE MEMORY LIMIT 10MB YIELD allocated AS allocated_2 RETURN " "allocated_1, allocated_2"); auto result_rows = client->FetchAll(); - if (result_rows) { - auto row = *result_rows->begin(); - test_passed = row[0].ValueBool() == true && row[0].ValueBool() == false; + if (result_rows && result_rows->empty()) { + test_passed = true; } - } catch (const std::exception &e) { test_passed = true; } diff --git a/tests/e2e/memory/procedures/CMakeLists.txt b/tests/e2e/memory/procedures/CMakeLists.txt index 84c56f414..df7acee31 100644 --- a/tests/e2e/memory/procedures/CMakeLists.txt +++ b/tests/e2e/memory/procedures/CMakeLists.txt @@ -4,16 +4,21 @@ target_include_directories(global_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/inclu add_library(global_memory_limit_proc SHARED global_memory_limit_proc.c) target_include_directories(global_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) - add_library(query_memory_limit_proc_multi_thread SHARED query_memory_limit_proc_multi_thread.cpp) target_include_directories(query_memory_limit_proc_multi_thread PRIVATE ${CMAKE_SOURCE_DIR}/include) target_link_libraries(query_memory_limit_proc_multi_thread mg-utils) - add_library(query_memory_limit_proc SHARED query_memory_limit_proc.cpp) target_include_directories(query_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) target_link_libraries(query_memory_limit_proc mg-utils) +add_library(global_memory_limit_thread_proc SHARED global_memory_limit_thread_proc.cpp) +target_include_directories(global_memory_limit_thread_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) +target_link_libraries(global_memory_limit_thread_proc mg-utils) + +add_library(global_memory_limit_multi_thread_create_proc SHARED global_memory_limit_multi_thread_create_proc.cpp) +target_include_directories(global_memory_limit_multi_thread_create_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) +target_link_libraries(global_memory_limit_multi_thread_create_proc mg-utils) add_library(proc_memory_limit SHARED proc_memory_limit.cpp) target_include_directories(proc_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/include) diff --git a/tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp b/tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp new file mode 100644 index 000000000..2ccaac631 --- /dev/null +++ b/tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp @@ -0,0 +1,95 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "mg_procedure.h" +#include "mgp.hpp" +#include "utils/on_scope_exit.hpp" + +// change communication between threads with feature and promise +std::atomic created_vertices{0}; +constexpr int num_vertices_per_thread{100'000}; +constexpr int num_threads{2}; + +void CallCreate(mgp_graph *graph, mgp_memory *memory) { + [[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph); + for (int i = 0; i < num_vertices_per_thread; i++) { + struct mgp_vertex *vertex{nullptr}; + auto enum_error = mgp_graph_create_vertex(graph, memory, &vertex); + if (enum_error != mgp_error::MGP_ERROR_NO_ERROR) { + break; + } + created_vertices.fetch_add(1, std::memory_order_acq_rel); + } + [[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph); +} + +void AllocFunc(mgp_graph *graph, mgp_memory *memory) { + try { + CallCreate(graph, memory); + } catch (const std::exception &e) { + return; + } +} + +void MultiCreate(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + mgp::MemoryDispatcherGuard guard{memory}; + const auto arguments = mgp::List(args); + const auto record_factory = mgp::RecordFactory(result); + try { + std::vector threads; + + for (int i = 0; i < 2; i++) { + threads.emplace_back(AllocFunc, memgraph_graph, memory); + } + + for (int i = 0; i < num_threads; i++) { + threads[i].join(); + } + if (created_vertices.load(std::memory_order_acquire) != num_vertices_per_thread * num_threads) { + record_factory.SetErrorMessage("Unable to allocate"); + return; + } + + auto new_record = record_factory.NewRecord(); + new_record.Insert("allocated_all", + created_vertices.load(std::memory_order_acquire) == num_vertices_per_thread * num_threads); + } catch (std::exception &e) { + record_factory.SetErrorMessage(e.what()); + } +} + +extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) { + try { + mgp::MemoryDispatcherGuard guard{memory}; + + AddProcedure(MultiCreate, std::string("multi_create").c_str(), mgp::ProcedureType::Write, {}, + {mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory); + + } catch (const std::exception &e) { + return 1; + } + + return 0; +} + +extern "C" int mgp_shutdown_module() { return 0; } diff --git a/tests/e2e/memory/procedures/global_memory_limit_thread_proc.cpp b/tests/e2e/memory/procedures/global_memory_limit_thread_proc.cpp new file mode 100644 index 000000000..95bf19f9d --- /dev/null +++ b/tests/e2e/memory/procedures/global_memory_limit_thread_proc.cpp @@ -0,0 +1,92 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "mg_procedure.h" +#include "mgp.hpp" +#include "utils/on_scope_exit.hpp" + +enum mgp_error Alloc(mgp_memory *memory, void *ptr) { + const size_t mb_size_512 = 1 << 29; + + return mgp_alloc(memory, mb_size_512, (void **)(&ptr)); +} + +// change communication between threads with feature and promise +std::atomic num_allocations{0}; +void *ptr_; + +void AllocFunc(mgp_memory *memory, mgp_graph *graph) { + try { + [[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph); + enum mgp_error alloc_err { mgp_error::MGP_ERROR_NO_ERROR }; + alloc_err = Alloc(memory, ptr_); + if (alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) { + num_allocations.fetch_add(1, std::memory_order_relaxed); + } + if (alloc_err != mgp_error::MGP_ERROR_NO_ERROR) { + assert(false); + } + } catch (const std::exception &e) { + [[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph); + assert(false); + } + [[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph); +} + +void Thread(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + mgp::MemoryDispatcherGuard guard{memory}; + const auto arguments = mgp::List(args); + const auto record_factory = mgp::RecordFactory(result); + num_allocations.store(0, std::memory_order_relaxed); + try { + std::thread thread{AllocFunc, memory, memgraph_graph}; + + thread.join(); + + if (ptr_ != nullptr) { + mgp_free(memory, ptr_); + } + + auto new_record = record_factory.NewRecord(); + + new_record.Insert("allocated_all", num_allocations.load(std::memory_order_relaxed) == 1); + } catch (std::exception &e) { + record_factory.SetErrorMessage(e.what()); + } +} + +extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) { + try { + mgp::memory = memory; + + AddProcedure(Thread, std::string("thread").c_str(), mgp::ProcedureType::Read, {}, + {mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory); + + } catch (const std::exception &e) { + return 1; + } + + return 0; +} + +extern "C" int mgp_shutdown_module() { return 0; } diff --git a/tests/e2e/memory/procedures/proc_memory_limit.cpp b/tests/e2e/memory/procedures/proc_memory_limit.cpp index 9f36dfaa4..d78407222 100644 --- a/tests/e2e/memory/procedures/proc_memory_limit.cpp +++ b/tests/e2e/memory/procedures/proc_memory_limit.cpp @@ -75,6 +75,9 @@ void Alloc_32_MiB(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, }}; const enum mgp_error alloc_err = Alloc_32(memory, ptr); + if (alloc_err != mgp_error::MGP_ERROR_NO_ERROR) { + record_factory.SetErrorMessage("Unable to allocate"); + } auto new_record = record_factory.NewRecord(); new_record.Insert("allocated", alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE); } catch (std::exception &e) { diff --git a/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp b/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp index ffc509ff3..0a1d8f125 100644 --- a/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp +++ b/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp @@ -13,7 +13,7 @@ #include #include #include -#include + #include #include #include @@ -22,6 +22,7 @@ #include #include "mg_procedure.h" +#include "mgp.hpp" #include "utils/on_scope_exit.hpp" enum mgp_error Alloc(void *ptr) { diff --git a/tests/e2e/memory/workloads.yaml b/tests/e2e/memory/workloads.yaml index e84faccf0..d826175ed 100644 --- a/tests/e2e/memory/workloads.yaml +++ b/tests/e2e/memory/workloads.yaml @@ -144,6 +144,7 @@ workloads: binary: "tests/e2e/memory/memgraph__e2e__memory__limit_query_alloc_create_multi_thread" args: ["--bolt-port", *bolt_port] <<: *in_memory_query_limit_cluster + - name: "Memory control for detach delete" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete" args: ["--bolt-port", *bolt_port] @@ -174,6 +175,18 @@ workloads: args: ["--bolt-port", *bolt_port] <<: *disk_450_MiB_limit_cluster + - name: "Memory control for create from multi thread proc create" + binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_multi_thread_proc_create" + proc: "tests/e2e/memory/procedures/" + args: ["--bolt-port", *bolt_port] + <<: *in_memory_cluster + + - name: "Memory control for memory limit global thread alloc" + binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_thread_alloc_proc" + proc: "tests/e2e/memory/procedures/" + args: ["--bolt-port", *bolt_port] + <<: *in_memory_cluster + - name: "Procedure memory control for single procedure" binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit" proc: "tests/e2e/memory/procedures/" @@ -181,7 +194,7 @@ workloads: <<: *in_memory_limited_global_limit_cluster - name: "Procedure memory control for multiple procedures" - binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit" + binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit_multi_proc" proc: "tests/e2e/memory/procedures/" args: ["--bolt-port", *bolt_port] <<: *in_memory_limited_global_limit_cluster