Add OOM enabler for MG procedure (#1401)

This commit is contained in:
Antonio Filipovic 2023-11-15 12:42:04 +01:00 committed by GitHub
parent c037cddb0e
commit d3f4c35362
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 368 additions and 14 deletions

View File

@ -57,6 +57,7 @@
#include "utils/likely.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/message.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/pmr/deque.hpp"
@ -4859,6 +4860,7 @@ class CallProcedureCursor : public Cursor {
result_signature_size_ = result_->signature->size();
result_->signature = nullptr;
if (result_->error_msg) {
memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker blocker;
throw QueryRuntimeException("{}: {}", self_->procedure_name_, *result_->error_msg);
}
result_row_it_ = result_->rows.begin();

View File

@ -176,7 +176,10 @@ class RuleBasedPlanner {
PlanResult Plan(const QueryParts &query_parts) {
auto &context = *context_;
std::unique_ptr<LogicalOperator> final_plan;
// procedures need to start from 1
// due to swapping mechanism of procedure
// tracking
uint64_t procedure_id = 1;
for (const auto &query_part : query_parts.query_parts) {
std::unique_ptr<LogicalOperator> input_op;
@ -186,10 +189,6 @@ class RuleBasedPlanner {
uint64_t merge_id = 0;
uint64_t subquery_id = 0;
// procedures need to start from 1
// due to swapping mechanism of procedure
// tracking
uint64_t procedure_id = 1;
for (const auto &clause : single_query_part.remaining_clauses) {
MG_ASSERT(!utils::IsSubtype(*clause, Match::kType), "Unexpected Match in remaining clauses");

View File

@ -38,6 +38,7 @@
#include "utils/logging.hpp"
#include "utils/math.hpp"
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/string.hpp"
#include "utils/temporal.hpp"
#include "utils/variant_helpers.hpp"
@ -158,6 +159,7 @@ template <typename TFunc, typename... Args>
[[nodiscard]] mgp_error WrapExceptions(TFunc &&func, Args &&...args) noexcept {
static_assert(sizeof...(args) <= 1, "WrapExceptions should have only one or zero parameter!");
try {
memgraph::utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_enabler;
WrapExceptionsHelper(std::forward<TFunc>(func), std::forward<Args>(args)...);
} catch (const DeletedObjectException &neoe) {
spdlog::error("Deleted object error during mg API call: {}", neoe.what());
@ -1544,6 +1546,7 @@ mgp_error mgp_duration_sub(mgp_duration *first, mgp_duration *second, mgp_memory
mgp_error mgp_result_set_error_msg(mgp_result *res, const char *msg) {
return WrapExceptions([=] {
memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker blocker{};
auto *memory = res->rows.get_allocator().GetMemoryResource();
res->error_msg.emplace(msg, memory);
});

View File

@ -2,6 +2,8 @@ add_subdirectory(procedures)
find_package(gflags REQUIRED)
# Global memory limit
add_executable(memgraph__e2e__memory__control memory_control.cpp)
target_link_libraries(memgraph__e2e__memory__control gflags mgclient mg-utils mg-io Threads::Threads)
@ -20,6 +22,12 @@ target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient
add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp)
target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io)
add_executable(memgraph__e2e__memory_limit_global_multi_thread_proc_create memory_limit_global_multi_thread_proc_create.cpp)
target_link_libraries(memgraph__e2e__memory_limit_global_multi_thread_proc_create gflags mgclient mg-utils mg-io)
add_executable(memgraph__e2e__memory_limit_global_thread_alloc_proc memory_limit_global_thread_alloc_proc.cpp)
target_link_libraries(memgraph__e2e__memory_limit_global_thread_alloc_proc gflags mgclient mg-utils mg-io)
# Query memory limit tests
add_executable(memgraph__e2e__memory__limit_query_alloc_proc_multi_thread query_memory_limit_proc_multi_thread.cpp)
@ -34,7 +42,6 @@ target_link_libraries(memgraph__e2e__memory__limit_query_alloc_proc gflags mgcli
add_executable(memgraph__e2e__memory__limit_query_alloc_create_multi_thread query_memory_limit_multi_thread.cpp)
target_link_libraries(memgraph__e2e__memory__limit_query_alloc_create_multi_thread gflags mgclient mg-utils mg-io Threads::Threads)
# Procedure memory limit tests
add_executable(memgraph__e2e__procedure_memory_limit procedure_memory_limit.cpp)

View File

@ -0,0 +1,67 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include <algorithm>
#include <exception>
#include <ios>
#include <iostream>
#include <mgclient.hpp>
#include "utils/logging.hpp"
#include "utils/timer.hpp"
DEFINE_uint64(bolt_port, 7687, "Bolt port");
DEFINE_uint64(timeout, 120, "Timeout seconds");
DEFINE_bool(multi_db, false, "Run test in multi db environment");
int main(int argc, char **argv) {
google::SetUsageMessage("Memgraph E2E Global Memory Limit In Multi-Thread Create For Local Allocators");
gflags::ParseCommandLineFlags(&argc, &argv, true);
memgraph::logging::RedirectToStderr();
mg::Client::Init();
auto client =
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
if (!client) {
LOG_FATAL("Failed to connect!");
}
if (FLAGS_multi_db) {
client->Execute("CREATE DATABASE clean;");
client->DiscardAll();
client->Execute("USE DATABASE clean;");
client->DiscardAll();
client->Execute("MATCH (n) DETACH DELETE n;");
client->DiscardAll();
}
bool error{false};
try {
client->Execute(
"CALL libglobal_memory_limit_multi_thread_create_proc.multi_create() PROCEDURE MEMORY UNLIMITED YIELD "
"allocated_all RETURN allocated_all "
"QUERY MEMORY LIMIT 50MB;");
auto result_rows = client->FetchAll();
if (result_rows) {
auto row = *result_rows->begin();
error = row[0].ValueBool() == false;
}
} catch (const std::exception &e) {
error = true;
}
MG_ASSERT(error, "Error should have happend");
return 0;
}

View File

@ -0,0 +1,68 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include <algorithm>
#include <exception>
#include <ios>
#include <iostream>
#include <mgclient.hpp>
#include "utils/logging.hpp"
#include "utils/timer.hpp"
DEFINE_uint64(bolt_port, 7687, "Bolt port");
DEFINE_uint64(timeout, 120, "Timeout seconds");
DEFINE_bool(multi_db, false, "Run test in multi db environment");
// Test checks path of throwing error from different thread
// than main thread which started test
int main(int argc, char **argv) {
google::SetUsageMessage("Memgraph E2E Global Memory Limit In Multi-Thread For Procedures For Local Allocators");
gflags::ParseCommandLineFlags(&argc, &argv, true);
memgraph::logging::RedirectToStderr();
mg::Client::Init();
auto client =
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
if (!client) {
LOG_FATAL("Failed to connect!");
}
if (FLAGS_multi_db) {
client->Execute("CREATE DATABASE clean;");
client->DiscardAll();
client->Execute("USE DATABASE clean;");
client->DiscardAll();
client->Execute("MATCH (n) DETACH DELETE n;");
client->DiscardAll();
}
bool error{false};
try {
client->Execute(
"CALL libglobal_memory_limit_thread_proc.thread() YIELD allocated_all RETURN allocated_all QUERY MEMORY LIMIT "
"100MB;");
auto result_rows = client->FetchAll();
if (result_rows) {
auto row = *result_rows->begin();
error = row[0].ValueBool() == false;
}
} catch (const std::exception &e) {
error = true;
}
MG_ASSERT(error, "Error should have happend");
return 0;
}

View File

@ -14,6 +14,7 @@
#include <exception>
#include <ios>
#include <iostream>
#include <mgclient-value.hpp>
#include <mgclient.hpp>
#include "utils/logging.hpp"
@ -53,11 +54,9 @@ int main(int argc, char **argv) {
"CALL libproc_memory_limit.alloc_32_mib() PROCEDURE MEMORY LIMIT 10MB YIELD allocated AS allocated_2 RETURN "
"allocated_1, allocated_2");
auto result_rows = client->FetchAll();
if (result_rows) {
auto row = *result_rows->begin();
test_passed = row[0].ValueBool() == true && row[0].ValueBool() == false;
if (result_rows && result_rows->empty()) {
test_passed = true;
}
} catch (const std::exception &e) {
test_passed = true;
}

View File

@ -4,16 +4,21 @@ target_include_directories(global_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/inclu
add_library(global_memory_limit_proc SHARED global_memory_limit_proc.c)
target_include_directories(global_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
add_library(query_memory_limit_proc_multi_thread SHARED query_memory_limit_proc_multi_thread.cpp)
target_include_directories(query_memory_limit_proc_multi_thread PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(query_memory_limit_proc_multi_thread mg-utils)
add_library(query_memory_limit_proc SHARED query_memory_limit_proc.cpp)
target_include_directories(query_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(query_memory_limit_proc mg-utils)
add_library(global_memory_limit_thread_proc SHARED global_memory_limit_thread_proc.cpp)
target_include_directories(global_memory_limit_thread_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(global_memory_limit_thread_proc mg-utils)
add_library(global_memory_limit_multi_thread_create_proc SHARED global_memory_limit_multi_thread_create_proc.cpp)
target_include_directories(global_memory_limit_multi_thread_create_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(global_memory_limit_multi_thread_create_proc mg-utils)
add_library(proc_memory_limit SHARED proc_memory_limit.cpp)
target_include_directories(proc_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/include)

View File

@ -0,0 +1,95 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <atomic>
#include <cassert>
#include <exception>
#include <functional>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "mg_procedure.h"
#include "mgp.hpp"
#include "utils/on_scope_exit.hpp"
// change communication between threads with feature and promise
std::atomic<int> created_vertices{0};
constexpr int num_vertices_per_thread{100'000};
constexpr int num_threads{2};
void CallCreate(mgp_graph *graph, mgp_memory *memory) {
[[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph);
for (int i = 0; i < num_vertices_per_thread; i++) {
struct mgp_vertex *vertex{nullptr};
auto enum_error = mgp_graph_create_vertex(graph, memory, &vertex);
if (enum_error != mgp_error::MGP_ERROR_NO_ERROR) {
break;
}
created_vertices.fetch_add(1, std::memory_order_acq_rel);
}
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph);
}
void AllocFunc(mgp_graph *graph, mgp_memory *memory) {
try {
CallCreate(graph, memory);
} catch (const std::exception &e) {
return;
}
}
void MultiCreate(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
try {
std::vector<std::thread> threads;
for (int i = 0; i < 2; i++) {
threads.emplace_back(AllocFunc, memgraph_graph, memory);
}
for (int i = 0; i < num_threads; i++) {
threads[i].join();
}
if (created_vertices.load(std::memory_order_acquire) != num_vertices_per_thread * num_threads) {
record_factory.SetErrorMessage("Unable to allocate");
return;
}
auto new_record = record_factory.NewRecord();
new_record.Insert("allocated_all",
created_vertices.load(std::memory_order_acquire) == num_vertices_per_thread * num_threads);
} catch (std::exception &e) {
record_factory.SetErrorMessage(e.what());
}
}
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};
AddProcedure(MultiCreate, std::string("multi_create").c_str(), mgp::ProcedureType::Write, {},
{mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory);
} catch (const std::exception &e) {
return 1;
}
return 0;
}
extern "C" int mgp_shutdown_module() { return 0; }

View File

@ -0,0 +1,92 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <atomic>
#include <cassert>
#include <exception>
#include <functional>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "mg_procedure.h"
#include "mgp.hpp"
#include "utils/on_scope_exit.hpp"
enum mgp_error Alloc(mgp_memory *memory, void *ptr) {
const size_t mb_size_512 = 1 << 29;
return mgp_alloc(memory, mb_size_512, (void **)(&ptr));
}
// change communication between threads with feature and promise
std::atomic<int> num_allocations{0};
void *ptr_;
void AllocFunc(mgp_memory *memory, mgp_graph *graph) {
try {
[[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph);
enum mgp_error alloc_err { mgp_error::MGP_ERROR_NO_ERROR };
alloc_err = Alloc(memory, ptr_);
if (alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
num_allocations.fetch_add(1, std::memory_order_relaxed);
}
if (alloc_err != mgp_error::MGP_ERROR_NO_ERROR) {
assert(false);
}
} catch (const std::exception &e) {
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph);
assert(false);
}
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph);
}
void Thread(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
num_allocations.store(0, std::memory_order_relaxed);
try {
std::thread thread{AllocFunc, memory, memgraph_graph};
thread.join();
if (ptr_ != nullptr) {
mgp_free(memory, ptr_);
}
auto new_record = record_factory.NewRecord();
new_record.Insert("allocated_all", num_allocations.load(std::memory_order_relaxed) == 1);
} catch (std::exception &e) {
record_factory.SetErrorMessage(e.what());
}
}
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::memory = memory;
AddProcedure(Thread, std::string("thread").c_str(), mgp::ProcedureType::Read, {},
{mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory);
} catch (const std::exception &e) {
return 1;
}
return 0;
}
extern "C" int mgp_shutdown_module() { return 0; }

View File

@ -75,6 +75,9 @@ void Alloc_32_MiB(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result,
}};
const enum mgp_error alloc_err = Alloc_32(memory, ptr);
if (alloc_err != mgp_error::MGP_ERROR_NO_ERROR) {
record_factory.SetErrorMessage("Unable to allocate");
}
auto new_record = record_factory.NewRecord();
new_record.Insert("allocated", alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE);
} catch (std::exception &e) {

View File

@ -13,7 +13,7 @@
#include <cassert>
#include <exception>
#include <functional>
#include <mgp.hpp>
#include <mutex>
#include <sstream>
#include <string>
@ -22,6 +22,7 @@
#include <vector>
#include "mg_procedure.h"
#include "mgp.hpp"
#include "utils/on_scope_exit.hpp"
enum mgp_error Alloc(void *ptr) {

View File

@ -144,6 +144,7 @@ workloads:
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_query_alloc_create_multi_thread"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_query_limit_cluster
- name: "Memory control for detach delete"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete"
args: ["--bolt-port", *bolt_port]
@ -174,6 +175,18 @@ workloads:
args: ["--bolt-port", *bolt_port]
<<: *disk_450_MiB_limit_cluster
- name: "Memory control for create from multi thread proc create"
binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_multi_thread_proc_create"
proc: "tests/e2e/memory/procedures/"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_cluster
- name: "Memory control for memory limit global thread alloc"
binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_thread_alloc_proc"
proc: "tests/e2e/memory/procedures/"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_cluster
- name: "Procedure memory control for single procedure"
binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit"
proc: "tests/e2e/memory/procedures/"
@ -181,7 +194,7 @@ workloads:
<<: *in_memory_limited_global_limit_cluster
- name: "Procedure memory control for multiple procedures"
binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit"
binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit_multi_proc"
proc: "tests/e2e/memory/procedures/"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_limited_global_limit_cluster