diff --git a/CMakeLists.txt b/CMakeLists.txt index db39a4547..a5ad2612a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -292,7 +292,7 @@ if (MG_ENTERPRISE) add_definitions(-DMG_ENTERPRISE) endif() -set(ENABLE_JEMALLOC ON) +option(ENABLE_JEMALLOC "Use jemalloc" ON) if (ASAN) message(WARNING "Disabling jemalloc as it doesn't work well with ASAN") diff --git a/src/flags/general.cpp b/src/flags/general.cpp index eb8fae589..be060c52d 100644 --- a/src/flags/general.cpp +++ b/src/flags/general.cpp @@ -65,7 +65,7 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i // Storage flags. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", - FLAG_IN_RANGE(1, 24 * 3600)); + FLAG_IN_RANGE(1, 24UL * 3600)); // NOTE: The `storage_properties_on_edges` flag must be the same here and in // `mg_import_csv`. If you change it, make sure to change it there as well. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/memory/CMakeLists.txt b/src/memory/CMakeLists.txt index aadbbe23c..e975c1d5c 100644 --- a/src/memory/CMakeLists.txt +++ b/src/memory/CMakeLists.txt @@ -3,14 +3,14 @@ set(memory_src_files global_memory_control.cpp query_memory_control.cpp) - - -find_package(jemalloc REQUIRED) - add_library(mg-memory STATIC ${memory_src_files}) target_link_libraries(mg-memory mg-utils fmt) +message(STATUS "ENABLE_JEMALLOC: ${ENABLE_JEMALLOC}") if (ENABLE_JEMALLOC) + find_package(jemalloc REQUIRED) target_link_libraries(mg-memory Jemalloc::Jemalloc ${CMAKE_DL_LIBS}) target_compile_definitions(mg-memory PRIVATE USE_JEMALLOC=1) +else() + target_compile_definitions(mg-memory PRIVATE USE_JEMALLOC=0) endif() diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 147684c54..9f6d8d4d7 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -41,4 +41,4 @@ add_library(mg-storage-v2 STATIC replication/replication_storage_state.cpp inmemory/replication/replication_client.cpp ) -target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events) +target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events mg-memory) diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 9d2bc7a65..d0d1dd071 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -11,6 +11,7 @@ #include "storage/v2/inmemory/storage.hpp" #include "dbms/constants.hpp" +#include "memory/global_memory_control.hpp" #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/metadata_delta.hpp" @@ -101,8 +102,13 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) "those files into a .backup directory inside the storage directory."); } } + if (config_.gc.type == Config::Gc::Type::PERIODIC) { - gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); + // TODO: move out of storage have one global gc_runner_ + gc_runner_.Run("Storage GC", config_.gc.interval, [this] { + this->FreeMemory(std::unique_lock{main_lock_, std::defer_lock}); + }); + gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); }); } if (timestamp_ == kTimestampInitialId) { commit_log_.emplace(); @@ -116,6 +122,7 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag InMemoryStorage::~InMemoryStorage() { if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Stop(); + gc_jemalloc_runner_.Stop(); } { // Stop replication (Stop all clients or stop the REPLICA server) @@ -1210,7 +1217,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ main_lock_.lock_shared(); } } else { - MG_ASSERT(main_guard.mutex() == std::addressof(main_lock_), "main_guard should be only for the main_lock_"); + DMG_ASSERT(main_guard.mutex() == std::addressof(main_lock_), "main_guard should be only for the main_lock_"); } utils::OnScopeExit lock_releaser{[&] { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 1d16eadf1..bfb445332 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -418,6 +418,7 @@ class InMemoryStorage final : public Storage { std::optional commit_log_; utils::Scheduler gc_runner_; + utils::Scheduler gc_jemalloc_runner_; std::mutex gc_lock_; using BondPmrLd = Bond>; diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index 71d80b7ed..28fe94559 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -71,6 +71,7 @@ add_subdirectory(query_modules) add_subdirectory(constraints) add_subdirectory(inspect_query) add_subdirectory(queries) +add_subdirectory(garbage_collection) copy_e2e_python_files(pytest_runner pytest_runner.sh "") copy_e2e_python_files(x x.sh "") diff --git a/tests/e2e/garbage_collection/CMakeLists.txt b/tests/e2e/garbage_collection/CMakeLists.txt new file mode 100644 index 000000000..690edf344 --- /dev/null +++ b/tests/e2e/garbage_collection/CMakeLists.txt @@ -0,0 +1,7 @@ +function(garbage_collection_e2e_python_files FILE_NAME) + copy_e2e_python_files(garbage_collection ${FILE_NAME}) +endfunction() + +garbage_collection_e2e_python_files(common.py) +garbage_collection_e2e_python_files(conftest.py) +garbage_collection_e2e_python_files(gc_periodic.py) diff --git a/tests/e2e/garbage_collection/common.py b/tests/e2e/garbage_collection/common.py new file mode 100644 index 000000000..dedf18dc3 --- /dev/null +++ b/tests/e2e/garbage_collection/common.py @@ -0,0 +1,25 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing + +import mgclient + + +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: + cursor.execute(query, params) + return cursor.fetchall() + + +def connect(**kwargs) -> mgclient.Connection: + connection = mgclient.connect(host="localhost", port=7687, **kwargs) + connection.autocommit = True + return connection diff --git a/tests/e2e/garbage_collection/conftest.py b/tests/e2e/garbage_collection/conftest.py new file mode 100644 index 000000000..a4ec62c9f --- /dev/null +++ b/tests/e2e/garbage_collection/conftest.py @@ -0,0 +1,21 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import pytest +from common import connect, execute_and_fetch_all + + +@pytest.fixture(autouse=True) +def connection(): + connection = connect() + yield connection + cursor = connection.cursor() + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") diff --git a/tests/e2e/garbage_collection/gc_periodic.py b/tests/e2e/garbage_collection/gc_periodic.py new file mode 100644 index 000000000..6903960be --- /dev/null +++ b/tests/e2e/garbage_collection/gc_periodic.py @@ -0,0 +1,59 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import re +import sys +import time + +import pytest +from common import execute_and_fetch_all + + +def remove_non_numeric_suffix(text): + match = re.search(r"\D*$", text) + if match: + non_numeric_suffix = match.group(0) + return text[: -len(non_numeric_suffix)] + else: + return text + + +def get_memory_from_list(list): + for list_item in list: + if list_item[0] == "memory_tracked": + return float(remove_non_numeric_suffix(list_item[1])) + return None + + +def get_memory(cursor): + return get_memory_from_list(execute_and_fetch_all(cursor, "SHOW STORAGE INFO")) + + +def test_gc_periodic(connection): + """ + This test checks that periodic gc works. + It does so by checking that the allocated memory is lowered by at least 1/4 of the memory allocated by creating nodes. + If we choose a number a high number the test will become flaky because the memory only gets fully cleared after a while + due to jemalloc holding some memory for a while. If we'd wait for jemalloc to fully release the memory the test would take too long. + """ + cursor = connection.cursor() + + memory_pre_creation = get_memory(cursor) + execute_and_fetch_all(cursor, "UNWIND range(1, 1000) AS index CREATE (:Node);") + memory_after_creation = get_memory(cursor) + time.sleep(2) + memory_after_gc = get_memory(cursor) + + assert memory_after_gc < memory_pre_creation + (memory_after_creation - memory_pre_creation) / 4 * 3 + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/garbage_collection/workloads.yaml b/tests/e2e/garbage_collection/workloads.yaml new file mode 100644 index 000000000..395ba83d9 --- /dev/null +++ b/tests/e2e/garbage_collection/workloads.yaml @@ -0,0 +1,19 @@ +args: &args + - "--bolt-port" + - "7687" + - "--log-level=TRACE" + - "--storage-gc-cycle-sec=2" + +in_memory_cluster: &in_memory_cluster + cluster: + main: + args: *args + log_file: "garbage_collection-e2e.log" + setup_queries: [] + validation_queries: [] + +workloads: + - name: "Garbage collection" + binary: "tests/e2e/pytest_runner.sh" + args: ["garbage_collection/gc_periodic.py"] + <<: *in_memory_cluster