From 33c400fcc1d285e7d883e29e4828a047fff1c9e2 Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd <gareth.lloyd@memgraph.io> Date: Fri, 16 Feb 2024 15:35:08 +0000 Subject: [PATCH] Fixup memory e2e tests (#1715) - Remove the e2e that did concurrent mgp_* calls on the same transaction (ATM this is unsupported) - Fix up the concurrent mgp_global_alloc test to be testing it more precisely - Reduce the memory limit on detach delete test due to recent memory optimizations around deltas. - No longer throw from hook, through jemalloc C, to our C++ on other side. This cause mutex unlocks to not happen. - No longer allocate error messages while inside the hook. This caused recursive entry back inside jamalloc which would try to relock a non-recursive mutex. --- src/memory/global_memory_control.cpp | 14 +- src/memory/new_delete.cpp | 20 ++- src/memory/query_memory_control.cpp | 8 +- src/memory/query_memory_control.hpp | 2 +- src/query/procedure/mg_procedure_impl.cpp | 9 +- src/utils/exceptions.hpp | 18 ++- src/utils/memory_tracker.cpp | 32 ++++- src/utils/memory_tracker.hpp | 26 +++- src/utils/query_memory_tracker.cpp | 11 +- src/utils/query_memory_tracker.hpp | 4 +- tests/e2e/CMakeLists.txt | 2 +- tests/e2e/memory/CMakeLists.txt | 3 - tests/e2e/memory/memory_control.cpp | 4 +- ..._limit_global_multi_thread_proc_create.cpp | 67 --------- tests/e2e/memory/procedures/CMakeLists.txt | 6 +- ..._memory_limit_multi_thread_create_proc.cpp | 95 ------------- .../query_memory_limit_proc_multi_thread.cpp | 127 ++++++++++++------ .../query_memory_limit_proc_multi_thread.cpp | 14 +- tests/e2e/memory/workloads.yaml | 65 ++++++--- tests/unit/utils_memory_tracker.cpp | 10 +- 20 files changed, 264 insertions(+), 273 deletions(-) delete mode 100644 tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp delete mode 100644 tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp diff --git a/src/memory/global_memory_control.cpp b/src/memory/global_memory_control.cpp index 9c3f5db32..bcf12bd2c 100644 --- a/src/memory/global_memory_control.cpp +++ b/src/memory/global_memory_control.cpp @@ -61,10 +61,12 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t // This needs to be before, to throw exception in case of too big alloc if (*commit) [[likely]] { if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] { - GetQueriesMemoryControl().TrackAllocOnCurrentThread(size); + bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(size); + if (!ok) return nullptr; } // This needs to be here so it doesn't get incremented in case the first TrackAlloc throws an exception - memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size)); + bool ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size)); + if (!ok) return nullptr; } auto *ptr = old_hooks->alloc(extent_hooks, new_addr, size, alignment, zero, commit, arena_ind); @@ -118,10 +120,14 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz return err; } + [[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{}; if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] { - GetQueriesMemoryControl().TrackAllocOnCurrentThread(length); + bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(length); + DMG_ASSERT(ok); } - memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length)); + + auto ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length)); + DMG_ASSERT(ok); return false; } diff --git a/src/memory/new_delete.cpp b/src/memory/new_delete.cpp index 2f982ec67..32ed4d4be 100644 --- a/src/memory/new_delete.cpp +++ b/src/memory/new_delete.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -28,6 +28,12 @@ void *newImpl(const std::size_t size) { return ptr; } + [[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{}; + auto maybe_msg = memgraph::utils::MemoryErrorStatus().msg(); + if (maybe_msg) { + throw memgraph::utils::OutOfMemoryException{std::move(*maybe_msg)}; + } + throw std::bad_alloc{}; } @@ -37,11 +43,21 @@ void *newImpl(const std::size_t size, const std::align_val_t align) { return ptr; } + [[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{}; + auto maybe_msg = memgraph::utils::MemoryErrorStatus().msg(); + if (maybe_msg) { + throw memgraph::utils::OutOfMemoryException{std::move(*maybe_msg)}; + } + throw std::bad_alloc{}; } -void *newNoExcept(const std::size_t size) noexcept { return malloc(size); } +void *newNoExcept(const std::size_t size) noexcept { + [[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{}; + return malloc(size); +} void *newNoExcept(const std::size_t size, const std::align_val_t align) noexcept { + [[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{}; return aligned_alloc(size, static_cast<std::size_t>(align)); } diff --git a/src/memory/query_memory_control.cpp b/src/memory/query_memory_control.cpp index 1a3ede032..119d936ec 100644 --- a/src/memory/query_memory_control.cpp +++ b/src/memory/query_memory_control.cpp @@ -54,14 +54,14 @@ void QueriesMemoryControl::EraseThreadToTransactionId(const std::thread::id &thr } } -void QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) { +bool QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) { auto thread_id_to_transaction_id_accessor = thread_id_to_transaction_id.access(); // we might be just constructing mapping between thread id and transaction id // so we miss this allocation auto thread_id_to_transaction_id_elem = thread_id_to_transaction_id_accessor.find(std::this_thread::get_id()); if (thread_id_to_transaction_id_elem == thread_id_to_transaction_id_accessor.end()) { - return; + return true; } auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); @@ -71,10 +71,10 @@ void QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) { // It can happen that some allocation happens between mapping thread to // transaction id, so we miss this allocation if (transaction_id_to_tracker == transaction_id_to_tracker_accessor.end()) [[unlikely]] { - return; + return true; } auto &query_tracker = transaction_id_to_tracker->tracker; - query_tracker.TrackAlloc(size); + return query_tracker.TrackAlloc(size); } void QueriesMemoryControl::TrackFreeOnCurrentThread(size_t size) { diff --git a/src/memory/query_memory_control.hpp b/src/memory/query_memory_control.hpp index 3852027a5..b598a8c73 100644 --- a/src/memory/query_memory_control.hpp +++ b/src/memory/query_memory_control.hpp @@ -62,7 +62,7 @@ class QueriesMemoryControl { // Find tracker for current thread if exists, track // query allocation and procedure allocation if // necessary - void TrackAllocOnCurrentThread(size_t size); + bool TrackAllocOnCurrentThread(size_t size); // Find tracker for current thread if exists, track // query allocation and procedure allocation if diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 2eea1cecb..c7faf15f7 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -187,6 +187,7 @@ template <typename TFunc, typename... Args> spdlog::error("Memory allocation error during mg API call: {}", bae.what()); return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE; } catch (const memgraph::utils::OutOfMemoryException &oome) { + [[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{}; spdlog::error("Memory limit exceeded during mg API call: {}", oome.what()); return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE; } catch (const std::out_of_range &oore) { @@ -198,12 +199,12 @@ template <typename TFunc, typename... Args> } catch (const std::logic_error &lee) { spdlog::error("Logic error during mg API call: {}", lee.what()); return mgp_error::MGP_ERROR_LOGIC_ERROR; - } catch (const std::exception &e) { - spdlog::error("Unexpected error during mg API call: {}", e.what()); - return mgp_error::MGP_ERROR_UNKNOWN_ERROR; } catch (const memgraph::utils::temporal::InvalidArgumentException &e) { spdlog::error("Invalid argument was sent to an mg API call for temporal types: {}", e.what()); return mgp_error::MGP_ERROR_INVALID_ARGUMENT; + } catch (const std::exception &e) { + spdlog::error("Unexpected error during mg API call: {}", e.what()); + return mgp_error::MGP_ERROR_UNKNOWN_ERROR; } catch (...) { spdlog::error("Unexpected error during mg API call"); return mgp_error::MGP_ERROR_UNKNOWN_ERROR; diff --git a/src/utils/exceptions.hpp b/src/utils/exceptions.hpp index fa49f770e..929be1d58 100644 --- a/src/utils/exceptions.hpp +++ b/src/utils/exceptions.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -42,12 +42,26 @@ namespace memgraph::utils { class BasicException : public std::exception { public: /** - * @brief Constructor (C++ STL strings). + * @brief Constructor (C++ STL strings_view). * * @param message The error message. */ explicit BasicException(std::string_view message) noexcept : msg_(message) {} + /** + * @brief Constructor (string literal). + * + * @param message The error message. + */ + explicit BasicException(const char *message) noexcept : msg_(message) {} + + /** + * @brief Constructor (C++ STL strings). + * + * @param message The error message. + */ + explicit BasicException(std::string message) noexcept : msg_(std::move(message)) {} + /** * @brief Constructor with format string (C++ STL strings). * diff --git a/src/utils/memory_tracker.cpp b/src/utils/memory_tracker.cpp index 7dfd88416..3a2ec4dec 100644 --- a/src/utils/memory_tracker.cpp +++ b/src/utils/memory_tracker.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -104,7 +104,7 @@ void MemoryTracker::SetMaximumHardLimit(const int64_t limit) { maximum_hard_limit_ = limit; } -void MemoryTracker::Alloc(const int64_t size) { +bool MemoryTracker::Alloc(int64_t const size) { MG_ASSERT(size >= 0, "Negative size passed to the MemoryTracker."); const int64_t will_be = size + amount_.fetch_add(size, std::memory_order_relaxed); @@ -116,12 +116,13 @@ void MemoryTracker::Alloc(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); - throw OutOfMemoryException( - fmt::format("Memory limit exceeded! Attempting to allocate a chunk of {} which would put the current " - "use to {}, while the maximum allowed size for allocation is set to {}.", - GetReadableSize(size), GetReadableSize(will_be), GetReadableSize(current_hard_limit))); + // register our error data, we will pick this up on the other side of jemalloc + MemoryErrorStatus().set({size, will_be, current_hard_limit}); + + return false; } UpdatePeak(will_be); + return true; } void MemoryTracker::DoCheck() { @@ -139,4 +140,23 @@ void MemoryTracker::DoCheck() { void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); } +// DEVNOTE: important that this is allocated at thread construction time +// otherwise subtle bug where jemalloc will try to lock an non-recursive mutex +// that it already owns +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +thread_local MemoryTrackerStatus status; +auto MemoryErrorStatus() -> MemoryTrackerStatus & { return status; } + +auto MemoryTrackerStatus::msg() -> std::optional<std::string> { + if (!data_) return std::nullopt; + + auto [size, will_be, hard_limit] = *data_; + data_.reset(); + return fmt::format( + "Memory limit exceeded! Attempting to allocate a chunk of {} which would put the current " + "use to {}, while the maximum allowed size for allocation is set to {}.", + // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) + GetReadableSize(size), GetReadableSize(will_be), GetReadableSize(hard_limit)); +} + } // namespace memgraph::utils diff --git a/src/utils/memory_tracker.hpp b/src/utils/memory_tracker.hpp index a6d7221ff..5d82c6f2a 100644 --- a/src/utils/memory_tracker.hpp +++ b/src/utils/memory_tracker.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -12,15 +12,35 @@ #pragma once #include <atomic> +#include <optional> +#include <string> #include <type_traits> #include "utils/exceptions.hpp" namespace memgraph::utils { +struct MemoryTrackerStatus { + struct data { + int64_t size; + int64_t will_be; + int64_t hard_limit; + }; + + // DEVNOTE: Do not call from within allocator, will cause another allocation + auto msg() -> std::optional<std::string>; + + void set(data d) { data_ = d; } + + private: + std::optional<data> data_; +}; + +auto MemoryErrorStatus() -> MemoryTrackerStatus &; + class OutOfMemoryException : public utils::BasicException { public: - explicit OutOfMemoryException(const std::string &msg) : utils::BasicException(msg) {} + explicit OutOfMemoryException(std::string msg) : utils::BasicException(std::move(msg)) {} SPECIALIZE_GET_EXCEPTION_NAME(OutOfMemoryException) }; @@ -47,7 +67,7 @@ class MemoryTracker final { MemoryTracker &operator=(MemoryTracker &&) = delete; - void Alloc(int64_t size); + bool Alloc(int64_t size); void Free(int64_t size); void DoCheck(); diff --git a/src/utils/query_memory_tracker.cpp b/src/utils/query_memory_tracker.cpp index a9b61cdf3..46cb6d871 100644 --- a/src/utils/query_memory_tracker.cpp +++ b/src/utils/query_memory_tracker.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -17,18 +17,19 @@ namespace memgraph::utils { -void QueryMemoryTracker::TrackAlloc(size_t size) { +bool QueryMemoryTracker::TrackAlloc(size_t size) { if (query_tracker_.has_value()) [[likely]] { - query_tracker_->Alloc(static_cast<int64_t>(size)); + bool ok = query_tracker_->Alloc(static_cast<int64_t>(size)); + if (!ok) return false; } auto *proc_tracker = GetActiveProc(); if (proc_tracker == nullptr) { - return; + return true; } - proc_tracker->Alloc(static_cast<int64_t>(size)); + return proc_tracker->Alloc(static_cast<int64_t>(size)); } void QueryMemoryTracker::TrackFree(size_t size) { if (query_tracker_.has_value()) [[likely]] { diff --git a/src/utils/query_memory_tracker.hpp b/src/utils/query_memory_tracker.hpp index 87975adf8..acfdca07f 100644 --- a/src/utils/query_memory_tracker.hpp +++ b/src/utils/query_memory_tracker.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -44,7 +44,7 @@ class QueryMemoryTracker { ~QueryMemoryTracker() = default; // Track allocation on query and procedure if active - void TrackAlloc(size_t); + bool TrackAlloc(size_t size); // Track Free on query and procedure if active void TrackFree(size_t); diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index 9a4406d2b..7e555398e 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -40,7 +40,7 @@ endfunction() add_subdirectory(fine_grained_access) add_subdirectory(server) add_subdirectory(replication) -#add_subdirectory(memory) +add_subdirectory(memory) add_subdirectory(triggers) add_subdirectory(isolation_levels) add_subdirectory(streams) diff --git a/tests/e2e/memory/CMakeLists.txt b/tests/e2e/memory/CMakeLists.txt index 256107724..97fd8f9dc 100644 --- a/tests/e2e/memory/CMakeLists.txt +++ b/tests/e2e/memory/CMakeLists.txt @@ -22,9 +22,6 @@ target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp) target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io) -add_executable(memgraph__e2e__memory_limit_global_multi_thread_proc_create memory_limit_global_multi_thread_proc_create.cpp) -target_link_libraries(memgraph__e2e__memory_limit_global_multi_thread_proc_create gflags mgclient mg-utils mg-io) - add_executable(memgraph__e2e__memory_limit_global_thread_alloc_proc memory_limit_global_thread_alloc_proc.cpp) target_link_libraries(memgraph__e2e__memory_limit_global_thread_alloc_proc gflags mgclient mg-utils mg-io) diff --git a/tests/e2e/memory/memory_control.cpp b/tests/e2e/memory/memory_control.cpp index 0d969220b..d4c4e431c 100644 --- a/tests/e2e/memory/memory_control.cpp +++ b/tests/e2e/memory/memory_control.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -44,7 +44,7 @@ int main(int argc, char **argv) { client->DiscardAll(); } - const auto *create_query = "UNWIND range(1, 50) as u CREATE (n {string: \"Some longer string\"}) RETURN n;"; + const auto *create_query = "UNWIND range(1, 100) as u CREATE (n {string: \"Some longer string\"}) RETURN n;"; memgraph::utils::Timer timer; while (true) { diff --git a/tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp b/tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp deleted file mode 100644 index e44c91ea7..000000000 --- a/tests/e2e/memory/memory_limit_global_multi_thread_proc_create.cpp +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include <gflags/gflags.h> -#include <algorithm> -#include <exception> -#include <ios> -#include <iostream> -#include <mgclient.hpp> - -#include "utils/logging.hpp" -#include "utils/timer.hpp" - -DEFINE_uint64(bolt_port, 7687, "Bolt port"); -DEFINE_uint64(timeout, 120, "Timeout seconds"); -DEFINE_bool(multi_db, false, "Run test in multi db environment"); - -int main(int argc, char **argv) { - google::SetUsageMessage("Memgraph E2E Global Memory Limit In Multi-Thread Create For Local Allocators"); - gflags::ParseCommandLineFlags(&argc, &argv, true); - memgraph::logging::RedirectToStderr(); - - mg::Client::Init(); - - auto client = - mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false}); - if (!client) { - LOG_FATAL("Failed to connect!"); - } - - if (FLAGS_multi_db) { - client->Execute("CREATE DATABASE clean;"); - client->DiscardAll(); - client->Execute("USE DATABASE clean;"); - client->DiscardAll(); - client->Execute("MATCH (n) DETACH DELETE n;"); - client->DiscardAll(); - } - - bool error{false}; - try { - client->Execute( - "CALL libglobal_memory_limit_multi_thread_create_proc.multi_create() PROCEDURE MEMORY UNLIMITED YIELD " - "allocated_all RETURN allocated_all " - "QUERY MEMORY LIMIT 50MB;"); - auto result_rows = client->FetchAll(); - if (result_rows) { - auto row = *result_rows->begin(); - error = !row[0].ValueBool(); - } - - } catch (const std::exception &e) { - error = true; - } - - MG_ASSERT(error, "Error should have happend"); - - return 0; -} diff --git a/tests/e2e/memory/procedures/CMakeLists.txt b/tests/e2e/memory/procedures/CMakeLists.txt index df7acee31..8f9d625f3 100644 --- a/tests/e2e/memory/procedures/CMakeLists.txt +++ b/tests/e2e/memory/procedures/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories(global_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/ add_library(query_memory_limit_proc_multi_thread SHARED query_memory_limit_proc_multi_thread.cpp) target_include_directories(query_memory_limit_proc_multi_thread PRIVATE ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(query_memory_limit_proc_multi_thread mg-utils) +target_link_libraries(query_memory_limit_proc_multi_thread mg-utils ) add_library(query_memory_limit_proc SHARED query_memory_limit_proc.cpp) target_include_directories(query_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) @@ -16,10 +16,6 @@ add_library(global_memory_limit_thread_proc SHARED global_memory_limit_thread_pr target_include_directories(global_memory_limit_thread_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) target_link_libraries(global_memory_limit_thread_proc mg-utils) -add_library(global_memory_limit_multi_thread_create_proc SHARED global_memory_limit_multi_thread_create_proc.cpp) -target_include_directories(global_memory_limit_multi_thread_create_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(global_memory_limit_multi_thread_create_proc mg-utils) - add_library(proc_memory_limit SHARED proc_memory_limit.cpp) target_include_directories(proc_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/include) target_link_libraries(proc_memory_limit mg-utils) diff --git a/tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp b/tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp deleted file mode 100644 index 2ccaac631..000000000 --- a/tests/e2e/memory/procedures/global_memory_limit_multi_thread_create_proc.cpp +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include <atomic> -#include <cassert> -#include <exception> -#include <functional> - -#include <mutex> -#include <sstream> -#include <string> -#include <thread> -#include <utility> -#include <vector> - -#include "mg_procedure.h" -#include "mgp.hpp" -#include "utils/on_scope_exit.hpp" - -// change communication between threads with feature and promise -std::atomic<int> created_vertices{0}; -constexpr int num_vertices_per_thread{100'000}; -constexpr int num_threads{2}; - -void CallCreate(mgp_graph *graph, mgp_memory *memory) { - [[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph); - for (int i = 0; i < num_vertices_per_thread; i++) { - struct mgp_vertex *vertex{nullptr}; - auto enum_error = mgp_graph_create_vertex(graph, memory, &vertex); - if (enum_error != mgp_error::MGP_ERROR_NO_ERROR) { - break; - } - created_vertices.fetch_add(1, std::memory_order_acq_rel); - } - [[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph); -} - -void AllocFunc(mgp_graph *graph, mgp_memory *memory) { - try { - CallCreate(graph, memory); - } catch (const std::exception &e) { - return; - } -} - -void MultiCreate(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { - mgp::MemoryDispatcherGuard guard{memory}; - const auto arguments = mgp::List(args); - const auto record_factory = mgp::RecordFactory(result); - try { - std::vector<std::thread> threads; - - for (int i = 0; i < 2; i++) { - threads.emplace_back(AllocFunc, memgraph_graph, memory); - } - - for (int i = 0; i < num_threads; i++) { - threads[i].join(); - } - if (created_vertices.load(std::memory_order_acquire) != num_vertices_per_thread * num_threads) { - record_factory.SetErrorMessage("Unable to allocate"); - return; - } - - auto new_record = record_factory.NewRecord(); - new_record.Insert("allocated_all", - created_vertices.load(std::memory_order_acquire) == num_vertices_per_thread * num_threads); - } catch (std::exception &e) { - record_factory.SetErrorMessage(e.what()); - } -} - -extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) { - try { - mgp::MemoryDispatcherGuard guard{memory}; - - AddProcedure(MultiCreate, std::string("multi_create").c_str(), mgp::ProcedureType::Write, {}, - {mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory); - - } catch (const std::exception &e) { - return 1; - } - - return 0; -} - -extern "C" int mgp_shutdown_module() { return 0; } diff --git a/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp b/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp index 0a1d8f125..4a1a96845 100644 --- a/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp +++ b/tests/e2e/memory/procedures/query_memory_limit_proc_multi_thread.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,73 +13,122 @@ #include <cassert> #include <exception> #include <functional> +#include <future> +#include <exception> +#include <latch> +#include <list> +#include <memory> #include <mutex> -#include <sstream> #include <string> #include <thread> -#include <utility> #include <vector> #include "mg_procedure.h" #include "mgp.hpp" +#include "utils/memory_tracker.hpp" #include "utils/on_scope_exit.hpp" -enum mgp_error Alloc(void *ptr) { - const size_t mb_size_268 = 1 << 28; +using safe_ptr = std::unique_ptr<void, decltype([](void *p) { mgp_global_free(p); })>; +enum class AllocFuncRes { NoIssues, UnableToAlloc, Unexpected }; +using result_t = std::pair<AllocFuncRes, std::list<safe_ptr>>; - return mgp_global_alloc(mb_size_268, (void **)(&ptr)); -} +constexpr auto N_THREADS = 2; +static_assert(N_THREADS > 0 && (N_THREADS & (N_THREADS - 1)) == 0); -// change communication between threads with feature and promise -std::atomic<int> num_allocations{0}; -std::vector<void *> ptrs_; +constexpr auto mb_size_512 = 1 << 29; +constexpr auto mb_size_16 = 1 << 24; -void AllocFunc(mgp_graph *graph) { +static_assert(mb_size_512 % N_THREADS == 0); +static_assert(mb_size_16 % N_THREADS == 0); +static_assert(mb_size_512 % mb_size_16 == 0); + +void AllocFunc(std::latch &start_latch, std::promise<result_t> promise, mgp_graph *graph) { [[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph); - void *ptr = nullptr; + auto on_exit = memgraph::utils::OnScopeExit{[&]() { + [[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph); + }}; + + std::list<safe_ptr> ptrs; + + // Ensure test would concurrently run these allocations, wait until both are ready + start_latch.arrive_and_wait(); - ptrs_.emplace_back(ptr); try { - enum mgp_error alloc_err { mgp_error::MGP_ERROR_NO_ERROR }; - alloc_err = Alloc(ptr); - if (alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) { - num_allocations.fetch_add(1, std::memory_order_relaxed); - } - if (alloc_err != mgp_error::MGP_ERROR_NO_ERROR) { - assert(false); + constexpr auto allocation_limit = mb_size_512 / N_THREADS; + // many allocation to increase chance of seeing any concurent issues + for (auto total = 0; total < allocation_limit; total += mb_size_16) { + void *ptr = nullptr; + auto alloc_err = mgp_global_alloc(mb_size_16, &ptr); + if (alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE && ptr != nullptr) { + ptrs.emplace_back(ptr); + } else if (alloc_err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) { + // this is expected, the test checks N threads allocating to a limit of 512MB + promise.set_value({AllocFuncRes::UnableToAlloc, std::move(ptrs)}); + return; + } else { + promise.set_value({AllocFuncRes::Unexpected, std::move(ptrs)}); + return; + } } } catch (const std::exception &e) { - assert(false); + promise.set_value({AllocFuncRes::Unexpected, std::move(ptrs)}); + return; } - - [[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph); + promise.set_value({AllocFuncRes::NoIssues, std::move(ptrs)}); + return; } void DualThread(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { mgp::MemoryDispatcherGuard guard{memory}; const auto arguments = mgp::List(args); const auto record_factory = mgp::RecordFactory(result); - num_allocations.store(0, std::memory_order_relaxed); + + // 1 byte allocation to + auto ptr = std::invoke([&] { + void *ptr; + [[maybe_unused]] auto alloc_err = mgp_global_alloc(1, &ptr); + return safe_ptr{ptr}; + }); + try { - std::vector<std::thread> threads; - - for (int i = 0; i < 2; i++) { - threads.emplace_back(AllocFunc, memgraph_graph); - } - - for (int i = 0; i < 2; i++) { - threads[i].join(); - } - for (void *ptr : ptrs_) { - if (ptr != nullptr) { - mgp_global_free(ptr); + auto futures = std::vector<std::future<result_t>>{}; + futures.reserve(N_THREADS); + std::latch start_latch{N_THREADS}; + { + auto threads = std::vector<std::jthread>{}; + threads.reserve(N_THREADS); + for (int i = 0; i < N_THREADS; i++) { + auto promise = std::promise<result_t>{}; + futures.emplace_back(promise.get_future()); + threads.emplace_back([&, promise = std::move(promise)]() mutable { + AllocFunc(start_latch, std::move(promise), memgraph_graph); + }); } + } // ~jthread will join + + int alloc_errors = 0; + int unexpected_errors = 0; + for (auto &x : futures) { + auto [res, ptrs] = x.get(); + alloc_errors += (res == AllocFuncRes::UnableToAlloc); + unexpected_errors += (res == AllocFuncRes::Unexpected); + // regardless of outcome, we want this thread to do the deallocation + ptrs.clear(); + } + + if (unexpected_errors != 0) { + record_factory.SetErrorMessage("Unanticipated error happened"); + return; + } + + if (alloc_errors < 1) { + record_factory.SetErrorMessage("Didn't hit the QUERY MEMORY LIMIT we expected"); + return; } auto new_record = record_factory.NewRecord(); - - new_record.Insert("allocated_all", num_allocations.load(std::memory_order_relaxed) == 2); + new_record.Insert("test_passed", true); } catch (std::exception &e) { record_factory.SetErrorMessage(e.what()); } @@ -90,7 +139,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem mgp::memory = memory; AddProcedure(DualThread, std::string("dual_thread").c_str(), mgp::ProcedureType::Read, {}, - {mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory); + {mgp::Return(std::string("test_passed").c_str(), mgp::Type::Bool)}, module, memory); } catch (const std::exception &e) { return 1; diff --git a/tests/e2e/memory/query_memory_limit_proc_multi_thread.cpp b/tests/e2e/memory/query_memory_limit_proc_multi_thread.cpp index 5acac5404..7d1d06b33 100644 --- a/tests/e2e/memory/query_memory_limit_proc_multi_thread.cpp +++ b/tests/e2e/memory/query_memory_limit_proc_multi_thread.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -46,21 +46,21 @@ int main(int argc, char **argv) { } MG_ASSERT( - client->Execute("CALL libquery_memory_limit_proc_multi_thread.dual_thread() YIELD allocated_all RETURN " - "allocated_all QUERY MEMORY LIMIT 500MB")); + client->Execute("CALL libquery_memory_limit_proc_multi_thread.dual_thread() YIELD test_passed RETURN " + "test_passed QUERY MEMORY LIMIT 500MB")); bool error{false}; try { auto result_rows = client->FetchAll(); if (result_rows) { auto row = *result_rows->begin(); - error = !row[0].ValueBool(); + MG_ASSERT(row[0].ValueBool(), "Execpected the procedure to pass"); + } else { + MG_ASSERT(false, "Expected at least one row"); } } catch (const std::exception &e) { - error = true; + MG_ASSERT(error, "This error should not have happend {}", e.what()); } - MG_ASSERT(error, "Error should have happend"); - return 0; } diff --git a/tests/e2e/memory/workloads.yaml b/tests/e2e/memory/workloads.yaml index 21924c880..bf29e484c 100644 --- a/tests/e2e/memory/workloads.yaml +++ b/tests/e2e/memory/workloads.yaml @@ -6,7 +6,22 @@ args: &args - "--storage-gc-cycle-sec=180" - "--log-level=TRACE" -in_memory_cluster: &in_memory_cluster +args_150_MiB_limit: &args_150_MiB_limit + - "--bolt-port" + - *bolt_port + - "--memory-limit=150" + - "--storage-gc-cycle-sec=180" + - "--log-level=TRACE" + +in_memory_150_MiB_limit_cluster: &in_memory_150_MiB_limit_cluster + cluster: + main: + args: *args_150_MiB_limit + log_file: "memory-e2e.log" + setup_queries: [] + validation_queries: [] + +in_memory_1024_MiB_limit_cluster: &in_memory_1024_MiB_limit_cluster cluster: main: args: *args @@ -61,6 +76,30 @@ disk_450_MiB_limit_cluster: &disk_450_MiB_limit_cluster setup_queries: [] validation_queries: [] +args_300_MiB_limit: &args_300_MiB_limit + - "--bolt-port" + - *bolt_port + - "--memory-limit=300" + - "--storage-gc-cycle-sec=180" + - "--log-level=INFO" + +in_memory_300_MiB_limit_cluster: &in_memory_300_MiB_limit_cluster + cluster: + main: + args: *args_300_MiB_limit + log_file: "memory-e2e.log" + setup_queries: [] + validation_queries: [] + + +disk_300_MiB_limit_cluster: &disk_300_MiB_limit_cluster + cluster: + main: + args: *args_300_MiB_limit + log_file: "memory-e2e.log" + setup_queries: [] + validation_queries: [] + args_global_limit_1024_MiB: &args_global_limit_1024_MiB - "--bolt-port" @@ -80,36 +119,36 @@ workloads: - name: "Memory control" binary: "tests/e2e/memory/memgraph__e2e__memory__control" args: ["--bolt-port", *bolt_port, "--timeout", "180"] - <<: *in_memory_cluster + <<: *in_memory_150_MiB_limit_cluster - name: "Memory control multi database" binary: "tests/e2e/memory/memgraph__e2e__memory__control" args: ["--bolt-port", *bolt_port, "--timeout", "180", "--multi-db", "true"] - <<: *in_memory_cluster + <<: *in_memory_150_MiB_limit_cluster - name: "Memory limit for modules upon loading" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc" args: ["--bolt-port", *bolt_port, "--timeout", "180"] proc: "tests/e2e/memory/procedures/" - <<: *in_memory_cluster + <<: *in_memory_1024_MiB_limit_cluster - name: "Memory limit for modules upon loading multi database" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc" args: ["--bolt-port", *bolt_port, "--timeout", "180", "--multi-db", "true"] proc: "tests/e2e/memory/procedures/" - <<: *in_memory_cluster + <<: *in_memory_1024_MiB_limit_cluster - name: "Memory limit for modules inside a procedure" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc_proc" args: ["--bolt-port", *bolt_port, "--timeout", "180"] proc: "tests/e2e/memory/procedures/" - <<: *in_memory_cluster + <<: *in_memory_1024_MiB_limit_cluster - name: "Memory limit for modules inside a procedure multi database" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc_proc" args: ["--bolt-port", *bolt_port, "--timeout", "180", "--multi-db", "true"] proc: "tests/e2e/memory/procedures/" - <<: *in_memory_cluster + <<: *in_memory_1024_MiB_limit_cluster - name: "Memory limit for modules upon loading for on-disk storage" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc" @@ -143,12 +182,12 @@ workloads: - name: "Memory control for detach delete" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete" args: ["--bolt-port", *bolt_port] - <<: *in_memory_450_MiB_limit_cluster + <<: *in_memory_300_MiB_limit_cluster - name: "Memory control for detach delete on disk storage" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete" args: ["--bolt-port", *bolt_port] - <<: *disk_450_MiB_limit_cluster + <<: *disk_300_MiB_limit_cluster - name: "Memory control for accumulation" binary: "tests/e2e/memory/memgraph__e2e__memory__limit_accumulation" @@ -170,17 +209,11 @@ workloads: args: ["--bolt-port", *bolt_port] <<: *disk_450_MiB_limit_cluster - - name: "Memory control for create from multi thread proc create" - binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_multi_thread_proc_create" - proc: "tests/e2e/memory/procedures/" - args: ["--bolt-port", *bolt_port] - <<: *in_memory_cluster - - name: "Memory control for memory limit global thread alloc" binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_thread_alloc_proc" proc: "tests/e2e/memory/procedures/" args: ["--bolt-port", *bolt_port] - <<: *in_memory_cluster + <<: *in_memory_1024_MiB_limit_cluster - name: "Procedure memory control for single procedure" binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit" diff --git a/tests/unit/utils_memory_tracker.cpp b/tests/unit/utils_memory_tracker.cpp index cfc9e32b1..5f92b493b 100644 --- a/tests/unit/utils_memory_tracker.cpp +++ b/tests/unit/utils_memory_tracker.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -36,13 +36,13 @@ TEST(MemoryTrackerTest, ExceptionEnabler) { can_continue = true; }}; - ASSERT_NO_THROW(memory_tracker.Alloc(hard_limit + 1)); + ASSERT_TRUE(memory_tracker.Alloc(hard_limit + 1)); }}; std::thread t2{[&] { memgraph::utils::MemoryTracker::OutOfMemoryExceptionEnabler exception_enabler; enabler_created = true; - ASSERT_THROW(memory_tracker.Alloc(hard_limit + 1), memgraph::utils::OutOfMemoryException); + ASSERT_FALSE(memory_tracker.Alloc(hard_limit + 1)); // hold the enabler until the first thread finishes while (!can_continue) @@ -63,8 +63,8 @@ TEST(MemoryTrackerTest, ExceptionBlocker) { { memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker; - ASSERT_NO_THROW(memory_tracker.Alloc(hard_limit + 1)); + ASSERT_TRUE(memory_tracker.Alloc(hard_limit + 1)); ASSERT_EQ(memory_tracker.Amount(), hard_limit + 1); } - ASSERT_THROW(memory_tracker.Alloc(hard_limit + 1), memgraph::utils::OutOfMemoryException); + ASSERT_FALSE(memory_tracker.Alloc(hard_limit + 1)); }