Fix IN_MEMORY_ANALYTICAL storage GC (#1025)

This commit is contained in:
Gareth Andrew Lloyd 2023-06-23 11:50:03 +01:00 committed by GitHub
parent b25e9968ee
commit 5b1ba10183
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 214 additions and 26 deletions

View File

@ -585,6 +585,12 @@ Result<std::optional<VertexAccessor>> Storage::Accessor::DeleteVertex(VertexAcce
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
storage_->gc_full_scan_vertices_delete_ = true;
}
return std::make_optional<VertexAccessor>(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_,
config_, true);
}
@ -655,6 +661,12 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
storage_->gc_full_scan_vertices_delete_ = true;
}
return std::make_optional<ReturnType>(
VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true},
std::move(deleted_edges));
@ -865,6 +877,12 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
auto *edge_ptr = edge_ref.ptr;
CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag());
edge_ptr->deleted = true;
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
storage_->gc_full_scan_edges_delete_ = true;
}
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
@ -1466,9 +1484,18 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level, StorageMo
}
template <bool force>
void Storage::CollectGarbage() {
void Storage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard) {
// NOTE: You do not need to consider cleanup of deleted object that occurred in
// different storage modes within the same CollectGarbage call. This is because
// SetStorageMode will ensure CollectGarbage is called before any new transactions
// with the new storage mode can start.
// SetStorageMode will pass its unique_lock of main_lock_. We will use that lock,
// as reacquiring the lock would cause deadlock. Otherwise, we need to get our own
// lock.
if (!main_guard.owns_lock()) {
if constexpr (force) {
// We take the unique lock on the main storage lock so we can forcefully clean
// We take the unique lock on the main storage lock, so we can forcefully clean
// everything we can
if (!main_lock_.try_lock()) {
CollectGarbage<false>();
@ -1480,13 +1507,20 @@ void Storage::CollectGarbage() {
// the indices and constraints aren't concurrently being modified.
main_lock_.lock_shared();
}
} else {
MG_ASSERT(main_guard.mutex() == std::addressof(main_lock_), "main_guard should be only for the main_lock_");
}
utils::OnScopeExit lock_releaser{[&] {
if (!main_guard.owns_lock()) {
if constexpr (force) {
main_lock_.unlock();
} else {
main_lock_.unlock_shared();
}
} else {
main_guard.unlock();
}
}};
// Garbage collection must be performed in two phases. In the first phase,
@ -1515,14 +1549,18 @@ void Storage::CollectGarbage() {
deleted_vertices_->swap(current_deleted_vertices);
deleted_edges_->swap(current_deleted_edges);
auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false);
auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false);
// Flag that will be used to determine whether the Index GC should be run. It
// should be run when there were any items that were cleaned up (there were
// updates between this run of the GC and the previous run of the GC). This
// eliminates high CPU usage when the GC doesn't have to clean up anything.
bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty();
bool run_index_cleanup = !committed_transactions_->empty() || !garbage_undo_buffers_->empty() ||
need_full_scan_vertices || need_full_scan_edges;
while (true) {
// We don't want to hold the lock on commited transactions for too long,
// We don't want to hold the lock on committed transactions for too long,
// because that prevents other transactions from committing.
Transaction *transaction;
{
@ -1718,11 +1756,37 @@ void Storage::CollectGarbage() {
MG_ASSERT(edge_acc.remove(edge), "Invalid database state!");
}
}
// EXPENSIVE full scan, is only run if an IN_MEMORY_ANALYTICAL transaction involved any deletions
// TODO: implement a fast internal iteration inside the skip_list (to avoid unnecessary find_node calls),
// accessor.remove_if([](auto const & item){ return item.delta == nullptr && item.deleted;});
// alternatively, an auxiliary data structure within skip_list to track these, hence a full scan wouldn't be needed
// we will wait for evidence that this is needed before doing so.
if (need_full_scan_vertices) {
auto vertex_acc = vertices_.access();
for (auto &vertex : vertex_acc) {
// a deleted vertex which as no deltas must have come from IN_MEMORY_ANALYTICAL deletion
if (vertex.delta == nullptr && vertex.deleted) {
vertex_acc.remove(vertex);
}
}
}
// EXPENSIVE full scan, is only run if an IN_MEMORY_ANALYTICAL transaction involved any deletions
if (need_full_scan_edges) {
auto edge_acc = edges_.access();
for (auto &edge : edge_acc) {
// a deleted edge which as no deltas must have come from IN_MEMORY_ANALYTICAL deletion
if (edge.delta == nullptr && edge.deleted) {
edge_acc.remove(edge);
}
}
}
}
// tell the linker he can find the CollectGarbage definitions here
template void Storage::CollectGarbage<true>();
template void Storage::CollectGarbage<false>();
// tell the linker it can find the CollectGarbage definitions here
template void Storage::CollectGarbage<true>(std::unique_lock<utils::RWLock>);
template void Storage::CollectGarbage<false>(std::unique_lock<utils::RWLock>);
bool Storage::InitializeWalFile() {
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
@ -2042,8 +2106,8 @@ utils::FileRetainer::FileLockerAccessor::ret_type Storage::UnlockPath() {
return true;
}
void Storage::FreeMemory() {
CollectGarbage<true>();
void Storage::FreeMemory(std::unique_lock<utils::RWLock> main_guard) {
CollectGarbage<true>(std::move(main_guard));
// SkipList is already threadsafe
vertices_.run_gc();
@ -2299,7 +2363,11 @@ IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_le
void Storage::SetStorageMode(StorageMode storage_mode) {
std::unique_lock main_guard{main_lock_};
// Only if we change storage_mode do we want to force storage cleanup
if (storage_mode_ != storage_mode) {
storage_mode_ = storage_mode;
FreeMemory(std::move(main_guard));
}
}
StorageMode Storage::GetStorageMode() { return storage_mode_; }

View File

@ -509,7 +509,7 @@ class Storage final {
std::vector<ReplicaInfo> ReplicasInfo();
void FreeMemory();
void FreeMemory(std::unique_lock<utils::RWLock> main_guard = {});
enum class SetIsolationLevelError : uint8_t { DisabledForAnalyticalMode };
@ -543,7 +543,7 @@ class Storage final {
/// @throw std::system_error
/// @throw std::bad_alloc
template <bool force>
void CollectGarbage();
void CollectGarbage(std::unique_lock<utils::RWLock> main_guard = {});
bool InitializeWalFile();
void FinalizeWalFile();
@ -618,6 +618,10 @@ class Storage final {
// storage.
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;
// Flags to inform CollectGarbage that it needs to do the more expensive full scans
std::atomic<bool> gc_full_scan_vertices_delete_ = false;
std::atomic<bool> gc_full_scan_edges_delete_ = false;
// Durability
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -391,7 +391,7 @@ class SkipListGc final {
///
/// The implementation is based on the work described in the paper
/// "A Provably Correct Scalable Concurrent Skip List"
/// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
/// http://people.csail.mit.edu/shanir/publications/OPODIS2006-BA.pdf
///
/// The proposed implementation is in Java so the authors don't worry about
/// garbage collection. This implementation uses the garbage collector that is
@ -640,6 +640,7 @@ class SkipList final {
skiplist_ = other.skiplist_;
id_ = other.id_;
other.skiplist_ = nullptr;
return *this;
}
/// Functions that return an Iterator (or ConstIterator) to the beginning of
@ -775,6 +776,7 @@ class SkipList final {
skiplist_ = other.skiplist_;
id_ = other.id_;
other.skiplist_ = nullptr;
return *this;
}
ConstIterator begin() const { return ConstIterator{skiplist_->head_->nexts[0].load(std::memory_order_acquire)}; }

View File

@ -57,6 +57,7 @@ add_subdirectory(transaction_queue)
add_subdirectory(mock_api)
add_subdirectory(load_csv)
add_subdirectory(init_file_flags)
add_subdirectory(analytical_mode)
copy_e2e_python_files(pytest_runner pytest_runner.sh "")
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/memgraph-selfsigned.crt DESTINATION ${CMAKE_CURRENT_BINARY_DIR})

View File

@ -0,0 +1,6 @@
function(copy_analytical_mode_e2e_python_files FILE_NAME)
copy_e2e_python_files(analytical_mode ${FILE_NAME})
endfunction()
copy_analytical_mode_e2e_python_files(common.py)
copy_analytical_mode_e2e_python_files(free_memory.py)

View File

@ -0,0 +1,30 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import pytest
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
@pytest.fixture
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
yield connection
cursor = connection.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
execute_and_fetch_all(cursor, "STORAGE MODE IN_MEMORY_TRANSACTIONAL;")

View File

@ -0,0 +1,63 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import connect, execute_and_fetch_all
def check_storage_info(cursor, expected_values):
cursor.execute("SHOW STORAGE INFO")
config = cursor.fetchall()
for conf in config:
conf_name = conf[0]
if conf_name in expected_values:
assert expected_values[conf_name] == conf[1]
def test_analytical_mode_objects_are_actually_deleted_when_asked(connect):
"""Tests objects are actually freed when deleted in analytical mode."""
expected_values = {
"vertex_count": 0,
}
cursor = connect.cursor()
check_storage_info(cursor, expected_values)
cursor.execute("STORAGE MODE IN_MEMORY_ANALYTICAL;")
cursor.execute("MERGE (n) DELETE n;")
cursor.execute("FREE MEMORY;")
check_storage_info(cursor, expected_values)
def test_analytical_mode_objects_are_actually_deleted_when_storage_mode_changes(connect):
"""Tests objects are actually freed when deleted in analytical mode."""
expected_values = {
"vertex_count": 0,
}
cursor = connect.cursor()
check_storage_info(cursor, expected_values)
cursor.execute("STORAGE MODE IN_MEMORY_ANALYTICAL;")
cursor.execute("MERGE (n) DELETE n;")
cursor.execute("STORAGE MODE IN_MEMORY_TRANSACTIONAL;")
check_storage_info(cursor, expected_values)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,14 @@
analytical_mode_cluster: &analytical_mode_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE"]
log_file: "analytical_mode.log"
setup_queries: []
validation_queries: []
workloads:
- name: "Analytical mode checks"
binary: "tests/e2e/pytest_runner.sh"
args: ["analytical_mode/free_memory.py"]
<<: *analytical_mode_cluster