diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c09fd947d..0de998382 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -153,7 +153,7 @@ set(mg_distributed_sources storage/distributed/vertex_accessor.cpp storage/locking/record_lock.cpp memgraph_init.cpp - transactions/single_node/engine_single_node.cpp + transactions/distributed/engine_single_node.cpp ) # ----------------------------------------------------------------------------- diff --git a/src/transactions/distributed/engine_master.hpp b/src/transactions/distributed/engine_master.hpp index b99027d67..e29b88513 100644 --- a/src/transactions/distributed/engine_master.hpp +++ b/src/transactions/distributed/engine_master.hpp @@ -5,7 +5,7 @@ #include "communication/rpc/server.hpp" #include "distributed/coordination.hpp" #include "transactions/distributed/engine_distributed.hpp" -#include "transactions/single_node/engine_single_node.hpp" +#include "transactions/distributed/engine_single_node.hpp" namespace tx { diff --git a/src/transactions/distributed/engine_single_node.cpp b/src/transactions/distributed/engine_single_node.cpp new file mode 100644 index 000000000..f8421eb1b --- /dev/null +++ b/src/transactions/distributed/engine_single_node.cpp @@ -0,0 +1,133 @@ +#include "transactions/distributed/engine_single_node.hpp" + +#include +#include + +#include "glog/logging.h" + +#include "durability/distributed/state_delta.hpp" + +namespace tx { + +EngineSingleNode::EngineSingleNode(durability::WriteAheadLog *wal) + : wal_(wal) {} + +Transaction *EngineSingleNode::Begin() { + VLOG(11) << "[Tx] Starting transaction " << counter_ + 1; + std::lock_guard guard(lock_); + + TransactionId id{++counter_}; + auto t = CreateTransaction(id, active_); + active_.insert(id); + store_.emplace(id, t); + if (wal_) { + wal_->Emplace(database::StateDelta::TxBegin(id)); + } + return t; +} + +CommandId EngineSingleNode::Advance(TransactionId id) { + std::lock_guard guard(lock_); + + auto it = store_.find(id); + DCHECK(it != store_.end()) + << "Transaction::advance on non-existing transaction"; + + Transaction *t = it->second.get(); + return AdvanceCommand(t); +} + +CommandId EngineSingleNode::UpdateCommand(TransactionId id) { + std::lock_guard guard(lock_); + auto it = store_.find(id); + DCHECK(it != store_.end()) + << "Transaction::advance on non-existing transaction"; + return it->second->cid(); +} + +void EngineSingleNode::Commit(const Transaction &t) { + VLOG(11) << "[Tx] Commiting transaction " << t.id_; + std::lock_guard guard(lock_); + clog_.set_committed(t.id_); + active_.remove(t.id_); + if (wal_) { + wal_->Emplace(database::StateDelta::TxCommit(t.id_)); + } + store_.erase(store_.find(t.id_)); +} + +void EngineSingleNode::Abort(const Transaction &t) { + VLOG(11) << "[Tx] Aborting transaction " << t.id_; + std::lock_guard guard(lock_); + clog_.set_aborted(t.id_); + active_.remove(t.id_); + if (wal_) { + wal_->Emplace(database::StateDelta::TxAbort(t.id_)); + } + store_.erase(store_.find(t.id_)); +} + +CommitLog::Info EngineSingleNode::Info(TransactionId tx) const { + return clog_.fetch_info(tx); +} + +Snapshot EngineSingleNode::GlobalGcSnapshot() { + std::lock_guard guard(lock_); + + // No active transactions. + if (active_.size() == 0) { + auto snapshot_copy = active_; + snapshot_copy.insert(counter_ + 1); + return snapshot_copy; + } + + // There are active transactions. + auto snapshot_copy = store_.find(active_.front())->second->snapshot(); + snapshot_copy.insert(active_.front()); + return snapshot_copy; +} + +Snapshot EngineSingleNode::GlobalActiveTransactions() { + std::lock_guard guard(lock_); + Snapshot active_transactions = active_; + return active_transactions; +} + +TransactionId EngineSingleNode::LocalLast() const { + std::lock_guard guard(lock_); + return counter_; +} + +TransactionId EngineSingleNode::GlobalLast() const { return LocalLast(); } + +TransactionId EngineSingleNode::LocalOldestActive() const { + std::lock_guard guard(lock_); + return active_.empty() ? counter_ + 1 : active_.front(); +} + +void EngineSingleNode::GarbageCollectCommitLog(TransactionId tx_id) { + clog_.garbage_collect_older(tx_id); +} + +void EngineSingleNode::LocalForEachActiveTransaction( + std::function f) { + std::lock_guard guard(lock_); + for (auto transaction : active_) { + f(*store_.find(transaction)->second); + } +} + +Transaction *EngineSingleNode::RunningTransaction(TransactionId tx_id) { + std::lock_guard guard(lock_); + auto found = store_.find(tx_id); + CHECK(found != store_.end()) + << "Can't return snapshot for an inactive transaction"; + return found->second.get(); +} + +void EngineSingleNode::EnsureNextIdGreater(TransactionId tx_id) { + std::lock_guard guard(lock_); + counter_ = std::max(tx_id, counter_); +} + +} // namespace tx diff --git a/src/transactions/distributed/engine_single_node.hpp b/src/transactions/distributed/engine_single_node.hpp new file mode 100644 index 000000000..d93651211 --- /dev/null +++ b/src/transactions/distributed/engine_single_node.hpp @@ -0,0 +1,56 @@ +/// @file + +#pragma once + +#include +#include +#include + +#include "durability/distributed/wal.hpp" +#include "transactions/commit_log.hpp" +#include "transactions/engine.hpp" +#include "transactions/transaction.hpp" +#include "utils/thread/sync.hpp" + +namespace tx { + +/// Single-node deployment transaction engine. Has complete functionality. +class EngineSingleNode final : public Engine { + public: + /// @param wal - Optional. If present, the Engine will write tx + /// Begin/Commit/Abort atomically (while under lock). + explicit EngineSingleNode(durability::WriteAheadLog *wal = nullptr); + + EngineSingleNode(const EngineSingleNode &) = delete; + EngineSingleNode(EngineSingleNode &&) = delete; + EngineSingleNode &operator=(const EngineSingleNode &) = delete; + EngineSingleNode &operator=(EngineSingleNode &&) = delete; + + Transaction *Begin() override; + CommandId Advance(TransactionId id) override; + CommandId UpdateCommand(TransactionId id) override; + void Commit(const Transaction &t) override; + void Abort(const Transaction &t) override; + CommitLog::Info Info(TransactionId tx) const override; + Snapshot GlobalGcSnapshot() override; + Snapshot GlobalActiveTransactions() override; + TransactionId GlobalLast() const override; + TransactionId LocalLast() const override; + TransactionId LocalOldestActive() const override; + void LocalForEachActiveTransaction( + std::function f) override; + Transaction *RunningTransaction(TransactionId tx_id) override; + void EnsureNextIdGreater(TransactionId tx_id) override; + void GarbageCollectCommitLog(TransactionId tx_id) override; + + private: + TransactionId counter_{0}; + CommitLog clog_; + std::unordered_map> store_; + Snapshot active_; + mutable utils::SpinLock lock_; + // Optional. If present, the Engine will write tx Begin/Commit/Abort + // atomically (while under lock). + durability::WriteAheadLog *wal_{nullptr}; +}; +} // namespace tx diff --git a/src/transactions/single_node/engine_single_node.cpp b/src/transactions/single_node/engine_single_node.cpp index 3053cc1ca..7037e2fb1 100644 --- a/src/transactions/single_node/engine_single_node.cpp +++ b/src/transactions/single_node/engine_single_node.cpp @@ -5,13 +5,7 @@ #include "glog/logging.h" -// TODO: THIS IS A HACK! -#ifdef MG_SINGLE_NODE #include "durability/single_node/state_delta.hpp" -#endif -#ifdef MG_DISTRIBUTED -#include "durability/distributed/state_delta.hpp" -#endif namespace tx { diff --git a/src/transactions/single_node/engine_single_node.hpp b/src/transactions/single_node/engine_single_node.hpp index 8bc0a74b4..950369463 100644 --- a/src/transactions/single_node/engine_single_node.hpp +++ b/src/transactions/single_node/engine_single_node.hpp @@ -6,14 +6,7 @@ #include #include -// TODO: THIS IS A HACK! -#ifdef MG_SINGLE_NODE #include "durability/single_node/wal.hpp" -#endif -#ifdef MG_DISTRIBUTED -#include "durability/distributed/wal.hpp" -#endif - #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" #include "transactions/transaction.hpp" diff --git a/tests/unit/distributed_serialization.cpp b/tests/unit/distributed_serialization.cpp index f7befbc0b..39ef6dc1e 100644 --- a/tests/unit/distributed_serialization.cpp +++ b/tests/unit/distributed_serialization.cpp @@ -10,8 +10,7 @@ #include "storage/distributed/edge.hpp" #include "storage/distributed/serialization.hpp" #include "storage/distributed/vertex.hpp" -// TODO: THIS SHOULD BE CHANGED! -#include "transactions/single_node/engine_single_node.hpp" +#include "transactions/distributed/engine_single_node.hpp" using namespace storage;