From 7f7f3adfcb7fd6a00701e8a0ed8c1e06c2ee2f47 Mon Sep 17 00:00:00 2001
From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com>
Date: Mon, 23 Oct 2023 12:48:26 +0200
Subject: [PATCH] Implement jemalloc extent hooks memory tracker (#1250)

Should improve/fix memory usage exceeds --memory-limit issues
---
 cmake/FindJemalloc.cmake              |  55 ----
 cmake/Findjemalloc.cmake              |  67 +++++
 libs/CMakeLists.txt                   |  17 +-
 libs/setup.sh                         |  20 ++
 src/memgraph.cpp                      |   3 +-
 src/memory/CMakeLists.txt             |   4 +-
 src/memory/memory_control.cpp         | 178 ++++++++++++-
 src/memory/memory_control.hpp         |   7 +-
 src/memory/new_delete.cpp             |  28 +-
 tests/e2e/disk_storage/workloads.yaml |   2 +-
 tests/e2e/memgraph.py                 |  11 +-
 tests/e2e/memory/workloads.yaml       |   2 +-
 tests/stress/common.py                |  19 ++
 tests/stress/continuous_integration   |  44 +++-
 tests/stress/memory_limit.py          | 257 ++++++++++++++++++
 tests/stress/memory_tracker.py        | 359 ++++++++++++++++++++++++++
 tests/stress/test_config.py           |  15 ++
 17 files changed, 988 insertions(+), 100 deletions(-)
 delete mode 100644 cmake/FindJemalloc.cmake
 create mode 100644 cmake/Findjemalloc.cmake
 create mode 100644 tests/stress/memory_limit.py
 create mode 100644 tests/stress/memory_tracker.py

diff --git a/cmake/FindJemalloc.cmake b/cmake/FindJemalloc.cmake
deleted file mode 100644
index 1f336fd10..000000000
--- a/cmake/FindJemalloc.cmake
+++ /dev/null
@@ -1,55 +0,0 @@
-# Try to find jemalloc library
-#
-# Use this module as:
-#   find_package(Jemalloc)
-#
-# or:
-#   find_package(Jemalloc REQUIRED)
-#
-# This will define the following variables:
-#
-#   Jemalloc_FOUND           True if the system has the jemalloc library.
-#   Jemalloc_INCLUDE_DIRS    Include directories needed to use jemalloc.
-#   Jemalloc_LIBRARIES       Libraries needed to link to jemalloc.
-#
-# The following cache variables may also be set:
-#
-# Jemalloc_INCLUDE_DIR     The directory containing jemalloc/jemalloc.h.
-# Jemalloc_LIBRARY         The path to the jemalloc static library.
-
-find_path(Jemalloc_INCLUDE_DIR NAMES jemalloc/jemalloc.h PATH_SUFFIXES include)
-
-find_library(Jemalloc_LIBRARY NAMES libjemalloc.a PATH_SUFFIXES lib)
-
-include(FindPackageHandleStandardArgs)
-find_package_handle_standard_args(Jemalloc
-  FOUND_VAR Jemalloc_FOUND
-  REQUIRED_VARS
-    Jemalloc_LIBRARY
-    Jemalloc_INCLUDE_DIR
-)
-
-if(Jemalloc_FOUND)
-  set(Jemalloc_LIBRARIES ${Jemalloc_LIBRARY})
-  set(Jemalloc_INCLUDE_DIRS ${Jemalloc_INCLUDE_DIR})
-else()
-  if(Jemalloc_FIND_REQUIRED)
-    message(FATAL_ERROR "Cannot find jemalloc!")
-  else()
-    message(WARNING "jemalloc is not found!")
-  endif()
-endif()
-
-if(Jemalloc_FOUND AND NOT TARGET Jemalloc::Jemalloc)
-  add_library(Jemalloc::Jemalloc UNKNOWN IMPORTED)
-  set_target_properties(Jemalloc::Jemalloc
-    PROPERTIES
-      IMPORTED_LOCATION "${Jemalloc_LIBRARY}"
-      INTERFACE_INCLUDE_DIRECTORIES "${Jemalloc_INCLUDE_DIR}"
-  )
-endif()
-
-mark_as_advanced(
-  Jemalloc_INCLUDE_DIR
-  Jemalloc_LIBRARY
-)
\ No newline at end of file
diff --git a/cmake/Findjemalloc.cmake b/cmake/Findjemalloc.cmake
new file mode 100644
index 000000000..1e24f0f30
--- /dev/null
+++ b/cmake/Findjemalloc.cmake
@@ -0,0 +1,67 @@
+# Try to find jemalloc library
+#
+# Use this module as:
+#   find_package(jemalloc)
+#
+# or:
+#   find_package(jemalloc REQUIRED)
+#
+# This will define the following variables:
+#
+#   JEMALLOC_FOUND           True if the system has the jemalloc library.
+#   Jemalloc_INCLUDE_DIRS    Include directories needed to use jemalloc.
+#   Jemalloc_LIBRARIES       Libraries needed to link to jemalloc.
+#
+# The following cache variables may also be set:
+#
+# Jemalloc_INCLUDE_DIR     The directory containing jemalloc/jemalloc.h.
+# Jemalloc_LIBRARY         The path to the jemalloc static library.
+
+
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(jemalloc
+  FOUND_VAR JEMALLOC_FOUND
+  REQUIRED_VARS
+    JEMALLOC_LIBRARY
+    JEMALLOC_INCLUDE_DIR
+)
+
+if(JEMALLOC_INCLUDE_DIR)
+  message(STATUS "Found jemalloc include dir: ${JEMALLOC_INCLUDE_DIR}")
+else()
+  message(WARNING "jemalloc not found!")
+endif()
+
+if(JEMALLOC_LIBRARY)
+  message(STATUS "Found jemalloc library: ${JEMALLOC_LIBRARY}")
+else()
+  message(WARNING "jemalloc library not found!")
+endif()
+
+if(JEMALLOC_FOUND)
+  set(Jemalloc_LIBRARIES ${JEMALLOC_LIBRARY})
+  set(Jemalloc_INCLUDE_DIRS ${JEMALLOC_INCLUDE_DIR})
+else()
+  if(Jemalloc_FIND_REQUIRED)
+    message(FATAL_ERROR "Cannot find jemalloc!")
+  else()
+    message(WARNING "jemalloc is not found!")
+  endif()
+endif()
+
+if(JEMALLOC_FOUND AND NOT TARGET Jemalloc::Jemalloc)
+  message(STATUS "JEMALLOC NOT TARGET")
+
+  add_library(Jemalloc::Jemalloc UNKNOWN IMPORTED)
+  set_target_properties(Jemalloc::Jemalloc
+    PROPERTIES
+      IMPORTED_LOCATION "${JEMALLOC_LIBRARY}"
+      INTERFACE_INCLUDE_DIRECTORIES "${JEMALLOC_INCLUDE_DIR}"
+  )
+endif()
+
+mark_as_advanced(
+  JEMALLOC_INCLUDE_DIR
+  JEMALLOC_LIBRARY
+)
diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt
index 17f013eff..411b180ab 100644
--- a/libs/CMakeLists.txt
+++ b/libs/CMakeLists.txt
@@ -15,7 +15,6 @@ set(GFLAGS_NOTHREADS OFF)
 # NOTE: config/generate.py depends on the gflags help XML format.
 find_package(gflags REQUIRED)
 find_package(fmt 8.0.1)
-find_package(Jemalloc REQUIRED)
 find_package(ZLIB 1.2.11 REQUIRED)
 
 set(LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR})
@@ -99,6 +98,17 @@ macro(import_external_library name type library_location include_dir)
   import_library(${name} ${type} ${${_upper_name}_LIBRARY} ${${_upper_name}_INCLUDE_DIR})
 endmacro(import_external_library)
 
+
+macro(set_path_external_library name type library_location include_dir)
+  string(TOUPPER ${name} _upper_name)
+  set(${_upper_name}_LIBRARY ${library_location} CACHE FILEPATH
+      "Path to ${name} library" FORCE)
+  set(${_upper_name}_INCLUDE_DIR ${include_dir} CACHE FILEPATH
+      "Path to ${name} include directory" FORCE)
+  mark_as_advanced(${name}_LIBRARY ${name}_INCLUDE_DIR)
+endmacro(set_path_external_library)
+
+
 # setup antlr
 import_external_library(antlr4 STATIC
   ${CMAKE_CURRENT_SOURCE_DIR}/antlr4/runtime/Cpp/lib/libantlr4-runtime.a
@@ -265,3 +275,8 @@ import_header_library(ctre ${CMAKE_CURRENT_SOURCE_DIR})
 # setup absl (cmake sub_directory tolerant)
 set(ABSL_PROPAGATE_CXX_STD ON)
 add_subdirectory(absl EXCLUDE_FROM_ALL)
+
+# set Jemalloc
+set_path_external_library(jemalloc STATIC
+  ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc/lib/libjemalloc.a
+  ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc/include/)
diff --git a/libs/setup.sh b/libs/setup.sh
index d0fb41b0f..638c1b9f2 100755
--- a/libs/setup.sh
+++ b/libs/setup.sh
@@ -124,6 +124,7 @@ declare -A primary_urls=(
   ["librdtsc"]="http://$local_cache_host/git/librdtsc.git"
   ["ctre"]="http://$local_cache_host/file/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
   ["absl"]="https://$local_cache_host/git/abseil-cpp.git"
+  ["jemalloc"]="https://$local_cache_host/git/jemalloc.git"
 )
 
 # The goal of secondary urls is to have links to the "source of truth" of
@@ -151,6 +152,7 @@ declare -A secondary_urls=(
   ["librdtsc"]="https://github.com/gabrieleara/librdtsc.git"
   ["ctre"]="https://raw.githubusercontent.com/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
   ["absl"]="https://github.com/abseil/abseil-cpp.git"
+  ["jemalloc"]="https://github.com/jemalloc/jemalloc.git"
 )
 
 # antlr
@@ -252,3 +254,21 @@ cd ..
 # abseil 20230125.3
 absl_ref="20230125.3"
 repo_clone_try_double "${primary_urls[absl]}" "${secondary_urls[absl]}" "absl" "$absl_ref"
+
+
+# jemalloc ea6b3e973b477b8061e0076bb257dbd7f3faa756
+JEMALLOC_COMMIT_VERSION="5.2.1"
+repo_clone_try_double "${secondary_urls[jemalloc]}" "${secondary_urls[jemalloc]}" "jemalloc" "$JEMALLOC_COMMIT_VERSION"
+
+# this is hack for cmake in libs to set path, and for FindJemalloc to use Jemalloc_INCLUDE_DIR
+pushd jemalloc
+
+./autogen.sh
+MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \
+./configure \
+  --disable-cxx \
+  --enable-shared=no --prefix=$working_dir \
+  --with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
+
+make -j$CPUS install
+popd
diff --git a/src/memgraph.cpp b/src/memgraph.cpp
index dcbef2e88..5fe343ffb 100644
--- a/src/memgraph.cpp
+++ b/src/memgraph.cpp
@@ -23,6 +23,7 @@
 #include "glue/run_id.hpp"
 #include "helpers.hpp"
 #include "license/license_sender.hpp"
+#include "memory/memory_control.hpp"
 #include "query/config.hpp"
 #include "query/discard_value_stream.hpp"
 #include "query/interpreter.hpp"
@@ -108,6 +109,7 @@ void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
 }
 
 int main(int argc, char **argv) {
+  memgraph::memory::SetHooks();
   google::SetUsageMessage("Memgraph database server");
   gflags::SetVersionString(version_string);
 
@@ -199,7 +201,6 @@ int main(int argc, char **argv) {
           "won't be available.");
     }
   }
-
   std::cout << "You are running Memgraph v" << gflags::VersionString() << std::endl;
   std::cout << "To get started with Memgraph, visit https://memgr.ph/start" << std::endl;
 
diff --git a/src/memory/CMakeLists.txt b/src/memory/CMakeLists.txt
index 0aaf47702..bf3cbb23b 100644
--- a/src/memory/CMakeLists.txt
+++ b/src/memory/CMakeLists.txt
@@ -2,7 +2,9 @@ set(memory_src_files
     new_delete.cpp
     memory_control.cpp)
 
-find_package(Jemalloc REQUIRED)
+
+
+find_package(jemalloc REQUIRED)
 
 add_library(mg-memory STATIC ${memory_src_files})
 target_link_libraries(mg-memory mg-utils fmt)
diff --git a/src/memory/memory_control.cpp b/src/memory/memory_control.cpp
index 2af313d09..b3eeb6c26 100644
--- a/src/memory/memory_control.cpp
+++ b/src/memory/memory_control.cpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -10,6 +10,8 @@
 // licenses/APL.txt.
 
 #include "memory_control.hpp"
+#include "utils/logging.hpp"
+#include "utils/memory_tracker.hpp"
 
 #if USE_JEMALLOC
 #include <jemalloc/jemalloc.h>
@@ -22,6 +24,179 @@ namespace memgraph::memory {
 // NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
 #define STRINGIFY(x) STRINGIFY_HELPER(x)
 
+#if USE_JEMALLOC
+
+static void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t alignment, bool *zero,
+                      bool *commit, unsigned arena_ind);
+static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind);
+static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind);
+static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
+                      unsigned arena_ind);
+static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
+                        unsigned arena_ind);
+static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
+                            unsigned arena_ind);
+extent_hooks_t *old_hooks = nullptr;
+
+static extent_hooks_t custom_hooks = {
+    .alloc = &my_alloc,
+    .dalloc = &my_dalloc,
+    .destroy = &my_destroy,
+    .commit = &my_commit,
+    .decommit = &my_decommit,
+    .purge_lazy = nullptr,
+    .purge_forced = &my_purge_forced,
+    .split = nullptr,
+    .merge = nullptr,
+};
+
+static const extent_hooks_t *new_hooks = &custom_hooks;
+
+void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t alignment, bool *zero, bool *commit,
+               unsigned arena_ind) {
+  // This needs to be before, to throw exception in case of too big alloc
+  if (*commit) [[likely]] {
+    memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
+  }
+
+  auto *ptr = old_hooks->alloc(extent_hooks, new_addr, size, alignment, zero, commit, arena_ind);
+  if (ptr == nullptr) [[unlikely]] {
+    if (*commit) {
+      memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
+    }
+    return ptr;
+  }
+
+  return ptr;
+}
+
+static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) {
+  auto err = old_hooks->dalloc(extent_hooks, addr, size, committed, arena_ind);
+
+  if (err) [[unlikely]] {
+    return err;
+  }
+
+  if (committed) [[likely]] {
+    memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
+  }
+
+  return false;
+}
+
+static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) {
+  if (committed) [[likely]] {
+    memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
+  }
+
+  old_hooks->destroy(extent_hooks, addr, size, committed, arena_ind);
+}
+
+static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
+                      unsigned arena_ind) {
+  auto err = old_hooks->commit(extent_hooks, addr, size, offset, length, arena_ind);
+
+  if (err) {
+    return err;
+  }
+
+  memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
+
+  return false;
+}
+
+static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
+                        unsigned arena_ind) {
+  MG_ASSERT(old_hooks && old_hooks->decommit);
+  auto err = old_hooks->decommit(extent_hooks, addr, size, offset, length, arena_ind);
+
+  if (err) {
+    return err;
+  }
+
+  memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
+
+  return false;
+}
+
+static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length,
+                            unsigned arena_ind) {
+  MG_ASSERT(old_hooks && old_hooks->purge_forced);
+  auto err = old_hooks->purge_forced(extent_hooks, addr, size, offset, length, arena_ind);
+
+  if (err) [[unlikely]] {
+    return err;
+  }
+  memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(length));
+
+  return false;
+}
+
+#endif
+
+void SetHooks() {
+#if USE_JEMALLOC
+
+  uint64_t allocated{0};
+  uint64_t sz{sizeof(allocated)};
+
+  sz = sizeof(unsigned);
+  unsigned n_arenas{0};
+  int err = mallctl("opt.narenas", (void *)&n_arenas, &sz, nullptr, 0);
+
+  if (err) {
+    return;
+  }
+
+  if (nullptr != old_hooks) {
+    return;
+  }
+
+  for (int i = 0; i < n_arenas; i++) {
+    std::string func_name = "arena." + std::to_string(i) + ".extent_hooks";
+
+    size_t hooks_len = sizeof(old_hooks);
+
+    int err = mallctl(func_name.c_str(), &old_hooks, &hooks_len, nullptr, 0);
+
+    if (err) {
+      LOG_FATAL("Error getting hooks for jemalloc arena {}", i);
+    }
+
+    // Due to the way jemalloc works, we need first to set their hooks
+    // which will trigger creating arena, then we can set our custom hook wrappers
+
+    err = mallctl(func_name.c_str(), nullptr, nullptr, &old_hooks, sizeof(old_hooks));
+
+    MG_ASSERT(old_hooks);
+    MG_ASSERT(old_hooks->alloc);
+    MG_ASSERT(old_hooks->dalloc);
+    MG_ASSERT(old_hooks->destroy);
+    MG_ASSERT(old_hooks->commit);
+    MG_ASSERT(old_hooks->decommit);
+    MG_ASSERT(old_hooks->purge_forced);
+    MG_ASSERT(old_hooks->purge_lazy);
+    MG_ASSERT(old_hooks->split);
+    MG_ASSERT(old_hooks->merge);
+
+    custom_hooks.purge_lazy = old_hooks->purge_lazy;
+    custom_hooks.split = old_hooks->split;
+    custom_hooks.merge = old_hooks->merge;
+
+    if (err) {
+      LOG_FATAL("Error setting jemalloc hooks for jemalloc arena {}", i);
+    }
+
+    err = mallctl(func_name.c_str(), nullptr, nullptr, &new_hooks, sizeof(new_hooks));
+
+    if (err) {
+      LOG_FATAL("Error setting custom hooks for jemalloc arena {}", i);
+    }
+  }
+
+#endif
+}
+
 void PurgeUnusedMemory() {
 #if USE_JEMALLOC
   mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
@@ -30,4 +205,5 @@ void PurgeUnusedMemory() {
 
 #undef STRINGIFY
 #undef STRINGIFY_HELPER
+
 }  // namespace memgraph::memory
diff --git a/src/memory/memory_control.hpp b/src/memory/memory_control.hpp
index c1258becd..471acf774 100644
--- a/src/memory/memory_control.hpp
+++ b/src/memory/memory_control.hpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -11,6 +11,11 @@
 
 #pragma once
 
+#include <cstddef>
+#include "utils/logging.hpp"
 namespace memgraph::memory {
+
 void PurgeUnusedMemory();
+void SetHooks();
+
 }  // namespace memgraph::memory
diff --git a/src/memory/new_delete.cpp b/src/memory/new_delete.cpp
index b656790c1..2f982ec67 100644
--- a/src/memory/new_delete.cpp
+++ b/src/memory/new_delete.cpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -87,21 +87,15 @@ void deleteSized(void *ptr, const std::size_t /*unused*/, const std::align_val_t
 #endif
 
 void TrackMemory(std::size_t size) {
-#if USE_JEMALLOC
-  if (size != 0) [[likely]] {
-    size = nallocx(size, 0);
-  }
-#endif
+#if !USE_JEMALLOC
   memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
+#endif
 }
 
 void TrackMemory(std::size_t size, const std::align_val_t align) {
-#if USE_JEMALLOC
-  if (size != 0) [[likely]] {
-    size = nallocx(size, MALLOCX_ALIGN(align));  // NOLINT(hicpp-signed-bitwise)
-  }
-#endif
+#if !USE_JEMALLOC
   memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(size));
+#endif
 }
 
 bool TrackMemoryNoExcept(const std::size_t size) {
@@ -126,11 +120,7 @@ bool TrackMemoryNoExcept(const std::size_t size, const std::align_val_t align) {
 
 void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size = 0) noexcept {
   try {
-#if USE_JEMALLOC
-    if (ptr != nullptr) [[likely]] {
-      memgraph::utils::total_memory_tracker.Free(sallocx(ptr, 0));
-    }
-#else
+#if !USE_JEMALLOC
     if (size) {
       memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
     } else {
@@ -144,11 +134,7 @@ void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size
 
 void UntrackMemory(void *ptr, const std::align_val_t align, [[maybe_unused]] std::size_t size = 0) noexcept {
   try {
-#if USE_JEMALLOC
-    if (ptr != nullptr) [[likely]] {
-      memgraph::utils::total_memory_tracker.Free(sallocx(ptr, MALLOCX_ALIGN(align)));  // NOLINT(hicpp-signed-bitwise)
-    }
-#else
+#if !USE_JEMALLOC
     if (size) {
       memgraph::utils::total_memory_tracker.Free(static_cast<int64_t>(size));
     } else {
diff --git a/tests/e2e/disk_storage/workloads.yaml b/tests/e2e/disk_storage/workloads.yaml
index 32c558935..e2ee51f25 100644
--- a/tests/e2e/disk_storage/workloads.yaml
+++ b/tests/e2e/disk_storage/workloads.yaml
@@ -1,7 +1,7 @@
 disk_storage: &disk_storage
   cluster:
     main:
-      args: ["--bolt-port", "7687", "--log-level", "TRACE", "--memory-limit", "50"]
+      args: ["--bolt-port", "7687", "--log-level", "TRACE", "--memory-limit", "125"]
       log_file: "disk_storage.log"
       setup_queries: []
       validation_queries: []
diff --git a/tests/e2e/memgraph.py b/tests/e2e/memgraph.py
index dff0f1d49..2b80c2f62 100755
--- a/tests/e2e/memgraph.py
+++ b/tests/e2e/memgraph.py
@@ -130,9 +130,14 @@ class MemgraphInstanceRunner:
     def stop(self):
         if not self.is_running():
             return
-        self.proc_mg.terminate()
-        code = self.proc_mg.wait()
-        assert code == 0, "The Memgraph process exited with non-zero!"
+
+        pid = self.proc_mg.pid
+        try:
+            os.kill(pid, 15)  # 15 is the signal number for SIGTERM
+        except os.OSError:
+            assert False
+
+        time.sleep(1)
 
     def kill(self):
         if not self.is_running():
diff --git a/tests/e2e/memory/workloads.yaml b/tests/e2e/memory/workloads.yaml
index 667613114..460d6bc3f 100644
--- a/tests/e2e/memory/workloads.yaml
+++ b/tests/e2e/memory/workloads.yaml
@@ -2,7 +2,7 @@ bolt_port: &bolt_port "7687"
 args: &args
   - "--bolt-port"
   - *bolt_port
-  - "--memory-limit=1000"
+  - "--memory-limit=1024"
   - "--storage-gc-cycle-sec=180"
   - "--log-level=TRACE"
 
diff --git a/tests/stress/common.py b/tests/stress/common.py
index 6ab1be65a..d5235132e 100644
--- a/tests/stress/common.py
+++ b/tests/stress/common.py
@@ -100,6 +100,25 @@ def execute_till_success(session, query, max_retries=1000):
                 raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries))
 
 
+def try_execute(session, query: str):
+    """
+    Executes a query within Bolt session
+
+    Args:
+        session - the bolt session to execute the query with
+        query - str, the query to execute
+
+    :param session: active Bolt session
+    :param query: query to execute
+
+    :return: None
+    """
+    result = session.run(query)
+    data = result.data()
+    summary = result.consume()
+    return data, summary
+
+
 def batch(input, batch_size):
     """Batches the given input (must be iterable).
     Supports input generators. Returns a generator.
diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration
index 8744eecae..7d2dbc76d 100755
--- a/tests/stress/continuous_integration
+++ b/tests/stress/continuous_integration
@@ -8,7 +8,7 @@ import subprocess
 import time
 from argparse import Namespace as Args
 from subprocess import Popen
-from typing import Dict, List
+from typing import Dict, List, Optional
 
 from test_config import LARGE_DATASET, SMALL_DATASET, DatabaseMode, DatasetConstants
 
@@ -79,7 +79,7 @@ def wait_for_server(port, delay=0.1) -> None:
     time.sleep(delay)
 
 
-def start_memgraph(args: Args) -> Popen:
+def start_memgraph(args: Args, memgraph_options: List[str]) -> Popen:
     """Starts Memgraph and return the process"""
     cwd = os.path.dirname(args.memgraph)
     cmd = [
@@ -93,7 +93,7 @@ def start_memgraph(args: Args) -> Popen:
         "--storage-recover-on-startup=false",
         "--query-execution-timeout-sec=1200",
         "--bolt-server-name-for-init=Neo4j/",
-    ]
+    ] + memgraph_options
     if not args.verbose:
         cmd += ["--log-level", "WARNING"]
     if args.log_file:
@@ -168,7 +168,7 @@ def _find_test_binary(args: Args, test: str) -> List[str]:
     raise Exception("Test '{}' binary not supported!".format(test))
 
 
-def run_stress_test_suite(args: Args) -> Dict[str, float]:
+def run_stress_test_suite(args: Args) -> Optional[Dict[str, float]]:
     def cleanup(memgraph_proc):
         if memgraph_proc.poll() != None:
             return
@@ -192,16 +192,30 @@ def run_stress_test_suite(args: Args) -> Dict[str, float]:
 
         for mode in test[DatasetConstants.MODE]:
             test_run = copy.deepcopy(test)
-
             # Run for every specified combination of storage mode, serialization mode, etc (if extended)
             test_run[DatasetConstants.MODE] = mode
-
-            memgraph_proc = start_memgraph(args)
-            runtime = run_test(args, **test_run)
-            runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime
-
-            stop_memgraph(memgraph_proc)
-            cleanup(memgraph_proc)
+            try:
+                memgraph_options = (
+                    test_run[DatasetConstants.MEMGRAPH_OPTIONS] if DatasetConstants.MEMGRAPH_OPTIONS in test_run else []
+                )
+                if DatasetConstants.MEMGRAPH_OPTIONS in test_run:
+                    del test_run[DatasetConstants.MEMGRAPH_OPTIONS]
+                memgraph_proc = start_memgraph(args, memgraph_options)
+            except Exception as ex:
+                print("Exception occured while starting memgraph", ex)
+                return None
+            err = False
+            try:
+                runtime = run_test(args, **test_run)
+                runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime
+            except Exception as ex:
+                print(f"Failed to execute {test_run} with following exception:", ex)
+                err = True
+            finally:
+                stop_memgraph(memgraph_proc)
+                cleanup(memgraph_proc)
+            if err:
+                return None
 
     return runtimes
 
@@ -225,10 +239,12 @@ if __name__ == "__main__":
         generate_temporary_ssl_certs()
 
     runtimes = run_stress_test_suite(args)
-
+    if runtimes is None:
+        print("Some stress tests have failed")
+        exit(1)
+    assert runtimes is not None
     if args.use_ssl:
         remove_certificates()
-
     write_stats(runtimes)
 
     print("Successfully ran stress tests!")
diff --git a/tests/stress/memory_limit.py b/tests/stress/memory_limit.py
new file mode 100644
index 000000000..8c6b699a9
--- /dev/null
+++ b/tests/stress/memory_limit.py
@@ -0,0 +1,257 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+"""
+Stress test for monitoring how memory tracker behaves when
+there is lot of node creation and deletions compared
+to RES memory usage.
+"""
+
+import atexit
+import logging
+import multiprocessing
+import time
+from argparse import Namespace as Args
+from dataclasses import dataclass
+from functools import wraps
+from typing import Any, Callable, Dict, List, Optional, Tuple
+
+from common import (
+    OutputData,
+    SessionCache,
+    connection_argument_parser,
+    execute_till_success,
+    try_execute,
+)
+
+log = logging.getLogger(__name__)
+output_data = OutputData()
+
+
+class Constants:
+    CREATE_FUNCTION = "CREATE"
+
+
+atexit.register(SessionCache.cleanup)
+
+MEMORY_LIMIT = 2048
+
+
+def parse_args() -> Args:
+    """
+    Parses user arguments
+
+    :return: parsed arguments
+    """
+    parser = connection_argument_parser()
+    parser.add_argument("--worker-count", type=int, default=5, help="Number of concurrent workers.")
+    parser.add_argument(
+        "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
+    )
+    parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action")
+    parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
+    parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
+
+    return parser.parse_args()
+
+
+# Global variables
+
+
+args = parse_args()
+
+# Difference between memory RES and memory tracker on
+# Memgraph start.
+# Due to various other things which are included in RES
+# there is difference of ~30MBs initially.
+initial_diff = 0
+
+
+@dataclass
+class Worker:
+    """
+    Class that performs a function defined in the `type` argument.
+
+    Args:
+    type - either `CREATE` or `DELETE`, signifying the function that's going to be performed
+        by the worker
+    id - worker id
+    total_worker_cnt - total number of workers for reference
+    repetition_count - number of times to perform the worker action
+    sleep_sec - float for subsecond sleeping between two subsequent actions
+    """
+
+    type: str
+    id: int
+    total_worker_cnt: int
+    repetition_count: int
+    sleep_sec: float
+
+
+def timed_function(name) -> Callable:
+    """
+    Times performed function
+    """
+
+    def actual_decorator(func) -> Callable:
+        @wraps(func)
+        def timed_wrapper(*args, **kwargs) -> Any:
+            start_time = time.time()
+            result = func(*args, **kwargs)
+            end_time = time.time()
+            output_data.add_measurement(name, end_time - start_time)
+            return result
+
+        return timed_wrapper
+
+    return actual_decorator
+
+
+@timed_function("cleanup_time")
+def clean_database() -> None:
+    session = SessionCache.argument_session(args)
+    execute_till_success(session, "MATCH (n) DETACH DELETE n")
+
+
+def create_indices() -> None:
+    session = SessionCache.argument_session(args)
+    execute_till_success(session, "CREATE INDEX ON :Node")
+
+
+def setup_database_mode() -> None:
+    session = SessionCache.argument_session(args)
+    execute_till_success(session, f"STORAGE MODE {args.storage_mode}")
+    execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
+
+
+def get_tracker_data(session) -> Optional[float]:
+    def parse_data(allocated: str) -> float:
+        num = 0
+        if "KiB" in allocated or "MiB" in allocated or "GiB" in allocated or "TiB" in allocated:
+            num = float(allocated[:-3])
+        else:
+            num = float(allocated[-1])
+
+        if "KiB" in allocated:
+            return num / 1024
+        if "MiB" in allocated:
+            return num
+        if "GiB" in allocated:
+            return num * 1024
+        else:
+            return num * 1024 * 1024
+
+    def isolate_value(data: List[Dict[str, Any]], key: str) -> Optional[str]:
+        for dict in data:
+            if dict["storage info"] == key:
+                return dict["value"]
+        return None
+
+    try:
+        data, _ = try_execute(session, f"SHOW STORAGE INFO")
+        memory_tracker_data = isolate_value(data, "memory_allocated")
+
+        return parse_data(memory_tracker_data)
+
+    except Exception as ex:
+        log.info(f"Get storage info failed with error", ex)
+        return None
+
+
+def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
+    """
+    This writer creates lot of nodes on each write.
+    Also it checks that query failed if memory limit is tried to be broken
+    """
+
+    session = SessionCache.argument_session(args)
+
+    def create() -> bool:
+        """
+        Returns True if done, False if needs to continue
+        """
+        memory_tracker_data_before_start = get_tracker_data(session)
+        should_fail = memory_tracker_data_before_start >= 2048
+        failed = False
+        try:
+            try_execute(
+                session,
+                f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
+            )
+        except Exception as ex:
+            failed = True
+            output = str(ex)
+            log.info("Exception in create", output)
+            assert "Memory limit exceeded!" in output
+
+        if should_fail:
+            assert failed, "Query should have failed"
+            return False
+        return True
+
+    curr_repetition = 0
+
+    while curr_repetition < repetition_count:
+        log.info(f"Worker {worker_id} started iteration {curr_repetition}")
+
+        should_continue = create()
+
+        if not should_continue:
+            return True
+
+        time.sleep(sleep_sec)
+        log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
+
+        curr_repetition += 1
+
+
+def execute_function(worker: Worker) -> Worker:
+    """
+    Executes the function based on the worker type
+    """
+    if worker.type == Constants.CREATE_FUNCTION:
+        run_writer(worker.repetition_count, worker.sleep_sec, worker.id)
+        log.info(f"Worker {worker.type} finished!")
+        return worker
+
+    raise Exception("Worker function not recognized, raising exception!")
+
+
+@timed_function("total_execution_time")
+def execution_handler() -> None:
+    clean_database()
+    log.info("Database is clean.")
+
+    setup_database_mode()
+
+    create_indices()
+
+    worker_count = args.worker_count
+    rep_count = args.repetition_count
+
+    workers = []
+    for i in range(worker_count):
+        workers.append(Worker(Constants.CREATE_FUNCTION, i, worker_count, rep_count, 0.1))
+
+    with multiprocessing.Pool(processes=worker_count) as p:
+        for worker in p.map(execute_function, workers):
+            log.info(f"Worker {worker.type} finished!")
+
+
+if __name__ == "__main__":
+    logging.basicConfig(level=args.logging)
+
+    execution_handler()
+    if args.logging in ["DEBUG", "INFO"]:
+        output_data.dump()
diff --git a/tests/stress/memory_tracker.py b/tests/stress/memory_tracker.py
new file mode 100644
index 000000000..a9ca26b06
--- /dev/null
+++ b/tests/stress/memory_tracker.py
@@ -0,0 +1,359 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Copyright 2023 Memgraph Ltd.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+# License, and you may not use this file except in compliance with the Business Source License.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0, included in the file
+# licenses/APL.txt.
+
+"""
+Stress test for monitoring how memory tracker behaves when
+there is lot of node creation and deletions compared
+to RES memory usage.
+"""
+
+import atexit
+import logging
+import multiprocessing
+import time
+from argparse import Namespace as Args
+from dataclasses import dataclass
+from functools import wraps
+from typing import Any, Callable, Dict, List, Optional, Tuple
+
+from common import (
+    OutputData,
+    SessionCache,
+    connection_argument_parser,
+    execute_till_success,
+    try_execute,
+)
+
+log = logging.getLogger(__name__)
+output_data = OutputData()
+
+
+class Constants:
+    CREATE_FUNCTION = "CREATE"
+    DELETE_FUNCTION = "DELETE"
+    MONITOR_CLEANUP_FUNCTION = "MONITOR_CLEANUP"
+
+
+atexit.register(SessionCache.cleanup)
+
+
+def parse_args() -> Args:
+    """
+    Parses user arguments
+
+    :return: parsed arguments
+    """
+    parser = connection_argument_parser()
+    parser.add_argument("--worker-count", type=int, default=5, help="Number of concurrent workers.")
+    parser.add_argument(
+        "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
+    )
+    parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action")
+    parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
+    parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
+
+    return parser.parse_args()
+
+
+# Global variables
+
+
+args = parse_args()
+
+# Difference between memory RES and memory tracker on
+# Memgraph start.
+# Due to various other things which are included in RES
+# there is difference of ~30MBs initially.
+initial_diff = 0
+
+
+@dataclass
+class Worker:
+    """
+    Class that performs a function defined in the `type` argument.
+
+    Args:
+    type - either `CREATE` or `DELETE`, signifying the function that's going to be performed
+        by the worker
+    id - worker id
+    total_worker_cnt - total number of workers for reference
+    repetition_count - number of times to perform the worker action
+    sleep_sec - float for subsecond sleeping between two subsequent actions
+    """
+
+    type: str
+    id: int
+    total_worker_cnt: int
+    repetition_count: int
+    sleep_sec: float
+
+
+def timed_function(name) -> Callable:
+    """
+    Times performed function
+    """
+
+    def actual_decorator(func) -> Callable:
+        @wraps(func)
+        def timed_wrapper(*args, **kwargs) -> Any:
+            start_time = time.time()
+            result = func(*args, **kwargs)
+            end_time = time.time()
+            output_data.add_measurement(name, end_time - start_time)
+            return result
+
+        return timed_wrapper
+
+    return actual_decorator
+
+
+@timed_function("cleanup_time")
+def clean_database() -> None:
+    session = SessionCache.argument_session(args)
+    execute_till_success(session, "MATCH (n) DETACH DELETE n")
+
+
+def create_indices() -> None:
+    session = SessionCache.argument_session(args)
+    execute_till_success(session, "CREATE INDEX ON :Node")
+
+
+def setup_database_mode() -> None:
+    session = SessionCache.argument_session(args)
+    execute_till_success(session, f"STORAGE MODE {args.storage_mode}")
+    execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
+
+
+def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
+    """
+    This writer creates lot of nodes on each write.
+    """
+    session = SessionCache.argument_session(args)
+
+    def create() -> bool:
+        exception_occured = False
+
+        memory_allocated_before, _ = get_storage_data(session)
+        count_before = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"]
+        try:
+            try_execute(
+                session,
+                f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
+            )
+        except Exception as ex:
+            log.info(f"Exception occured during create: {ex}")
+            exception_occured = True
+
+        memory_allocated_after, _ = get_storage_data(session)
+        count_after = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"]
+        if exception_occured:
+            log.info(
+                f"Exception occured, stopping exection of run {Constants.CREATE_FUNCTION} worker."
+                f"Memory stats: before query: {memory_allocated_before}, after query: {memory_allocated_after}."
+                f"Node stats: before query {count_before}, after query {count_after}"
+            )
+            return False
+
+        return True
+
+    curr_repetition = 0
+
+    while curr_repetition < repetition_count:
+        log.info(f"Worker {worker_id} started iteration {curr_repetition}")
+
+        success = create()
+
+        assert success, "Create was not successfull"
+
+        time.sleep(sleep_sec)
+        log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
+
+        curr_repetition += 1
+
+
+def run_deleter(repetition_count: int, sleep_sec: float) -> None:
+    """
+    Deleting of whole graph
+    """
+    session = SessionCache.argument_session(args)
+
+    def delete_graph() -> None:
+        try:
+            execute_till_success(session, f"MATCH (n) DETACH DELETE n")
+            log.info(f"Worker deleted all nodes")
+        except Exception as ex:
+            log.info(f"Worker failed to delete")
+            pass
+
+    curr_repetition = 0
+    while curr_repetition < repetition_count:
+        delete_graph()
+        time.sleep(sleep_sec)
+        curr_repetition += 1
+
+
+def get_storage_data(session) -> Tuple[float, float]:
+    def parse_data(allocated: str) -> float:
+        num = 0
+        if "KiB" in allocated or "MiB" in allocated or "GiB" in allocated or "TiB" in allocated:
+            num = float(allocated[:-3])
+        else:
+            num = float(allocated[-1])
+
+        if "KiB" in allocated:
+            return num / 1024
+        if "MiB" in allocated:
+            return num
+        if "GiB" in allocated:
+            return num * 1024
+        else:
+            return num * 1024 * 1024
+
+    def isolate_value(data: List[Dict[str, Any]], key: str) -> Optional[str]:
+        for dict in data:
+            if dict["storage info"] == key:
+                return dict["value"]
+        return None
+
+    try:
+        data = execute_till_success(session, f"SHOW STORAGE INFO")[0]
+        res_data = isolate_value(data, "memory_usage")
+        memory_tracker_data = isolate_value(data, "memory_allocated")
+        log.info(
+            f"Worker {Constants.MONITOR_CLEANUP_FUNCTION} logged memory: memory tracker {memory_tracker_data} vs res data {res_data}"
+        )
+
+        return parse_data(memory_tracker_data), parse_data(res_data)
+
+    except Exception as ex:
+        log.info(f"Get storage info failed with error", ex)
+        raise Exception(f"Worker {Constants.MONITOR_CLEANUP_FUNCTION} can't continue working")
+
+
+def run_monitor_cleanup(repetition_count: int, sleep_sec: float) -> None:
+    """
+    Monitoring of graph and periodic cleanup.
+    Idea is that cleanup is in this function
+    so that we can make thread sleep for a while
+    and give RES vs memory tracker time to stabilize
+    to reduce flakeyness of test
+    """
+    session = SessionCache.argument_session(args)
+
+    curr_repetition = 0
+    while curr_repetition < repetition_count:
+        memory_tracker, res_data = get_storage_data(session)
+
+        if memory_tracker < res_data and ((memory_tracker + initial_diff) < res_data):
+            # maybe RES got measured wrongly
+            # Problem with test using detach delete and memory tracker
+            # is that memory tracker gets updated immediately
+            # whereas RES takes some time
+            cnt_again = 3
+            skip_failure = False
+            # 10% is maximum increment, afterwards is fail
+            multiplier = 1
+            while cnt_again:
+                new_memory_tracker, new_res_data = get_storage_data(session)
+
+                if new_memory_tracker > new_res_data or (
+                    (new_memory_tracker + initial_diff) * multiplier > new_res_data
+                ):
+                    skip_failure = True
+                    log.info(
+                        f"Skipping failure on new data:"
+                        f"memory tracker: {new_memory_tracker}, initial diff: {initial_diff},"
+                        f"RES data: {new_res_data}, multiplier: {multiplier}"
+                    )
+                    break
+                multiplier += 0.05
+                cnt_again -= 1
+            if not skip_failure:
+                log.info(memory_tracker, initial_diff, res_data)
+                assert False, "Memory tracker is off by more than 10%, check logs for details"
+
+        def run_cleanup():
+            try:
+                execute_till_success(session, f"FREE MEMORY")
+                log.info(f"Worker deleted all nodes")
+            except Exception as ex:
+                log.info(f"Worker failed to delete")
+            pass
+
+        # idea is to run cleanup from this thread and let thread sleep for a while so RES
+        # gets stabilized
+        if curr_repetition % 10 == 0:
+            run_cleanup()
+
+        time.sleep(sleep_sec)
+        curr_repetition += 1
+
+
+def execute_function(worker: Worker) -> Worker:
+    """
+    Executes the function based on the worker type
+    """
+    if worker.type == Constants.CREATE_FUNCTION:
+        run_writer(worker.repetition_count, worker.sleep_sec, worker.id)
+        log.info(f"Worker {worker.type} finished!")
+        return worker
+
+    elif worker.type == Constants.DELETE_FUNCTION:
+        run_deleter(worker.repetition_count, worker.sleep_sec)
+        log.info(f"Worker {worker.type} finished!")
+        return worker
+
+    elif worker.type == Constants.MONITOR_CLEANUP_FUNCTION:
+        run_monitor_cleanup(worker.repetition_count, worker.sleep_sec)
+        log.info(f"Worker {worker.type} finished!")
+        return worker
+
+    raise Exception("Worker function not recognized, raising exception!")
+
+
+@timed_function("total_execution_time")
+def execution_handler() -> None:
+    clean_database()
+    log.info("Database is clean.")
+
+    setup_database_mode()
+
+    create_indices()
+
+    worker_count = args.worker_count
+    rep_count = args.repetition_count
+
+    workers = []
+    for i in range(worker_count - 2):
+        workers.append(Worker(Constants.CREATE_FUNCTION, i, worker_count - 2, rep_count, 0.1))
+    workers.append(Worker(Constants.DELETE_FUNCTION, -1, 1, rep_count * 1.5, 0.1))
+    workers.append(Worker(Constants.MONITOR_CLEANUP_FUNCTION, -1, 1, rep_count * 1.5, 0.1))
+
+    with multiprocessing.Pool(processes=worker_count) as p:
+        for worker in p.map(execute_function, workers):
+            log.info(f"Worker {worker.type} finished!")
+
+
+if __name__ == "__main__":
+    logging.basicConfig(level=args.logging)
+
+    session = SessionCache.argument_session(args)
+    memory_tracker_size, memory_res = get_storage_data(session)
+
+    # global var used in monitoring
+    initial_diff = memory_res - memory_tracker_size
+
+    execution_handler()
+    if args.logging in ["DEBUG", "INFO"]:
+        output_data.dump()
diff --git a/tests/stress/test_config.py b/tests/stress/test_config.py
index f740a150e..2228be249 100644
--- a/tests/stress/test_config.py
+++ b/tests/stress/test_config.py
@@ -12,6 +12,7 @@ class DatasetConstants:
     OPTIONS = "options"
     TIMEOUT = "timeout"
     MODE = "mode"
+    MEMGRAPH_OPTIONS = "memgraph_options"
 
 
 @dataclass
@@ -68,6 +69,20 @@ SMALL_DATASET = [
         DatasetConstants.TIMEOUT: 5,
         DatasetConstants.MODE: [get_default_database_mode()],
     },
+    {
+        DatasetConstants.TEST: "memory_tracker.py",
+        DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
+        DatasetConstants.TIMEOUT: 5,
+        DatasetConstants.MODE: [get_default_database_mode()],
+        DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
+    },
+    {
+        DatasetConstants.TEST: "memory_limit.py",
+        DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
+        DatasetConstants.TIMEOUT: 5,
+        DatasetConstants.MODE: [get_default_database_mode()],
+        DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
+    },
     {
         DatasetConstants.TEST: "create_match.py",
         DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"],