From d9a67756569b697c846c224fd6b8fda5b97fc8aa Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Thu, 11 Jul 2019 13:27:50 +0200 Subject: [PATCH] Move transaction to stack in storage v2 Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2196 --- src/storage/v2/mvcc.hpp | 10 ++- src/storage/v2/storage.cpp | 159 +++++++++++++++------------------ src/storage/v2/storage.hpp | 26 +++--- src/storage/v2/transaction.hpp | 15 +++- 4 files changed, 104 insertions(+), 106 deletions(-) diff --git a/src/storage/v2/mvcc.hpp b/src/storage/v2/mvcc.hpp index 14d7ebf41..34ec03761 100644 --- a/src/storage/v2/mvcc.hpp +++ b/src/storage/v2/mvcc.hpp @@ -70,8 +70,9 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) { /// and is primarily used to create the first delta for an object (that must be /// a `DELETE_OBJECT` delta). inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { + transaction->EnsureCommitTimestampExists(); return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), - &transaction->commit_timestamp, + transaction->commit_timestamp.get(), transaction->command_id); } @@ -80,9 +81,10 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { template inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&... args) { - auto delta = &transaction->deltas.emplace_back(std::forward(args)..., - &transaction->commit_timestamp, - transaction->command_id); + transaction->EnsureCommitTimestampExists(); + auto delta = &transaction->deltas.emplace_back( + std::forward(args)..., transaction->commit_timestamp.get(), + transaction->command_id); // The operations are written in such order so that both `next` and `prev` // chains are valid at all times. The chains must be valid at all times diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index dc2aa07d4..9a298da83 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -21,52 +21,25 @@ Storage::~Storage() { } } -Storage::Accessor::Accessor(Storage *storage) - : storage_(storage), is_transaction_starter_(true) { - // We acquire the transaction engine lock here because we access (and - // modify) the transaction engine variables (`transaction_id` and - // `timestamp`) below. - uint64_t transaction_id; - uint64_t start_timestamp; - { - std::lock_guard guard(storage_->engine_lock_); - transaction_id = storage_->transaction_id_++; - start_timestamp = storage_->timestamp_++; - } - transaction_ = std::make_unique(transaction_id, start_timestamp); -} +Storage::Accessor::Accessor(Storage *storage, uint64_t transaction_id, + uint64_t start_timestamp) + : storage_(storage), + transaction_(transaction_id, start_timestamp), + is_transaction_starter_(true), + is_transaction_active_(true) {} Storage::Accessor::Accessor(Accessor &&other) noexcept : storage_(other.storage_), transaction_(std::move(other.transaction_)), - is_transaction_starter_(true) { + is_transaction_starter_(true), + is_transaction_active_(other.is_transaction_active_) { CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!"; // Don't allow the other accessor to abort our transaction. other.is_transaction_starter_ = false; } -// This operator isn't `noexcept` because the `Abort` function isn't -// `noexcept`. -Storage::Accessor &Storage::Accessor::operator=(Accessor &&other) { - if (this == &other) return *this; - - if (is_transaction_starter_ && transaction_) { - Abort(); - } - - storage_ = other.storage_; - transaction_ = std::move(other.transaction_); - is_transaction_starter_ = true; - - CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!"; - // Don't allow the other accessor to abort our transaction. - other.is_transaction_starter_ = false; - - return *this; -} - Storage::Accessor::~Accessor() { - if (is_transaction_starter_ && transaction_) { + if (is_transaction_starter_ && is_transaction_active_) { Abort(); } } @@ -74,12 +47,12 @@ Storage::Accessor::~Accessor() { VertexAccessor Storage::Accessor::CreateVertex() { auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel); auto acc = storage_->vertices_.access(); - auto delta = CreateDeleteObjectDelta(transaction_.get()); + auto delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta}); CHECK(inserted) << "The vertex must be inserted here!"; CHECK(it != acc.end()) << "Invalid Vertex accessor!"; delta->prev.Set(&*it); - return VertexAccessor{&*it, transaction_.get()}; + return VertexAccessor{&*it, &transaction_}; } std::optional Storage::Accessor::FindVertex(Gid gid, @@ -87,18 +60,18 @@ std::optional Storage::Accessor::FindVertex(Gid gid, auto acc = storage_->vertices_.access(); auto it = acc.find(gid); if (it == acc.end()) return std::nullopt; - return VertexAccessor::Create(&*it, transaction_.get(), view); + return VertexAccessor::Create(&*it, &transaction_, view); } Result Storage::Accessor::DeleteVertex(VertexAccessor *vertex) { - CHECK(vertex->transaction_ == transaction_.get()) + CHECK(vertex->transaction_ == &transaction_) << "VertexAccessor must be from the same transaction as the storage " "accessor when deleting a vertex!"; auto vertex_ptr = vertex->vertex_; std::lock_guard guard(vertex_ptr->lock); - if (!PrepareForWrite(transaction_.get(), vertex_ptr)) + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Result{Error::SERIALIZATION_ERROR}; if (vertex_ptr->deleted) return Result{false}; @@ -106,15 +79,14 @@ Result Storage::Accessor::DeleteVertex(VertexAccessor *vertex) { if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Result{Error::VERTEX_HAS_EDGES}; - CreateAndLinkDelta(transaction_.get(), vertex_ptr, - Delta::RecreateObjectTag()); + CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); vertex_ptr->deleted = true; return Result{true}; } Result Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { - CHECK(vertex->transaction_ == transaction_.get()) + CHECK(vertex->transaction_ == &transaction_) << "VertexAccessor must be from the same transaction as the storage " "accessor when deleting a vertex!"; auto vertex_ptr = vertex->vertex_; @@ -125,7 +97,7 @@ Result Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { { std::lock_guard guard(vertex_ptr->lock); - if (!PrepareForWrite(transaction_.get(), vertex_ptr)) + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Result{Error::SERIALIZATION_ERROR}; if (vertex_ptr->deleted) return Result{false}; @@ -136,8 +108,7 @@ Result Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { for (const auto &item : in_edges) { auto [edge_type, from_vertex, edge] = item; - EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr, - transaction_.get()}; + EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr, &transaction_}; auto ret = DeleteEdge(&e); if (ret.IsError()) { CHECK(ret.GetError() == Error::SERIALIZATION_ERROR) @@ -147,7 +118,7 @@ Result Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { } for (const auto &item : out_edges) { auto [edge_type, to_vertex, edge] = item; - EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, transaction_.get()}; + EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, &transaction_}; auto ret = DeleteEdge(&e); if (ret.IsError()) { CHECK(ret.GetError() == Error::SERIALIZATION_ERROR) @@ -162,13 +133,12 @@ Result Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) { // vertex. Some other transaction could have modified the vertex in the // meantime if we didn't have any edges to delete. - if (!PrepareForWrite(transaction_.get(), vertex_ptr)) + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Result{Error::SERIALIZATION_ERROR}; CHECK(!vertex_ptr->deleted) << "Invalid database state!"; - CreateAndLinkDelta(transaction_.get(), vertex_ptr, - Delta::RecreateObjectTag()); + CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); vertex_ptr->deleted = true; return Result{true}; @@ -180,7 +150,7 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, CHECK(from->transaction_ == to->transaction_) << "VertexAccessors must be from the same transaction when creating " "an edge!"; - CHECK(from->transaction_ == transaction_.get()) + CHECK(from->transaction_ == &transaction_) << "VertexAccessors must be from the same transaction in when " "creating an edge!"; @@ -202,39 +172,39 @@ Result Storage::Accessor::CreateEdge(VertexAccessor *from, guard_from.lock(); } - if (!PrepareForWrite(transaction_.get(), from_vertex)) + if (!PrepareForWrite(&transaction_, from_vertex)) return Result{Error::SERIALIZATION_ERROR}; CHECK(!from_vertex->deleted) << "Invalid database state!"; if (to_vertex != from_vertex) { - if (!PrepareForWrite(transaction_.get(), to_vertex)) + if (!PrepareForWrite(&transaction_, to_vertex)) return Result{Error::SERIALIZATION_ERROR}; CHECK(!to_vertex->deleted) << "Invalid database state!"; } auto gid = storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel); auto acc = storage_->edges_.access(); - auto delta = CreateDeleteObjectDelta(transaction_.get()); + auto delta = CreateDeleteObjectDelta(&transaction_); auto [it, inserted] = acc.insert(Edge{storage::Gid::FromUint(gid), delta}); CHECK(inserted) << "The edge must be inserted here!"; CHECK(it != acc.end()) << "Invalid Edge accessor!"; auto edge = &*it; delta->prev.Set(&*it); - CreateAndLinkDelta(transaction_.get(), from_vertex, Delta::RemoveOutEdgeTag(), + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); - CreateAndLinkDelta(transaction_.get(), to_vertex, Delta::RemoveInEdgeTag(), + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); - return Result{EdgeAccessor{edge, edge_type, from_vertex, - to_vertex, transaction_.get()}}; + return Result{ + EdgeAccessor{edge, edge_type, from_vertex, to_vertex, &transaction_}}; } Result Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { - CHECK(edge->transaction_ == transaction_.get()) + CHECK(edge->transaction_ == &transaction_) << "EdgeAccessor must be from the same transaction as the storage " "accessor when deleting an edge!"; auto edge_ptr = edge->edge_; @@ -242,7 +212,7 @@ Result Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { std::lock_guard guard(edge_ptr->lock); - if (!PrepareForWrite(transaction_.get(), edge_ptr)) + if (!PrepareForWrite(&transaction_, edge_ptr)) return Result{Error::SERIALIZATION_ERROR}; if (edge_ptr->deleted) return Result{false}; @@ -265,20 +235,20 @@ Result Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { guard_from.lock(); } - if (!PrepareForWrite(transaction_.get(), from_vertex)) + if (!PrepareForWrite(&transaction_, from_vertex)) return Result{Error::SERIALIZATION_ERROR}; CHECK(!from_vertex->deleted) << "Invalid database state!"; if (to_vertex != from_vertex) { - if (!PrepareForWrite(transaction_.get(), to_vertex)) + if (!PrepareForWrite(&transaction_, to_vertex)) return Result{Error::SERIALIZATION_ERROR}; CHECK(!to_vertex->deleted) << "Invalid database state!"; } - CreateAndLinkDelta(transaction_.get(), edge_ptr, Delta::RecreateObjectTag()); + CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag()); edge_ptr->deleted = true; - CreateAndLinkDelta(transaction_.get(), from_vertex, Delta::AddOutEdgeTag(), + CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ptr); { std::tuple link{edge_type, to_vertex, edge_ptr}; @@ -289,8 +259,8 @@ Result Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { from_vertex->out_edges.pop_back(); } - CreateAndLinkDelta(transaction_.get(), to_vertex, Delta::AddInEdgeTag(), - edge_type, from_vertex, edge_ptr); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, + from_vertex, edge_ptr); { std::tuple link{edge_type, from_vertex, edge_ptr}; @@ -304,20 +274,19 @@ Result Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { return Result{true}; } -void Storage::Accessor::AdvanceCommand() { ++transaction_->command_id; } +void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; } void Storage::Accessor::Commit() { - CHECK(transaction_) << "The transaction is already terminated!"; - CHECK(!transaction_->must_abort) << "The transaction can't be committed!"; + CHECK(is_transaction_active_) << "The transaction is already terminated!"; + CHECK(!transaction_.must_abort) << "The transaction can't be committed!"; - if (transaction_->deltas.empty()) { + if (transaction_.deltas.empty()) { // We don't have to update the commit timestamp here because no one reads // it. - storage_->commit_log_.MarkFinished(transaction_->start_timestamp); - transaction_ = nullptr; + storage_->commit_log_.MarkFinished(transaction_.start_timestamp); } else { // Save these so we can mark them used in the commit log. - uint64_t start_timestamp = transaction_->start_timestamp; + uint64_t start_timestamp = transaction_.start_timestamp; uint64_t commit_timestamp; { @@ -331,7 +300,9 @@ void Storage::Accessor::Commit() { storage_->committed_transactions_lock_); // TODO: release lock, and update all deltas to have a local copy // of the commit timestamp - transaction_->commit_timestamp.store(commit_timestamp, + CHECK(transaction_.commit_timestamp != nullptr) + << "Invalid database state!"; + transaction_.commit_timestamp->store(commit_timestamp, std::memory_order_release); // Release engine lock because we don't have to hold it anymore and // emplace back could take a long time. @@ -342,14 +313,15 @@ void Storage::Accessor::Commit() { storage_->commit_log_.MarkFinished(start_timestamp); storage_->commit_log_.MarkFinished(commit_timestamp); } + is_transaction_active_ = false; if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) { storage_->CollectGarbage(); } } void Storage::Accessor::Abort() { - CHECK(transaction_) << "The transaction is already terminated!"; - for (const auto &delta : transaction_->deltas) { + CHECK(is_transaction_active_) << "The transaction is already terminated!"; + for (const auto &delta : transaction_.deltas) { auto prev = delta.prev.Get(); switch (prev.type) { case PreviousPtr::Type::VERTEX: { @@ -358,7 +330,7 @@ void Storage::Accessor::Abort() { Delta *current = vertex->delta; while (current != nullptr && current->timestamp->load(std::memory_order_acquire) == - transaction_->transaction_id) { + transaction_.transaction_id) { switch (current->action) { case Delta::Action::REMOVE_LABEL: { auto it = std::find(vertex->labels.begin(), vertex->labels.end(), @@ -462,7 +434,7 @@ void Storage::Accessor::Abort() { Delta *current = edge->delta; while (current != nullptr && current->timestamp->load(std::memory_order_acquire) == - transaction_->transaction_id) { + transaction_.transaction_id) { switch (current->action) { case Delta::Action::SET_PROPERTY: { auto it = edge->properties.find(current->property.key); @@ -528,16 +500,30 @@ void Storage::Accessor::Abort() { // emplace back could take a long time. engine_guard.unlock(); storage_->aborted_undo_buffers_.emplace_back( - mark_timestamp, std::move(transaction_->deltas)); + mark_timestamp, std::move(transaction_.deltas)); } - storage_->commit_log_.MarkFinished(transaction_->start_timestamp); - transaction_ = nullptr; + storage_->commit_log_.MarkFinished(transaction_.start_timestamp); + is_transaction_active_ = false; if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) { storage_->CollectGarbage(); } } +Storage::Accessor Storage::Access() { + // We acquire the transaction engine lock here because we access (and + // modify) the transaction engine variables (`transaction_id` and + // `timestamp`) below. + uint64_t transaction_id; + uint64_t start_timestamp; + { + std::lock_guard guard(engine_lock_); + transaction_id = transaction_id_++; + start_timestamp = timestamp_++; + } + return Accessor{this, transaction_id, start_timestamp}; +} + void Storage::CollectGarbage() { // Garbage collection must be performed in two phases. In the first phase, // deltas that won't be applied by any transaction anymore are unlinked from @@ -555,7 +541,7 @@ void Storage::CollectGarbage() { // We don't move undo buffers of unlinked transactions to // marked_undo_buffers list immediately, because we would have to repeatedly // take transaction engine lock. - std::list> unlinked; + std::list unlinked; while (true) { // We don't want to hold the lock on commited transactions for too long, @@ -568,10 +554,11 @@ void Storage::CollectGarbage() { break; } - transaction = committed_transactions_.front().get(); + transaction = &committed_transactions_.front(); } - if (transaction->commit_timestamp >= oldest_active_start_timestamp) { + if (transaction->commit_timestamp->load(std::memory_order_acquire) >= + oldest_active_start_timestamp) { break; } @@ -660,7 +647,7 @@ void Storage::CollectGarbage() { for (auto &transaction : unlinked) { marked_undo_buffers_.emplace_back(mark_timestamp, - std::move(transaction->deltas)); + std::move(transaction.deltas)); } while (!marked_undo_buffers_.empty() && diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index fdb76c37e..fe5207018 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -49,18 +49,21 @@ class Storage final { ~Storage(); class Accessor final { - public: - explicit Accessor(Storage *storage); + private: + friend class Storage; + Accessor(Storage *storage, uint64_t transaction_id, + uint64_t start_timestamp); + + public: Accessor(const Accessor &) = delete; Accessor &operator=(const Accessor &) = delete; + Accessor &operator=(Accessor &&other) = delete; + // NOTE: After the accessor is moved, all objects derived from it (accessors + // and iterators) are *invalid*. You have to get all derived objects again. Accessor(Accessor &&other) noexcept; - // This operator isn't `noexcept` because the `Abort` function isn't - // `noexcept`. - Accessor &operator=(Accessor &&other); - ~Accessor(); VertexAccessor CreateVertex(); @@ -84,13 +87,12 @@ class Storage final { private: Storage *storage_; - // TODO: when we are able to move Transaction objects without breaking the - // pointers in Delta, we can get rid of the unique pointer here - std::unique_ptr transaction_; + Transaction transaction_; bool is_transaction_starter_; + bool is_transaction_active_; }; - Accessor Access() { return Accessor{this}; } + Accessor Access(); private: void CollectGarbage(); @@ -112,9 +114,7 @@ class Storage final { CommitLog commit_log_; utils::SpinLock committed_transactions_lock_; - // TODO: when we are able to move Transaction objects without breaking the - // pointers in Delta, we can get rid of the unique pointer here - std::list> committed_transactions_; + std::list committed_transactions_; utils::SpinLock aborted_undo_buffers_lock_; std::list>> aborted_undo_buffers_; diff --git a/src/storage/v2/transaction.hpp b/src/storage/v2/transaction.hpp index a1c2fabb8..4854b4da4 100644 --- a/src/storage/v2/transaction.hpp +++ b/src/storage/v2/transaction.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "utils/skip_list.hpp" @@ -18,14 +19,13 @@ struct Transaction { Transaction(uint64_t transaction_id, uint64_t start_timestamp) : transaction_id(transaction_id), start_timestamp(start_timestamp), - commit_timestamp(transaction_id), command_id(0), must_abort(false) {} Transaction(Transaction &&other) noexcept : transaction_id(other.transaction_id), start_timestamp(other.start_timestamp), - commit_timestamp(other.commit_timestamp.load()), + commit_timestamp(std::move(other.commit_timestamp)), command_id(other.command_id), deltas(std::move(other.deltas)), must_abort(other.must_abort) {} @@ -36,9 +36,18 @@ struct Transaction { ~Transaction() {} + void EnsureCommitTimestampExists() { + if (commit_timestamp != nullptr) return; + commit_timestamp = std::make_unique>(transaction_id); + } + uint64_t transaction_id; uint64_t start_timestamp; - std::atomic commit_timestamp; + // The `Transaction` object is stack allocated, but the `commit_timestamp` + // must be heap allocated because `Delta`s have a pointer to it, and that + // pointer must stay valid after the `Transaction` is moved into + // `commited_transactions_` list for GC. + std::unique_ptr> commit_timestamp; uint64_t command_id; std::list deltas; bool must_abort;