From ffa76e59c940a1114a638b372693333ed89326c2 Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Mon, 22 Jul 2019 17:05:00 +0200 Subject: [PATCH] Implement deferred vertex/edge deletion Summary: this will make GC for indices easier Reviewers: teon.banek, mferencevic Reviewed By: teon.banek, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2223 --- src/storage/v2/storage.cpp | 166 +++++++++++++++++++++---------------- src/storage/v2/storage.hpp | 23 +++-- 2 files changed, 108 insertions(+), 81 deletions(-) diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index e4d31844d..2d37297b8 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -318,18 +318,19 @@ void Storage::Accessor::Commit() { // Take committed_transactions lock while holding the engine lock to // make sure that committed transactions are sorted by the commit // timestamp in the list. - std::lock_guard committed_transactions_guard( - storage_->committed_transactions_lock_); - // TODO: release lock, and update all deltas to have a local copy - // of the commit timestamp - CHECK(transaction_.commit_timestamp != nullptr) - << "Invalid database state!"; - transaction_.commit_timestamp->store(commit_timestamp, - std::memory_order_release); - // Release engine lock because we don't have to hold it anymore and - // emplace back could take a long time. - engine_guard.unlock(); - storage_->committed_transactions_.emplace_back(std::move(transaction_)); + storage_->committed_transactions_.WithLock( + [&](auto &committed_transactions) { + // TODO: release lock, and update all deltas to have a local copy + // of the commit timestamp + CHECK(transaction_.commit_timestamp != nullptr) + << "Invalid database state!"; + transaction_.commit_timestamp->store(commit_timestamp, + std::memory_order_release); + // Release engine lock because we don't have to hold it anymore and + // emplace back could take a long time. + engine_guard.unlock(); + committed_transactions.emplace_back(std::move(transaction_)); + }); } storage_->commit_log_.MarkFinished(start_timestamp); @@ -344,12 +345,11 @@ void Storage::Accessor::Commit() { void Storage::Accessor::Abort() { CHECK(is_transaction_active_) << "The transaction is already terminated!"; - // We acquire accessors to the storage `SkipList`s so that we can safely - // delete objects in the lists. While the accessor is alive, all objects (even - // deleted ones) are guaranteed to still exist and pointers to them are still - // valid. - auto vertex_acc = storage_->vertices_.access(); - auto edge_acc = storage_->edges_.access(); + // We collect vertices and edges we've created here and then splice them into + // `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one + // by one and acquiring lock every time. + std::list my_deleted_vertices; + std::list my_deleted_edges; for (const auto &delta : transaction_.deltas) { auto prev = delta.prev.Get(); @@ -436,8 +436,8 @@ void Storage::Accessor::Abort() { break; } case Delta::Action::DELETE_OBJECT: { - CHECK(vertex_acc.remove(vertex->gid)) - << "Invalid database state!"; + vertex->deleted = true; + my_deleted_vertices.push_back(vertex->gid); break; } case Delta::Action::RECREATE_OBJECT: { @@ -479,7 +479,8 @@ void Storage::Accessor::Abort() { break; } case Delta::Action::DELETE_OBJECT: { - CHECK(edge_acc.remove(edge->gid)) << "Invalid database state!"; + edge->deleted = true; + my_deleted_edges.push_back(edge->gid); break; } case Delta::Action::RECREATE_OBJECT: { @@ -513,15 +514,21 @@ void Storage::Accessor::Abort() { { std::unique_lock engine_guard(storage_->engine_lock_); uint64_t mark_timestamp = storage_->timestamp_; - // Take aborted_undo_buffers lock while holding the engine lock to make + // Take garbage_undo_buffers lock while holding the engine lock to make // sure that entries are sorted by mark timestamp in the list. - std::lock_guard aborted_undo_buffers_guard( - storage_->aborted_undo_buffers_lock_); - // Release engine lock because we don't have to hold it anymore and - // emplace back could take a long time. - engine_guard.unlock(); - storage_->aborted_undo_buffers_.emplace_back( - mark_timestamp, std::move(transaction_.deltas)); + storage_->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { + // Release engine lock because we don't have to hold it anymore and + // emplace back could take a long time. + engine_guard.unlock(); + garbage_undo_buffers.emplace_back(mark_timestamp, + std::move(transaction_.deltas)); + }); + storage_->deleted_vertices_.WithLock([&](auto &deleted_vertices) { + deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); + }); + storage_->deleted_edges_.WithLock([&](auto &deleted_edges) { + deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); + }); } storage_->commit_log_.MarkFinished(transaction_.start_timestamp); @@ -559,23 +566,29 @@ void Storage::CollectGarbage() { } uint64_t oldest_active_start_timestamp = commit_log_.OldestActive(); - // We don't move undo buffers of unlinked transactions to - // marked_undo_buffers list immediately, because we would have to repeatedly - // take transaction engine lock. - std::list unlinked; + // We don't move undo buffers of unlinked transactions to garbage_undo_buffers + // list immediately, because we would have to repeatedly take + // garbage_undo_buffers lock. + std::list>> unlinked_undo_buffers; + + // We will free only vertices deleted up until now in this GC cycle. + // Otherwise, GC cycle might be prolonged by aborted transactions adding new + // edges and vertices to the lists over and over again. + std::list current_deleted_edges; + std::list current_deleted_vertices; + deleted_vertices_->swap(current_deleted_vertices); + deleted_edges_->swap(current_deleted_edges); while (true) { // We don't want to hold the lock on commited transactions for too long, // because that prevents other transactions from committing. Transaction *transaction; - { - std::lock_guard guard(committed_transactions_lock_); - if (committed_transactions_.empty()) { + auto committed_transactions_ptr = committed_transactions_.Lock(); + if (committed_transactions_ptr->empty()) { break; } - - transaction = &committed_transactions_.front(); + transaction = &committed_transactions_ptr->front(); } if (transaction->commit_timestamp->load(std::memory_order_acquire) >= @@ -583,7 +596,7 @@ void Storage::CollectGarbage() { break; } - // When unlinking a delta which is a first delta in its version chain, + // When unlinking a delta which is the first delta in its version chain, // special care has to be taken to avoid the following race condition: // // [Vertex] --> [Delta A] @@ -616,11 +629,7 @@ void Storage::CollectGarbage() { } vertex->delta = nullptr; if (vertex->deleted) { - // We must unlock the guard now because the lock it is holding - // might be destroyed when vertex is deleted. - vertex_guard.unlock(); - auto acc = vertices_.access(); - acc.remove(vertex->gid); + current_deleted_vertices.push_back(vertex->gid); } break; } @@ -633,12 +642,7 @@ void Storage::CollectGarbage() { continue; } if (edge->deleted) { - // We must unlock the guard now because the lock it is holding - // might be destroyed when vertex is deleted. - edge_guard.unlock(); - auto acc = edges_.access(); - - acc.remove(edge->gid); + current_deleted_edges.push_back(edge->gid); } break; } @@ -652,38 +656,54 @@ void Storage::CollectGarbage() { } } - { - std::lock_guard guard(committed_transactions_lock_); - // We use splice here to avoid allocating storage for new list nodes. - unlinked.splice(unlinked.begin(), committed_transactions_, - committed_transactions_.begin()); - } + committed_transactions_.WithLock([&](auto &committed_transactions) { + unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas)); + committed_transactions.pop_front(); + }); } - uint64_t mark_timestamp; { - std::lock_guard guard(engine_lock_); + uint64_t mark_timestamp; + std::unique_lock guard(engine_lock_); mark_timestamp = timestamp_; - } - - for (auto &transaction : unlinked) { - marked_undo_buffers_.emplace_back(mark_timestamp, - std::move(transaction.deltas)); - } - - while (!marked_undo_buffers_.empty() && - marked_undo_buffers_.front().first < oldest_active_start_timestamp) { - marked_undo_buffers_.pop_front(); + // Take garbage_undo_buffers lock while holding the engine lock to make + // sure that entries are sorted by mark timestamp in the list. + garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { + // Release engine lock because we don't have to hold it anymore and + // this could take a long time. + guard.unlock(); + // TODO(mtomic): holding garbage_undo_buffers_ lock here prevents + // transactions from aborting until we're done marking, maybe we should + // add them one-by-one or something + for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) { + timestamp = mark_timestamp; + } + garbage_undo_buffers.splice(garbage_undo_buffers.end(), + unlinked_undo_buffers); + }); } while (true) { - std::lock_guard aborted_undo_buffers_guard( - aborted_undo_buffers_lock_); - if (aborted_undo_buffers_.empty() || - aborted_undo_buffers_.front().first >= oldest_active_start_timestamp) { + auto garbage_undo_buffers_ptr = garbage_undo_buffers_.Lock(); + if (garbage_undo_buffers_ptr->empty() || + garbage_undo_buffers_ptr->front().first >= + oldest_active_start_timestamp) { break; } - aborted_undo_buffers_.pop_front(); + garbage_undo_buffers_ptr->pop_front(); + } + + { + auto vertex_acc = vertices_.access(); + for (auto vertex : current_deleted_vertices) { + CHECK(vertex_acc.remove(vertex)) << "Invalid database state!"; + } + } + { + auto edge_acc = edges_.access(); + for (auto edge : current_deleted_edges) { + CHECK(edge_acc.remove(edge)) << "Invalid database state!"; + } } } diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 5d2114ac4..a33422130 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -15,6 +15,7 @@ #include "utils/rw_lock.hpp" #include "utils/scheduler.hpp" #include "utils/skip_list.hpp" +#include "utils/synchronized.hpp" namespace storage { @@ -124,6 +125,8 @@ class Storage final { std::atomic vertex_id_{0}; std::atomic edge_id_{0}; + NameIdMapper name_id_mapper_; + // Transaction engine utils::SpinLock engine_lock_; uint64_t timestamp_{kTimestampInitialId}; @@ -134,18 +137,22 @@ class Storage final { // whatever. CommitLog commit_log_; - NameIdMapper name_id_mapper_; - - utils::SpinLock committed_transactions_lock_; - std::list committed_transactions_; - - utils::SpinLock aborted_undo_buffers_lock_; - std::list>> aborted_undo_buffers_; + utils::Synchronized, utils::SpinLock> + committed_transactions_; StorageGcConfig gc_config_; utils::Scheduler gc_runner_; std::mutex gc_lock_; - std::list>> marked_undo_buffers_; + + // Undo buffers that were unlinked and now are waiting to be freed. + utils::Synchronized>>, + utils::SpinLock> + garbage_undo_buffers_; + + // Vertices that are logically deleted and now are waiting to be removed from + // the main storage. + utils::Synchronized, utils::SpinLock> deleted_vertices_; + utils::Synchronized, utils::SpinLock> deleted_edges_; }; } // namespace storage