Fix GC by adding periodic jemalloc purge (#1471)

This commit is contained in:
imilinovic 2023-11-14 21:06:21 +01:00 committed by GitHub
parent 9cc060c4b0
commit ced08fd7bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 149 additions and 9 deletions

View File

@ -292,7 +292,7 @@ if (MG_ENTERPRISE)
add_definitions(-DMG_ENTERPRISE)
endif()
set(ENABLE_JEMALLOC ON)
option(ENABLE_JEMALLOC "Use jemalloc" ON)
if (ASAN)
message(WARNING "Disabling jemalloc as it doesn't work well with ASAN")

View File

@ -65,7 +65,7 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i
// Storage flags.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24 * 3600));
FLAG_IN_RANGE(1, 24UL * 3600));
// NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -3,14 +3,14 @@ set(memory_src_files
global_memory_control.cpp
query_memory_control.cpp)
find_package(jemalloc REQUIRED)
add_library(mg-memory STATIC ${memory_src_files})
target_link_libraries(mg-memory mg-utils fmt)
message(STATUS "ENABLE_JEMALLOC: ${ENABLE_JEMALLOC}")
if (ENABLE_JEMALLOC)
find_package(jemalloc REQUIRED)
target_link_libraries(mg-memory Jemalloc::Jemalloc ${CMAKE_DL_LIBS})
target_compile_definitions(mg-memory PRIVATE USE_JEMALLOC=1)
else()
target_compile_definitions(mg-memory PRIVATE USE_JEMALLOC=0)
endif()

View File

@ -41,4 +41,4 @@ add_library(mg-storage-v2 STATIC
replication/replication_storage_state.cpp
inmemory/replication/replication_client.cpp
)
target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events)
target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events mg-memory)

View File

@ -11,6 +11,7 @@
#include "storage/v2/inmemory/storage.hpp"
#include "dbms/constants.hpp"
#include "memory/global_memory_control.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/metadata_delta.hpp"
@ -101,8 +102,13 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
"those files into a .backup directory inside the storage directory.");
}
}
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage<false>(); });
// TODO: move out of storage have one global gc_runner_
gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
});
gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); });
}
if (timestamp_ == kTimestampInitialId) {
commit_log_.emplace();
@ -116,6 +122,7 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag
InMemoryStorage::~InMemoryStorage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
gc_jemalloc_runner_.Stop();
}
{
// Stop replication (Stop all clients or stop the REPLICA server)
@ -1210,7 +1217,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
main_lock_.lock_shared();
}
} else {
MG_ASSERT(main_guard.mutex() == std::addressof(main_lock_), "main_guard should be only for the main_lock_");
DMG_ASSERT(main_guard.mutex() == std::addressof(main_lock_), "main_guard should be only for the main_lock_");
}
utils::OnScopeExit lock_releaser{[&] {

View File

@ -418,6 +418,7 @@ class InMemoryStorage final : public Storage {
std::optional<CommitLog> commit_log_;
utils::Scheduler gc_runner_;
utils::Scheduler gc_jemalloc_runner_;
std::mutex gc_lock_;
using BondPmrLd = Bond<utils::pmr::list<Delta>>;

View File

@ -71,6 +71,7 @@ add_subdirectory(query_modules)
add_subdirectory(constraints)
add_subdirectory(inspect_query)
add_subdirectory(queries)
add_subdirectory(garbage_collection)
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
copy_e2e_python_files(x x.sh "")

View File

@ -0,0 +1,7 @@
function(garbage_collection_e2e_python_files FILE_NAME)
copy_e2e_python_files(garbage_collection ${FILE_NAME})
endfunction()
garbage_collection_e2e_python_files(common.py)
garbage_collection_e2e_python_files(conftest.py)
garbage_collection_e2e_python_files(gc_periodic.py)

View File

@ -0,0 +1,25 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
return connection

View File

@ -0,0 +1,21 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import pytest
from common import connect, execute_and_fetch_all
@pytest.fixture(autouse=True)
def connection():
connection = connect()
yield connection
cursor = connection.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")

View File

@ -0,0 +1,59 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import re
import sys
import time
import pytest
from common import execute_and_fetch_all
def remove_non_numeric_suffix(text):
match = re.search(r"\D*$", text)
if match:
non_numeric_suffix = match.group(0)
return text[: -len(non_numeric_suffix)]
else:
return text
def get_memory_from_list(list):
for list_item in list:
if list_item[0] == "memory_tracked":
return float(remove_non_numeric_suffix(list_item[1]))
return None
def get_memory(cursor):
return get_memory_from_list(execute_and_fetch_all(cursor, "SHOW STORAGE INFO"))
def test_gc_periodic(connection):
"""
This test checks that periodic gc works.
It does so by checking that the allocated memory is lowered by at least 1/4 of the memory allocated by creating nodes.
If we choose a number a high number the test will become flaky because the memory only gets fully cleared after a while
due to jemalloc holding some memory for a while. If we'd wait for jemalloc to fully release the memory the test would take too long.
"""
cursor = connection.cursor()
memory_pre_creation = get_memory(cursor)
execute_and_fetch_all(cursor, "UNWIND range(1, 1000) AS index CREATE (:Node);")
memory_after_creation = get_memory(cursor)
time.sleep(2)
memory_after_gc = get_memory(cursor)
assert memory_after_gc < memory_pre_creation + (memory_after_creation - memory_pre_creation) / 4 * 3
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,19 @@
args: &args
- "--bolt-port"
- "7687"
- "--log-level=TRACE"
- "--storage-gc-cycle-sec=2"
in_memory_cluster: &in_memory_cluster
cluster:
main:
args: *args
log_file: "garbage_collection-e2e.log"
setup_queries: []
validation_queries: []
workloads:
- name: "Garbage collection"
binary: "tests/e2e/pytest_runner.sh"
args: ["garbage_collection/gc_periodic.py"]
<<: *in_memory_cluster