diff --git a/src/memory/global_memory_control.cpp b/src/memory/global_memory_control.cpp index 128d5b046..f905be7c8 100644 --- a/src/memory/global_memory_control.cpp +++ b/src/memory/global_memory_control.cpp @@ -62,10 +62,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t if (*commit) [[likely]] { memgraph::utils::total_memory_tracker.Alloc(static_cast(size)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Alloc(static_cast(size)); - } + GetQueriesMemoryControl().TrackAllocOnCurrentThread(size); } } @@ -74,10 +71,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t if (*commit) { memgraph::utils::total_memory_tracker.Free(static_cast(size)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Free(static_cast(size)); - } + GetQueriesMemoryControl().TrackFreeOnCurrentThread(size); } } return ptr; @@ -97,10 +91,7 @@ static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, boo memgraph::utils::total_memory_tracker.Free(static_cast(size)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Free(static_cast(size)); - } + GetQueriesMemoryControl().TrackFreeOnCurrentThread(size); } } @@ -111,10 +102,7 @@ static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bo if (committed) [[likely]] { memgraph::utils::total_memory_tracker.Free(static_cast(size)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Free(static_cast(size)); - } + GetQueriesMemoryControl().TrackFreeOnCurrentThread(size); } } @@ -131,10 +119,7 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz memgraph::utils::total_memory_tracker.Alloc(static_cast(length)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Alloc(static_cast(size)); - } + GetQueriesMemoryControl().TrackFreeOnCurrentThread(size); } return false; @@ -151,10 +136,7 @@ static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, s memgraph::utils::total_memory_tracker.Free(static_cast(length)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Free(static_cast(size)); - } + GetQueriesMemoryControl().TrackFreeOnCurrentThread(size); } return false; @@ -171,10 +153,7 @@ static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t siz memgraph::utils::total_memory_tracker.Free(static_cast(length)); if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] { - auto *memory_tracker = GetQueriesMemoryControl().GetTrackerCurrentThread(); - if (memory_tracker != nullptr) [[likely]] { - memory_tracker->Alloc(static_cast(size)); - } + GetQueriesMemoryControl().TrackFreeOnCurrentThread(size); } return false; diff --git a/src/memory/query_memory_control.cpp b/src/memory/query_memory_control.cpp index d44ad1a83..d742cc8a4 100644 --- a/src/memory/query_memory_control.cpp +++ b/src/memory/query_memory_control.cpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #include +#include #include #include #include @@ -58,34 +59,58 @@ void QueriesMemoryControl::EraseThreadToTransactionId(const std::thread::id &thr accessor.remove(thread_id); } -utils::MemoryTracker *QueriesMemoryControl::GetTrackerCurrentThread() { +void QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) { auto thread_id_to_transaction_id_accessor = thread_id_to_transaction_id.access(); // we might be just constructing mapping between thread id and transaction id // so we miss this allocation auto thread_id_to_transaction_id_elem = thread_id_to_transaction_id_accessor.find(std::this_thread::get_id()); if (thread_id_to_transaction_id_elem == thread_id_to_transaction_id_accessor.end()) { - return nullptr; + return; } auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); auto transaction_id_to_tracker = transaction_id_to_tracker_accessor.find(thread_id_to_transaction_id_elem->transaction_id); - return &transaction_id_to_tracker->tracker; + + // It can happen that some allocation happens between mapping thread to + // transaction id, so we miss this allocation + if (transaction_id_to_tracker == transaction_id_to_tracker_accessor.end()) [[unlikely]] { + return; + } + auto &query_tracker = transaction_id_to_tracker->tracker; + query_tracker.TrackAlloc(size); +} + +void QueriesMemoryControl::TrackFreeOnCurrentThread(size_t size) { + auto thread_id_to_transaction_id_accessor = thread_id_to_transaction_id.access(); + + // we might be just constructing mapping between thread id and transaction id + // so we miss this allocation + auto thread_id_to_transaction_id_elem = thread_id_to_transaction_id_accessor.find(std::this_thread::get_id()); + if (thread_id_to_transaction_id_elem == thread_id_to_transaction_id_accessor.end()) { + return; + } + + auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); + auto transaction_id_to_tracker = + transaction_id_to_tracker_accessor.find(thread_id_to_transaction_id_elem->transaction_id); + + // It can happen that some allocation happens between mapping thread to + // transaction id, so we miss this allocation + if (transaction_id_to_tracker == transaction_id_to_tracker_accessor.end()) [[unlikely]] { + return; + } + auto &query_tracker = transaction_id_to_tracker->tracker; + query_tracker.TrackFree(size); } void QueriesMemoryControl::CreateTransactionIdTracker(uint64_t transaction_id, size_t inital_limit) { auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); - auto [elem, result] = transaction_id_to_tracker_accessor.insert({transaction_id, utils::MemoryTracker{}}); + auto [elem, result] = transaction_id_to_tracker_accessor.insert({transaction_id, utils::QueryMemoryTracker{}}); - elem->tracker.SetMaximumHardLimit(inital_limit); - elem->tracker.SetHardLimit(inital_limit); -} - -bool QueriesMemoryControl::CheckTransactionIdTrackerExists(uint64_t transaction_id) { - auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); - return transaction_id_to_tracker_accessor.contains(transaction_id); + elem->tracker.SetQueryLimit(inital_limit); } bool QueriesMemoryControl::EraseTransactionIdTracker(uint64_t transaction_id) { @@ -102,6 +127,45 @@ void QueriesMemoryControl::InitializeArenaCounter(unsigned arena_ind) { arena_tracking[arena_ind].store(0, std::memory_order_relaxed); } +bool QueriesMemoryControl::CheckTransactionIdTrackerExists(uint64_t transaction_id) { + auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); + return transaction_id_to_tracker_accessor.contains(transaction_id); +} + +void QueriesMemoryControl::TryCreateTransactionProcTracker(uint64_t transaction_id, int64_t procedure_id, + size_t limit) { + auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); + auto query_tracker = transaction_id_to_tracker_accessor.find(transaction_id); + + if (query_tracker == transaction_id_to_tracker_accessor.end()) { + return; + } + + query_tracker->tracker.TryCreateProcTracker(procedure_id, limit); +} + +void QueriesMemoryControl::SetActiveProcIdTracker(uint64_t transaction_id, int64_t procedure_id) { + auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); + auto query_tracker = transaction_id_to_tracker_accessor.find(transaction_id); + + if (query_tracker == transaction_id_to_tracker_accessor.end()) { + return; + } + + query_tracker->tracker.SetActiveProc(procedure_id); +} + +void QueriesMemoryControl::PauseProcedureTracking(uint64_t transaction_id) { + auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access(); + auto query_tracker = transaction_id_to_tracker_accessor.find(transaction_id); + + if (query_tracker == transaction_id_to_tracker_accessor.end()) { + return; + } + + query_tracker->tracker.StopProcTracking(); +} + #endif void StartTrackingCurrentThreadTransaction(uint64_t transaction_id) { @@ -137,4 +201,29 @@ void TryStopTrackingOnTransaction(uint64_t transaction_id) { #endif } +#if USE_JEMALLOC +bool IsTransactionTracked(uint64_t transaction_id) { + return GetQueriesMemoryControl().CheckTransactionIdTrackerExists(transaction_id); +} +#else +bool IsTransactionTracked(uint64_t /*transaction_id*/) { return false; } +#endif + +void CreateOrContinueProcedureTracking(uint64_t transaction_id, int64_t procedure_id, size_t limit) { +#if USE_JEMALLOC + if (!GetQueriesMemoryControl().CheckTransactionIdTrackerExists(transaction_id)) { + LOG_FATAL("Memory tracker for transaction was not set"); + } + + GetQueriesMemoryControl().TryCreateTransactionProcTracker(transaction_id, procedure_id, limit); + GetQueriesMemoryControl().SetActiveProcIdTracker(transaction_id, procedure_id); +#endif +} + +void PauseProcedureTracking(uint64_t transaction_id) { +#if USE_JEMALLOC + GetQueriesMemoryControl().PauseProcedureTracking(transaction_id); +#endif +} + } // namespace memgraph::memory diff --git a/src/memory/query_memory_control.hpp b/src/memory/query_memory_control.hpp index 491dd8c36..9a7d6d06c 100644 --- a/src/memory/query_memory_control.hpp +++ b/src/memory/query_memory_control.hpp @@ -16,10 +16,13 @@ #include #include "utils/memory_tracker.hpp" +#include "utils/query_memory_tracker.hpp" #include "utils/skip_list.hpp" namespace memgraph::memory { +static constexpr int64_t UNLIMITED_MEMORY{0}; + #if USE_JEMALLOC // Track memory allocations per query. @@ -75,16 +78,21 @@ class QueriesMemoryControl { // Important to reset if one thread gets reused for different transaction void EraseThreadToTransactionId(const std::thread::id &, uint64_t); - // C-API functionality for thread to transaction mapping - void UpdateThreadToTransactionId(const char *, uint64_t); + // Find tracker for current thread if exists, track + // query allocation and procedure allocation if + // necessary + void TrackAllocOnCurrentThread(size_t size); - // C-API functionality for thread to transaction unmapping - void EraseThreadToTransactionId(const char *, uint64_t); + // Find tracker for current thread if exists, track + // query allocation and procedure allocation if + // necessary + void TrackFreeOnCurrentThread(size_t size); - // Get tracker to current thread if exists, otherwise return - // nullptr. This can happen only if tracker is still - // being constructed. - utils::MemoryTracker *GetTrackerCurrentThread(); + void TryCreateTransactionProcTracker(uint64_t, int64_t, size_t); + + void SetActiveProcIdTracker(uint64_t, int64_t); + + void PauseProcedureTracking(uint64_t); private: std::unordered_map> arena_tracking; @@ -102,7 +110,7 @@ class QueriesMemoryControl { struct TransactionIdToTracker { uint64_t transaction_id; - utils::MemoryTracker tracker; + utils::QueryMemoryTracker tracker; bool operator<(const TransactionIdToTracker &other) const { return transaction_id < other.transaction_id; } bool operator==(const TransactionIdToTracker &other) const { return transaction_id == other.transaction_id; } @@ -138,4 +146,15 @@ void TryStartTrackingOnTransaction(uint64_t transaction_id, size_t limit); // Does nothing if jemalloc is not enabled. Does nothing if tracker doesn't exist void TryStopTrackingOnTransaction(uint64_t transaction_id); +// Is transaction with given id tracked in memory tracker +bool IsTransactionTracked(uint64_t transaction_id); + +// Creates tracker on procedure if doesn't exist. Sets query tracker +// to track procedure with id. +void CreateOrContinueProcedureTracking(uint64_t transaction_id, int64_t procedure_id, size_t limit); + +// Pauses procedure tracking. This enables to continue +// tracking on procedure once procedure execution resumes. +void PauseProcedureTracking(uint64_t transaction_id); + } // namespace memgraph::memory diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index e4f96b684..69748014e 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -27,6 +27,7 @@ #include #include +#include "memory/query_memory_control.hpp" #include "query/common.hpp" #include "spdlog/spdlog.h" @@ -57,6 +58,7 @@ #include "utils/logging.hpp" #include "utils/memory.hpp" #include "utils/message.hpp" +#include "utils/on_scope_exit.hpp" #include "utils/pmr/deque.hpp" #include "utils/pmr/list.hpp" #include "utils/pmr/unordered_map.hpp" @@ -4617,7 +4619,7 @@ UniqueCursorPtr OutputTableStream::MakeCursor(utils::MemoryResource *mem) const CallProcedure::CallProcedure(std::shared_ptr input, std::string name, std::vector args, std::vector fields, std::vector symbols, Expression *memory_limit, - size_t memory_scale, bool is_write, bool void_procedure) + size_t memory_scale, bool is_write, int64_t procedure_id, bool void_procedure) : input_(input ? input : std::make_shared()), procedure_name_(name), arguments_(args), @@ -4626,6 +4628,7 @@ CallProcedure::CallProcedure(std::shared_ptr input, std::string memory_limit_(memory_limit), memory_scale_(memory_scale), is_write_(is_write), + procedure_id_(procedure_id), void_procedure_(void_procedure) {} ACCEPT_WITH_INPUT(CallProcedure); @@ -4654,7 +4657,7 @@ namespace { void CallCustomProcedure(const std::string_view fully_qualified_procedure_name, const mgp_proc &proc, const std::vector &args, mgp_graph &graph, ExpressionEvaluator *evaluator, utils::MemoryResource *memory, std::optional memory_limit, mgp_result *result, - const bool call_initializer = false) { + int64_t procedure_id, uint64_t transaction_id, const bool call_initializer = false) { static_assert(std::uses_allocator_v>, "Expected mgp_value to use custom allocator and makes STL " "containers aware of that"); @@ -4686,12 +4689,46 @@ void CallCustomProcedure(const std::string_view fully_qualified_procedure_name, if (memory_limit) { SPDLOG_INFO("Running '{}' with memory limit of {}", fully_qualified_procedure_name, utils::GetReadableSize(*memory_limit)); - utils::LimitedMemoryResource limited_mem(memory, *memory_limit); - mgp_memory proc_memory{&limited_mem}; + // Only allocations which can leak memory are + // our own mgp object allocations. Jemalloc can track + // memory correctly, but some memory may not be released + // immediately, so we want to give user info on leak still + // considering our allocations + utils::MemoryTrackingResource memory_tracking_resource{memory, *memory_limit}; + // if we are already tracking, no harm no faul + // if we are not tracking, we need to start now, with unlimited memory + // for query, but limited for procedure + + // check if transaction is tracked currently, so we + // can disable tracking on that arena if it is not + // once we are done with procedure tracking + + bool is_transaction_tracked = memgraph::memory::IsTransactionTracked(transaction_id); + + if (!is_transaction_tracked) { + // start tracking with unlimited limit on query + // which is same as not being tracked at all + memgraph::memory::TryStartTrackingOnTransaction(transaction_id, memgraph::memory::UNLIMITED_MEMORY); + } + memgraph::memory::StartTrackingCurrentThreadTransaction(transaction_id); + + // due to mgp_batch_read_proc and mgp_batch_write_proc + // we can return to execution without exhausting whole + // memory. Here we need to update tracking + memgraph::memory::CreateOrContinueProcedureTracking(transaction_id, procedure_id, *memory_limit); + + mgp_memory proc_memory{&memory_tracking_resource}; MG_ASSERT(result->signature == &proc.results); + + utils::OnScopeExit on_scope_exit{[transaction_id = transaction_id]() { + memgraph::memory::StopTrackingCurrentThreadTransaction(transaction_id); + memgraph::memory::PauseProcedureTracking(transaction_id); + }}; + // TODO: What about cross library boundary exceptions? OMG C++?! proc.cb(&proc_args, &graph, result, &proc_memory); - size_t leaked_bytes = limited_mem.GetAllocatedBytes(); + + auto leaked_bytes = memory_tracking_resource.GetAllocatedBytes(); if (leaked_bytes > 0U) { spdlog::warn("Query procedure '{}' leaked {} *tracked* bytes", fully_qualified_procedure_name, leaked_bytes); } @@ -4799,8 +4836,10 @@ class CallProcedureCursor : public Cursor { auto *memory = self_->memory_resource; auto memory_limit = EvaluateMemoryLimit(evaluator, self_->memory_limit_, self_->memory_scale_); auto graph = mgp_graph::WritableGraph(*context.db_accessor, graph_view, context); + const auto transaction_id = context.db_accessor->GetTransactionId(); + MG_ASSERT(transaction_id.has_value()); CallCustomProcedure(self_->procedure_name_, *proc, self_->arguments_, graph, &evaluator, memory, memory_limit, - result_, call_initializer); + result_, self_->procedure_id_, transaction_id.value(), call_initializer); if (call_initializer) call_initializer = false; diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index 402d44cca..4951b5137 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -2361,7 +2361,7 @@ class CallProcedure : public memgraph::query::plan::LogicalOperator { CallProcedure() = default; CallProcedure(std::shared_ptr input, std::string name, std::vector arguments, std::vector fields, std::vector symbols, Expression *memory_limit, - size_t memory_scale, bool is_write, bool void_procedure = false); + size_t memory_scale, bool is_write, int64_t procedure_id, bool void_procedure = false); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; UniqueCursorPtr MakeCursor(utils::MemoryResource *) const override; @@ -2383,6 +2383,7 @@ class CallProcedure : public memgraph::query::plan::LogicalOperator { Expression *memory_limit_{nullptr}; size_t memory_scale_{1024U}; bool is_write_; + int64_t procedure_id_; bool void_procedure_; mutable utils::MonotonicBufferResource monotonic_memory{1024UL * 1024UL}; utils::MemoryResource *memory_resource = &monotonic_memory; @@ -2405,6 +2406,7 @@ class CallProcedure : public memgraph::query::plan::LogicalOperator { object->memory_limit_ = memory_limit_ ? memory_limit_->Clone(storage) : nullptr; object->memory_scale_ = memory_scale_; object->is_write_ = is_write_; + object->procedure_id_ = procedure_id_; object->void_procedure_ = void_procedure_; return object; } diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp index d19d4bfa3..afa060b9d 100644 --- a/src/query/plan/rule_based_planner.hpp +++ b/src/query/plan/rule_based_planner.hpp @@ -12,6 +12,7 @@ /// @file #pragma once +#include #include #include @@ -185,6 +186,10 @@ class RuleBasedPlanner { uint64_t merge_id = 0; uint64_t subquery_id = 0; + // procedures need to start from 1 + // due to swapping mechanism of procedure + // tracking + uint64_t procedure_id = 1; for (const auto &clause : single_query_part.remaining_clauses) { MG_ASSERT(!utils::IsSubtype(*clause, Match::kType), "Unexpected Match in remaining clauses"); @@ -224,7 +229,7 @@ class RuleBasedPlanner { input_op = std::make_unique( std::move(input_op), call_proc->procedure_name_, call_proc->arguments_, call_proc->result_fields_, result_symbols, call_proc->memory_limit_, call_proc->memory_scale_, call_proc->is_write_, - call_proc->void_procedure_); + procedure_id++, call_proc->void_procedure_); } else if (auto *load_csv = utils::Downcast(clause)) { const auto &row_sym = context.symbol_table->at(*load_csv->row_var_); context.bound_symbols.insert(row_sym); diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index d1aa28a70..276927725 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -14,7 +14,8 @@ set(utils_src_files tsc.cpp system_info.cpp uuid.cpp - build_info.cpp) + build_info.cpp + query_memory_tracker.cpp) find_package(Boost REQUIRED) find_package(fmt REQUIRED) diff --git a/src/utils/memory.hpp b/src/utils/memory.hpp index b764c83af..3bec47ec0 100644 --- a/src/utils/memory.hpp +++ b/src/utils/memory.hpp @@ -539,9 +539,9 @@ class SynchronizedPoolResource final : public MemoryResource { bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; } }; -class LimitedMemoryResource final : public utils::MemoryResource { +class MemoryTrackingResource final : public utils::MemoryResource { public: - explicit LimitedMemoryResource(utils::MemoryResource *memory, size_t max_allocated_bytes) + explicit MemoryTrackingResource(utils::MemoryResource *memory, size_t max_allocated_bytes) : memory_(memory), max_allocated_bytes_(max_allocated_bytes) {} size_t GetAllocatedBytes() const noexcept { return max_allocated_bytes_ - available_bytes_; } @@ -552,13 +552,11 @@ class LimitedMemoryResource final : public utils::MemoryResource { size_t available_bytes_{max_allocated_bytes_}; void *DoAllocate(size_t bytes, size_t alignment) override { - if (bytes > available_bytes_) throw utils::BadAlloc("Memory allocation limit exceeded!"); available_bytes_ -= bytes; return memory_->Allocate(bytes, alignment); } void DoDeallocate(void *p, size_t bytes, size_t alignment) override { - MG_ASSERT(available_bytes_ + bytes > available_bytes_, "Failed deallocation"); available_bytes_ += bytes; return memory_->Deallocate(p, bytes, alignment); } diff --git a/src/utils/memory_tracker.cpp b/src/utils/memory_tracker.cpp index 774faf72c..029223b71 100644 --- a/src/utils/memory_tracker.cpp +++ b/src/utils/memory_tracker.cpp @@ -111,7 +111,7 @@ void MemoryTracker::Alloc(const int64_t size) { const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed); - if (UNLIKELY(current_hard_limit && will_be > current_hard_limit && MemoryTrackerCanThrow())) { + if (current_hard_limit && will_be > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] { MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker; amount_.fetch_sub(size, std::memory_order_relaxed); @@ -121,7 +121,6 @@ void MemoryTracker::Alloc(const int64_t size) { "use to {}, while the maximum allowed size for allocation is set to {}.", GetReadableSize(size), GetReadableSize(will_be), GetReadableSize(current_hard_limit))); } - UpdatePeak(will_be); } diff --git a/src/utils/memory_tracker.hpp b/src/utils/memory_tracker.hpp index 0385d4517..fd99a07ae 100644 --- a/src/utils/memory_tracker.hpp +++ b/src/utils/memory_tracker.hpp @@ -25,17 +25,6 @@ class OutOfMemoryException : public utils::BasicException { }; class MemoryTracker final { - private: - std::atomic amount_{0}; - std::atomic peak_{0}; - std::atomic hard_limit_{0}; - // Maximum possible value of a hard limit. If it's set to 0, no upper bound on the hard limit is set. - int64_t maximum_hard_limit_{0}; - - void UpdatePeak(int64_t will_be); - - static void LogMemoryUsage(int64_t current); - public: void LogPeakMemoryUsage() const; @@ -73,6 +62,13 @@ class MemoryTracker final { void ResetTrackings(); + bool IsProcedureTracked(); + + void SetProcTrackingLimit(size_t limit); + + void StartProcTracking(); + void StopProcTracking(); + // By creating an object of this class, every allocation in its scope that goes over // the set hard limit produces an OutOfMemoryException. class OutOfMemoryExceptionEnabler final { @@ -109,6 +105,20 @@ class MemoryTracker final { private: static thread_local uint64_t counter_; }; + + private: + enum class TrackingMode { DEFAULT, ADDITIONAL_PROC }; + + TrackingMode tracking_mode_{TrackingMode::DEFAULT}; + std::atomic amount_{0}; + std::atomic peak_{0}; + std::atomic hard_limit_{0}; + // Maximum possible value of a hard limit. If it's set to 0, no upper bound on the hard limit is set. + int64_t maximum_hard_limit_{0}; + + void UpdatePeak(int64_t will_be); + + static void LogMemoryUsage(int64_t current); }; // Global memory tracker which tracks every allocation in the application. diff --git a/src/utils/query_memory_tracker.cpp b/src/utils/query_memory_tracker.cpp new file mode 100644 index 000000000..a9b61cdf3 --- /dev/null +++ b/src/utils/query_memory_tracker.cpp @@ -0,0 +1,78 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "utils/query_memory_tracker.hpp" +#include +#include +#include "memory/query_memory_control.hpp" +#include "utils/memory_tracker.hpp" + +namespace memgraph::utils { + +void QueryMemoryTracker::TrackAlloc(size_t size) { + if (query_tracker_.has_value()) [[likely]] { + query_tracker_->Alloc(static_cast(size)); + } + + auto *proc_tracker = GetActiveProc(); + + if (proc_tracker == nullptr) { + return; + } + + proc_tracker->Alloc(static_cast(size)); +} +void QueryMemoryTracker::TrackFree(size_t size) { + if (query_tracker_.has_value()) [[likely]] { + query_tracker_->Free(static_cast(size)); + } + + auto *proc_tracker = GetActiveProc(); + + if (proc_tracker == nullptr) { + return; + } + + proc_tracker->Free(static_cast(size)); +} + +void QueryMemoryTracker::SetQueryLimit(size_t size) { + if (size == memgraph::memory::UNLIMITED_MEMORY) { + return; + } + InitializeQueryTracker(); + query_tracker_->SetMaximumHardLimit(static_cast(size)); + query_tracker_->SetHardLimit(static_cast(size)); +} + +memgraph::utils::MemoryTracker *QueryMemoryTracker::GetActiveProc() { + if (active_proc_id == NO_PROCEDURE) [[likely]] { + return nullptr; + } + return &proc_memory_trackers_[active_proc_id]; +} + +void QueryMemoryTracker::SetActiveProc(int64_t new_active_proc) { active_proc_id = new_active_proc; } + +void QueryMemoryTracker::StopProcTracking() { active_proc_id = QueryMemoryTracker::NO_PROCEDURE; } + +void QueryMemoryTracker::TryCreateProcTracker(int64_t procedure_id, size_t limit) { + if (proc_memory_trackers_.contains(procedure_id)) { + return; + } + auto [it, inserted] = proc_memory_trackers_.emplace(procedure_id, utils::MemoryTracker{}); + it->second.SetMaximumHardLimit(static_cast(limit)); + it->second.SetHardLimit(static_cast(limit)); +} + +void QueryMemoryTracker::InitializeQueryTracker() { query_tracker_.emplace(MemoryTracker{}); } + +} // namespace memgraph::utils diff --git a/src/utils/query_memory_tracker.hpp b/src/utils/query_memory_tracker.hpp new file mode 100644 index 000000000..87975adf8 --- /dev/null +++ b/src/utils/query_memory_tracker.hpp @@ -0,0 +1,78 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include +#include +#include +#include +#include "utils/memory_tracker.hpp" + +namespace memgraph::utils { + +// Class which handles tracking of query memory +// together with procedure memory. Default +// active procedure id is set to -1, as +// procedures start from id 1. +// This class is meant to be used in environment +// where we don't execute multiple procedures in parallel +// but procedure can have multiple threads which are doing allocations +class QueryMemoryTracker { + public: + QueryMemoryTracker() = default; + + QueryMemoryTracker(QueryMemoryTracker &&other) noexcept + : query_tracker_(std::move(other.query_tracker_)), + proc_memory_trackers_(std::move(other.proc_memory_trackers_)), + active_proc_id(other.active_proc_id) { + other.active_proc_id = NO_PROCEDURE; + } + + QueryMemoryTracker(const QueryMemoryTracker &other) = delete; + + QueryMemoryTracker &operator=(QueryMemoryTracker &&other) = delete; + QueryMemoryTracker &operator=(const QueryMemoryTracker &other) = delete; + + ~QueryMemoryTracker() = default; + + // Track allocation on query and procedure if active + void TrackAlloc(size_t); + + // Track Free on query and procedure if active + void TrackFree(size_t); + + // Set query limit + void SetQueryLimit(size_t); + + // Create proc tracker if doesn't exist + void TryCreateProcTracker(int64_t, size_t); + + // Set currently active procedure + void SetActiveProc(int64_t); + + // Stop procedure tracking + void StopProcTracking(); + + private: + static constexpr int64_t NO_PROCEDURE{-1}; + void InitializeQueryTracker(); + + std::optional query_tracker_{std::nullopt}; + std::unordered_map proc_memory_trackers_; + + // Procedure ids start from 1. Procedure id -1 means there is no procedure + // to track. + int64_t active_proc_id{NO_PROCEDURE}; + + memgraph::utils::MemoryTracker *GetActiveProc(); +}; + +} // namespace memgraph::utils diff --git a/tests/e2e/memory/CMakeLists.txt b/tests/e2e/memory/CMakeLists.txt index d06885483..327f09106 100644 --- a/tests/e2e/memory/CMakeLists.txt +++ b/tests/e2e/memory/CMakeLists.txt @@ -11,6 +11,17 @@ target_link_libraries(memgraph__e2e__memory__limit_global_alloc gflags mgclient add_executable(memgraph__e2e__memory__limit_global_alloc_proc memory_limit_global_alloc_proc.cpp) target_link_libraries(memgraph__e2e__memory__limit_global_alloc_proc gflags mgclient mg-utils mg-io Threads::Threads) +add_executable(memgraph__e2e__memory__limit_delete memory_limit_delete.cpp) +target_link_libraries(memgraph__e2e__memory__limit_delete gflags mgclient mg-utils mg-io) + +add_executable(memgraph__e2e__memory__limit_accumulation memory_limit_accumulation.cpp) +target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient mg-utils mg-io) + +add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp) +target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io) + +# Query memory limit tests + add_executable(memgraph__e2e__memory__limit_query_alloc_proc_multi_thread query_memory_limit_proc_multi_thread.cpp) target_link_libraries(memgraph__e2e__memory__limit_query_alloc_proc_multi_thread gflags mgclient mg-utils mg-io Threads::Threads) @@ -23,11 +34,11 @@ target_link_libraries(memgraph__e2e__memory__limit_query_alloc_proc gflags mgcli add_executable(memgraph__e2e__memory__limit_query_alloc_create_multi_thread query_memory_limit_multi_thread.cpp) target_link_libraries(memgraph__e2e__memory__limit_query_alloc_create_multi_thread gflags mgclient mg-utils mg-io Threads::Threads) -add_executable(memgraph__e2e__memory__limit_delete memory_limit_delete.cpp) -target_link_libraries(memgraph__e2e__memory__limit_delete gflags mgclient mg-utils mg-io) -add_executable(memgraph__e2e__memory__limit_accumulation memory_limit_accumulation.cpp) -target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient mg-utils mg-io) +# Procedure memory limit tests -add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp) -target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io) +add_executable(memgraph__e2e__procedure_memory_limit procedure_memory_limit.cpp) +target_link_libraries(memgraph__e2e__procedure_memory_limit gflags mgclient mg-utils mg-io) + +add_executable(memgraph__e2e__procedure_memory_limit_multi_proc procedure_memory_limit_multi_proc.cpp) +target_link_libraries(memgraph__e2e__procedure_memory_limit_multi_proc gflags mgclient mg-utils mg-io) diff --git a/tests/e2e/memory/procedure_memory_limit.cpp b/tests/e2e/memory/procedure_memory_limit.cpp new file mode 100644 index 000000000..89a91a9ce --- /dev/null +++ b/tests/e2e/memory/procedure_memory_limit.cpp @@ -0,0 +1,66 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include + +#include "utils/logging.hpp" +#include "utils/timer.hpp" + +DEFINE_uint64(bolt_port, 7687, "Bolt port"); +DEFINE_uint64(timeout, 120, "Timeout seconds"); +DEFINE_bool(multi_db, false, "Run test in multi db environment"); + +int main(int argc, char **argv) { + google::SetUsageMessage("Memgraph E2E Query Memory Limit In Multi-Thread For Global Allocators"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + memgraph::logging::RedirectToStderr(); + + mg::Client::Init(); + + auto client = + mg::Client::Connect({.host = "127.0.0.1", .port = static_cast(FLAGS_bolt_port), .use_ssl = false}); + if (!client) { + LOG_FATAL("Failed to connect!"); + } + + if (FLAGS_multi_db) { + client->Execute("CREATE DATABASE clean;"); + client->DiscardAll(); + client->Execute("USE DATABASE clean;"); + client->DiscardAll(); + client->Execute("MATCH (n) DETACH DELETE n;"); + client->DiscardAll(); + } + + bool error{false}; + try { + client->Execute( + "CALL libproc_memory_limit.alloc_256_mib() PROCEDURE MEMORY LIMIT 200MB YIELD allocated RETURN " + "allocated"); + auto result_rows = client->FetchAll(); + if (result_rows) { + auto row = *result_rows->begin(); + error = row[0].ValueBool() == false; + } + + } catch (const std::exception &e) { + error = true; + } + + MG_ASSERT(error, "Error should have happend"); + + return 0; +} diff --git a/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp b/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp new file mode 100644 index 000000000..118ff41ce --- /dev/null +++ b/tests/e2e/memory/procedure_memory_limit_multi_proc.cpp @@ -0,0 +1,68 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include + +#include "utils/logging.hpp" +#include "utils/timer.hpp" + +DEFINE_uint64(bolt_port, 7687, "Bolt port"); +DEFINE_uint64(timeout, 120, "Timeout seconds"); +DEFINE_bool(multi_db, false, "Run test in multi db environment"); + +int main(int argc, char **argv) { + google::SetUsageMessage("Memgraph E2E Query Memory Limit In Multi-Thread For Global Allocators"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + memgraph::logging::RedirectToStderr(); + + mg::Client::Init(); + + auto client = + mg::Client::Connect({.host = "127.0.0.1", .port = static_cast(FLAGS_bolt_port), .use_ssl = false}); + if (!client) { + LOG_FATAL("Failed to connect!"); + } + + if (FLAGS_multi_db) { + client->Execute("CREATE DATABASE clean;"); + client->DiscardAll(); + client->Execute("USE DATABASE clean;"); + client->DiscardAll(); + client->Execute("MATCH (n) DETACH DELETE n;"); + client->DiscardAll(); + } + + bool test_passed{false}; + try { + client->Execute( + "CALL libproc_memory_limit.alloc_256_mib() PROCEDURE MEMORY LIMIT 400MB YIELD allocated WITH allocated AS " + "allocated_1" + "CALL libproc_memory_limit.alloc_32_mib() PROCEDURE MEMORY LIMIT 10MB YIELD allocated AS allocated_2 RETURN " + "allocated_1, allocated_2"); + auto result_rows = client->FetchAll(); + if (result_rows) { + auto row = *result_rows->begin(); + test_passed = row[0].ValueBool() == true && row[0].ValueBool() == false; + } + + } catch (const std::exception &e) { + test_passed = true; + } + + MG_ASSERT(test_passed, "Error should have happend"); + + return 0; +} diff --git a/tests/e2e/memory/procedures/CMakeLists.txt b/tests/e2e/memory/procedures/CMakeLists.txt index 4ea9db247..84c56f414 100644 --- a/tests/e2e/memory/procedures/CMakeLists.txt +++ b/tests/e2e/memory/procedures/CMakeLists.txt @@ -13,3 +13,8 @@ target_link_libraries(query_memory_limit_proc_multi_thread mg-utils) add_library(query_memory_limit_proc SHARED query_memory_limit_proc.cpp) target_include_directories(query_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include) target_link_libraries(query_memory_limit_proc mg-utils) + + +add_library(proc_memory_limit SHARED proc_memory_limit.cpp) +target_include_directories(proc_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/include) +target_link_libraries(proc_memory_limit mg-utils) diff --git a/tests/e2e/memory/procedures/proc_memory_limit.cpp b/tests/e2e/memory/procedures/proc_memory_limit.cpp new file mode 100644 index 000000000..9f36dfaa4 --- /dev/null +++ b/tests/e2e/memory/procedures/proc_memory_limit.cpp @@ -0,0 +1,102 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mg_procedure.h" +#include "utils/on_scope_exit.hpp" + +enum mgp_error Alloc_256(mgp_memory *memory, void *ptr) { + const size_t mib_size_256 = 1 << 28; + + return mgp_alloc(memory, mib_size_256, (void **)(&ptr)); +} + +void Alloc_256_MiB(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + mgp::MemoryDispatcherGuard guard{memory}; + const auto arguments = mgp::List(args); + const auto record_factory = mgp::RecordFactory(result); + + try { + void *ptr{nullptr}; + + memgraph::utils::OnScopeExit> cleanup{[&memory, &ptr]() { + if (nullptr == ptr) { + return; + } + mgp_free(memory, ptr); + }}; + + const enum mgp_error alloc_err = Alloc_256(memory, ptr); + auto new_record = record_factory.NewRecord(); + new_record.Insert("allocated", alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE); + } catch (std::exception &e) { + record_factory.SetErrorMessage(e.what()); + } +} + +enum mgp_error Alloc_32(mgp_memory *memory, void *ptr) { + const size_t mib_size_32 = 1 << 25; + + return mgp_alloc(memory, mib_size_32, (void **)(&ptr)); +} + +void Alloc_32_MiB(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { + mgp::MemoryDispatcherGuard guard{memory}; + const auto arguments = mgp::List(args); + const auto record_factory = mgp::RecordFactory(result); + + try { + void *ptr{nullptr}; + + memgraph::utils::OnScopeExit> cleanup{[&ptr]() { + if (nullptr == ptr) { + return; + } + mgp_global_free(ptr); + }}; + + const enum mgp_error alloc_err = Alloc_32(memory, ptr); + auto new_record = record_factory.NewRecord(); + new_record.Insert("allocated", alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE); + } catch (std::exception &e) { + record_factory.SetErrorMessage(e.what()); + } +} + +extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) { + try { + mgp::MemoryDispatcherGuard mdg{memory}; + + AddProcedure(Alloc_256_MiB, std::string("alloc_256_mib").c_str(), mgp::ProcedureType::Read, {}, + {mgp::Return(std::string("allocated").c_str(), mgp::Type::Bool)}, module, memory); + + AddProcedure(Alloc_32_MiB, std::string("alloc_32_mib").c_str(), mgp::ProcedureType::Read, {}, + {mgp::Return(std::string("allocated").c_str(), mgp::Type::Bool)}, module, memory); + + } catch (const std::exception &e) { + return 1; + } + + return 0; +} + +extern "C" int mgp_shutdown_module() { return 0; } diff --git a/tests/e2e/memory/workloads.yaml b/tests/e2e/memory/workloads.yaml index a23527c85..e84faccf0 100644 --- a/tests/e2e/memory/workloads.yaml +++ b/tests/e2e/memory/workloads.yaml @@ -62,6 +62,20 @@ disk_450_MiB_limit_cluster: &disk_450_MiB_limit_cluster validation_queries: [] +args_global_limit_1024_MiB: &args_global_limit_1024_MiB + - "--bolt-port" + - *bolt_port + - "--storage-gc-cycle-sec=180" + - "--log-level=INFO" + +in_memory_limited_global_limit_cluster: &in_memory_limited_global_limit_cluster + cluster: + main: + args: *args_global_limit_1024_MiB + log_file: "memory-e2e.log" + setup_queries: [] + validation_queries: [] + workloads: - name: "Memory control" binary: "tests/e2e/memory/memgraph__e2e__memory__control" @@ -159,3 +173,15 @@ workloads: binary: "tests/e2e/memory/memgraph__e2e__memory__limit_edge_create" args: ["--bolt-port", *bolt_port] <<: *disk_450_MiB_limit_cluster + + - name: "Procedure memory control for single procedure" + binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit" + proc: "tests/e2e/memory/procedures/" + args: ["--bolt-port", *bolt_port] + <<: *in_memory_limited_global_limit_cluster + + - name: "Procedure memory control for multiple procedures" + binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit" + proc: "tests/e2e/memory/procedures/" + args: ["--bolt-port", *bolt_port] + <<: *in_memory_limited_global_limit_cluster diff --git a/tests/unit/query_plan_read_write_typecheck.cpp b/tests/unit/query_plan_read_write_typecheck.cpp index dba2d5f57..a369f44b9 100644 --- a/tests/unit/query_plan_read_write_typecheck.cpp +++ b/tests/unit/query_plan_read_write_typecheck.cpp @@ -248,7 +248,7 @@ TYPED_TEST(ReadWriteTypeCheckTest, CallReadProcedureBeforeUpdate) { std::vector result_symbols{this->GetSymbol("name_alias"), this->GetSymbol("signature_alias")}; last_op = std::make_shared(last_op, procedure_name, arguments, result_fields, result_symbols, - nullptr, 0, false); + nullptr, 0, false, 1); this->CheckPlanType(last_op.get(), RWType::RW); }