Fix race condition and arena tracking bug (#1468)
This commit is contained in:
parent
4e9a036881
commit
17915578f8
@ -61,7 +61,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
|
||||
// This needs to be before, to throw exception in case of too big alloc
|
||||
if (*commit) [[likely]] {
|
||||
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackAllocOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
@ -70,7 +70,7 @@ void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t
|
||||
if (ptr == nullptr) [[unlikely]] {
|
||||
if (*commit) {
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
@ -90,7 +90,7 @@ static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, boo
|
||||
if (committed) [[likely]] {
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
|
||||
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
@ -101,7 +101,7 @@ static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, boo
|
||||
static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) {
|
||||
if (committed) [[likely]] {
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
}
|
||||
@ -118,7 +118,7 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz
|
||||
}
|
||||
|
||||
memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
|
||||
@ -135,7 +135,7 @@ static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, s
|
||||
}
|
||||
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
|
||||
@ -152,7 +152,7 @@ static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t siz
|
||||
}
|
||||
memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
|
||||
|
||||
if (GetQueriesMemoryControl().IsArenaTracked(arena_ind)) [[unlikely]] {
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
GetQueriesMemoryControl().TrackFreeOnCurrentThread(size);
|
||||
}
|
||||
|
||||
@ -179,8 +179,11 @@ void SetHooks() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Needs init due to error we might encounter otherwise
|
||||
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=13684
|
||||
[[maybe_unused]] const auto &queries_memory_control = GetQueriesMemoryControl();
|
||||
|
||||
for (int i = 0; i < n_arenas; i++) {
|
||||
GetQueriesMemoryControl().InitializeArenaCounter(i);
|
||||
std::string func_name = "arena." + std::to_string(i) + ".extent_hooks";
|
||||
|
||||
size_t hooks_len = sizeof(old_hooks);
|
||||
@ -240,7 +243,6 @@ void UnsetHooks() {
|
||||
}
|
||||
|
||||
for (int i = 0; i < n_arenas; i++) {
|
||||
GetQueriesMemoryControl().InitializeArenaCounter(i);
|
||||
std::string func_name = "arena." + std::to_string(i) + ".extent_hooks";
|
||||
|
||||
MG_ASSERT(old_hooks);
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "query_memory_control.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
#include "utils/rw_spin_lock.hpp"
|
||||
|
||||
@ -33,20 +34,6 @@ namespace memgraph::memory {
|
||||
|
||||
#if USE_JEMALLOC
|
||||
|
||||
unsigned QueriesMemoryControl::GetArenaForThread() {
|
||||
unsigned thread_arena{0};
|
||||
size_t size_thread_arena = sizeof(thread_arena);
|
||||
int err = mallctl("thread.arena", &thread_arena, &size_thread_arena, nullptr, 0);
|
||||
if (err) {
|
||||
LOG_FATAL("Can't get arena for thread.");
|
||||
}
|
||||
return thread_arena;
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::AddTrackingOnArena(unsigned arena_id) { arena_tracking[arena_id].fetch_add(1); }
|
||||
|
||||
void QueriesMemoryControl::RemoveTrackingOnArena(unsigned arena_id) { arena_tracking[arena_id].fetch_sub(1); }
|
||||
|
||||
void QueriesMemoryControl::UpdateThreadToTransactionId(const std::thread::id &thread_id, uint64_t transaction_id) {
|
||||
auto accessor = thread_id_to_transaction_id.access();
|
||||
accessor.insert({thread_id, transaction_id});
|
||||
@ -119,14 +106,6 @@ bool QueriesMemoryControl::EraseTransactionIdTracker(uint64_t transaction_id) {
|
||||
return removed;
|
||||
}
|
||||
|
||||
bool QueriesMemoryControl::IsArenaTracked(unsigned arena_ind) {
|
||||
return arena_tracking[arena_ind].load(std::memory_order_acquire) != 0;
|
||||
}
|
||||
|
||||
void QueriesMemoryControl::InitializeArenaCounter(unsigned arena_ind) {
|
||||
arena_tracking[arena_ind].store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
bool QueriesMemoryControl::CheckTransactionIdTrackerExists(uint64_t transaction_id) {
|
||||
auto transaction_id_to_tracker_accessor = transaction_id_to_tracker.access();
|
||||
return transaction_id_to_tracker_accessor.contains(transaction_id);
|
||||
@ -166,19 +145,29 @@ void QueriesMemoryControl::PauseProcedureTracking(uint64_t transaction_id) {
|
||||
query_tracker->tracker.StopProcTracking();
|
||||
}
|
||||
|
||||
inline int &Get_Thread_Tracker() {
|
||||
// store variable in bss segment for each thread
|
||||
// https://cs-fundamentals.com/c-programming/memory-layout-of-c-program-code-data-segments#size-of-code-data-bss-segments
|
||||
static thread_local int is_thread_tracked{0};
|
||||
return is_thread_tracked;
|
||||
}
|
||||
|
||||
bool QueriesMemoryControl::IsThreadTracked() { return Get_Thread_Tracker() == 1; }
|
||||
|
||||
#endif
|
||||
|
||||
void StartTrackingCurrentThreadTransaction(uint64_t transaction_id) {
|
||||
#if USE_JEMALLOC
|
||||
Get_Thread_Tracker() = 0;
|
||||
GetQueriesMemoryControl().UpdateThreadToTransactionId(std::this_thread::get_id(), transaction_id);
|
||||
GetQueriesMemoryControl().AddTrackingOnArena(QueriesMemoryControl::GetArenaForThread());
|
||||
Get_Thread_Tracker() = 1;
|
||||
#endif
|
||||
}
|
||||
|
||||
void StopTrackingCurrentThreadTransaction(uint64_t transaction_id) {
|
||||
#if USE_JEMALLOC
|
||||
Get_Thread_Tracker() = 0;
|
||||
GetQueriesMemoryControl().EraseThreadToTransactionId(std::this_thread::get_id(), transaction_id);
|
||||
GetQueriesMemoryControl().RemoveTrackingOnArena(QueriesMemoryControl::GetArenaForThread());
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -33,25 +33,6 @@ static constexpr int64_t UNLIMITED_MEMORY{0};
|
||||
// it is necessary to restart tracking at the beginning of new query for that transaction.
|
||||
class QueriesMemoryControl {
|
||||
public:
|
||||
/*
|
||||
Arena stats
|
||||
*/
|
||||
|
||||
static unsigned GetArenaForThread();
|
||||
|
||||
// Add counter on threads allocating inside arena
|
||||
void AddTrackingOnArena(unsigned);
|
||||
|
||||
// Remove counter on threads allocating in arena
|
||||
void RemoveTrackingOnArena(unsigned);
|
||||
|
||||
// Are any threads using current arena for allocations
|
||||
// Multiple threads can allocate inside one arena
|
||||
bool IsArenaTracked(unsigned);
|
||||
|
||||
// Initialize arena counter
|
||||
void InitializeArenaCounter(unsigned);
|
||||
|
||||
/*
|
||||
Transaction id <-> tracker
|
||||
*/
|
||||
@ -94,9 +75,9 @@ class QueriesMemoryControl {
|
||||
|
||||
void PauseProcedureTracking(uint64_t);
|
||||
|
||||
private:
|
||||
std::unordered_map<unsigned, std::atomic<int>> arena_tracking;
|
||||
bool IsThreadTracked();
|
||||
|
||||
private:
|
||||
struct ThreadIdToTransactionId {
|
||||
std::thread::id thread_id;
|
||||
uint64_t transaction_id;
|
||||
|
@ -107,9 +107,6 @@ class MemoryTracker final {
|
||||
};
|
||||
|
||||
private:
|
||||
enum class TrackingMode { DEFAULT, ADDITIONAL_PROC };
|
||||
|
||||
TrackingMode tracking_mode_{TrackingMode::DEFAULT};
|
||||
std::atomic<int64_t> amount_{0};
|
||||
std::atomic<int64_t> peak_{0};
|
||||
std::atomic<int64_t> hard_limit_{0};
|
||||
|
Loading…
Reference in New Issue
Block a user