From 8414479abed7014b577499b67af0d27f438480a2 Mon Sep 17 00:00:00 2001 From: Marin Tomic <marin.tomic@memgraph.io> Date: Tue, 9 Jul 2019 16:34:23 +0200 Subject: [PATCH] [StorageV2] Implement GC Summary: Here are some numbers from the benchmark: ``` (TOOLCHAIN) mtomic@poso:~/memgraph/build_release$ tests/benchmark/storage_v2_gc --num-threads 8 Config: NoGc, Time: 25.9836 Config: OnFinishGc, Time: 49.012 Config: 100msPeriodicGc, Time: 45.9856 Config: 1000msPeriodicGc, Time: 40.3094 ``` ``` (TOOLCHAIN) mtomic@poso:~/memgraph/build_release$ tests/benchmark/storage_v2_gc --num-threads 7 Config: NoGc, Time: 20.4256 Config: OnFinishGc, Time: 39.6669 Config: 100msPeriodicGc, Time: 30.7956 Config: 1000msPeriodicGc, Time: 35.128 ``` It is not that bad if there is a core dedicated to doing garbage collection. Reviewers: mferencevic, teon.banek Reviewed By: mferencevic, teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2168 --- src/storage/v2/storage.cpp | 307 +++++++++++++++++++++++++----- src/storage/v2/storage.hpp | 55 +++++- src/storage/v2/transaction.hpp | 3 - tests/benchmark/CMakeLists.txt | 3 + tests/benchmark/storage_v2_gc.cpp | 86 +++++++++ tests/unit/CMakeLists.txt | 3 + tests/unit/storage_v2_gc.cpp | 146 ++++++++++++++ 7 files changed, 543 insertions(+), 60 deletions(-) create mode 100644 tests/benchmark/storage_v2_gc.cpp create mode 100644 tests/unit/storage_v2_gc.cpp diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 75be4bea8..dc2aa07d4 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -8,22 +8,37 @@ namespace storage { +Storage::Storage(StorageGcConfig gc_config) : gc_config_(gc_config) { + if (gc_config.type == StorageGcConfig::Type::PERIODIC) { + gc_runner_.Run("Storage GC", gc_config.interval, + [this] { this->CollectGarbage(); }); + } +} + +Storage::~Storage() { + if (gc_config_.type == StorageGcConfig::Type::PERIODIC) { + gc_runner_.Stop(); + } +} + Storage::Accessor::Accessor(Storage *storage) : storage_(storage), is_transaction_starter_(true) { - // We acquire the storage lock here because we access (and modify) the - // transaction engine variables (`transaction_id` and `timestamp`) below. - std::lock_guard<utils::SpinLock> guard(storage_->lock_); - auto acc = storage_->transactions_.access(); - auto [it, inserted] = acc.insert( - Transaction{storage_->transaction_id_++, storage_->timestamp_++}); - CHECK(inserted) << "The Transaction must be inserted here!"; - CHECK(it != acc.end()) << "Invalid Transaction iterator!"; - transaction_ = &*it; + // We acquire the transaction engine lock here because we access (and + // modify) the transaction engine variables (`transaction_id` and + // `timestamp`) below. + uint64_t transaction_id; + uint64_t start_timestamp; + { + std::lock_guard<utils::SpinLock> guard(storage_->engine_lock_); + transaction_id = storage_->transaction_id_++; + start_timestamp = storage_->timestamp_++; + } + transaction_ = std::make_unique<Transaction>(transaction_id, start_timestamp); } Storage::Accessor::Accessor(Accessor &&other) noexcept : storage_(other.storage_), - transaction_(other.transaction_), + transaction_(std::move(other.transaction_)), is_transaction_starter_(true) { CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!"; // Don't allow the other accessor to abort our transaction. @@ -35,12 +50,12 @@ Storage::Accessor::Accessor(Accessor &&other) noexcept Storage::Accessor &Storage::Accessor::operator=(Accessor &&other) { if (this == &other) return *this; - if (is_transaction_starter_ && transaction_->is_active) { + if (is_transaction_starter_ && transaction_) { Abort(); } storage_ = other.storage_; - transaction_ = other.transaction_; + transaction_ = std::move(other.transaction_); is_transaction_starter_ = true; CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!"; @@ -51,7 +66,7 @@ Storage::Accessor &Storage::Accessor::operator=(Accessor &&other) { } Storage::Accessor::~Accessor() { - if (is_transaction_starter_ && transaction_->is_active) { + if (is_transaction_starter_ && transaction_) { Abort(); } } @@ -59,12 +74,12 @@ Storage::Accessor::~Accessor() { VertexAccessor Storage::Accessor::CreateVertex() { auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel); auto acc = storage_->vertices_.access(); - auto delta = CreateDeleteObjectDelta(transaction_); + auto delta = CreateDeleteObjectDelta(transaction_.get()); auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta}); CHECK(inserted) << "The vertex must be inserted here!"; CHECK(it != acc.end()) << "Invalid Vertex accessor!"; delta->prev.Set(&*it); - return VertexAccessor{&*it, transaction_}; + return VertexAccessor{&*it, transaction_.get()}; } std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid, @@ -72,18 +87,18 @@ std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid, auto acc = storage_->vertices_.access(); auto it = acc.find(gid); if (it == acc.end()) return std::nullopt; - return VertexAccessor::Create(&*it, transaction_, view); + return VertexAccessor::Create(&*it, transaction_.get(), view); } Result<bool> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) { - CHECK(vertex->transaction_ == transaction_) + CHECK(vertex->transaction_ == transaction_.get()) << "VertexAccessor must be from the same transaction as the storage " "accessor when deleting a vertex!"; auto vertex_ptr = vertex->vertex_; std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock); - if (!PrepareForWrite(transaction_, vertex_ptr)) + if (!PrepareForWrite(transaction_.get(), vertex_ptr)) return Result<bool>{Error::SERIALIZATION_ERROR}; if (vertex_ptr->deleted) return Result<bool>{false}; @@ -91,14 +106,15 @@ Result<bool> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) { if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Result<bool>{Error::VERTEX_HAS_EDGES}; - CreateAndLinkDelta(transaction_, vertex_ptr, Delta::RecreateObjectTag()); + CreateAndLinkDelta(transaction_.get(), vertex_ptr, + Delta::RecreateObjectTag()); vertex_ptr->deleted = true; return Result<bool>{true}; } Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { - CHECK(vertex->transaction_ == transaction_) + CHECK(vertex->transaction_ == transaction_.get()) << "VertexAccessor must be from the same transaction as the storage " "accessor when deleting a vertex!"; auto vertex_ptr = vertex->vertex_; @@ -109,7 +125,7 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { { std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock); - if (!PrepareForWrite(transaction_, vertex_ptr)) + if (!PrepareForWrite(transaction_.get(), vertex_ptr)) return Result<bool>{Error::SERIALIZATION_ERROR}; if (vertex_ptr->deleted) return Result<bool>{false}; @@ -120,7 +136,8 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { for (const auto &item : in_edges) { auto [edge_type, from_vertex, edge] = item; - EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr, transaction_}; + EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr, + transaction_.get()}; auto ret = DeleteEdge(&e); if (ret.IsError()) { CHECK(ret.GetError() == Error::SERIALIZATION_ERROR) @@ -130,7 +147,7 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { } for (const auto &item : out_edges) { auto [edge_type, to_vertex, edge] = item; - EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, transaction_}; + EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, transaction_.get()}; auto ret = DeleteEdge(&e); if (ret.IsError()) { CHECK(ret.GetError() == Error::SERIALIZATION_ERROR) @@ -145,12 +162,13 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { // vertex. Some other transaction could have modified the vertex in the // meantime if we didn't have any edges to delete. - if (!PrepareForWrite(transaction_, vertex_ptr)) + if (!PrepareForWrite(transaction_.get(), vertex_ptr)) return Result<bool>{Error::SERIALIZATION_ERROR}; CHECK(!vertex_ptr->deleted) << "Invalid database state!"; - CreateAndLinkDelta(transaction_, vertex_ptr, Delta::RecreateObjectTag()); + CreateAndLinkDelta(transaction_.get(), vertex_ptr, + Delta::RecreateObjectTag()); vertex_ptr->deleted = true; return Result<bool>{true}; @@ -162,7 +180,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, CHECK(from->transaction_ == to->transaction_) << "VertexAccessors must be from the same transaction when creating " "an edge!"; - CHECK(from->transaction_ == transaction_) + CHECK(from->transaction_ == transaction_.get()) << "VertexAccessors must be from the same transaction in when " "creating an edge!"; @@ -184,39 +202,39 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, guard_from.lock(); } - if (!PrepareForWrite(transaction_, from_vertex)) + if (!PrepareForWrite(transaction_.get(), from_vertex)) return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR}; CHECK(!from_vertex->deleted) << "Invalid database state!"; if (to_vertex != from_vertex) { - if (!PrepareForWrite(transaction_, to_vertex)) + if (!PrepareForWrite(transaction_.get(), to_vertex)) return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR}; CHECK(!to_vertex->deleted) << "Invalid database state!"; } auto gid = storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel); auto acc = storage_->edges_.access(); - auto delta = CreateDeleteObjectDelta(transaction_); + auto delta = CreateDeleteObjectDelta(transaction_.get()); auto [it, inserted] = acc.insert(Edge{storage::Gid::FromUint(gid), delta}); CHECK(inserted) << "The edge must be inserted here!"; CHECK(it != acc.end()) << "Invalid Edge accessor!"; auto edge = &*it; delta->prev.Set(&*it); - CreateAndLinkDelta(transaction_, from_vertex, Delta::RemoveOutEdgeTag(), + CreateAndLinkDelta(transaction_.get(), from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); - CreateAndLinkDelta(transaction_, to_vertex, Delta::RemoveInEdgeTag(), + CreateAndLinkDelta(transaction_.get(), to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); - return Result<EdgeAccessor>{ - EdgeAccessor{edge, edge_type, from_vertex, to_vertex, transaction_}}; + return Result<EdgeAccessor>{EdgeAccessor{edge, edge_type, from_vertex, + to_vertex, transaction_.get()}}; } Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { - CHECK(edge->transaction_ == transaction_) + CHECK(edge->transaction_ == transaction_.get()) << "EdgeAccessor must be from the same transaction as the storage " "accessor when deleting an edge!"; auto edge_ptr = edge->edge_; @@ -224,7 +242,7 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { std::lock_guard<utils::SpinLock> guard(edge_ptr->lock); - if (!PrepareForWrite(transaction_, edge_ptr)) + if (!PrepareForWrite(transaction_.get(), edge_ptr)) return Result<bool>{Error::SERIALIZATION_ERROR}; if (edge_ptr->deleted) return Result<bool>{false}; @@ -247,20 +265,20 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { guard_from.lock(); } - if (!PrepareForWrite(transaction_, from_vertex)) + if (!PrepareForWrite(transaction_.get(), from_vertex)) return Result<bool>{Error::SERIALIZATION_ERROR}; CHECK(!from_vertex->deleted) << "Invalid database state!"; if (to_vertex != from_vertex) { - if (!PrepareForWrite(transaction_, to_vertex)) + if (!PrepareForWrite(transaction_.get(), to_vertex)) return Result<bool>{Error::SERIALIZATION_ERROR}; CHECK(!to_vertex->deleted) << "Invalid database state!"; } - CreateAndLinkDelta(transaction_, edge_ptr, Delta::RecreateObjectTag()); + CreateAndLinkDelta(transaction_.get(), edge_ptr, Delta::RecreateObjectTag()); edge_ptr->deleted = true; - CreateAndLinkDelta(transaction_, from_vertex, Delta::AddOutEdgeTag(), + CreateAndLinkDelta(transaction_.get(), from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ptr); { std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, to_vertex, edge_ptr}; @@ -271,8 +289,8 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { from_vertex->out_edges.pop_back(); } - CreateAndLinkDelta(transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, - from_vertex, edge_ptr); + CreateAndLinkDelta(transaction_.get(), to_vertex, Delta::AddInEdgeTag(), + edge_type, from_vertex, edge_ptr); { std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, from_vertex, edge_ptr}; @@ -289,23 +307,48 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { void Storage::Accessor::AdvanceCommand() { ++transaction_->command_id; } void Storage::Accessor::Commit() { + CHECK(transaction_) << "The transaction is already terminated!"; CHECK(!transaction_->must_abort) << "The transaction can't be committed!"; - CHECK(transaction_->is_active) << "The transaction is already terminated!"; + if (transaction_->deltas.empty()) { - transaction_->commit_timestamp.store(transaction_->start_timestamp, - std::memory_order_release); + // We don't have to update the commit timestamp here because no one reads + // it. + storage_->commit_log_.MarkFinished(transaction_->start_timestamp); + transaction_ = nullptr; } else { - std::lock_guard<utils::SpinLock> guard(storage_->lock_); - transaction_->commit_timestamp.store(storage_->timestamp_++, - std::memory_order_release); - // TODO: release lock, and update all deltas to have an in-memory copy - // of the commit id + // Save these so we can mark them used in the commit log. + uint64_t start_timestamp = transaction_->start_timestamp; + uint64_t commit_timestamp; + + { + std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_); + commit_timestamp = storage_->timestamp_++; + + // Take committed_transactions lock while holding the engine lock to + // make sure that committed transactions are sorted by the commit + // timestamp in the list. + std::lock_guard<utils::SpinLock> committed_transactions_guard( + storage_->committed_transactions_lock_); + // TODO: release lock, and update all deltas to have a local copy + // of the commit timestamp + transaction_->commit_timestamp.store(commit_timestamp, + std::memory_order_release); + // Release engine lock because we don't have to hold it anymore and + // emplace back could take a long time. + engine_guard.unlock(); + storage_->committed_transactions_.emplace_back(std::move(transaction_)); + } + + storage_->commit_log_.MarkFinished(start_timestamp); + storage_->commit_log_.MarkFinished(commit_timestamp); + } + if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) { + storage_->CollectGarbage(); } - transaction_->is_active = false; } void Storage::Accessor::Abort() { - CHECK(transaction_->is_active) << "The transaction is already terminated!"; + CHECK(transaction_) << "The transaction is already terminated!"; for (const auto &delta : transaction_->deltas) { auto prev = delta.prev.Get(); switch (prev.type) { @@ -473,7 +516,167 @@ void Storage::Accessor::Abort() { break; } } - transaction_->is_active = false; + + { + std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_); + uint64_t mark_timestamp = storage_->timestamp_; + // Take aborted_undo_buffers lock while holding the engine lock to make + // sure that entries are sorted by mark timestamp in the list. + std::lock_guard<utils::SpinLock> aborted_undo_buffers_guard( + storage_->aborted_undo_buffers_lock_); + // Release engine lock because we don't have to hold it anymore and + // emplace back could take a long time. + engine_guard.unlock(); + storage_->aborted_undo_buffers_.emplace_back( + mark_timestamp, std::move(transaction_->deltas)); + } + + storage_->commit_log_.MarkFinished(transaction_->start_timestamp); + transaction_ = nullptr; + if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) { + storage_->CollectGarbage(); + } +} + +void Storage::CollectGarbage() { + // Garbage collection must be performed in two phases. In the first phase, + // deltas that won't be applied by any transaction anymore are unlinked from + // the version chains. They cannot be deleted immediately, because there + // might be a transaction that still needs them to terminate the version + // chain traversal. They are instead marked for deletion and will be deleted + // in the second GC phase in this GC iteration or some of the following + // ones. + std::unique_lock<std::mutex> gc_guard(gc_lock_, std::try_to_lock); + if (!gc_guard.owns_lock()) { + return; + } + + uint64_t oldest_active_start_timestamp = commit_log_.OldestActive(); + // We don't move undo buffers of unlinked transactions to + // marked_undo_buffers list immediately, because we would have to repeatedly + // take transaction engine lock. + std::list<std::unique_ptr<Transaction>> unlinked; + + while (true) { + // We don't want to hold the lock on commited transactions for too long, + // because that prevents other transactions from committing. + Transaction *transaction; + + { + std::lock_guard<utils::SpinLock> guard(committed_transactions_lock_); + if (committed_transactions_.empty()) { + break; + } + + transaction = committed_transactions_.front().get(); + } + + if (transaction->commit_timestamp >= oldest_active_start_timestamp) { + break; + } + + // When unlinking a delta which is a first delta in its version chain, + // special care has to be taken to avoid the following race condition: + // + // [Vertex] --> [Delta A] + // + // GC thread: Delta A is the first in its chain, it must be unlinked from + // vertex and marked for deletion + // TX thread: Update vertex and add Delta B with Delta A as next + // + // [Vertex] --> [Delta B] <--> [Delta A] + // + // GC thread: Unlink delta from Vertex + // + // [Vertex] --> (nullptr) + // + // When processing a delta that is the first one in its chain, we + // obtain the corresponding vertex or edge lock, and then verify that this + // delta still is the first in its chain. + + for (Delta &delta : transaction->deltas) { + while (true) { + auto prev = delta.prev.Get(); + switch (prev.type) { + case PreviousPtr::Type::VERTEX: { + Vertex *vertex = prev.vertex; + std::unique_lock<utils::SpinLock> vertex_guard(vertex->lock); + if (vertex->delta != &delta) { + // Something changed, we're not the first delta in the chain + // anymore. + continue; + } + vertex->delta = nullptr; + if (vertex->deleted) { + // We must unlock the guard now because the lock it is holding + // might be destroyed when vertex is deleted. + vertex_guard.unlock(); + auto acc = vertices_.access(); + acc.remove(vertex->gid); + } + break; + } + case PreviousPtr::Type::EDGE: { + Edge *edge = prev.edge; + std::unique_lock<utils::SpinLock> edge_guard(edge->lock); + if (edge->delta != &delta) { + // Something changed, we're not the first delta in the chain + // anymore. + continue; + } + if (edge->deleted) { + // We must unlock the guard now because the lock it is holding + // might be destroyed when vertex is deleted. + edge_guard.unlock(); + auto acc = edges_.access(); + + acc.remove(edge->gid); + } + break; + } + case PreviousPtr::Type::DELTA: { + Delta *prev_delta = prev.delta; + prev_delta->next.store(nullptr, std::memory_order_release); + break; + } + } + break; + } + } + + { + std::lock_guard<utils::SpinLock> guard(committed_transactions_lock_); + // We use splice here to avoid allocating storage for new list nodes. + unlinked.splice(unlinked.begin(), committed_transactions_, + committed_transactions_.begin()); + } + } + + uint64_t mark_timestamp; + { + std::lock_guard<utils::SpinLock> guard(engine_lock_); + mark_timestamp = timestamp_; + } + + for (auto &transaction : unlinked) { + marked_undo_buffers_.emplace_back(mark_timestamp, + std::move(transaction->deltas)); + } + + while (!marked_undo_buffers_.empty() && + marked_undo_buffers_.front().first < oldest_active_start_timestamp) { + marked_undo_buffers_.pop_front(); + } + + while (true) { + std::lock_guard<utils::SpinLock> aborted_undo_buffers_guard( + aborted_undo_buffers_lock_); + if (aborted_undo_buffers_.empty() || + aborted_undo_buffers_.front().first >= oldest_active_start_timestamp) { + break; + } + aborted_undo_buffers_.pop_front(); + } } } // namespace storage diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 34f3710de..fdb76c37e 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -2,8 +2,7 @@ #include <optional> -#include "utils/skip_list.hpp" - +#include "storage/v2/commit_log.hpp" #include "storage/v2/edge.hpp" #include "storage/v2/edge_accessor.hpp" #include "storage/v2/mvcc.hpp" @@ -11,6 +10,8 @@ #include "storage/v2/transaction.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/scheduler.hpp" +#include "utils/skip_list.hpp" namespace storage { @@ -22,8 +23,31 @@ namespace storage { const uint64_t kTimestampInitialId = 0; const uint64_t kTransactionInitialId = 1ULL << 63U; +/// Pass this class to the \ref Storage constructor to set the behavior of the +/// garbage control. +/// +/// There are three options: +// 1. NONE - No GC at all, only useful for benchmarking. +// 2. PERIODIC - A separate thread performs GC periodically with given +// interval (this is the default, with 1 second interval). +// 3. ON_FINISH - Whenever a transaction commits or aborts, GC is performed +// on the same thread. +struct StorageGcConfig { + enum class Type { NONE, PERIODIC, ON_FINISH }; + Type type; + std::chrono::milliseconds interval; +}; + +inline static constexpr StorageGcConfig DefaultGcConfig = { + .type = StorageGcConfig::Type::PERIODIC, + .interval = std::chrono::milliseconds(1000)}; + class Storage final { public: + explicit Storage(StorageGcConfig gc_config = DefaultGcConfig); + + ~Storage(); + class Accessor final { public: explicit Accessor(Storage *storage); @@ -60,13 +84,17 @@ class Storage final { private: Storage *storage_; - Transaction *transaction_; + // TODO: when we are able to move Transaction objects without breaking the + // pointers in Delta, we can get rid of the unique pointer here + std::unique_ptr<Transaction> transaction_; bool is_transaction_starter_; }; Accessor Access() { return Accessor{this}; } private: + void CollectGarbage(); + // Main object storage utils::SkipList<storage::Vertex> vertices_; utils::SkipList<storage::Edge> edges_; @@ -74,10 +102,27 @@ class Storage final { std::atomic<uint64_t> edge_id_{0}; // Transaction engine - utils::SpinLock lock_; + utils::SpinLock engine_lock_; uint64_t timestamp_{kTimestampInitialId}; uint64_t transaction_id_{kTransactionInitialId}; - utils::SkipList<Transaction> transactions_; + // TODO: This isn't really a commit log, it doesn't even care if a + // transaction commited or aborted. We could probably combine this with + // `timestamp_` in a sensible unit, something like TransactionClock or + // whatever. + CommitLog commit_log_; + + utils::SpinLock committed_transactions_lock_; + // TODO: when we are able to move Transaction objects without breaking the + // pointers in Delta, we can get rid of the unique pointer here + std::list<std::unique_ptr<Transaction>> committed_transactions_; + + utils::SpinLock aborted_undo_buffers_lock_; + std::list<std::pair<uint64_t, std::list<Delta>>> aborted_undo_buffers_; + + StorageGcConfig gc_config_; + utils::Scheduler gc_runner_; + std::mutex gc_lock_; + std::list<std::pair<uint64_t, std::list<Delta>>> marked_undo_buffers_; }; } // namespace storage diff --git a/src/storage/v2/transaction.hpp b/src/storage/v2/transaction.hpp index c7089e5cb..a1c2fabb8 100644 --- a/src/storage/v2/transaction.hpp +++ b/src/storage/v2/transaction.hpp @@ -20,7 +20,6 @@ struct Transaction { start_timestamp(start_timestamp), commit_timestamp(transaction_id), command_id(0), - is_active(true), must_abort(false) {} Transaction(Transaction &&other) noexcept @@ -29,7 +28,6 @@ struct Transaction { commit_timestamp(other.commit_timestamp.load()), command_id(other.command_id), deltas(std::move(other.deltas)), - is_active(other.is_active), must_abort(other.must_abort) {} Transaction(const Transaction &) = delete; @@ -43,7 +41,6 @@ struct Transaction { std::atomic<uint64_t> commit_timestamp; uint64_t command_id; std::list<Delta> deltas; - bool is_active; bool must_abort; }; diff --git a/tests/benchmark/CMakeLists.txt b/tests/benchmark/CMakeLists.txt index d5ec5d8ca..7edc94b6f 100644 --- a/tests/benchmark/CMakeLists.txt +++ b/tests/benchmark/CMakeLists.txt @@ -77,3 +77,6 @@ target_link_libraries(${test_prefix}tx_engine mg-single-node kvstore_dummy_lib) add_benchmark(expansion.cpp) target_link_libraries(${test_prefix}expansion mg-single-node kvstore_dummy_lib) + +add_benchmark(storage_v2_gc.cpp) +target_link_libraries(${test_prefix}storage_v2_gc mg-storage-v2) diff --git a/tests/benchmark/storage_v2_gc.cpp b/tests/benchmark/storage_v2_gc.cpp new file mode 100644 index 000000000..e942a4706 --- /dev/null +++ b/tests/benchmark/storage_v2_gc.cpp @@ -0,0 +1,86 @@ +#include <iostream> + +#include <gflags/gflags.h> + +#include "storage/v2/storage.hpp" +#include "utils/timer.hpp" + +// This benchmark should be run for a fixed amount of time that is +// large compared to GC interval to make the output relevant. + +const int kNumIterations = 5000000; +const int kNumVertices = 1000000; + +DEFINE_int32(num_threads, 4, "number of threads"); +DEFINE_int32(num_vertices, kNumVertices, "number of vertices"); +DEFINE_int32(num_iterations, kNumIterations, "number of iterations"); + +std::pair<std::string, storage::StorageGcConfig> TestConfigurations[] = { + {"NoGc", + storage::StorageGcConfig{.type = storage::StorageGcConfig::Type::NONE}}, + {"OnFinishGc", + storage::StorageGcConfig{.type = + storage::StorageGcConfig::Type::ON_FINISH}}, + {"100msPeriodicGc", + storage::StorageGcConfig{.type = storage::StorageGcConfig::Type::PERIODIC, + .interval = std::chrono::milliseconds(100)}}, + {"1000msPeriodicGc", + + storage::StorageGcConfig{.type = storage::StorageGcConfig::Type::ON_FINISH, + .interval = std::chrono::milliseconds(1000)}}}; + +void UpdateLabelFunc(int thread_id, storage::Storage *storage, + const std::vector<storage::Gid> &vertices, + int num_iterations) { + std::mt19937 gen(thread_id); + std::uniform_int_distribution<uint64_t> vertex_dist(0, vertices.size() - 1); + std::uniform_int_distribution<uint64_t> label_dist(0, 100); + + utils::Timer timer; + for (int iter = 0; iter < num_iterations; ++iter) { + auto acc = storage->Access(); + storage::Gid gid = vertices.at(vertex_dist(gen)); + std::optional<storage::VertexAccessor> vertex = + acc.FindVertex(gid, storage::View::OLD); + CHECK(vertex.has_value()) + << "Vertex with GID " << gid.AsUint() << " doesn't exist"; + if (vertex->AddLabel(label_dist(gen)).IsReturn()) { + acc.Commit(); + } else { + acc.Abort(); + } + } +} + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + for (const auto &config : TestConfigurations) { + storage::Storage storage(config.second); + std::vector<storage::Gid> vertices; + { + auto acc = storage.Access(); + for (int i = 0; i < FLAGS_num_vertices; ++i) { + vertices.push_back(acc.CreateVertex().Gid()); + } + acc.Commit(); + } + + utils::Timer timer; + std::vector<std::thread> threads; + threads.reserve(FLAGS_num_threads); + for (int i = 0; i < FLAGS_num_threads; ++i) { + threads.emplace_back(UpdateLabelFunc, i, &storage, vertices, + FLAGS_num_iterations); + } + + for (int i = 0; i < FLAGS_num_threads; ++i) { + threads[i].join(); + } + + std::cout << "Config: " << config.first + << ", Time: " << timer.Elapsed().count() << std::endl; + } + + return 0; +} diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 208d2feaa..f5423b077 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -388,6 +388,9 @@ target_link_libraries(${test_prefix}storage_v2_edge mg-storage-v2) add_unit_test(storage_v2.cpp) target_link_libraries(${test_prefix}storage_v2 mg-storage-v2) +add_unit_test(storage_v2_gc.cpp) +target_link_libraries(${test_prefix}storage_v2_gc mg-storage-v2) + # Test LCP add_custom_command( diff --git a/tests/unit/storage_v2_gc.cpp b/tests/unit/storage_v2_gc.cpp new file mode 100644 index 000000000..c4be35f74 --- /dev/null +++ b/tests/unit/storage_v2_gc.cpp @@ -0,0 +1,146 @@ +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include "storage/v2/storage.hpp" + +using testing::UnorderedElementsAre; + +// TODO: We should implement a more sophisticated stress test to verify that GC +// is working properly in a multithreaded environment. + +// A simple test trying to get GC to run while a transaction is still alive and +// then verify that GC didn't delete anything it shouldn't have. +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST(StorageV2Gc, Sanity) { + storage::Storage storage( + storage::StorageGcConfig{.type = storage::StorageGcConfig::Type::PERIODIC, + .interval = std::chrono::milliseconds(100)}); + + std::vector<storage::Gid> vertices; + + { + auto acc = storage.Access(); + // Create some vertices, but delete some of them immediately. + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex = acc.CreateVertex(); + vertices.push_back(vertex.Gid()); + } + + acc.AdvanceCommand(); + + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex = acc.FindVertex(vertices[i], storage::View::OLD); + ASSERT_TRUE(vertex.has_value()); + if (i % 5 == 0) { + acc.DeleteVertex(&vertex.value()); + } + } + + // Wait for GC. + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex_old = acc.FindVertex(vertices[i], storage::View::OLD); + auto vertex_new = acc.FindVertex(vertices[i], storage::View::NEW); + EXPECT_TRUE(vertex_old.has_value()); + EXPECT_EQ(vertex_new.has_value(), i % 5 != 0); + } + + acc.Commit(); + } + + // Verify existing vertices and add labels to some of them. + { + auto acc = storage.Access(); + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex = acc.FindVertex(vertices[i], storage::View::OLD); + EXPECT_EQ(vertex.has_value(), i % 5 != 0); + + if (vertex.has_value()) { + vertex->AddLabel(3 * i); + vertex->AddLabel(3 * i + 1); + vertex->AddLabel(3 * i + 2); + } + } + + // Wait for GC. + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Verify labels. + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex = acc.FindVertex(vertices[i], storage::View::NEW); + EXPECT_EQ(vertex.has_value(), i % 5 != 0); + + if (vertex.has_value()) { + auto labels_old = vertex->Labels(storage::View::OLD); + EXPECT_TRUE(labels_old.IsReturn()); + EXPECT_TRUE(labels_old.GetReturn().empty()); + + auto labels_new = vertex->Labels(storage::View::NEW); + EXPECT_TRUE(labels_new.IsReturn()); + EXPECT_THAT(labels_new.GetReturn(), + UnorderedElementsAre(3 * i, 3 * i + 1, 3 * i + 2)); + } + } + + acc.Commit(); + } + + // Add and remove some edges. + { + auto acc = storage.Access(); + for (uint64_t i = 0; i < 1000; ++i) { + auto from_vertex = acc.FindVertex(vertices[i], storage::View::OLD); + auto to_vertex = + acc.FindVertex(vertices[(i + 1) % 1000], storage::View::OLD); + EXPECT_EQ(from_vertex.has_value(), i % 5 != 0); + EXPECT_EQ(to_vertex.has_value(), (i + 1) % 5 != 0); + + if (from_vertex.has_value() && to_vertex.has_value()) { + acc.CreateEdge(&from_vertex.value(), &to_vertex.value(), i); + } + } + + // Detach delete some vertices. + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex = acc.FindVertex(vertices[i], storage::View::NEW); + EXPECT_EQ(vertex.has_value(), i % 5 != 0); + if (vertex.has_value()) { + if (i % 3 == 0) { + acc.DetachDeleteVertex(&vertex.value()); + } + } + } + + // Wait for GC. + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Vertify edges. + for (uint64_t i = 0; i < 1000; ++i) { + auto vertex = acc.FindVertex(vertices[i], storage::View::NEW); + EXPECT_EQ(vertex.has_value(), i % 5 != 0 && i % 3 != 0); + if (vertex.has_value()) { + auto out_edges = + vertex->OutEdges(std::vector<uint64_t>{}, storage::View::NEW); + if (i % 5 != 4 && i % 3 != 2) { + EXPECT_EQ(out_edges.GetReturn().size(), 1); + EXPECT_EQ(std::get<2>(out_edges.GetReturn().at(0)).EdgeType(), i); + } else { + EXPECT_TRUE(out_edges.GetReturn().empty()); + } + + auto in_edges = + vertex->InEdges(std::vector<uint64_t>{}, storage::View::NEW); + if (i % 5 != 1 && i % 3 != 1) { + EXPECT_EQ(in_edges.GetReturn().size(), 1); + EXPECT_EQ(std::get<2>(in_edges.GetReturn().at(0)).EdgeType(), + (i + 999) % 1000); + } else { + EXPECT_TRUE(in_edges.GetReturn().empty()); + } + } + } + + acc.Commit(); + } +}