Fixup memory e2e tests (#1715)

- Remove the e2e that did concurrent mgp_* calls on the same transaction
  (ATM this is unsupported)
- Fix up the concurrent mgp_global_alloc test to be testing it more precisely
- Reduce the memory limit on detach delete test due to recent memory
  optimizations around deltas.
- No longer throw from hook, through jemalloc C, to our C++ on other
  side. This cause mutex unlocks to not happen.
- No longer allocate error messages while inside the hook. This caused
  recursive entry back inside jamalloc which would try to relock a
  non-recursive mutex.
This commit is contained in:
Gareth Andrew Lloyd 2024-02-16 15:35:08 +00:00 committed by GitHub
parent 5ac938a6c9
commit 33c400fcc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 264 additions and 273 deletions

View File

@ -61,10 +61,12 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
// This needs to be before, to throw exception in case of too big alloc
if (*commit) [[likely]] {
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackAllocOnCurrentThread(size);
bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(size);
if (!ok) return nullptr;
}
// This needs to be here so it doesn't get incremented in case the first TrackAlloc throws an exception
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
bool ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
if (!ok) return nullptr;
}
auto *ptr = old_hooks->alloc(extent_hooks, new_addr, size, alignment, zero, commit, arena_ind);
@ -118,10 +120,14 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz
return err;
}
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
GetQueriesMemoryControl().TrackAllocOnCurrentThread(length);
bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(length);
DMG_ASSERT(ok);
}
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
auto ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
DMG_ASSERT(ok);
return false;
}

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -28,6 +28,12 @@ void *newImpl(const std::size_t size) {
return ptr;
}
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
auto maybe_msg = memgraph::utils::MemoryErrorStatus().msg();
if (maybe_msg) {
throw memgraph::utils::OutOfMemoryException{std::move(*maybe_msg)};
}
throw std::bad_alloc{};
}
@ -37,11 +43,21 @@ void *newImpl(const std::size_t size, const std::align_val_t align) {
return ptr;
}
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
auto maybe_msg = memgraph::utils::MemoryErrorStatus().msg();
if (maybe_msg) {
throw memgraph::utils::OutOfMemoryException{std::move(*maybe_msg)};
}
throw std::bad_alloc{};
}
void *newNoExcept(const std::size_t size) noexcept { return malloc(size); }
void *newNoExcept(const std::size_t size) noexcept {
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
return malloc(size);
}
void *newNoExcept(const std::size_t size, const std::align_val_t align) noexcept {
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
return aligned_alloc(size, static_cast<std::size_t>(align));
}

View File

@ -54,14 +54,14 @@ void QueriesMemoryControl::EraseThreadToTransactionId(const std::thread::id &thr
}
}
void QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) {
bool QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) {
auto thread_id_to_transaction_id_accessor = thread_id_to_transaction_id.access();
// we might be just constructing mapping between thread id and transaction id
// so we miss this allocation
auto thread_id_to_transaction_id_elem = thread_id_to_transaction_id_accessor.find(std::this_thread::get_id());
if (thread_id_to_transaction_id_elem == thread_id_to_transaction_id_accessor.end()) {
return;
return true;
}
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
@ -71,10 +71,10 @@ void QueriesMemoryControl::TrackAllocOnCurrentThread(size_t size) {
// It can happen that some allocation happens between mapping thread to
// transaction id, so we miss this allocation
if (transaction_id_to_tracker == transaction_id_to_tracker_accessor.end()) [[unlikely]] {
return;
return true;
}
auto &query_tracker = transaction_id_to_tracker->tracker;
query_tracker.TrackAlloc(size);
return query_tracker.TrackAlloc(size);
}
void QueriesMemoryControl::TrackFreeOnCurrentThread(size_t size) {

View File

@ -62,7 +62,7 @@ class QueriesMemoryControl {
// Find tracker for current thread if exists, track
// query allocation and procedure allocation if
// necessary
void TrackAllocOnCurrentThread(size_t size);
bool TrackAllocOnCurrentThread(size_t size);
// Find tracker for current thread if exists, track
// query allocation and procedure allocation if

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -187,6 +187,7 @@ template <typename TFunc, typename... Args>
spdlog::error("Memory allocation error during mg API call: {}", bae.what());
return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE;
} catch (const memgraph::utils::OutOfMemoryException &oome) {
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
spdlog::error("Memory limit exceeded during mg API call: {}", oome.what());
return mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE;
} catch (const std::out_of_range &oore) {
@ -198,12 +199,12 @@ template <typename TFunc, typename... Args>
} catch (const std::logic_error &lee) {
spdlog::error("Logic error during mg API call: {}", lee.what());
return mgp_error::MGP_ERROR_LOGIC_ERROR;
} catch (const std::exception &e) {
spdlog::error("Unexpected error during mg API call: {}", e.what());
return mgp_error::MGP_ERROR_UNKNOWN_ERROR;
} catch (const memgraph::utils::temporal::InvalidArgumentException &e) {
spdlog::error("Invalid argument was sent to an mg API call for temporal types: {}", e.what());
return mgp_error::MGP_ERROR_INVALID_ARGUMENT;
} catch (const std::exception &e) {
spdlog::error("Unexpected error during mg API call: {}", e.what());
return mgp_error::MGP_ERROR_UNKNOWN_ERROR;
} catch (...) {
spdlog::error("Unexpected error during mg API call");
return mgp_error::MGP_ERROR_UNKNOWN_ERROR;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -42,12 +42,26 @@ namespace memgraph::utils {
class BasicException : public std::exception {
public:
/**
* @brief Constructor (C++ STL strings).
* @brief Constructor (C++ STL strings_view).
*
* @param message The error message.
*/
explicit BasicException(std::string_view message) noexcept : msg_(message) {}
/**
* @brief Constructor (string literal).
*
* @param message The error message.
*/
explicit BasicException(const char *message) noexcept : msg_(message) {}
/**
* @brief Constructor (C++ STL strings).
*
* @param message The error message.
*/
explicit BasicException(std::string message) noexcept : msg_(std::move(message)) {}
/**
* @brief Constructor with format string (C++ STL strings).
*

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -104,7 +104,7 @@ void MemoryTracker::SetMaximumHardLimit(const int64_t limit) {
maximum_hard_limit_ = limit;
}
void MemoryTracker::Alloc(const int64_t size) {
bool MemoryTracker::Alloc(int64_t const size) {
MG_ASSERT(size >= 0, "Negative size passed to the MemoryTracker.");
const int64_t will_be = size + amount_.fetch_add(size, std::memory_order_relaxed);
@ -116,12 +116,13 @@ void MemoryTracker::Alloc(const int64_t size) {
amount_.fetch_sub(size, std::memory_order_relaxed);
throw OutOfMemoryException(
fmt::format("Memory limit exceeded! Attempting to allocate a chunk of {} which would put the current "
"use to {}, while the maximum allowed size for allocation is set to {}.",
GetReadableSize(size), GetReadableSize(will_be), GetReadableSize(current_hard_limit)));
// register our error data, we will pick this up on the other side of jemalloc
MemoryErrorStatus().set({size, will_be, current_hard_limit});
return false;
}
UpdatePeak(will_be);
return true;
}
void MemoryTracker::DoCheck() {
@ -139,4 +140,23 @@ void MemoryTracker::DoCheck() {
void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); }
// DEVNOTE: important that this is allocated at thread construction time
// otherwise subtle bug where jemalloc will try to lock an non-recursive mutex
// that it already owns
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local MemoryTrackerStatus status;
auto MemoryErrorStatus() -> MemoryTrackerStatus & { return status; }
auto MemoryTrackerStatus::msg() -> std::optional<std::string> {
if (!data_) return std::nullopt;
auto [size, will_be, hard_limit] = *data_;
data_.reset();
return fmt::format(
"Memory limit exceeded! Attempting to allocate a chunk of {} which would put the current "
"use to {}, while the maximum allowed size for allocation is set to {}.",
// NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
GetReadableSize(size), GetReadableSize(will_be), GetReadableSize(hard_limit));
}
} // namespace memgraph::utils

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -12,15 +12,35 @@
#pragma once
#include <atomic>
#include <optional>
#include <string>
#include <type_traits>
#include "utils/exceptions.hpp"
namespace memgraph::utils {
struct MemoryTrackerStatus {
struct data {
int64_t size;
int64_t will_be;
int64_t hard_limit;
};
// DEVNOTE: Do not call from within allocator, will cause another allocation
auto msg() -> std::optional<std::string>;
void set(data d) { data_ = d; }
private:
std::optional<data> data_;
};
auto MemoryErrorStatus() -> MemoryTrackerStatus &;
class OutOfMemoryException : public utils::BasicException {
public:
explicit OutOfMemoryException(const std::string &msg) : utils::BasicException(msg) {}
explicit OutOfMemoryException(std::string msg) : utils::BasicException(std::move(msg)) {}
SPECIALIZE_GET_EXCEPTION_NAME(OutOfMemoryException)
};
@ -47,7 +67,7 @@ class MemoryTracker final {
MemoryTracker &operator=(MemoryTracker &&) = delete;
void Alloc(int64_t size);
bool Alloc(int64_t size);
void Free(int64_t size);
void DoCheck();

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -17,18 +17,19 @@
namespace memgraph::utils {
void QueryMemoryTracker::TrackAlloc(size_t size) {
bool QueryMemoryTracker::TrackAlloc(size_t size) {
if (query_tracker_.has_value()) [[likely]] {
query_tracker_->Alloc(static_cast<int64_t>(size));
bool ok = query_tracker_->Alloc(static_cast<int64_t>(size));
if (!ok) return false;
}
auto *proc_tracker = GetActiveProc();
if (proc_tracker == nullptr) {
return;
return true;
}
proc_tracker->Alloc(static_cast<int64_t>(size));
return proc_tracker->Alloc(static_cast<int64_t>(size));
}
void QueryMemoryTracker::TrackFree(size_t size) {
if (query_tracker_.has_value()) [[likely]] {

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -44,7 +44,7 @@ class QueryMemoryTracker {
~QueryMemoryTracker() = default;
// Track allocation on query and procedure if active
void TrackAlloc(size_t);
bool TrackAlloc(size_t size);
// Track Free on query and procedure if active
void TrackFree(size_t);

View File

@ -40,7 +40,7 @@ endfunction()
add_subdirectory(fine_grained_access)
add_subdirectory(server)
add_subdirectory(replication)
#add_subdirectory(memory)
add_subdirectory(memory)
add_subdirectory(triggers)
add_subdirectory(isolation_levels)
add_subdirectory(streams)

View File

@ -22,9 +22,6 @@ target_link_libraries(memgraph__e2e__memory__limit_accumulation gflags mgclient
add_executable(memgraph__e2e__memory__limit_edge_create memory_limit_edge_create.cpp)
target_link_libraries(memgraph__e2e__memory__limit_edge_create gflags mgclient mg-utils mg-io)
add_executable(memgraph__e2e__memory_limit_global_multi_thread_proc_create memory_limit_global_multi_thread_proc_create.cpp)
target_link_libraries(memgraph__e2e__memory_limit_global_multi_thread_proc_create gflags mgclient mg-utils mg-io)
add_executable(memgraph__e2e__memory_limit_global_thread_alloc_proc memory_limit_global_thread_alloc_proc.cpp)
target_link_libraries(memgraph__e2e__memory_limit_global_thread_alloc_proc gflags mgclient mg-utils mg-io)

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -44,7 +44,7 @@ int main(int argc, char **argv) {
client->DiscardAll();
}
const auto *create_query = "UNWIND range(1, 50) as u CREATE (n {string: \"Some longer string\"}) RETURN n;";
const auto *create_query = "UNWIND range(1, 100) as u CREATE (n {string: \"Some longer string\"}) RETURN n;";
memgraph::utils::Timer timer;
while (true) {

View File

@ -1,67 +0,0 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include <algorithm>
#include <exception>
#include <ios>
#include <iostream>
#include <mgclient.hpp>
#include "utils/logging.hpp"
#include "utils/timer.hpp"
DEFINE_uint64(bolt_port, 7687, "Bolt port");
DEFINE_uint64(timeout, 120, "Timeout seconds");
DEFINE_bool(multi_db, false, "Run test in multi db environment");
int main(int argc, char **argv) {
google::SetUsageMessage("Memgraph E2E Global Memory Limit In Multi-Thread Create For Local Allocators");
gflags::ParseCommandLineFlags(&argc, &argv, true);
memgraph::logging::RedirectToStderr();
mg::Client::Init();
auto client =
mg::Client::Connect({.host = "127.0.0.1", .port = static_cast<uint16_t>(FLAGS_bolt_port), .use_ssl = false});
if (!client) {
LOG_FATAL("Failed to connect!");
}
if (FLAGS_multi_db) {
client->Execute("CREATE DATABASE clean;");
client->DiscardAll();
client->Execute("USE DATABASE clean;");
client->DiscardAll();
client->Execute("MATCH (n) DETACH DELETE n;");
client->DiscardAll();
}
bool error{false};
try {
client->Execute(
"CALL libglobal_memory_limit_multi_thread_create_proc.multi_create() PROCEDURE MEMORY UNLIMITED YIELD "
"allocated_all RETURN allocated_all "
"QUERY MEMORY LIMIT 50MB;");
auto result_rows = client->FetchAll();
if (result_rows) {
auto row = *result_rows->begin();
error = !row[0].ValueBool();
}
} catch (const std::exception &e) {
error = true;
}
MG_ASSERT(error, "Error should have happend");
return 0;
}

View File

@ -6,7 +6,7 @@ target_include_directories(global_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/
add_library(query_memory_limit_proc_multi_thread SHARED query_memory_limit_proc_multi_thread.cpp)
target_include_directories(query_memory_limit_proc_multi_thread PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(query_memory_limit_proc_multi_thread mg-utils)
target_link_libraries(query_memory_limit_proc_multi_thread mg-utils )
add_library(query_memory_limit_proc SHARED query_memory_limit_proc.cpp)
target_include_directories(query_memory_limit_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
@ -16,10 +16,6 @@ add_library(global_memory_limit_thread_proc SHARED global_memory_limit_thread_pr
target_include_directories(global_memory_limit_thread_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(global_memory_limit_thread_proc mg-utils)
add_library(global_memory_limit_multi_thread_create_proc SHARED global_memory_limit_multi_thread_create_proc.cpp)
target_include_directories(global_memory_limit_multi_thread_create_proc PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(global_memory_limit_multi_thread_create_proc mg-utils)
add_library(proc_memory_limit SHARED proc_memory_limit.cpp)
target_include_directories(proc_memory_limit PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(proc_memory_limit mg-utils)

View File

@ -1,95 +0,0 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <atomic>
#include <cassert>
#include <exception>
#include <functional>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "mg_procedure.h"
#include "mgp.hpp"
#include "utils/on_scope_exit.hpp"
// change communication between threads with feature and promise
std::atomic<int> created_vertices{0};
constexpr int num_vertices_per_thread{100'000};
constexpr int num_threads{2};
void CallCreate(mgp_graph *graph, mgp_memory *memory) {
[[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph);
for (int i = 0; i < num_vertices_per_thread; i++) {
struct mgp_vertex *vertex{nullptr};
auto enum_error = mgp_graph_create_vertex(graph, memory, &vertex);
if (enum_error != mgp_error::MGP_ERROR_NO_ERROR) {
break;
}
created_vertices.fetch_add(1, std::memory_order_acq_rel);
}
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph);
}
void AllocFunc(mgp_graph *graph, mgp_memory *memory) {
try {
CallCreate(graph, memory);
} catch (const std::exception &e) {
return;
}
}
void MultiCreate(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
try {
std::vector<std::thread> threads;
for (int i = 0; i < 2; i++) {
threads.emplace_back(AllocFunc, memgraph_graph, memory);
}
for (int i = 0; i < num_threads; i++) {
threads[i].join();
}
if (created_vertices.load(std::memory_order_acquire) != num_vertices_per_thread * num_threads) {
record_factory.SetErrorMessage("Unable to allocate");
return;
}
auto new_record = record_factory.NewRecord();
new_record.Insert("allocated_all",
created_vertices.load(std::memory_order_acquire) == num_vertices_per_thread * num_threads);
} catch (std::exception &e) {
record_factory.SetErrorMessage(e.what());
}
}
extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *memory) {
try {
mgp::MemoryDispatcherGuard guard{memory};
AddProcedure(MultiCreate, std::string("multi_create").c_str(), mgp::ProcedureType::Write, {},
{mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory);
} catch (const std::exception &e) {
return 1;
}
return 0;
}
extern "C" int mgp_shutdown_module() { return 0; }

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,73 +13,122 @@
#include <cassert>
#include <exception>
#include <functional>
#include <future>
#include <exception>
#include <latch>
#include <list>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "mg_procedure.h"
#include "mgp.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/on_scope_exit.hpp"
enum mgp_error Alloc(void *ptr) {
const size_t mb_size_268 = 1 << 28;
using safe_ptr = std::unique_ptr<void, decltype([](void *p) { mgp_global_free(p); })>;
enum class AllocFuncRes { NoIssues, UnableToAlloc, Unexpected };
using result_t = std::pair<AllocFuncRes, std::list<safe_ptr>>;
return mgp_global_alloc(mb_size_268, (void **)(&ptr));
}
constexpr auto N_THREADS = 2;
static_assert(N_THREADS > 0 && (N_THREADS & (N_THREADS - 1)) == 0);
// change communication between threads with feature and promise
std::atomic<int> num_allocations{0};
std::vector<void *> ptrs_;
constexpr auto mb_size_512 = 1 << 29;
constexpr auto mb_size_16 = 1 << 24;
void AllocFunc(mgp_graph *graph) {
static_assert(mb_size_512 % N_THREADS == 0);
static_assert(mb_size_16 % N_THREADS == 0);
static_assert(mb_size_512 % mb_size_16 == 0);
void AllocFunc(std::latch &start_latch, std::promise<result_t> promise, mgp_graph *graph) {
[[maybe_unused]] const enum mgp_error tracking_error = mgp_track_current_thread_allocations(graph);
void *ptr = nullptr;
auto on_exit = memgraph::utils::OnScopeExit{[&]() {
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph);
}};
std::list<safe_ptr> ptrs;
// Ensure test would concurrently run these allocations, wait until both are ready
start_latch.arrive_and_wait();
ptrs_.emplace_back(ptr);
try {
enum mgp_error alloc_err { mgp_error::MGP_ERROR_NO_ERROR };
alloc_err = Alloc(ptr);
if (alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
num_allocations.fetch_add(1, std::memory_order_relaxed);
}
if (alloc_err != mgp_error::MGP_ERROR_NO_ERROR) {
assert(false);
constexpr auto allocation_limit = mb_size_512 / N_THREADS;
// many allocation to increase chance of seeing any concurent issues
for (auto total = 0; total < allocation_limit; total += mb_size_16) {
void *ptr = nullptr;
auto alloc_err = mgp_global_alloc(mb_size_16, &ptr);
if (alloc_err != mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE && ptr != nullptr) {
ptrs.emplace_back(ptr);
} else if (alloc_err == mgp_error::MGP_ERROR_UNABLE_TO_ALLOCATE) {
// this is expected, the test checks N threads allocating to a limit of 512MB
promise.set_value({AllocFuncRes::UnableToAlloc, std::move(ptrs)});
return;
} else {
promise.set_value({AllocFuncRes::Unexpected, std::move(ptrs)});
return;
}
}
} catch (const std::exception &e) {
assert(false);
promise.set_value({AllocFuncRes::Unexpected, std::move(ptrs)});
return;
}
[[maybe_unused]] const enum mgp_error untracking_error = mgp_untrack_current_thread_allocations(graph);
promise.set_value({AllocFuncRes::NoIssues, std::move(ptrs)});
return;
}
void DualThread(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto arguments = mgp::List(args);
const auto record_factory = mgp::RecordFactory(result);
num_allocations.store(0, std::memory_order_relaxed);
// 1 byte allocation to
auto ptr = std::invoke([&] {
void *ptr;
[[maybe_unused]] auto alloc_err = mgp_global_alloc(1, &ptr);
return safe_ptr{ptr};
});
try {
std::vector<std::thread> threads;
for (int i = 0; i < 2; i++) {
threads.emplace_back(AllocFunc, memgraph_graph);
}
for (int i = 0; i < 2; i++) {
threads[i].join();
}
for (void *ptr : ptrs_) {
if (ptr != nullptr) {
mgp_global_free(ptr);
auto futures = std::vector<std::future<result_t>>{};
futures.reserve(N_THREADS);
std::latch start_latch{N_THREADS};
{
auto threads = std::vector<std::jthread>{};
threads.reserve(N_THREADS);
for (int i = 0; i < N_THREADS; i++) {
auto promise = std::promise<result_t>{};
futures.emplace_back(promise.get_future());
threads.emplace_back([&, promise = std::move(promise)]() mutable {
AllocFunc(start_latch, std::move(promise), memgraph_graph);
});
}
} // ~jthread will join
int alloc_errors = 0;
int unexpected_errors = 0;
for (auto &x : futures) {
auto [res, ptrs] = x.get();
alloc_errors += (res == AllocFuncRes::UnableToAlloc);
unexpected_errors += (res == AllocFuncRes::Unexpected);
// regardless of outcome, we want this thread to do the deallocation
ptrs.clear();
}
if (unexpected_errors != 0) {
record_factory.SetErrorMessage("Unanticipated error happened");
return;
}
if (alloc_errors < 1) {
record_factory.SetErrorMessage("Didn't hit the QUERY MEMORY LIMIT we expected");
return;
}
auto new_record = record_factory.NewRecord();
new_record.Insert("allocated_all", num_allocations.load(std::memory_order_relaxed) == 2);
new_record.Insert("test_passed", true);
} catch (std::exception &e) {
record_factory.SetErrorMessage(e.what());
}
@ -90,7 +139,7 @@ extern "C" int mgp_init_module(struct mgp_module *module, struct mgp_memory *mem
mgp::memory = memory;
AddProcedure(DualThread, std::string("dual_thread").c_str(), mgp::ProcedureType::Read, {},
{mgp::Return(std::string("allocated_all").c_str(), mgp::Type::Bool)}, module, memory);
{mgp::Return(std::string("test_passed").c_str(), mgp::Type::Bool)}, module, memory);
} catch (const std::exception &e) {
return 1;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -46,21 +46,21 @@ int main(int argc, char **argv) {
}
MG_ASSERT(
client->Execute("CALL libquery_memory_limit_proc_multi_thread.dual_thread() YIELD allocated_all RETURN "
"allocated_all QUERY MEMORY LIMIT 500MB"));
client->Execute("CALL libquery_memory_limit_proc_multi_thread.dual_thread() YIELD test_passed RETURN "
"test_passed QUERY MEMORY LIMIT 500MB"));
bool error{false};
try {
auto result_rows = client->FetchAll();
if (result_rows) {
auto row = *result_rows->begin();
error = !row[0].ValueBool();
MG_ASSERT(row[0].ValueBool(), "Execpected the procedure to pass");
} else {
MG_ASSERT(false, "Expected at least one row");
}
} catch (const std::exception &e) {
error = true;
MG_ASSERT(error, "This error should not have happend {}", e.what());
}
MG_ASSERT(error, "Error should have happend");
return 0;
}

View File

@ -6,7 +6,22 @@ args: &args
- "--storage-gc-cycle-sec=180"
- "--log-level=TRACE"
in_memory_cluster: &in_memory_cluster
args_150_MiB_limit: &args_150_MiB_limit
- "--bolt-port"
- *bolt_port
- "--memory-limit=150"
- "--storage-gc-cycle-sec=180"
- "--log-level=TRACE"
in_memory_150_MiB_limit_cluster: &in_memory_150_MiB_limit_cluster
cluster:
main:
args: *args_150_MiB_limit
log_file: "memory-e2e.log"
setup_queries: []
validation_queries: []
in_memory_1024_MiB_limit_cluster: &in_memory_1024_MiB_limit_cluster
cluster:
main:
args: *args
@ -61,6 +76,30 @@ disk_450_MiB_limit_cluster: &disk_450_MiB_limit_cluster
setup_queries: []
validation_queries: []
args_300_MiB_limit: &args_300_MiB_limit
- "--bolt-port"
- *bolt_port
- "--memory-limit=300"
- "--storage-gc-cycle-sec=180"
- "--log-level=INFO"
in_memory_300_MiB_limit_cluster: &in_memory_300_MiB_limit_cluster
cluster:
main:
args: *args_300_MiB_limit
log_file: "memory-e2e.log"
setup_queries: []
validation_queries: []
disk_300_MiB_limit_cluster: &disk_300_MiB_limit_cluster
cluster:
main:
args: *args_300_MiB_limit
log_file: "memory-e2e.log"
setup_queries: []
validation_queries: []
args_global_limit_1024_MiB: &args_global_limit_1024_MiB
- "--bolt-port"
@ -80,36 +119,36 @@ workloads:
- name: "Memory control"
binary: "tests/e2e/memory/memgraph__e2e__memory__control"
args: ["--bolt-port", *bolt_port, "--timeout", "180"]
<<: *in_memory_cluster
<<: *in_memory_150_MiB_limit_cluster
- name: "Memory control multi database"
binary: "tests/e2e/memory/memgraph__e2e__memory__control"
args: ["--bolt-port", *bolt_port, "--timeout", "180", "--multi-db", "true"]
<<: *in_memory_cluster
<<: *in_memory_150_MiB_limit_cluster
- name: "Memory limit for modules upon loading"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc"
args: ["--bolt-port", *bolt_port, "--timeout", "180"]
proc: "tests/e2e/memory/procedures/"
<<: *in_memory_cluster
<<: *in_memory_1024_MiB_limit_cluster
- name: "Memory limit for modules upon loading multi database"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc"
args: ["--bolt-port", *bolt_port, "--timeout", "180", "--multi-db", "true"]
proc: "tests/e2e/memory/procedures/"
<<: *in_memory_cluster
<<: *in_memory_1024_MiB_limit_cluster
- name: "Memory limit for modules inside a procedure"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc_proc"
args: ["--bolt-port", *bolt_port, "--timeout", "180"]
proc: "tests/e2e/memory/procedures/"
<<: *in_memory_cluster
<<: *in_memory_1024_MiB_limit_cluster
- name: "Memory limit for modules inside a procedure multi database"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc_proc"
args: ["--bolt-port", *bolt_port, "--timeout", "180", "--multi-db", "true"]
proc: "tests/e2e/memory/procedures/"
<<: *in_memory_cluster
<<: *in_memory_1024_MiB_limit_cluster
- name: "Memory limit for modules upon loading for on-disk storage"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_global_alloc"
@ -143,12 +182,12 @@ workloads:
- name: "Memory control for detach delete"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_450_MiB_limit_cluster
<<: *in_memory_300_MiB_limit_cluster
- name: "Memory control for detach delete on disk storage"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_delete"
args: ["--bolt-port", *bolt_port]
<<: *disk_450_MiB_limit_cluster
<<: *disk_300_MiB_limit_cluster
- name: "Memory control for accumulation"
binary: "tests/e2e/memory/memgraph__e2e__memory__limit_accumulation"
@ -170,17 +209,11 @@ workloads:
args: ["--bolt-port", *bolt_port]
<<: *disk_450_MiB_limit_cluster
- name: "Memory control for create from multi thread proc create"
binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_multi_thread_proc_create"
proc: "tests/e2e/memory/procedures/"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_cluster
- name: "Memory control for memory limit global thread alloc"
binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_thread_alloc_proc"
proc: "tests/e2e/memory/procedures/"
args: ["--bolt-port", *bolt_port]
<<: *in_memory_cluster
<<: *in_memory_1024_MiB_limit_cluster
- name: "Procedure memory control for single procedure"
binary: "tests/e2e/memory/memgraph__e2e__procedure_memory_limit"

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -36,13 +36,13 @@ TEST(MemoryTrackerTest, ExceptionEnabler) {
can_continue = true;
}};
ASSERT_NO_THROW(memory_tracker.Alloc(hard_limit + 1));
ASSERT_TRUE(memory_tracker.Alloc(hard_limit + 1));
}};
std::thread t2{[&] {
memgraph::utils::MemoryTracker::OutOfMemoryExceptionEnabler exception_enabler;
enabler_created = true;
ASSERT_THROW(memory_tracker.Alloc(hard_limit + 1), memgraph::utils::OutOfMemoryException);
ASSERT_FALSE(memory_tracker.Alloc(hard_limit + 1));
// hold the enabler until the first thread finishes
while (!can_continue)
@ -63,8 +63,8 @@ TEST(MemoryTrackerTest, ExceptionBlocker) {
{
memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker;
ASSERT_NO_THROW(memory_tracker.Alloc(hard_limit + 1));
ASSERT_TRUE(memory_tracker.Alloc(hard_limit + 1));
ASSERT_EQ(memory_tracker.Amount(), hard_limit + 1);
}
ASSERT_THROW(memory_tracker.Alloc(hard_limit + 1), memgraph::utils::OutOfMemoryException);
ASSERT_FALSE(memory_tracker.Alloc(hard_limit + 1));
}