Move transaction to stack in storage v2
Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2196
This commit is contained in:
parent
c4c6febbc4
commit
d9a6775656
@ -70,8 +70,9 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
|
|||||||
/// and is primarily used to create the first delta for an object (that must be
|
/// and is primarily used to create the first delta for an object (that must be
|
||||||
/// a `DELETE_OBJECT` delta).
|
/// a `DELETE_OBJECT` delta).
|
||||||
inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
||||||
|
transaction->EnsureCommitTimestampExists();
|
||||||
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(),
|
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(),
|
||||||
&transaction->commit_timestamp,
|
transaction->commit_timestamp.get(),
|
||||||
transaction->command_id);
|
transaction->command_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,8 +81,9 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
|||||||
template <typename TObj, class... Args>
|
template <typename TObj, class... Args>
|
||||||
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object,
|
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object,
|
||||||
Args &&... args) {
|
Args &&... args) {
|
||||||
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)...,
|
transaction->EnsureCommitTimestampExists();
|
||||||
&transaction->commit_timestamp,
|
auto delta = &transaction->deltas.emplace_back(
|
||||||
|
std::forward<Args>(args)..., transaction->commit_timestamp.get(),
|
||||||
transaction->command_id);
|
transaction->command_id);
|
||||||
|
|
||||||
// The operations are written in such order so that both `next` and `prev`
|
// The operations are written in such order so that both `next` and `prev`
|
||||||
|
@ -21,52 +21,25 @@ Storage::~Storage() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Storage::Accessor::Accessor(Storage *storage)
|
Storage::Accessor::Accessor(Storage *storage, uint64_t transaction_id,
|
||||||
: storage_(storage), is_transaction_starter_(true) {
|
uint64_t start_timestamp)
|
||||||
// We acquire the transaction engine lock here because we access (and
|
: storage_(storage),
|
||||||
// modify) the transaction engine variables (`transaction_id` and
|
transaction_(transaction_id, start_timestamp),
|
||||||
// `timestamp`) below.
|
is_transaction_starter_(true),
|
||||||
uint64_t transaction_id;
|
is_transaction_active_(true) {}
|
||||||
uint64_t start_timestamp;
|
|
||||||
{
|
|
||||||
std::lock_guard<utils::SpinLock> guard(storage_->engine_lock_);
|
|
||||||
transaction_id = storage_->transaction_id_++;
|
|
||||||
start_timestamp = storage_->timestamp_++;
|
|
||||||
}
|
|
||||||
transaction_ = std::make_unique<Transaction>(transaction_id, start_timestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
Storage::Accessor::Accessor(Accessor &&other) noexcept
|
Storage::Accessor::Accessor(Accessor &&other) noexcept
|
||||||
: storage_(other.storage_),
|
: storage_(other.storage_),
|
||||||
transaction_(std::move(other.transaction_)),
|
transaction_(std::move(other.transaction_)),
|
||||||
is_transaction_starter_(true) {
|
is_transaction_starter_(true),
|
||||||
|
is_transaction_active_(other.is_transaction_active_) {
|
||||||
CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!";
|
CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!";
|
||||||
// Don't allow the other accessor to abort our transaction.
|
// Don't allow the other accessor to abort our transaction.
|
||||||
other.is_transaction_starter_ = false;
|
other.is_transaction_starter_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This operator isn't `noexcept` because the `Abort` function isn't
|
|
||||||
// `noexcept`.
|
|
||||||
Storage::Accessor &Storage::Accessor::operator=(Accessor &&other) {
|
|
||||||
if (this == &other) return *this;
|
|
||||||
|
|
||||||
if (is_transaction_starter_ && transaction_) {
|
|
||||||
Abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
storage_ = other.storage_;
|
|
||||||
transaction_ = std::move(other.transaction_);
|
|
||||||
is_transaction_starter_ = true;
|
|
||||||
|
|
||||||
CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!";
|
|
||||||
// Don't allow the other accessor to abort our transaction.
|
|
||||||
other.is_transaction_starter_ = false;
|
|
||||||
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
Storage::Accessor::~Accessor() {
|
Storage::Accessor::~Accessor() {
|
||||||
if (is_transaction_starter_ && transaction_) {
|
if (is_transaction_starter_ && is_transaction_active_) {
|
||||||
Abort();
|
Abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -74,12 +47,12 @@ Storage::Accessor::~Accessor() {
|
|||||||
VertexAccessor Storage::Accessor::CreateVertex() {
|
VertexAccessor Storage::Accessor::CreateVertex() {
|
||||||
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
|
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||||
auto acc = storage_->vertices_.access();
|
auto acc = storage_->vertices_.access();
|
||||||
auto delta = CreateDeleteObjectDelta(transaction_.get());
|
auto delta = CreateDeleteObjectDelta(&transaction_);
|
||||||
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
|
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
|
||||||
CHECK(inserted) << "The vertex must be inserted here!";
|
CHECK(inserted) << "The vertex must be inserted here!";
|
||||||
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
|
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
|
||||||
delta->prev.Set(&*it);
|
delta->prev.Set(&*it);
|
||||||
return VertexAccessor{&*it, transaction_.get()};
|
return VertexAccessor{&*it, &transaction_};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid,
|
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid,
|
||||||
@ -87,18 +60,18 @@ std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid,
|
|||||||
auto acc = storage_->vertices_.access();
|
auto acc = storage_->vertices_.access();
|
||||||
auto it = acc.find(gid);
|
auto it = acc.find(gid);
|
||||||
if (it == acc.end()) return std::nullopt;
|
if (it == acc.end()) return std::nullopt;
|
||||||
return VertexAccessor::Create(&*it, transaction_.get(), view);
|
return VertexAccessor::Create(&*it, &transaction_, view);
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<bool> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) {
|
Result<bool> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) {
|
||||||
CHECK(vertex->transaction_ == transaction_.get())
|
CHECK(vertex->transaction_ == &transaction_)
|
||||||
<< "VertexAccessor must be from the same transaction as the storage "
|
<< "VertexAccessor must be from the same transaction as the storage "
|
||||||
"accessor when deleting a vertex!";
|
"accessor when deleting a vertex!";
|
||||||
auto vertex_ptr = vertex->vertex_;
|
auto vertex_ptr = vertex->vertex_;
|
||||||
|
|
||||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||||
|
|
||||||
if (!PrepareForWrite(transaction_.get(), vertex_ptr))
|
if (!PrepareForWrite(&transaction_, vertex_ptr))
|
||||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||||
|
|
||||||
if (vertex_ptr->deleted) return Result<bool>{false};
|
if (vertex_ptr->deleted) return Result<bool>{false};
|
||||||
@ -106,15 +79,14 @@ Result<bool> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) {
|
|||||||
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty())
|
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty())
|
||||||
return Result<bool>{Error::VERTEX_HAS_EDGES};
|
return Result<bool>{Error::VERTEX_HAS_EDGES};
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), vertex_ptr,
|
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
|
||||||
Delta::RecreateObjectTag());
|
|
||||||
vertex_ptr->deleted = true;
|
vertex_ptr->deleted = true;
|
||||||
|
|
||||||
return Result<bool>{true};
|
return Result<bool>{true};
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
||||||
CHECK(vertex->transaction_ == transaction_.get())
|
CHECK(vertex->transaction_ == &transaction_)
|
||||||
<< "VertexAccessor must be from the same transaction as the storage "
|
<< "VertexAccessor must be from the same transaction as the storage "
|
||||||
"accessor when deleting a vertex!";
|
"accessor when deleting a vertex!";
|
||||||
auto vertex_ptr = vertex->vertex_;
|
auto vertex_ptr = vertex->vertex_;
|
||||||
@ -125,7 +97,7 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
|||||||
{
|
{
|
||||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||||
|
|
||||||
if (!PrepareForWrite(transaction_.get(), vertex_ptr))
|
if (!PrepareForWrite(&transaction_, vertex_ptr))
|
||||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||||
|
|
||||||
if (vertex_ptr->deleted) return Result<bool>{false};
|
if (vertex_ptr->deleted) return Result<bool>{false};
|
||||||
@ -136,8 +108,7 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
|||||||
|
|
||||||
for (const auto &item : in_edges) {
|
for (const auto &item : in_edges) {
|
||||||
auto [edge_type, from_vertex, edge] = item;
|
auto [edge_type, from_vertex, edge] = item;
|
||||||
EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr,
|
EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr, &transaction_};
|
||||||
transaction_.get()};
|
|
||||||
auto ret = DeleteEdge(&e);
|
auto ret = DeleteEdge(&e);
|
||||||
if (ret.IsError()) {
|
if (ret.IsError()) {
|
||||||
CHECK(ret.GetError() == Error::SERIALIZATION_ERROR)
|
CHECK(ret.GetError() == Error::SERIALIZATION_ERROR)
|
||||||
@ -147,7 +118,7 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
|||||||
}
|
}
|
||||||
for (const auto &item : out_edges) {
|
for (const auto &item : out_edges) {
|
||||||
auto [edge_type, to_vertex, edge] = item;
|
auto [edge_type, to_vertex, edge] = item;
|
||||||
EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, transaction_.get()};
|
EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, &transaction_};
|
||||||
auto ret = DeleteEdge(&e);
|
auto ret = DeleteEdge(&e);
|
||||||
if (ret.IsError()) {
|
if (ret.IsError()) {
|
||||||
CHECK(ret.GetError() == Error::SERIALIZATION_ERROR)
|
CHECK(ret.GetError() == Error::SERIALIZATION_ERROR)
|
||||||
@ -162,13 +133,12 @@ Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
|||||||
// vertex. Some other transaction could have modified the vertex in the
|
// vertex. Some other transaction could have modified the vertex in the
|
||||||
// meantime if we didn't have any edges to delete.
|
// meantime if we didn't have any edges to delete.
|
||||||
|
|
||||||
if (!PrepareForWrite(transaction_.get(), vertex_ptr))
|
if (!PrepareForWrite(&transaction_, vertex_ptr))
|
||||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||||
|
|
||||||
CHECK(!vertex_ptr->deleted) << "Invalid database state!";
|
CHECK(!vertex_ptr->deleted) << "Invalid database state!";
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), vertex_ptr,
|
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
|
||||||
Delta::RecreateObjectTag());
|
|
||||||
vertex_ptr->deleted = true;
|
vertex_ptr->deleted = true;
|
||||||
|
|
||||||
return Result<bool>{true};
|
return Result<bool>{true};
|
||||||
@ -180,7 +150,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
|
|||||||
CHECK(from->transaction_ == to->transaction_)
|
CHECK(from->transaction_ == to->transaction_)
|
||||||
<< "VertexAccessors must be from the same transaction when creating "
|
<< "VertexAccessors must be from the same transaction when creating "
|
||||||
"an edge!";
|
"an edge!";
|
||||||
CHECK(from->transaction_ == transaction_.get())
|
CHECK(from->transaction_ == &transaction_)
|
||||||
<< "VertexAccessors must be from the same transaction in when "
|
<< "VertexAccessors must be from the same transaction in when "
|
||||||
"creating an edge!";
|
"creating an edge!";
|
||||||
|
|
||||||
@ -202,39 +172,39 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
|
|||||||
guard_from.lock();
|
guard_from.lock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!PrepareForWrite(transaction_.get(), from_vertex))
|
if (!PrepareForWrite(&transaction_, from_vertex))
|
||||||
return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR};
|
return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR};
|
||||||
CHECK(!from_vertex->deleted) << "Invalid database state!";
|
CHECK(!from_vertex->deleted) << "Invalid database state!";
|
||||||
|
|
||||||
if (to_vertex != from_vertex) {
|
if (to_vertex != from_vertex) {
|
||||||
if (!PrepareForWrite(transaction_.get(), to_vertex))
|
if (!PrepareForWrite(&transaction_, to_vertex))
|
||||||
return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR};
|
return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR};
|
||||||
CHECK(!to_vertex->deleted) << "Invalid database state!";
|
CHECK(!to_vertex->deleted) << "Invalid database state!";
|
||||||
}
|
}
|
||||||
|
|
||||||
auto gid = storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel);
|
auto gid = storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||||
auto acc = storage_->edges_.access();
|
auto acc = storage_->edges_.access();
|
||||||
auto delta = CreateDeleteObjectDelta(transaction_.get());
|
auto delta = CreateDeleteObjectDelta(&transaction_);
|
||||||
auto [it, inserted] = acc.insert(Edge{storage::Gid::FromUint(gid), delta});
|
auto [it, inserted] = acc.insert(Edge{storage::Gid::FromUint(gid), delta});
|
||||||
CHECK(inserted) << "The edge must be inserted here!";
|
CHECK(inserted) << "The edge must be inserted here!";
|
||||||
CHECK(it != acc.end()) << "Invalid Edge accessor!";
|
CHECK(it != acc.end()) << "Invalid Edge accessor!";
|
||||||
auto edge = &*it;
|
auto edge = &*it;
|
||||||
delta->prev.Set(&*it);
|
delta->prev.Set(&*it);
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), from_vertex, Delta::RemoveOutEdgeTag(),
|
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(),
|
||||||
edge_type, to_vertex, edge);
|
edge_type, to_vertex, edge);
|
||||||
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
|
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), to_vertex, Delta::RemoveInEdgeTag(),
|
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(),
|
||||||
edge_type, from_vertex, edge);
|
edge_type, from_vertex, edge);
|
||||||
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
|
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
|
||||||
|
|
||||||
return Result<EdgeAccessor>{EdgeAccessor{edge, edge_type, from_vertex,
|
return Result<EdgeAccessor>{
|
||||||
to_vertex, transaction_.get()}};
|
EdgeAccessor{edge, edge_type, from_vertex, to_vertex, &transaction_}};
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
||||||
CHECK(edge->transaction_ == transaction_.get())
|
CHECK(edge->transaction_ == &transaction_)
|
||||||
<< "EdgeAccessor must be from the same transaction as the storage "
|
<< "EdgeAccessor must be from the same transaction as the storage "
|
||||||
"accessor when deleting an edge!";
|
"accessor when deleting an edge!";
|
||||||
auto edge_ptr = edge->edge_;
|
auto edge_ptr = edge->edge_;
|
||||||
@ -242,7 +212,7 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
|||||||
|
|
||||||
std::lock_guard<utils::SpinLock> guard(edge_ptr->lock);
|
std::lock_guard<utils::SpinLock> guard(edge_ptr->lock);
|
||||||
|
|
||||||
if (!PrepareForWrite(transaction_.get(), edge_ptr))
|
if (!PrepareForWrite(&transaction_, edge_ptr))
|
||||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||||
|
|
||||||
if (edge_ptr->deleted) return Result<bool>{false};
|
if (edge_ptr->deleted) return Result<bool>{false};
|
||||||
@ -265,20 +235,20 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
|||||||
guard_from.lock();
|
guard_from.lock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!PrepareForWrite(transaction_.get(), from_vertex))
|
if (!PrepareForWrite(&transaction_, from_vertex))
|
||||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||||
CHECK(!from_vertex->deleted) << "Invalid database state!";
|
CHECK(!from_vertex->deleted) << "Invalid database state!";
|
||||||
|
|
||||||
if (to_vertex != from_vertex) {
|
if (to_vertex != from_vertex) {
|
||||||
if (!PrepareForWrite(transaction_.get(), to_vertex))
|
if (!PrepareForWrite(&transaction_, to_vertex))
|
||||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||||
CHECK(!to_vertex->deleted) << "Invalid database state!";
|
CHECK(!to_vertex->deleted) << "Invalid database state!";
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), edge_ptr, Delta::RecreateObjectTag());
|
CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag());
|
||||||
edge_ptr->deleted = true;
|
edge_ptr->deleted = true;
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), from_vertex, Delta::AddOutEdgeTag(),
|
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(),
|
||||||
edge_type, to_vertex, edge_ptr);
|
edge_type, to_vertex, edge_ptr);
|
||||||
{
|
{
|
||||||
std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, to_vertex, edge_ptr};
|
std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, to_vertex, edge_ptr};
|
||||||
@ -289,8 +259,8 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
|||||||
from_vertex->out_edges.pop_back();
|
from_vertex->out_edges.pop_back();
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateAndLinkDelta(transaction_.get(), to_vertex, Delta::AddInEdgeTag(),
|
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type,
|
||||||
edge_type, from_vertex, edge_ptr);
|
from_vertex, edge_ptr);
|
||||||
{
|
{
|
||||||
std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, from_vertex,
|
std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, from_vertex,
|
||||||
edge_ptr};
|
edge_ptr};
|
||||||
@ -304,20 +274,19 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
|||||||
return Result<bool>{true};
|
return Result<bool>{true};
|
||||||
}
|
}
|
||||||
|
|
||||||
void Storage::Accessor::AdvanceCommand() { ++transaction_->command_id; }
|
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; }
|
||||||
|
|
||||||
void Storage::Accessor::Commit() {
|
void Storage::Accessor::Commit() {
|
||||||
CHECK(transaction_) << "The transaction is already terminated!";
|
CHECK(is_transaction_active_) << "The transaction is already terminated!";
|
||||||
CHECK(!transaction_->must_abort) << "The transaction can't be committed!";
|
CHECK(!transaction_.must_abort) << "The transaction can't be committed!";
|
||||||
|
|
||||||
if (transaction_->deltas.empty()) {
|
if (transaction_.deltas.empty()) {
|
||||||
// We don't have to update the commit timestamp here because no one reads
|
// We don't have to update the commit timestamp here because no one reads
|
||||||
// it.
|
// it.
|
||||||
storage_->commit_log_.MarkFinished(transaction_->start_timestamp);
|
storage_->commit_log_.MarkFinished(transaction_.start_timestamp);
|
||||||
transaction_ = nullptr;
|
|
||||||
} else {
|
} else {
|
||||||
// Save these so we can mark them used in the commit log.
|
// Save these so we can mark them used in the commit log.
|
||||||
uint64_t start_timestamp = transaction_->start_timestamp;
|
uint64_t start_timestamp = transaction_.start_timestamp;
|
||||||
uint64_t commit_timestamp;
|
uint64_t commit_timestamp;
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -331,7 +300,9 @@ void Storage::Accessor::Commit() {
|
|||||||
storage_->committed_transactions_lock_);
|
storage_->committed_transactions_lock_);
|
||||||
// TODO: release lock, and update all deltas to have a local copy
|
// TODO: release lock, and update all deltas to have a local copy
|
||||||
// of the commit timestamp
|
// of the commit timestamp
|
||||||
transaction_->commit_timestamp.store(commit_timestamp,
|
CHECK(transaction_.commit_timestamp != nullptr)
|
||||||
|
<< "Invalid database state!";
|
||||||
|
transaction_.commit_timestamp->store(commit_timestamp,
|
||||||
std::memory_order_release);
|
std::memory_order_release);
|
||||||
// Release engine lock because we don't have to hold it anymore and
|
// Release engine lock because we don't have to hold it anymore and
|
||||||
// emplace back could take a long time.
|
// emplace back could take a long time.
|
||||||
@ -342,14 +313,15 @@ void Storage::Accessor::Commit() {
|
|||||||
storage_->commit_log_.MarkFinished(start_timestamp);
|
storage_->commit_log_.MarkFinished(start_timestamp);
|
||||||
storage_->commit_log_.MarkFinished(commit_timestamp);
|
storage_->commit_log_.MarkFinished(commit_timestamp);
|
||||||
}
|
}
|
||||||
|
is_transaction_active_ = false;
|
||||||
if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) {
|
if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) {
|
||||||
storage_->CollectGarbage();
|
storage_->CollectGarbage();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Storage::Accessor::Abort() {
|
void Storage::Accessor::Abort() {
|
||||||
CHECK(transaction_) << "The transaction is already terminated!";
|
CHECK(is_transaction_active_) << "The transaction is already terminated!";
|
||||||
for (const auto &delta : transaction_->deltas) {
|
for (const auto &delta : transaction_.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
switch (prev.type) {
|
switch (prev.type) {
|
||||||
case PreviousPtr::Type::VERTEX: {
|
case PreviousPtr::Type::VERTEX: {
|
||||||
@ -358,7 +330,7 @@ void Storage::Accessor::Abort() {
|
|||||||
Delta *current = vertex->delta;
|
Delta *current = vertex->delta;
|
||||||
while (current != nullptr &&
|
while (current != nullptr &&
|
||||||
current->timestamp->load(std::memory_order_acquire) ==
|
current->timestamp->load(std::memory_order_acquire) ==
|
||||||
transaction_->transaction_id) {
|
transaction_.transaction_id) {
|
||||||
switch (current->action) {
|
switch (current->action) {
|
||||||
case Delta::Action::REMOVE_LABEL: {
|
case Delta::Action::REMOVE_LABEL: {
|
||||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||||
@ -462,7 +434,7 @@ void Storage::Accessor::Abort() {
|
|||||||
Delta *current = edge->delta;
|
Delta *current = edge->delta;
|
||||||
while (current != nullptr &&
|
while (current != nullptr &&
|
||||||
current->timestamp->load(std::memory_order_acquire) ==
|
current->timestamp->load(std::memory_order_acquire) ==
|
||||||
transaction_->transaction_id) {
|
transaction_.transaction_id) {
|
||||||
switch (current->action) {
|
switch (current->action) {
|
||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
auto it = edge->properties.find(current->property.key);
|
auto it = edge->properties.find(current->property.key);
|
||||||
@ -528,16 +500,30 @@ void Storage::Accessor::Abort() {
|
|||||||
// emplace back could take a long time.
|
// emplace back could take a long time.
|
||||||
engine_guard.unlock();
|
engine_guard.unlock();
|
||||||
storage_->aborted_undo_buffers_.emplace_back(
|
storage_->aborted_undo_buffers_.emplace_back(
|
||||||
mark_timestamp, std::move(transaction_->deltas));
|
mark_timestamp, std::move(transaction_.deltas));
|
||||||
}
|
}
|
||||||
|
|
||||||
storage_->commit_log_.MarkFinished(transaction_->start_timestamp);
|
storage_->commit_log_.MarkFinished(transaction_.start_timestamp);
|
||||||
transaction_ = nullptr;
|
is_transaction_active_ = false;
|
||||||
if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) {
|
if (storage_->gc_config_.type == StorageGcConfig::Type::ON_FINISH) {
|
||||||
storage_->CollectGarbage();
|
storage_->CollectGarbage();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Storage::Accessor Storage::Access() {
|
||||||
|
// We acquire the transaction engine lock here because we access (and
|
||||||
|
// modify) the transaction engine variables (`transaction_id` and
|
||||||
|
// `timestamp`) below.
|
||||||
|
uint64_t transaction_id;
|
||||||
|
uint64_t start_timestamp;
|
||||||
|
{
|
||||||
|
std::lock_guard<utils::SpinLock> guard(engine_lock_);
|
||||||
|
transaction_id = transaction_id_++;
|
||||||
|
start_timestamp = timestamp_++;
|
||||||
|
}
|
||||||
|
return Accessor{this, transaction_id, start_timestamp};
|
||||||
|
}
|
||||||
|
|
||||||
void Storage::CollectGarbage() {
|
void Storage::CollectGarbage() {
|
||||||
// Garbage collection must be performed in two phases. In the first phase,
|
// Garbage collection must be performed in two phases. In the first phase,
|
||||||
// deltas that won't be applied by any transaction anymore are unlinked from
|
// deltas that won't be applied by any transaction anymore are unlinked from
|
||||||
@ -555,7 +541,7 @@ void Storage::CollectGarbage() {
|
|||||||
// We don't move undo buffers of unlinked transactions to
|
// We don't move undo buffers of unlinked transactions to
|
||||||
// marked_undo_buffers list immediately, because we would have to repeatedly
|
// marked_undo_buffers list immediately, because we would have to repeatedly
|
||||||
// take transaction engine lock.
|
// take transaction engine lock.
|
||||||
std::list<std::unique_ptr<Transaction>> unlinked;
|
std::list<Transaction> unlinked;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
// We don't want to hold the lock on commited transactions for too long,
|
// We don't want to hold the lock on commited transactions for too long,
|
||||||
@ -568,10 +554,11 @@ void Storage::CollectGarbage() {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
transaction = committed_transactions_.front().get();
|
transaction = &committed_transactions_.front();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (transaction->commit_timestamp >= oldest_active_start_timestamp) {
|
if (transaction->commit_timestamp->load(std::memory_order_acquire) >=
|
||||||
|
oldest_active_start_timestamp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -660,7 +647,7 @@ void Storage::CollectGarbage() {
|
|||||||
|
|
||||||
for (auto &transaction : unlinked) {
|
for (auto &transaction : unlinked) {
|
||||||
marked_undo_buffers_.emplace_back(mark_timestamp,
|
marked_undo_buffers_.emplace_back(mark_timestamp,
|
||||||
std::move(transaction->deltas));
|
std::move(transaction.deltas));
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!marked_undo_buffers_.empty() &&
|
while (!marked_undo_buffers_.empty() &&
|
||||||
|
@ -49,18 +49,21 @@ class Storage final {
|
|||||||
~Storage();
|
~Storage();
|
||||||
|
|
||||||
class Accessor final {
|
class Accessor final {
|
||||||
public:
|
private:
|
||||||
explicit Accessor(Storage *storage);
|
friend class Storage;
|
||||||
|
|
||||||
|
Accessor(Storage *storage, uint64_t transaction_id,
|
||||||
|
uint64_t start_timestamp);
|
||||||
|
|
||||||
|
public:
|
||||||
Accessor(const Accessor &) = delete;
|
Accessor(const Accessor &) = delete;
|
||||||
Accessor &operator=(const Accessor &) = delete;
|
Accessor &operator=(const Accessor &) = delete;
|
||||||
|
Accessor &operator=(Accessor &&other) = delete;
|
||||||
|
|
||||||
|
// NOTE: After the accessor is moved, all objects derived from it (accessors
|
||||||
|
// and iterators) are *invalid*. You have to get all derived objects again.
|
||||||
Accessor(Accessor &&other) noexcept;
|
Accessor(Accessor &&other) noexcept;
|
||||||
|
|
||||||
// This operator isn't `noexcept` because the `Abort` function isn't
|
|
||||||
// `noexcept`.
|
|
||||||
Accessor &operator=(Accessor &&other);
|
|
||||||
|
|
||||||
~Accessor();
|
~Accessor();
|
||||||
|
|
||||||
VertexAccessor CreateVertex();
|
VertexAccessor CreateVertex();
|
||||||
@ -84,13 +87,12 @@ class Storage final {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
Storage *storage_;
|
Storage *storage_;
|
||||||
// TODO: when we are able to move Transaction objects without breaking the
|
Transaction transaction_;
|
||||||
// pointers in Delta, we can get rid of the unique pointer here
|
|
||||||
std::unique_ptr<Transaction> transaction_;
|
|
||||||
bool is_transaction_starter_;
|
bool is_transaction_starter_;
|
||||||
|
bool is_transaction_active_;
|
||||||
};
|
};
|
||||||
|
|
||||||
Accessor Access() { return Accessor{this}; }
|
Accessor Access();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void CollectGarbage();
|
void CollectGarbage();
|
||||||
@ -112,9 +114,7 @@ class Storage final {
|
|||||||
CommitLog commit_log_;
|
CommitLog commit_log_;
|
||||||
|
|
||||||
utils::SpinLock committed_transactions_lock_;
|
utils::SpinLock committed_transactions_lock_;
|
||||||
// TODO: when we are able to move Transaction objects without breaking the
|
std::list<Transaction> committed_transactions_;
|
||||||
// pointers in Delta, we can get rid of the unique pointer here
|
|
||||||
std::list<std::unique_ptr<Transaction>> committed_transactions_;
|
|
||||||
|
|
||||||
utils::SpinLock aborted_undo_buffers_lock_;
|
utils::SpinLock aborted_undo_buffers_lock_;
|
||||||
std::list<std::pair<uint64_t, std::list<Delta>>> aborted_undo_buffers_;
|
std::list<std::pair<uint64_t, std::list<Delta>>> aborted_undo_buffers_;
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
#include "utils/skip_list.hpp"
|
#include "utils/skip_list.hpp"
|
||||||
|
|
||||||
@ -18,14 +19,13 @@ struct Transaction {
|
|||||||
Transaction(uint64_t transaction_id, uint64_t start_timestamp)
|
Transaction(uint64_t transaction_id, uint64_t start_timestamp)
|
||||||
: transaction_id(transaction_id),
|
: transaction_id(transaction_id),
|
||||||
start_timestamp(start_timestamp),
|
start_timestamp(start_timestamp),
|
||||||
commit_timestamp(transaction_id),
|
|
||||||
command_id(0),
|
command_id(0),
|
||||||
must_abort(false) {}
|
must_abort(false) {}
|
||||||
|
|
||||||
Transaction(Transaction &&other) noexcept
|
Transaction(Transaction &&other) noexcept
|
||||||
: transaction_id(other.transaction_id),
|
: transaction_id(other.transaction_id),
|
||||||
start_timestamp(other.start_timestamp),
|
start_timestamp(other.start_timestamp),
|
||||||
commit_timestamp(other.commit_timestamp.load()),
|
commit_timestamp(std::move(other.commit_timestamp)),
|
||||||
command_id(other.command_id),
|
command_id(other.command_id),
|
||||||
deltas(std::move(other.deltas)),
|
deltas(std::move(other.deltas)),
|
||||||
must_abort(other.must_abort) {}
|
must_abort(other.must_abort) {}
|
||||||
@ -36,9 +36,18 @@ struct Transaction {
|
|||||||
|
|
||||||
~Transaction() {}
|
~Transaction() {}
|
||||||
|
|
||||||
|
void EnsureCommitTimestampExists() {
|
||||||
|
if (commit_timestamp != nullptr) return;
|
||||||
|
commit_timestamp = std::make_unique<std::atomic<uint64_t>>(transaction_id);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t transaction_id;
|
uint64_t transaction_id;
|
||||||
uint64_t start_timestamp;
|
uint64_t start_timestamp;
|
||||||
std::atomic<uint64_t> commit_timestamp;
|
// The `Transaction` object is stack allocated, but the `commit_timestamp`
|
||||||
|
// must be heap allocated because `Delta`s have a pointer to it, and that
|
||||||
|
// pointer must stay valid after the `Transaction` is moved into
|
||||||
|
// `commited_transactions_` list for GC.
|
||||||
|
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp;
|
||||||
uint64_t command_id;
|
uint64_t command_id;
|
||||||
std::list<Delta> deltas;
|
std::list<Delta> deltas;
|
||||||
bool must_abort;
|
bool must_abort;
|
||||||
|
Loading…
Reference in New Issue
Block a user