diff --git a/cmake/FindJemalloc.cmake b/cmake/FindJemalloc.cmake deleted file mode 100644 index 1f336fd10..000000000 --- a/cmake/FindJemalloc.cmake +++ /dev/null @@ -1,55 +0,0 @@ -# Try to find jemalloc library -# -# Use this module as: -# find_package(Jemalloc) -# -# or: -# find_package(Jemalloc REQUIRED) -# -# This will define the following variables: -# -# Jemalloc_FOUND True if the system has the jemalloc library. -# Jemalloc_INCLUDE_DIRS Include directories needed to use jemalloc. -# Jemalloc_LIBRARIES Libraries needed to link to jemalloc. -# -# The following cache variables may also be set: -# -# Jemalloc_INCLUDE_DIR The directory containing jemalloc/jemalloc.h. -# Jemalloc_LIBRARY The path to the jemalloc static library. - -find_path(Jemalloc_INCLUDE_DIR NAMES jemalloc/jemalloc.h PATH_SUFFIXES include) - -find_library(Jemalloc_LIBRARY NAMES libjemalloc.a PATH_SUFFIXES lib) - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(Jemalloc - FOUND_VAR Jemalloc_FOUND - REQUIRED_VARS - Jemalloc_LIBRARY - Jemalloc_INCLUDE_DIR -) - -if(Jemalloc_FOUND) - set(Jemalloc_LIBRARIES ${Jemalloc_LIBRARY}) - set(Jemalloc_INCLUDE_DIRS ${Jemalloc_INCLUDE_DIR}) -else() - if(Jemalloc_FIND_REQUIRED) - message(FATAL_ERROR "Cannot find jemalloc!") - else() - message(WARNING "jemalloc is not found!") - endif() -endif() - -if(Jemalloc_FOUND AND NOT TARGET Jemalloc::Jemalloc) - add_library(Jemalloc::Jemalloc UNKNOWN IMPORTED) - set_target_properties(Jemalloc::Jemalloc - PROPERTIES - IMPORTED_LOCATION "${Jemalloc_LIBRARY}" - INTERFACE_INCLUDE_DIRECTORIES "${Jemalloc_INCLUDE_DIR}" - ) -endif() - -mark_as_advanced( - Jemalloc_INCLUDE_DIR - Jemalloc_LIBRARY -) \ No newline at end of file diff --git a/cmake/Findjemalloc.cmake b/cmake/Findjemalloc.cmake new file mode 100644 index 000000000..1e24f0f30 --- /dev/null +++ b/cmake/Findjemalloc.cmake @@ -0,0 +1,67 @@ +# Try to find jemalloc library +# +# Use this module as: +# find_package(jemalloc) +# +# or: +# find_package(jemalloc REQUIRED) +# +# This will define the following variables: +# +# JEMALLOC_FOUND True if the system has the jemalloc library. +# Jemalloc_INCLUDE_DIRS Include directories needed to use jemalloc. +# Jemalloc_LIBRARIES Libraries needed to link to jemalloc. +# +# The following cache variables may also be set: +# +# Jemalloc_INCLUDE_DIR The directory containing jemalloc/jemalloc.h. +# Jemalloc_LIBRARY The path to the jemalloc static library. + + + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(jemalloc + FOUND_VAR JEMALLOC_FOUND + REQUIRED_VARS + JEMALLOC_LIBRARY + JEMALLOC_INCLUDE_DIR +) + +if(JEMALLOC_INCLUDE_DIR) + message(STATUS "Found jemalloc include dir: ${JEMALLOC_INCLUDE_DIR}") +else() + message(WARNING "jemalloc not found!") +endif() + +if(JEMALLOC_LIBRARY) + message(STATUS "Found jemalloc library: ${JEMALLOC_LIBRARY}") +else() + message(WARNING "jemalloc library not found!") +endif() + +if(JEMALLOC_FOUND) + set(Jemalloc_LIBRARIES ${JEMALLOC_LIBRARY}) + set(Jemalloc_INCLUDE_DIRS ${JEMALLOC_INCLUDE_DIR}) +else() + if(Jemalloc_FIND_REQUIRED) + message(FATAL_ERROR "Cannot find jemalloc!") + else() + message(WARNING "jemalloc is not found!") + endif() +endif() + +if(JEMALLOC_FOUND AND NOT TARGET Jemalloc::Jemalloc) + message(STATUS "JEMALLOC NOT TARGET") + + add_library(Jemalloc::Jemalloc UNKNOWN IMPORTED) + set_target_properties(Jemalloc::Jemalloc + PROPERTIES + IMPORTED_LOCATION "${JEMALLOC_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${JEMALLOC_INCLUDE_DIR}" + ) +endif() + +mark_as_advanced( + JEMALLOC_INCLUDE_DIR + JEMALLOC_LIBRARY +) diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 17f013eff..411b180ab 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -15,7 +15,6 @@ set(GFLAGS_NOTHREADS OFF) # NOTE: config/generate.py depends on the gflags help XML format. find_package(gflags REQUIRED) find_package(fmt 8.0.1) -find_package(Jemalloc REQUIRED) find_package(ZLIB 1.2.11 REQUIRED) set(LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR}) @@ -99,6 +98,17 @@ macro(import_external_library name type library_location include_dir) import_library(${name} ${type} ${${_upper_name}_LIBRARY} ${${_upper_name}_INCLUDE_DIR}) endmacro(import_external_library) + +macro(set_path_external_library name type library_location include_dir) + string(TOUPPER ${name} _upper_name) + set(${_upper_name}_LIBRARY ${library_location} CACHE FILEPATH + "Path to ${name} library" FORCE) + set(${_upper_name}_INCLUDE_DIR ${include_dir} CACHE FILEPATH + "Path to ${name} include directory" FORCE) + mark_as_advanced(${name}_LIBRARY ${name}_INCLUDE_DIR) +endmacro(set_path_external_library) + + # setup antlr import_external_library(antlr4 STATIC ${CMAKE_CURRENT_SOURCE_DIR}/antlr4/runtime/Cpp/lib/libantlr4-runtime.a @@ -265,3 +275,8 @@ import_header_library(ctre ${CMAKE_CURRENT_SOURCE_DIR}) # setup absl (cmake sub_directory tolerant) set(ABSL_PROPAGATE_CXX_STD ON) add_subdirectory(absl EXCLUDE_FROM_ALL) + +# set Jemalloc +set_path_external_library(jemalloc STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc/lib/libjemalloc.a + ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc/include/) diff --git a/libs/setup.sh b/libs/setup.sh index d0fb41b0f..638c1b9f2 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -124,6 +124,7 @@ declare -A primary_urls=( ["librdtsc"]="http://$local_cache_host/git/librdtsc.git" ["ctre"]="http://$local_cache_host/file/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp" ["absl"]="https://$local_cache_host/git/abseil-cpp.git" + ["jemalloc"]="https://$local_cache_host/git/jemalloc.git" ) # The goal of secondary urls is to have links to the "source of truth" of @@ -151,6 +152,7 @@ declare -A secondary_urls=( ["librdtsc"]="https://github.com/gabrieleara/librdtsc.git" ["ctre"]="https://raw.githubusercontent.com/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp" ["absl"]="https://github.com/abseil/abseil-cpp.git" + ["jemalloc"]="https://github.com/jemalloc/jemalloc.git" ) # antlr @@ -252,3 +254,21 @@ cd .. # abseil 20230125.3 absl_ref="20230125.3" repo_clone_try_double "${primary_urls[absl]}" "${secondary_urls[absl]}" "absl" "$absl_ref" + + +# jemalloc ea6b3e973b477b8061e0076bb257dbd7f3faa756 +JEMALLOC_COMMIT_VERSION="5.2.1" +repo_clone_try_double "${secondary_urls[jemalloc]}" "${secondary_urls[jemalloc]}" "jemalloc" "$JEMALLOC_COMMIT_VERSION" + +# this is hack for cmake in libs to set path, and for FindJemalloc to use Jemalloc_INCLUDE_DIR +pushd jemalloc + +./autogen.sh +MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \ +./configure \ + --disable-cxx \ + --enable-shared=no --prefix=$working_dir \ + --with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" + +make -j$CPUS install +popd diff --git a/src/memgraph.cpp b/src/memgraph.cpp index dcbef2e88..5fe343ffb 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -23,6 +23,7 @@ #include "glue/run_id.hpp" #include "helpers.hpp" #include "license/license_sender.hpp" +#include "memory/memory_control.hpp" #include "query/config.hpp" #include "query/discard_value_stream.hpp" #include "query/interpreter.hpp" @@ -108,6 +109,7 @@ void InitSignalHandlers(const std::function &shutdown_fun) { } int main(int argc, char **argv) { + memgraph::memory::SetHooks(); google::SetUsageMessage("Memgraph database server"); gflags::SetVersionString(version_string); @@ -199,7 +201,6 @@ int main(int argc, char **argv) { "won't be available."); } } - std::cout << "You are running Memgraph v" << gflags::VersionString() << std::endl; std::cout << "To get started with Memgraph, visit https://memgr.ph/start" << std::endl; diff --git a/src/memory/CMakeLists.txt b/src/memory/CMakeLists.txt index 0aaf47702..bf3cbb23b 100644 --- a/src/memory/CMakeLists.txt +++ b/src/memory/CMakeLists.txt @@ -2,7 +2,9 @@ set(memory_src_files new_delete.cpp memory_control.cpp) -find_package(Jemalloc REQUIRED) + + +find_package(jemalloc REQUIRED) add_library(mg-memory STATIC ${memory_src_files}) target_link_libraries(mg-memory mg-utils fmt) diff --git a/src/memory/memory_control.cpp b/src/memory/memory_control.cpp index 2af313d09..b3eeb6c26 100644 --- a/src/memory/memory_control.cpp +++ b/src/memory/memory_control.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,6 +10,8 @@ // licenses/APL.txt. #include "memory_control.hpp" +#include "utils/logging.hpp" +#include "utils/memory_tracker.hpp" #if USE_JEMALLOC #include @@ -22,6 +24,179 @@ namespace memgraph::memory { // NOLINTNEXTLINE(cppcoreguidelines-macro-usage) #define STRINGIFY(x) STRINGIFY_HELPER(x) +#if USE_JEMALLOC + +static void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t alignment, bool *zero, + bool *commit, unsigned arena_ind); +static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind); +static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind); +static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length, + unsigned arena_ind); +static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length, + unsigned arena_ind); +static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length, + unsigned arena_ind); +extent_hooks_t *old_hooks = nullptr; + +static extent_hooks_t custom_hooks = { + .alloc = &my_alloc, + .dalloc = &my_dalloc, + .destroy = &my_destroy, + .commit = &my_commit, + .decommit = &my_decommit, + .purge_lazy = nullptr, + .purge_forced = &my_purge_forced, + .split = nullptr, + .merge = nullptr, +}; + +static const extent_hooks_t *new_hooks = &custom_hooks; + +void *my_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size, size_t alignment, bool *zero, bool *commit, + unsigned arena_ind) { + // This needs to be before, to throw exception in case of too big alloc + if (*commit) [[likely]] { + memgraph::utils::total_memory_tracker.Alloc(static_cast(size)); + } + + auto *ptr = old_hooks->alloc(extent_hooks, new_addr, size, alignment, zero, commit, arena_ind); + if (ptr == nullptr) [[unlikely]] { + if (*commit) { + memgraph::utils::total_memory_tracker.Free(static_cast(size)); + } + return ptr; + } + + return ptr; +} + +static bool my_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) { + auto err = old_hooks->dalloc(extent_hooks, addr, size, committed, arena_ind); + + if (err) [[unlikely]] { + return err; + } + + if (committed) [[likely]] { + memgraph::utils::total_memory_tracker.Free(static_cast(size)); + } + + return false; +} + +static void my_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size, bool committed, unsigned arena_ind) { + if (committed) [[likely]] { + memgraph::utils::total_memory_tracker.Free(static_cast(size)); + } + + old_hooks->destroy(extent_hooks, addr, size, committed, arena_ind); +} + +static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length, + unsigned arena_ind) { + auto err = old_hooks->commit(extent_hooks, addr, size, offset, length, arena_ind); + + if (err) { + return err; + } + + memgraph::utils::total_memory_tracker.Alloc(static_cast(length)); + + return false; +} + +static bool my_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length, + unsigned arena_ind) { + MG_ASSERT(old_hooks && old_hooks->decommit); + auto err = old_hooks->decommit(extent_hooks, addr, size, offset, length, arena_ind); + + if (err) { + return err; + } + + memgraph::utils::total_memory_tracker.Free(static_cast(length)); + + return false; +} + +static bool my_purge_forced(extent_hooks_t *extent_hooks, void *addr, size_t size, size_t offset, size_t length, + unsigned arena_ind) { + MG_ASSERT(old_hooks && old_hooks->purge_forced); + auto err = old_hooks->purge_forced(extent_hooks, addr, size, offset, length, arena_ind); + + if (err) [[unlikely]] { + return err; + } + memgraph::utils::total_memory_tracker.Free(static_cast(length)); + + return false; +} + +#endif + +void SetHooks() { +#if USE_JEMALLOC + + uint64_t allocated{0}; + uint64_t sz{sizeof(allocated)}; + + sz = sizeof(unsigned); + unsigned n_arenas{0}; + int err = mallctl("opt.narenas", (void *)&n_arenas, &sz, nullptr, 0); + + if (err) { + return; + } + + if (nullptr != old_hooks) { + return; + } + + for (int i = 0; i < n_arenas; i++) { + std::string func_name = "arena." + std::to_string(i) + ".extent_hooks"; + + size_t hooks_len = sizeof(old_hooks); + + int err = mallctl(func_name.c_str(), &old_hooks, &hooks_len, nullptr, 0); + + if (err) { + LOG_FATAL("Error getting hooks for jemalloc arena {}", i); + } + + // Due to the way jemalloc works, we need first to set their hooks + // which will trigger creating arena, then we can set our custom hook wrappers + + err = mallctl(func_name.c_str(), nullptr, nullptr, &old_hooks, sizeof(old_hooks)); + + MG_ASSERT(old_hooks); + MG_ASSERT(old_hooks->alloc); + MG_ASSERT(old_hooks->dalloc); + MG_ASSERT(old_hooks->destroy); + MG_ASSERT(old_hooks->commit); + MG_ASSERT(old_hooks->decommit); + MG_ASSERT(old_hooks->purge_forced); + MG_ASSERT(old_hooks->purge_lazy); + MG_ASSERT(old_hooks->split); + MG_ASSERT(old_hooks->merge); + + custom_hooks.purge_lazy = old_hooks->purge_lazy; + custom_hooks.split = old_hooks->split; + custom_hooks.merge = old_hooks->merge; + + if (err) { + LOG_FATAL("Error setting jemalloc hooks for jemalloc arena {}", i); + } + + err = mallctl(func_name.c_str(), nullptr, nullptr, &new_hooks, sizeof(new_hooks)); + + if (err) { + LOG_FATAL("Error setting custom hooks for jemalloc arena {}", i); + } + } + +#endif +} + void PurgeUnusedMemory() { #if USE_JEMALLOC mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); @@ -30,4 +205,5 @@ void PurgeUnusedMemory() { #undef STRINGIFY #undef STRINGIFY_HELPER + } // namespace memgraph::memory diff --git a/src/memory/memory_control.hpp b/src/memory/memory_control.hpp index c1258becd..471acf774 100644 --- a/src/memory/memory_control.hpp +++ b/src/memory/memory_control.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -11,6 +11,11 @@ #pragma once +#include +#include "utils/logging.hpp" namespace memgraph::memory { + void PurgeUnusedMemory(); +void SetHooks(); + } // namespace memgraph::memory diff --git a/src/memory/new_delete.cpp b/src/memory/new_delete.cpp index b656790c1..2f982ec67 100644 --- a/src/memory/new_delete.cpp +++ b/src/memory/new_delete.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -87,21 +87,15 @@ void deleteSized(void *ptr, const std::size_t /*unused*/, const std::align_val_t #endif void TrackMemory(std::size_t size) { -#if USE_JEMALLOC - if (size != 0) [[likely]] { - size = nallocx(size, 0); - } -#endif +#if !USE_JEMALLOC memgraph::utils::total_memory_tracker.Alloc(static_cast(size)); +#endif } void TrackMemory(std::size_t size, const std::align_val_t align) { -#if USE_JEMALLOC - if (size != 0) [[likely]] { - size = nallocx(size, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise) - } -#endif +#if !USE_JEMALLOC memgraph::utils::total_memory_tracker.Alloc(static_cast(size)); +#endif } bool TrackMemoryNoExcept(const std::size_t size) { @@ -126,11 +120,7 @@ bool TrackMemoryNoExcept(const std::size_t size, const std::align_val_t align) { void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size = 0) noexcept { try { -#if USE_JEMALLOC - if (ptr != nullptr) [[likely]] { - memgraph::utils::total_memory_tracker.Free(sallocx(ptr, 0)); - } -#else +#if !USE_JEMALLOC if (size) { memgraph::utils::total_memory_tracker.Free(static_cast(size)); } else { @@ -144,11 +134,7 @@ void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size void UntrackMemory(void *ptr, const std::align_val_t align, [[maybe_unused]] std::size_t size = 0) noexcept { try { -#if USE_JEMALLOC - if (ptr != nullptr) [[likely]] { - memgraph::utils::total_memory_tracker.Free(sallocx(ptr, MALLOCX_ALIGN(align))); // NOLINT(hicpp-signed-bitwise) - } -#else +#if !USE_JEMALLOC if (size) { memgraph::utils::total_memory_tracker.Free(static_cast(size)); } else { diff --git a/tests/e2e/disk_storage/workloads.yaml b/tests/e2e/disk_storage/workloads.yaml index 32c558935..e2ee51f25 100644 --- a/tests/e2e/disk_storage/workloads.yaml +++ b/tests/e2e/disk_storage/workloads.yaml @@ -1,7 +1,7 @@ disk_storage: &disk_storage cluster: main: - args: ["--bolt-port", "7687", "--log-level", "TRACE", "--memory-limit", "50"] + args: ["--bolt-port", "7687", "--log-level", "TRACE", "--memory-limit", "125"] log_file: "disk_storage.log" setup_queries: [] validation_queries: [] diff --git a/tests/e2e/memgraph.py b/tests/e2e/memgraph.py index dff0f1d49..2b80c2f62 100755 --- a/tests/e2e/memgraph.py +++ b/tests/e2e/memgraph.py @@ -130,9 +130,14 @@ class MemgraphInstanceRunner: def stop(self): if not self.is_running(): return - self.proc_mg.terminate() - code = self.proc_mg.wait() - assert code == 0, "The Memgraph process exited with non-zero!" + + pid = self.proc_mg.pid + try: + os.kill(pid, 15) # 15 is the signal number for SIGTERM + except os.OSError: + assert False + + time.sleep(1) def kill(self): if not self.is_running(): diff --git a/tests/e2e/memory/workloads.yaml b/tests/e2e/memory/workloads.yaml index 667613114..460d6bc3f 100644 --- a/tests/e2e/memory/workloads.yaml +++ b/tests/e2e/memory/workloads.yaml @@ -2,7 +2,7 @@ bolt_port: &bolt_port "7687" args: &args - "--bolt-port" - *bolt_port - - "--memory-limit=1000" + - "--memory-limit=1024" - "--storage-gc-cycle-sec=180" - "--log-level=TRACE" diff --git a/tests/stress/common.py b/tests/stress/common.py index 6ab1be65a..d5235132e 100644 --- a/tests/stress/common.py +++ b/tests/stress/common.py @@ -100,6 +100,25 @@ def execute_till_success(session, query, max_retries=1000): raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries)) +def try_execute(session, query: str): + """ + Executes a query within Bolt session + + Args: + session - the bolt session to execute the query with + query - str, the query to execute + + :param session: active Bolt session + :param query: query to execute + + :return: None + """ + result = session.run(query) + data = result.data() + summary = result.consume() + return data, summary + + def batch(input, batch_size): """Batches the given input (must be iterable). Supports input generators. Returns a generator. diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 8744eecae..7d2dbc76d 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -8,7 +8,7 @@ import subprocess import time from argparse import Namespace as Args from subprocess import Popen -from typing import Dict, List +from typing import Dict, List, Optional from test_config import LARGE_DATASET, SMALL_DATASET, DatabaseMode, DatasetConstants @@ -79,7 +79,7 @@ def wait_for_server(port, delay=0.1) -> None: time.sleep(delay) -def start_memgraph(args: Args) -> Popen: +def start_memgraph(args: Args, memgraph_options: List[str]) -> Popen: """Starts Memgraph and return the process""" cwd = os.path.dirname(args.memgraph) cmd = [ @@ -93,7 +93,7 @@ def start_memgraph(args: Args) -> Popen: "--storage-recover-on-startup=false", "--query-execution-timeout-sec=1200", "--bolt-server-name-for-init=Neo4j/", - ] + ] + memgraph_options if not args.verbose: cmd += ["--log-level", "WARNING"] if args.log_file: @@ -168,7 +168,7 @@ def _find_test_binary(args: Args, test: str) -> List[str]: raise Exception("Test '{}' binary not supported!".format(test)) -def run_stress_test_suite(args: Args) -> Dict[str, float]: +def run_stress_test_suite(args: Args) -> Optional[Dict[str, float]]: def cleanup(memgraph_proc): if memgraph_proc.poll() != None: return @@ -192,16 +192,30 @@ def run_stress_test_suite(args: Args) -> Dict[str, float]: for mode in test[DatasetConstants.MODE]: test_run = copy.deepcopy(test) - # Run for every specified combination of storage mode, serialization mode, etc (if extended) test_run[DatasetConstants.MODE] = mode - - memgraph_proc = start_memgraph(args) - runtime = run_test(args, **test_run) - runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime - - stop_memgraph(memgraph_proc) - cleanup(memgraph_proc) + try: + memgraph_options = ( + test_run[DatasetConstants.MEMGRAPH_OPTIONS] if DatasetConstants.MEMGRAPH_OPTIONS in test_run else [] + ) + if DatasetConstants.MEMGRAPH_OPTIONS in test_run: + del test_run[DatasetConstants.MEMGRAPH_OPTIONS] + memgraph_proc = start_memgraph(args, memgraph_options) + except Exception as ex: + print("Exception occured while starting memgraph", ex) + return None + err = False + try: + runtime = run_test(args, **test_run) + runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime + except Exception as ex: + print(f"Failed to execute {test_run} with following exception:", ex) + err = True + finally: + stop_memgraph(memgraph_proc) + cleanup(memgraph_proc) + if err: + return None return runtimes @@ -225,10 +239,12 @@ if __name__ == "__main__": generate_temporary_ssl_certs() runtimes = run_stress_test_suite(args) - + if runtimes is None: + print("Some stress tests have failed") + exit(1) + assert runtimes is not None if args.use_ssl: remove_certificates() - write_stats(runtimes) print("Successfully ran stress tests!") diff --git a/tests/stress/memory_limit.py b/tests/stress/memory_limit.py new file mode 100644 index 000000000..8c6b699a9 --- /dev/null +++ b/tests/stress/memory_limit.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +""" +Stress test for monitoring how memory tracker behaves when +there is lot of node creation and deletions compared +to RES memory usage. +""" + +import atexit +import logging +import multiprocessing +import time +from argparse import Namespace as Args +from dataclasses import dataclass +from functools import wraps +from typing import Any, Callable, Dict, List, Optional, Tuple + +from common import ( + OutputData, + SessionCache, + connection_argument_parser, + execute_till_success, + try_execute, +) + +log = logging.getLogger(__name__) +output_data = OutputData() + + +class Constants: + CREATE_FUNCTION = "CREATE" + + +atexit.register(SessionCache.cleanup) + +MEMORY_LIMIT = 2048 + + +def parse_args() -> Args: + """ + Parses user arguments + + :return: parsed arguments + """ + parser = connection_argument_parser() + parser.add_argument("--worker-count", type=int, default=5, help="Number of concurrent workers.") + parser.add_argument( + "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level" + ) + parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action") + parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.") + parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.") + + return parser.parse_args() + + +# Global variables + + +args = parse_args() + +# Difference between memory RES and memory tracker on +# Memgraph start. +# Due to various other things which are included in RES +# there is difference of ~30MBs initially. +initial_diff = 0 + + +@dataclass +class Worker: + """ + Class that performs a function defined in the `type` argument. + + Args: + type - either `CREATE` or `DELETE`, signifying the function that's going to be performed + by the worker + id - worker id + total_worker_cnt - total number of workers for reference + repetition_count - number of times to perform the worker action + sleep_sec - float for subsecond sleeping between two subsequent actions + """ + + type: str + id: int + total_worker_cnt: int + repetition_count: int + sleep_sec: float + + +def timed_function(name) -> Callable: + """ + Times performed function + """ + + def actual_decorator(func) -> Callable: + @wraps(func) + def timed_wrapper(*args, **kwargs) -> Any: + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + output_data.add_measurement(name, end_time - start_time) + return result + + return timed_wrapper + + return actual_decorator + + +@timed_function("cleanup_time") +def clean_database() -> None: + session = SessionCache.argument_session(args) + execute_till_success(session, "MATCH (n) DETACH DELETE n") + + +def create_indices() -> None: + session = SessionCache.argument_session(args) + execute_till_success(session, "CREATE INDEX ON :Node") + + +def setup_database_mode() -> None: + session = SessionCache.argument_session(args) + execute_till_success(session, f"STORAGE MODE {args.storage_mode}") + execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}") + + +def get_tracker_data(session) -> Optional[float]: + def parse_data(allocated: str) -> float: + num = 0 + if "KiB" in allocated or "MiB" in allocated or "GiB" in allocated or "TiB" in allocated: + num = float(allocated[:-3]) + else: + num = float(allocated[-1]) + + if "KiB" in allocated: + return num / 1024 + if "MiB" in allocated: + return num + if "GiB" in allocated: + return num * 1024 + else: + return num * 1024 * 1024 + + def isolate_value(data: List[Dict[str, Any]], key: str) -> Optional[str]: + for dict in data: + if dict["storage info"] == key: + return dict["value"] + return None + + try: + data, _ = try_execute(session, f"SHOW STORAGE INFO") + memory_tracker_data = isolate_value(data, "memory_allocated") + + return parse_data(memory_tracker_data) + + except Exception as ex: + log.info(f"Get storage info failed with error", ex) + return None + + +def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int: + """ + This writer creates lot of nodes on each write. + Also it checks that query failed if memory limit is tried to be broken + """ + + session = SessionCache.argument_session(args) + + def create() -> bool: + """ + Returns True if done, False if needs to continue + """ + memory_tracker_data_before_start = get_tracker_data(session) + should_fail = memory_tracker_data_before_start >= 2048 + failed = False + try: + try_execute( + session, + f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))", + ) + except Exception as ex: + failed = True + output = str(ex) + log.info("Exception in create", output) + assert "Memory limit exceeded!" in output + + if should_fail: + assert failed, "Query should have failed" + return False + return True + + curr_repetition = 0 + + while curr_repetition < repetition_count: + log.info(f"Worker {worker_id} started iteration {curr_repetition}") + + should_continue = create() + + if not should_continue: + return True + + time.sleep(sleep_sec) + log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") + + curr_repetition += 1 + + +def execute_function(worker: Worker) -> Worker: + """ + Executes the function based on the worker type + """ + if worker.type == Constants.CREATE_FUNCTION: + run_writer(worker.repetition_count, worker.sleep_sec, worker.id) + log.info(f"Worker {worker.type} finished!") + return worker + + raise Exception("Worker function not recognized, raising exception!") + + +@timed_function("total_execution_time") +def execution_handler() -> None: + clean_database() + log.info("Database is clean.") + + setup_database_mode() + + create_indices() + + worker_count = args.worker_count + rep_count = args.repetition_count + + workers = [] + for i in range(worker_count): + workers.append(Worker(Constants.CREATE_FUNCTION, i, worker_count, rep_count, 0.1)) + + with multiprocessing.Pool(processes=worker_count) as p: + for worker in p.map(execute_function, workers): + log.info(f"Worker {worker.type} finished!") + + +if __name__ == "__main__": + logging.basicConfig(level=args.logging) + + execution_handler() + if args.logging in ["DEBUG", "INFO"]: + output_data.dump() diff --git a/tests/stress/memory_tracker.py b/tests/stress/memory_tracker.py new file mode 100644 index 000000000..a9ca26b06 --- /dev/null +++ b/tests/stress/memory_tracker.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +""" +Stress test for monitoring how memory tracker behaves when +there is lot of node creation and deletions compared +to RES memory usage. +""" + +import atexit +import logging +import multiprocessing +import time +from argparse import Namespace as Args +from dataclasses import dataclass +from functools import wraps +from typing import Any, Callable, Dict, List, Optional, Tuple + +from common import ( + OutputData, + SessionCache, + connection_argument_parser, + execute_till_success, + try_execute, +) + +log = logging.getLogger(__name__) +output_data = OutputData() + + +class Constants: + CREATE_FUNCTION = "CREATE" + DELETE_FUNCTION = "DELETE" + MONITOR_CLEANUP_FUNCTION = "MONITOR_CLEANUP" + + +atexit.register(SessionCache.cleanup) + + +def parse_args() -> Args: + """ + Parses user arguments + + :return: parsed arguments + """ + parser = connection_argument_parser() + parser.add_argument("--worker-count", type=int, default=5, help="Number of concurrent workers.") + parser.add_argument( + "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level" + ) + parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action") + parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.") + parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.") + + return parser.parse_args() + + +# Global variables + + +args = parse_args() + +# Difference between memory RES and memory tracker on +# Memgraph start. +# Due to various other things which are included in RES +# there is difference of ~30MBs initially. +initial_diff = 0 + + +@dataclass +class Worker: + """ + Class that performs a function defined in the `type` argument. + + Args: + type - either `CREATE` or `DELETE`, signifying the function that's going to be performed + by the worker + id - worker id + total_worker_cnt - total number of workers for reference + repetition_count - number of times to perform the worker action + sleep_sec - float for subsecond sleeping between two subsequent actions + """ + + type: str + id: int + total_worker_cnt: int + repetition_count: int + sleep_sec: float + + +def timed_function(name) -> Callable: + """ + Times performed function + """ + + def actual_decorator(func) -> Callable: + @wraps(func) + def timed_wrapper(*args, **kwargs) -> Any: + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + output_data.add_measurement(name, end_time - start_time) + return result + + return timed_wrapper + + return actual_decorator + + +@timed_function("cleanup_time") +def clean_database() -> None: + session = SessionCache.argument_session(args) + execute_till_success(session, "MATCH (n) DETACH DELETE n") + + +def create_indices() -> None: + session = SessionCache.argument_session(args) + execute_till_success(session, "CREATE INDEX ON :Node") + + +def setup_database_mode() -> None: + session = SessionCache.argument_session(args) + execute_till_success(session, f"STORAGE MODE {args.storage_mode}") + execute_till_success(session, f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}") + + +def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int: + """ + This writer creates lot of nodes on each write. + """ + session = SessionCache.argument_session(args) + + def create() -> bool: + exception_occured = False + + memory_allocated_before, _ = get_storage_data(session) + count_before = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"] + try: + try_execute( + session, + f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))", + ) + except Exception as ex: + log.info(f"Exception occured during create: {ex}") + exception_occured = True + + memory_allocated_after, _ = get_storage_data(session) + count_after = execute_till_success(session, f"MATCH (n) RETURN COUNT(n) AS cnt")[0][0]["cnt"] + if exception_occured: + log.info( + f"Exception occured, stopping exection of run {Constants.CREATE_FUNCTION} worker." + f"Memory stats: before query: {memory_allocated_before}, after query: {memory_allocated_after}." + f"Node stats: before query {count_before}, after query {count_after}" + ) + return False + + return True + + curr_repetition = 0 + + while curr_repetition < repetition_count: + log.info(f"Worker {worker_id} started iteration {curr_repetition}") + + success = create() + + assert success, "Create was not successfull" + + time.sleep(sleep_sec) + log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") + + curr_repetition += 1 + + +def run_deleter(repetition_count: int, sleep_sec: float) -> None: + """ + Deleting of whole graph + """ + session = SessionCache.argument_session(args) + + def delete_graph() -> None: + try: + execute_till_success(session, f"MATCH (n) DETACH DELETE n") + log.info(f"Worker deleted all nodes") + except Exception as ex: + log.info(f"Worker failed to delete") + pass + + curr_repetition = 0 + while curr_repetition < repetition_count: + delete_graph() + time.sleep(sleep_sec) + curr_repetition += 1 + + +def get_storage_data(session) -> Tuple[float, float]: + def parse_data(allocated: str) -> float: + num = 0 + if "KiB" in allocated or "MiB" in allocated or "GiB" in allocated or "TiB" in allocated: + num = float(allocated[:-3]) + else: + num = float(allocated[-1]) + + if "KiB" in allocated: + return num / 1024 + if "MiB" in allocated: + return num + if "GiB" in allocated: + return num * 1024 + else: + return num * 1024 * 1024 + + def isolate_value(data: List[Dict[str, Any]], key: str) -> Optional[str]: + for dict in data: + if dict["storage info"] == key: + return dict["value"] + return None + + try: + data = execute_till_success(session, f"SHOW STORAGE INFO")[0] + res_data = isolate_value(data, "memory_usage") + memory_tracker_data = isolate_value(data, "memory_allocated") + log.info( + f"Worker {Constants.MONITOR_CLEANUP_FUNCTION} logged memory: memory tracker {memory_tracker_data} vs res data {res_data}" + ) + + return parse_data(memory_tracker_data), parse_data(res_data) + + except Exception as ex: + log.info(f"Get storage info failed with error", ex) + raise Exception(f"Worker {Constants.MONITOR_CLEANUP_FUNCTION} can't continue working") + + +def run_monitor_cleanup(repetition_count: int, sleep_sec: float) -> None: + """ + Monitoring of graph and periodic cleanup. + Idea is that cleanup is in this function + so that we can make thread sleep for a while + and give RES vs memory tracker time to stabilize + to reduce flakeyness of test + """ + session = SessionCache.argument_session(args) + + curr_repetition = 0 + while curr_repetition < repetition_count: + memory_tracker, res_data = get_storage_data(session) + + if memory_tracker < res_data and ((memory_tracker + initial_diff) < res_data): + # maybe RES got measured wrongly + # Problem with test using detach delete and memory tracker + # is that memory tracker gets updated immediately + # whereas RES takes some time + cnt_again = 3 + skip_failure = False + # 10% is maximum increment, afterwards is fail + multiplier = 1 + while cnt_again: + new_memory_tracker, new_res_data = get_storage_data(session) + + if new_memory_tracker > new_res_data or ( + (new_memory_tracker + initial_diff) * multiplier > new_res_data + ): + skip_failure = True + log.info( + f"Skipping failure on new data:" + f"memory tracker: {new_memory_tracker}, initial diff: {initial_diff}," + f"RES data: {new_res_data}, multiplier: {multiplier}" + ) + break + multiplier += 0.05 + cnt_again -= 1 + if not skip_failure: + log.info(memory_tracker, initial_diff, res_data) + assert False, "Memory tracker is off by more than 10%, check logs for details" + + def run_cleanup(): + try: + execute_till_success(session, f"FREE MEMORY") + log.info(f"Worker deleted all nodes") + except Exception as ex: + log.info(f"Worker failed to delete") + pass + + # idea is to run cleanup from this thread and let thread sleep for a while so RES + # gets stabilized + if curr_repetition % 10 == 0: + run_cleanup() + + time.sleep(sleep_sec) + curr_repetition += 1 + + +def execute_function(worker: Worker) -> Worker: + """ + Executes the function based on the worker type + """ + if worker.type == Constants.CREATE_FUNCTION: + run_writer(worker.repetition_count, worker.sleep_sec, worker.id) + log.info(f"Worker {worker.type} finished!") + return worker + + elif worker.type == Constants.DELETE_FUNCTION: + run_deleter(worker.repetition_count, worker.sleep_sec) + log.info(f"Worker {worker.type} finished!") + return worker + + elif worker.type == Constants.MONITOR_CLEANUP_FUNCTION: + run_monitor_cleanup(worker.repetition_count, worker.sleep_sec) + log.info(f"Worker {worker.type} finished!") + return worker + + raise Exception("Worker function not recognized, raising exception!") + + +@timed_function("total_execution_time") +def execution_handler() -> None: + clean_database() + log.info("Database is clean.") + + setup_database_mode() + + create_indices() + + worker_count = args.worker_count + rep_count = args.repetition_count + + workers = [] + for i in range(worker_count - 2): + workers.append(Worker(Constants.CREATE_FUNCTION, i, worker_count - 2, rep_count, 0.1)) + workers.append(Worker(Constants.DELETE_FUNCTION, -1, 1, rep_count * 1.5, 0.1)) + workers.append(Worker(Constants.MONITOR_CLEANUP_FUNCTION, -1, 1, rep_count * 1.5, 0.1)) + + with multiprocessing.Pool(processes=worker_count) as p: + for worker in p.map(execute_function, workers): + log.info(f"Worker {worker.type} finished!") + + +if __name__ == "__main__": + logging.basicConfig(level=args.logging) + + session = SessionCache.argument_session(args) + memory_tracker_size, memory_res = get_storage_data(session) + + # global var used in monitoring + initial_diff = memory_res - memory_tracker_size + + execution_handler() + if args.logging in ["DEBUG", "INFO"]: + output_data.dump() diff --git a/tests/stress/test_config.py b/tests/stress/test_config.py index f740a150e..2228be249 100644 --- a/tests/stress/test_config.py +++ b/tests/stress/test_config.py @@ -12,6 +12,7 @@ class DatasetConstants: OPTIONS = "options" TIMEOUT = "timeout" MODE = "mode" + MEMGRAPH_OPTIONS = "memgraph_options" @dataclass @@ -68,6 +69,20 @@ SMALL_DATASET = [ DatasetConstants.TIMEOUT: 5, DatasetConstants.MODE: [get_default_database_mode()], }, + { + DatasetConstants.TEST: "memory_tracker.py", + DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"], + }, + { + DatasetConstants.TEST: "memory_limit.py", + DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"], + }, { DatasetConstants.TEST: "create_match.py", DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"],