Implement deferred vertex/edge deletion
Summary: this will make GC for indices easier Reviewers: teon.banek, mferencevic Reviewed By: teon.banek, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2223
This commit is contained in:
parent
08bf993a47
commit
ffa76e59c9
@ -318,18 +318,19 @@ void Storage::Accessor::Commit() {
|
||||
// Take committed_transactions lock while holding the engine lock to
|
||||
// make sure that committed transactions are sorted by the commit
|
||||
// timestamp in the list.
|
||||
std::lock_guard<utils::SpinLock> committed_transactions_guard(
|
||||
storage_->committed_transactions_lock_);
|
||||
// TODO: release lock, and update all deltas to have a local copy
|
||||
// of the commit timestamp
|
||||
CHECK(transaction_.commit_timestamp != nullptr)
|
||||
<< "Invalid database state!";
|
||||
transaction_.commit_timestamp->store(commit_timestamp,
|
||||
std::memory_order_release);
|
||||
// Release engine lock because we don't have to hold it anymore and
|
||||
// emplace back could take a long time.
|
||||
engine_guard.unlock();
|
||||
storage_->committed_transactions_.emplace_back(std::move(transaction_));
|
||||
storage_->committed_transactions_.WithLock(
|
||||
[&](auto &committed_transactions) {
|
||||
// TODO: release lock, and update all deltas to have a local copy
|
||||
// of the commit timestamp
|
||||
CHECK(transaction_.commit_timestamp != nullptr)
|
||||
<< "Invalid database state!";
|
||||
transaction_.commit_timestamp->store(commit_timestamp,
|
||||
std::memory_order_release);
|
||||
// Release engine lock because we don't have to hold it anymore and
|
||||
// emplace back could take a long time.
|
||||
engine_guard.unlock();
|
||||
committed_transactions.emplace_back(std::move(transaction_));
|
||||
});
|
||||
}
|
||||
|
||||
storage_->commit_log_.MarkFinished(start_timestamp);
|
||||
@ -344,12 +345,11 @@ void Storage::Accessor::Commit() {
|
||||
void Storage::Accessor::Abort() {
|
||||
CHECK(is_transaction_active_) << "The transaction is already terminated!";
|
||||
|
||||
// We acquire accessors to the storage `SkipList`s so that we can safely
|
||||
// delete objects in the lists. While the accessor is alive, all objects (even
|
||||
// deleted ones) are guaranteed to still exist and pointers to them are still
|
||||
// valid.
|
||||
auto vertex_acc = storage_->vertices_.access();
|
||||
auto edge_acc = storage_->edges_.access();
|
||||
// We collect vertices and edges we've created here and then splice them into
|
||||
// `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one
|
||||
// by one and acquiring lock every time.
|
||||
std::list<Gid> my_deleted_vertices;
|
||||
std::list<Gid> my_deleted_edges;
|
||||
|
||||
for (const auto &delta : transaction_.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
@ -436,8 +436,8 @@ void Storage::Accessor::Abort() {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
CHECK(vertex_acc.remove(vertex->gid))
|
||||
<< "Invalid database state!";
|
||||
vertex->deleted = true;
|
||||
my_deleted_vertices.push_back(vertex->gid);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
@ -479,7 +479,8 @@ void Storage::Accessor::Abort() {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
CHECK(edge_acc.remove(edge->gid)) << "Invalid database state!";
|
||||
edge->deleted = true;
|
||||
my_deleted_edges.push_back(edge->gid);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
@ -513,15 +514,21 @@ void Storage::Accessor::Abort() {
|
||||
{
|
||||
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
|
||||
uint64_t mark_timestamp = storage_->timestamp_;
|
||||
// Take aborted_undo_buffers lock while holding the engine lock to make
|
||||
// Take garbage_undo_buffers lock while holding the engine lock to make
|
||||
// sure that entries are sorted by mark timestamp in the list.
|
||||
std::lock_guard<utils::SpinLock> aborted_undo_buffers_guard(
|
||||
storage_->aborted_undo_buffers_lock_);
|
||||
// Release engine lock because we don't have to hold it anymore and
|
||||
// emplace back could take a long time.
|
||||
engine_guard.unlock();
|
||||
storage_->aborted_undo_buffers_.emplace_back(
|
||||
mark_timestamp, std::move(transaction_.deltas));
|
||||
storage_->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
|
||||
// Release engine lock because we don't have to hold it anymore and
|
||||
// emplace back could take a long time.
|
||||
engine_guard.unlock();
|
||||
garbage_undo_buffers.emplace_back(mark_timestamp,
|
||||
std::move(transaction_.deltas));
|
||||
});
|
||||
storage_->deleted_vertices_.WithLock([&](auto &deleted_vertices) {
|
||||
deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices);
|
||||
});
|
||||
storage_->deleted_edges_.WithLock([&](auto &deleted_edges) {
|
||||
deleted_edges.splice(deleted_edges.begin(), my_deleted_edges);
|
||||
});
|
||||
}
|
||||
|
||||
storage_->commit_log_.MarkFinished(transaction_.start_timestamp);
|
||||
@ -559,23 +566,29 @@ void Storage::CollectGarbage() {
|
||||
}
|
||||
|
||||
uint64_t oldest_active_start_timestamp = commit_log_.OldestActive();
|
||||
// We don't move undo buffers of unlinked transactions to
|
||||
// marked_undo_buffers list immediately, because we would have to repeatedly
|
||||
// take transaction engine lock.
|
||||
std::list<Transaction> unlinked;
|
||||
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers
|
||||
// list immediately, because we would have to repeatedly take
|
||||
// garbage_undo_buffers lock.
|
||||
std::list<std::pair<uint64_t, std::list<Delta>>> unlinked_undo_buffers;
|
||||
|
||||
// We will free only vertices deleted up until now in this GC cycle.
|
||||
// Otherwise, GC cycle might be prolonged by aborted transactions adding new
|
||||
// edges and vertices to the lists over and over again.
|
||||
std::list<Gid> current_deleted_edges;
|
||||
std::list<Gid> current_deleted_vertices;
|
||||
deleted_vertices_->swap(current_deleted_vertices);
|
||||
deleted_edges_->swap(current_deleted_edges);
|
||||
|
||||
while (true) {
|
||||
// We don't want to hold the lock on commited transactions for too long,
|
||||
// because that prevents other transactions from committing.
|
||||
Transaction *transaction;
|
||||
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(committed_transactions_lock_);
|
||||
if (committed_transactions_.empty()) {
|
||||
auto committed_transactions_ptr = committed_transactions_.Lock();
|
||||
if (committed_transactions_ptr->empty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
transaction = &committed_transactions_.front();
|
||||
transaction = &committed_transactions_ptr->front();
|
||||
}
|
||||
|
||||
if (transaction->commit_timestamp->load(std::memory_order_acquire) >=
|
||||
@ -583,7 +596,7 @@ void Storage::CollectGarbage() {
|
||||
break;
|
||||
}
|
||||
|
||||
// When unlinking a delta which is a first delta in its version chain,
|
||||
// When unlinking a delta which is the first delta in its version chain,
|
||||
// special care has to be taken to avoid the following race condition:
|
||||
//
|
||||
// [Vertex] --> [Delta A]
|
||||
@ -616,11 +629,7 @@ void Storage::CollectGarbage() {
|
||||
}
|
||||
vertex->delta = nullptr;
|
||||
if (vertex->deleted) {
|
||||
// We must unlock the guard now because the lock it is holding
|
||||
// might be destroyed when vertex is deleted.
|
||||
vertex_guard.unlock();
|
||||
auto acc = vertices_.access();
|
||||
acc.remove(vertex->gid);
|
||||
current_deleted_vertices.push_back(vertex->gid);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -633,12 +642,7 @@ void Storage::CollectGarbage() {
|
||||
continue;
|
||||
}
|
||||
if (edge->deleted) {
|
||||
// We must unlock the guard now because the lock it is holding
|
||||
// might be destroyed when vertex is deleted.
|
||||
edge_guard.unlock();
|
||||
auto acc = edges_.access();
|
||||
|
||||
acc.remove(edge->gid);
|
||||
current_deleted_edges.push_back(edge->gid);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -652,38 +656,54 @@ void Storage::CollectGarbage() {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(committed_transactions_lock_);
|
||||
// We use splice here to avoid allocating storage for new list nodes.
|
||||
unlinked.splice(unlinked.begin(), committed_transactions_,
|
||||
committed_transactions_.begin());
|
||||
}
|
||||
committed_transactions_.WithLock([&](auto &committed_transactions) {
|
||||
unlinked_undo_buffers.emplace_back(0, std::move(transaction->deltas));
|
||||
committed_transactions.pop_front();
|
||||
});
|
||||
}
|
||||
|
||||
uint64_t mark_timestamp;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(engine_lock_);
|
||||
uint64_t mark_timestamp;
|
||||
std::unique_lock<utils::SpinLock> guard(engine_lock_);
|
||||
mark_timestamp = timestamp_;
|
||||
}
|
||||
|
||||
for (auto &transaction : unlinked) {
|
||||
marked_undo_buffers_.emplace_back(mark_timestamp,
|
||||
std::move(transaction.deltas));
|
||||
}
|
||||
|
||||
while (!marked_undo_buffers_.empty() &&
|
||||
marked_undo_buffers_.front().first < oldest_active_start_timestamp) {
|
||||
marked_undo_buffers_.pop_front();
|
||||
// Take garbage_undo_buffers lock while holding the engine lock to make
|
||||
// sure that entries are sorted by mark timestamp in the list.
|
||||
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
|
||||
// Release engine lock because we don't have to hold it anymore and
|
||||
// this could take a long time.
|
||||
guard.unlock();
|
||||
// TODO(mtomic): holding garbage_undo_buffers_ lock here prevents
|
||||
// transactions from aborting until we're done marking, maybe we should
|
||||
// add them one-by-one or something
|
||||
for (auto &[timestamp, undo_buffer] : unlinked_undo_buffers) {
|
||||
timestamp = mark_timestamp;
|
||||
}
|
||||
garbage_undo_buffers.splice(garbage_undo_buffers.end(),
|
||||
unlinked_undo_buffers);
|
||||
});
|
||||
}
|
||||
|
||||
while (true) {
|
||||
std::lock_guard<utils::SpinLock> aborted_undo_buffers_guard(
|
||||
aborted_undo_buffers_lock_);
|
||||
if (aborted_undo_buffers_.empty() ||
|
||||
aborted_undo_buffers_.front().first >= oldest_active_start_timestamp) {
|
||||
auto garbage_undo_buffers_ptr = garbage_undo_buffers_.Lock();
|
||||
if (garbage_undo_buffers_ptr->empty() ||
|
||||
garbage_undo_buffers_ptr->front().first >=
|
||||
oldest_active_start_timestamp) {
|
||||
break;
|
||||
}
|
||||
aborted_undo_buffers_.pop_front();
|
||||
garbage_undo_buffers_ptr->pop_front();
|
||||
}
|
||||
|
||||
{
|
||||
auto vertex_acc = vertices_.access();
|
||||
for (auto vertex : current_deleted_vertices) {
|
||||
CHECK(vertex_acc.remove(vertex)) << "Invalid database state!";
|
||||
}
|
||||
}
|
||||
{
|
||||
auto edge_acc = edges_.access();
|
||||
for (auto edge : current_deleted_edges) {
|
||||
CHECK(edge_acc.remove(edge)) << "Invalid database state!";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "utils/rw_lock.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
#include "utils/skip_list.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -124,6 +125,8 @@ class Storage final {
|
||||
std::atomic<uint64_t> vertex_id_{0};
|
||||
std::atomic<uint64_t> edge_id_{0};
|
||||
|
||||
NameIdMapper name_id_mapper_;
|
||||
|
||||
// Transaction engine
|
||||
utils::SpinLock engine_lock_;
|
||||
uint64_t timestamp_{kTimestampInitialId};
|
||||
@ -134,18 +137,22 @@ class Storage final {
|
||||
// whatever.
|
||||
CommitLog commit_log_;
|
||||
|
||||
NameIdMapper name_id_mapper_;
|
||||
|
||||
utils::SpinLock committed_transactions_lock_;
|
||||
std::list<Transaction> committed_transactions_;
|
||||
|
||||
utils::SpinLock aborted_undo_buffers_lock_;
|
||||
std::list<std::pair<uint64_t, std::list<Delta>>> aborted_undo_buffers_;
|
||||
utils::Synchronized<std::list<Transaction>, utils::SpinLock>
|
||||
committed_transactions_;
|
||||
|
||||
StorageGcConfig gc_config_;
|
||||
utils::Scheduler gc_runner_;
|
||||
std::mutex gc_lock_;
|
||||
std::list<std::pair<uint64_t, std::list<Delta>>> marked_undo_buffers_;
|
||||
|
||||
// Undo buffers that were unlinked and now are waiting to be freed.
|
||||
utils::Synchronized<std::list<std::pair<uint64_t, std::list<Delta>>>,
|
||||
utils::SpinLock>
|
||||
garbage_undo_buffers_;
|
||||
|
||||
// Vertices that are logically deleted and now are waiting to be removed from
|
||||
// the main storage.
|
||||
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_vertices_;
|
||||
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
Loading…
Reference in New Issue
Block a user