From c4327b26f465c9616333dc46248b6b2c58a88454 Mon Sep 17 00:00:00 2001 From: florijan Date: Wed, 10 Jan 2018 15:10:22 +0100 Subject: [PATCH] Extract tx::SingleNodeEngine from tx::MasterEngine Summary: No logic changes, just split `tx::MasterEngine` into `tx::SingleNodeEngine` and `tx::MasterEngine`. This gives better responsibility separation and is more appropriate now there is no Start/Shutdown. Reviewers: dgleich, teon.banek, buda Reviewed By: dgleich, teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1099 --- src/CMakeLists.txt | 1 + src/database/graph_db.cpp | 7 +- src/database/graph_db.hpp | 2 +- src/database/graph_db_accessor.cpp | 8 +- src/database/graph_db_accessor.hpp | 16 +-- src/transactions/engine_master.cpp | 114 ++---------------- src/transactions/engine_master.hpp | 80 ++---------- src/transactions/engine_rpc_messages.hpp | 2 + src/transactions/engine_single_node.cpp | 113 +++++++++++++++++ src/transactions/engine_single_node.hpp | 83 +++++++++++++ src/transactions/engine_worker.cpp | 2 +- src/transactions/engine_worker.hpp | 3 +- src/transactions/transaction.hpp | 1 + tests/benchmark/mvcc.cpp | 4 +- tests/unit/database_key_index.cpp | 8 +- tests/unit/database_label_property_index.cpp | 4 +- tests/unit/distributed_serialization.cpp | 4 +- tests/unit/mvcc.cpp | 8 +- tests/unit/mvcc_find_update_common.hpp | 6 +- tests/unit/mvcc_gc.cpp | 6 +- tests/unit/mvcc_gc_common.hpp | 6 +- ...cpp => transaction_engine_distributed.cpp} | 4 +- ...cpp => transaction_engine_single_node.cpp} | 8 +- 23 files changed, 264 insertions(+), 226 deletions(-) create mode 100644 src/transactions/engine_single_node.cpp create mode 100644 src/transactions/engine_single_node.hpp rename tests/unit/{transaction_engine_worker.cpp => transaction_engine_distributed.cpp} (97%) rename tests/unit/{transaction_engine_master.cpp => transaction_engine_single_node.cpp} (92%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e6f0a718a..682ce1da6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,6 +45,7 @@ set(memgraph_src_files storage/vertex_accessor.cpp threading/thread.cpp transactions/engine_master.cpp + transactions/engine_single_node.cpp transactions/engine_worker.cpp utils/watchdog.cpp ) diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index fef508a34..631ef10d0 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -11,6 +11,7 @@ #include "storage/concurrent_id_mapper_master.hpp" #include "storage/concurrent_id_mapper_worker.hpp" #include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "transactions/engine_worker.hpp" #include "utils/timer.hpp" @@ -22,7 +23,7 @@ namespace fs = std::experimental::filesystem; properties_ = std::make_unique>(__VA_ARGS__); GraphDb::GraphDb(Config config) : GraphDb(config, 0) { - tx_engine_ = std::make_unique(&wal_); + tx_engine_ = std::make_unique(&wal_); counters_ = std::make_unique(); INIT_MAPPERS(storage::SingleNodeConcurrentIdMapper); Start(); @@ -31,9 +32,7 @@ GraphDb::GraphDb(Config config) : GraphDb(config, 0) { GraphDb::GraphDb(communication::messaging::System &system, distributed::MasterCoordination &master, Config config) : GraphDb(config, 0) { - auto tx_engine = std::make_unique(&wal_); - tx_engine->StartServer(system); - tx_engine_ = std::move(tx_engine); + tx_engine_ = std::make_unique(system, &wal_); auto counters = std::make_unique(system); counters_ = std::move(counters); INIT_MAPPERS(storage::MasterConcurrentIdMapper, system); diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 473017942..e38e7a6f5 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -75,7 +75,7 @@ class GraphDb { }; /** Single-node GraphDb ctor. */ - GraphDb(Config config = Config{}); + explicit GraphDb(Config config = Config{}); /** Distributed master GraphDb ctor. */ GraphDb(communication::messaging::System &system, diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 3db3a8571..64fd7d5a4 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -10,7 +10,7 @@ #include "utils/on_scope_exit.hpp" GraphDbAccessor::GraphDbAccessor(GraphDb &db) - : db_(db), transaction_(MasterEngine().Begin()) {} + : db_(db), transaction_(SingleNodeEngine().Begin()) {} GraphDbAccessor::~GraphDbAccessor() { if (!commited_ && !aborted_) { @@ -24,18 +24,18 @@ tx::transaction_id_t GraphDbAccessor::transaction_id() const { void GraphDbAccessor::AdvanceCommand() { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - MasterEngine().Advance(transaction_->id_); + SingleNodeEngine().Advance(transaction_->id_); } void GraphDbAccessor::Commit() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; - MasterEngine().Commit(*transaction_); + SingleNodeEngine().Commit(*transaction_); commited_ = true; } void GraphDbAccessor::Abort() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; - MasterEngine().Abort(*transaction_); + SingleNodeEngine().Abort(*transaction_); aborted_ = true; } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index fc4a95f01..4bb01b849 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -610,13 +610,15 @@ class GraphDbAccessor { const RecordAccessor &vertex_accessor, const Vertex *const vertex); - /** Casts the DB's engine to MasterEngine and returns it. If the DB's engine - * is RemoteEngine, this function will crash MG. */ - tx::MasterEngine &MasterEngine() { - auto *master_engine = - dynamic_cast(db_.tx_engine_.get()); - DCHECK(master_engine) << "Asked for MasterEngine on distributed worker"; - return *master_engine; + /** Casts the DB's engine to SingleNodeEngine and returns it. If the DB's + * engine is RemoteEngine, this function will crash MG. It must be either + * SingleNodeEngine, or MasterEngine (which inherits it). */ + tx::SingleNodeEngine &SingleNodeEngine() { + auto *single_node_engine = + dynamic_cast(db_.tx_engine_.get()); + DCHECK(single_node_engine) + << "Asked for SingleNodeEngine on distributed worker"; + return *single_node_engine; } GraphDb &db_; diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 651b489ff..7d0a9eaef 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -9,127 +9,31 @@ namespace tx { -MasterEngine::MasterEngine(durability::WriteAheadLog *wal) : wal_(wal) {} - -Transaction *MasterEngine::Begin() { - std::lock_guard guard(lock_); - - transaction_id_t id{++counter_}; - auto t = new Transaction(id, active_, *this); - active_.insert(id); - store_.emplace(id, t); - if (wal_) { - wal_->Emplace(database::StateDelta::TxBegin(id)); - } - return t; -} - -void MasterEngine::Advance(transaction_id_t id) { - std::lock_guard guard(lock_); - - auto it = store_.find(id); - DCHECK(it != store_.end()) - << "Transaction::advance on non-existing transaction"; - - Transaction *t = it->second.get(); - if (t->cid_ == std::numeric_limits::max()) - throw TransactionError( - "Reached maximum number of commands in this " - "transaction."); - - t->cid_++; -} - -void MasterEngine::Commit(const Transaction &t) { - std::lock_guard guard(lock_); - clog_.set_committed(t.id_); - active_.remove(t.id_); - if (wal_) { - wal_->Emplace(database::StateDelta::TxCommit(t.id_)); - } - store_.erase(store_.find(t.id_)); -} - -void MasterEngine::Abort(const Transaction &t) { - std::lock_guard guard(lock_); - clog_.set_aborted(t.id_); - active_.remove(t.id_); - if (wal_) { - wal_->Emplace(database::StateDelta::TxAbort(t.id_)); - } - store_.erase(store_.find(t.id_)); -} - -CommitLog::Info MasterEngine::Info(transaction_id_t tx) const { - return clog_.fetch_info(tx); -} - -Snapshot MasterEngine::GlobalGcSnapshot() { - std::lock_guard guard(lock_); - - // No active transactions. - if (active_.size() == 0) { - auto snapshot_copy = active_; - snapshot_copy.insert(counter_ + 1); - return snapshot_copy; - } - - // There are active transactions. - auto snapshot_copy = store_.find(active_.front())->second->snapshot(); - snapshot_copy.insert(active_.front()); - return snapshot_copy; -} - -Snapshot MasterEngine::GlobalActiveTransactions() { - std::lock_guard guard(lock_); - Snapshot active_transactions = active_; - return active_transactions; -} - -bool MasterEngine::GlobalIsActive(transaction_id_t tx) const { - return clog_.is_active(tx); -} - -tx::transaction_id_t MasterEngine::LocalLast() const { return counter_.load(); } - -void MasterEngine::LocalForEachActiveTransaction( - std::function f) { - std::lock_guard guard(lock_); - for (auto transaction : active_) { - f(*store_.find(transaction)->second); - } -} - -void MasterEngine::StartServer(communication::messaging::System &system) { - CHECK(!rpc_server_) << "Can't start a running server"; - rpc_server_.emplace(system, "tx_engine"); - - rpc_server_->Register([this](const SnapshotReq &req) { +MasterEngine::MasterEngine(communication::messaging::System &system, + durability::WriteAheadLog *wal) + : SingleNodeEngine(wal), rpc_server_(system, kTransactionEngineRpc) { + rpc_server_.Register([this](const SnapshotReq &req) { // It is guaranteed that the Worker will not be requesting this for a // transaction that's done, and that there are no race conditions here. - auto found = store_.find(req.member); - DCHECK(found != store_.end()) - << "Can't return snapshot for an inactive transaction"; - return std::make_unique(found->second->snapshot()); + return std::make_unique(GetSnapshot(req.member)); }); - rpc_server_->Register( + rpc_server_.Register( [this](const communication::messaging::Message &) { return std::make_unique(GlobalGcSnapshot()); }); - rpc_server_->Register([this](const ClogInfoReq &req) { + rpc_server_.Register([this](const ClogInfoReq &req) { return std::make_unique(Info(req.member)); }); - rpc_server_->Register( + rpc_server_.Register( [this](const communication::messaging::Message &) { return std::make_unique(GlobalActiveTransactions()); }); - rpc_server_->Register([this](const IsActiveReq &req) { + rpc_server_.Register([this](const IsActiveReq &req) { return std::make_unique(GlobalIsActive(req.member)); }); } - } // namespace tx diff --git a/src/transactions/engine_master.hpp b/src/transactions/engine_master.hpp index 53e66d565..65fbc42a1 100644 --- a/src/transactions/engine_master.hpp +++ b/src/transactions/engine_master.hpp @@ -1,89 +1,23 @@ #pragma once -#include -#include -#include - #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" -#include "durability/wal.hpp" -#include "threading/sync/spinlock.hpp" -#include "transactions/commit_log.hpp" -#include "transactions/engine.hpp" -#include "transactions/transaction.hpp" -#include "utils/exceptions.hpp" +#include "transactions/engine_single_node.hpp" namespace tx { -/** Indicates an error in transaction handling (currently - * only command id overflow). */ -class TransactionError : public utils::BasicException { - public: - using utils::BasicException::BasicException; -}; - -/** - * A transaction engine that contains everything necessary for transactional - * handling. Used for single-node Memgraph deployments and for the master in a - * distributed system. - */ -class MasterEngine : public Engine { +/** Distributed master transaction engine. Has complete engine functionality and + * exposes an RPC server to be used by distributed Workers. */ +class MasterEngine : public SingleNodeEngine { public: /** * @param wal - Optional. If present, the Engine will write tx * Begin/Commit/Abort atomically (while under lock). */ - MasterEngine(durability::WriteAheadLog *wal = nullptr); - - /** - * Begins a transaction and returns a pointer to - * it's object. - * - * The transaction object is owned by this engine. - * It will be released when the transaction gets - * committted or aborted. - */ - Transaction *Begin(); - - /** - * Advances the command on the transaction with the - * given id. - * - * @param id - Transation id. That transaction must - * be currently active. - */ - void Advance(transaction_id_t id); - - /** Comits the given transaction. Deletes the transaction object, it's not - * valid after this function executes. */ - void Commit(const Transaction &t); - - /** Aborts the given transaction. Deletes the transaction object, it's not - * valid after this function executes. */ - void Abort(const Transaction &t); - - CommitLog::Info Info(transaction_id_t tx) const override; - Snapshot GlobalGcSnapshot() override; - Snapshot GlobalActiveTransactions() override; - bool GlobalIsActive(transaction_id_t tx) const override; - tx::transaction_id_t LocalLast() const override; - void LocalForEachActiveTransaction( - std::function f) override; - - /** Starts the RPC server of the master transactional engine. */ - void StartServer(communication::messaging::System &system); + MasterEngine(communication::messaging::System &system, + durability::WriteAheadLog *wal = nullptr); private: - std::atomic counter_{0}; - CommitLog clog_; - std::unordered_map> store_; - Snapshot active_; - SpinLock lock_; - // Optional. If present, the Engine will write tx Begin/Commit/Abort - // atomically (while under lock). - durability::WriteAheadLog *wal_{nullptr}; - - // Optional RPC server, only used in distributed, not in single_node. - std::experimental::optional rpc_server_; + communication::rpc::Server rpc_server_; }; } // namespace tx diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index d52911686..e597e9c85 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -8,6 +8,8 @@ namespace tx { +const std::string kTransactionEngineRpc = "transaction_engine_rpc"; + RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t) RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot) RPC_NO_MEMBER_MESSAGE(GcSnapshotReq) diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp new file mode 100644 index 000000000..36643033f --- /dev/null +++ b/src/transactions/engine_single_node.cpp @@ -0,0 +1,113 @@ +#include +#include + +#include "glog/logging.h" + +#include "database/state_delta.hpp" +#include "transactions/engine_rpc_messages.hpp" +#include "transactions/engine_single_node.hpp" + +namespace tx { + +SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal) + : wal_(wal) {} + +Transaction *SingleNodeEngine::Begin() { + std::lock_guard guard(lock_); + + transaction_id_t id{++counter_}; + auto t = new Transaction(id, active_, *this); + active_.insert(id); + store_.emplace(id, t); + if (wal_) { + wal_->Emplace(database::StateDelta::TxBegin(id)); + } + return t; +} + +void SingleNodeEngine::Advance(transaction_id_t id) { + std::lock_guard guard(lock_); + + auto it = store_.find(id); + DCHECK(it != store_.end()) + << "Transaction::advance on non-existing transaction"; + + Transaction *t = it->second.get(); + if (t->cid_ == std::numeric_limits::max()) + throw TransactionError( + "Reached maximum number of commands in this " + "transaction."); + + t->cid_++; +} + +void SingleNodeEngine::Commit(const Transaction &t) { + std::lock_guard guard(lock_); + clog_.set_committed(t.id_); + active_.remove(t.id_); + if (wal_) { + wal_->Emplace(database::StateDelta::TxCommit(t.id_)); + } + store_.erase(store_.find(t.id_)); +} + +void SingleNodeEngine::Abort(const Transaction &t) { + std::lock_guard guard(lock_); + clog_.set_aborted(t.id_); + active_.remove(t.id_); + if (wal_) { + wal_->Emplace(database::StateDelta::TxAbort(t.id_)); + } + store_.erase(store_.find(t.id_)); +} + +CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const { + return clog_.fetch_info(tx); +} + +Snapshot SingleNodeEngine::GlobalGcSnapshot() { + std::lock_guard guard(lock_); + + // No active transactions. + if (active_.size() == 0) { + auto snapshot_copy = active_; + snapshot_copy.insert(counter_ + 1); + return snapshot_copy; + } + + // There are active transactions. + auto snapshot_copy = store_.find(active_.front())->second->snapshot(); + snapshot_copy.insert(active_.front()); + return snapshot_copy; +} + +Snapshot SingleNodeEngine::GlobalActiveTransactions() { + std::lock_guard guard(lock_); + Snapshot active_transactions = active_; + return active_transactions; +} + +bool SingleNodeEngine::GlobalIsActive(transaction_id_t tx) const { + return clog_.is_active(tx); +} + +tx::transaction_id_t SingleNodeEngine::LocalLast() const { + return counter_.load(); +} + +void SingleNodeEngine::LocalForEachActiveTransaction( + std::function f) { + std::lock_guard guard(lock_); + for (auto transaction : active_) { + f(*store_.find(transaction)->second); + } +} + +Snapshot SingleNodeEngine::GetSnapshot(tx::transaction_id_t tx_id) { + std::lock_guard guard(lock_); + auto found = store_.find(tx_id); + DCHECK(found != store_.end()) + << "Can't return snapshot for an inactive transaction"; + return found->second->snapshot(); +} +} // namespace tx diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp new file mode 100644 index 000000000..73b8c9812 --- /dev/null +++ b/src/transactions/engine_single_node.hpp @@ -0,0 +1,83 @@ +#pragma once + +#include +#include +#include + +#include "communication/messaging/distributed.hpp" +#include "communication/rpc/rpc.hpp" +#include "durability/wal.hpp" +#include "threading/sync/spinlock.hpp" +#include "transactions/commit_log.hpp" +#include "transactions/engine.hpp" +#include "transactions/transaction.hpp" +#include "utils/exceptions.hpp" + +namespace tx { + +/** Indicates an error in transaction handling (currently + * only command id overflow). */ +class TransactionError : public utils::BasicException { + public: + using utils::BasicException::BasicException; +}; + +/** Single-node deployment transaction engine. Has complete functionality. */ +class SingleNodeEngine : public Engine { + public: + /** + * @param wal - Optional. If present, the Engine will write tx + * Begin/Commit/Abort atomically (while under lock). + */ + explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr); + + /** + * Begins a transaction and returns a pointer to + * it's object. + * + * The transaction object is owned by this engine. + * It will be released when the transaction gets + * committted or aborted. + */ + Transaction *Begin(); + + /** + * Advances the command on the transaction with the + * given id. + * + * @param id - Transation id. That transaction must + * be currently active. + */ + void Advance(transaction_id_t id); + + /** Comits the given transaction. Deletes the transaction object, it's not + * valid after this function executes. */ + void Commit(const Transaction &t); + + /** Aborts the given transaction. Deletes the transaction object, it's not + * valid after this function executes. */ + void Abort(const Transaction &t); + + CommitLog::Info Info(transaction_id_t tx) const override; + Snapshot GlobalGcSnapshot() override; + Snapshot GlobalActiveTransactions() override; + bool GlobalIsActive(transaction_id_t tx) const override; + tx::transaction_id_t LocalLast() const override; + void LocalForEachActiveTransaction( + std::function f) override; + + protected: + // Exposed for MasterEngine. Transaction for tx_id must be alive. + Snapshot GetSnapshot(tx::transaction_id_t tx_id); + + private: + std::atomic counter_{0}; + CommitLog clog_; + std::unordered_map> store_; + Snapshot active_; + SpinLock lock_; + // Optional. If present, the Engine will write tx Begin/Commit/Abort + // atomically (while under lock). + durability::WriteAheadLog *wal_{nullptr}; +}; +} // namespace tx diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 3449e12d4..f4815def7 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -11,7 +11,7 @@ static const auto kRpcTimeout = 100ms; WorkerEngine::WorkerEngine(communication::messaging::System &system, const io::network::NetworkEndpoint &endpoint) - : rpc_client_(system, endpoint, "tx_engine") {} + : rpc_client_(system, endpoint, kTransactionEngineRpc) {} Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) { auto accessor = active_.access(); diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 4eac0cdf5..84d05dbd5 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -13,7 +13,8 @@ namespace tx { -/** A transactional engine for the worker in a distributed system. */ +/** Distributed worker transaction engine. Connects to a MasterEngine (single + * source of truth) to obtain transactional info. Caches most info locally. */ class WorkerEngine : public Engine { public: WorkerEngine(communication::messaging::System &system, diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index c7fa4c6ba..4db792459 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -26,6 +26,7 @@ class Transaction { } private: + friend class SingleNodeEngine; friend class MasterEngine; friend class WorkerEngine; diff --git a/tests/benchmark/mvcc.cpp b/tests/benchmark/mvcc.cpp index 61dd2ce63..03d0de517 100644 --- a/tests/benchmark/mvcc.cpp +++ b/tests/benchmark/mvcc.cpp @@ -4,7 +4,7 @@ #include "mvcc/record.hpp" #include "mvcc/version_list.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" class Prop : public mvcc::Record { public: @@ -19,7 +19,7 @@ class Prop : public mvcc::Record { void MvccMix(benchmark::State &state) { while (state.KeepRunning()) { state.PauseTiming(); - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t1 = engine.Begin(); mvcc::VersionList version_list(*t1, 0); diff --git a/tests/unit/database_key_index.cpp b/tests/unit/database_key_index.cpp index e413520d0..6a5616e2f 100644 --- a/tests/unit/database_key_index.cpp +++ b/tests/unit/database_key_index.cpp @@ -4,7 +4,7 @@ #include "database/graph_db_accessor.hpp" #include "database/graph_db_datatypes.hpp" #include "storage/vertex.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "mvcc_gc_common.hpp" @@ -15,7 +15,7 @@ TEST(LabelsIndex, UniqueInsert) { KeyIndex index; GraphDb db; GraphDbAccessor dba(db); - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t1 = engine.Begin(); mvcc::VersionList vlist(*t1, 0); engine.Commit(*t1); @@ -43,7 +43,7 @@ TEST(LabelsIndex, UniqueFilter) { GraphDb db; KeyIndex index; GraphDbAccessor dba(db); - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t1 = engine.Begin(); mvcc::VersionList vlist1(*t1, 0); @@ -83,7 +83,7 @@ TEST(LabelsIndex, Refresh) { KeyIndex index; GraphDb db; GraphDbAccessor access(db); - tx::MasterEngine engine; + tx::SingleNodeEngine engine; // add two vertices to database auto t1 = engine.Begin(); diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp index 408cca246..545b0cf72 100644 --- a/tests/unit/database_label_property_index.cpp +++ b/tests/unit/database_label_property_index.cpp @@ -4,7 +4,7 @@ #include "database/graph_db_accessor.hpp" #include "database/graph_db_datatypes.hpp" #include "database/indexes/label_property_index.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "mvcc_gc_common.hpp" @@ -44,7 +44,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test { LabelPropertyIndex index; LabelPropertyIndex::Key *key; - tx::MasterEngine engine; + tx::SingleNodeEngine engine; tx::Transaction *t{nullptr}; mvcc::VersionList *vlist; diff --git a/tests/unit/distributed_serialization.cpp b/tests/unit/distributed_serialization.cpp index f300d2537..d76a09f34 100644 --- a/tests/unit/distributed_serialization.cpp +++ b/tests/unit/distributed_serialization.cpp @@ -10,7 +10,7 @@ #include "storage/edge.hpp" #include "storage/property_value_store.hpp" #include "storage/vertex.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" using namespace GraphDbTypes; @@ -118,7 +118,7 @@ TEST(DistributedSerialization, VertexProperties) { class DistributedSerializationMvcc : public ::testing::Test { protected: - tx::MasterEngine engine; + tx::SingleNodeEngine engine; tx::Transaction *tx = engine.Begin(); mvcc::VersionList v1_vlist{*tx, 0}; Vertex &v1 = *v1_vlist.Oldest(); diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp index 146e9434d..c9502d84a 100644 --- a/tests/unit/mvcc.cpp +++ b/tests/unit/mvcc.cpp @@ -5,13 +5,13 @@ #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" #include "threading/sync/lock_timeout_exception.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" #include "mvcc_gc_common.hpp" TEST(MVCC, Deadlock) { - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t0 = engine.Begin(); mvcc::VersionList version_list1(*t0, 0); @@ -31,7 +31,7 @@ TEST(MVCC, Deadlock) { TEST(MVCC, UpdateDontDelete) { std::atomic count{0}; { - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t1 = engine.Begin(); mvcc::VersionList version_list(*t1, 0, count); engine.Commit(*t1); @@ -55,7 +55,7 @@ TEST(MVCC, UpdateDontDelete) { // Check that we get the oldest record. TEST(MVCC, Oldest) { - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t1 = engine.Begin(); mvcc::VersionList version_list(*t1, 0); auto first = version_list.Oldest(); diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index 733b58916..f1981c077 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -5,13 +5,13 @@ #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" #include "threading/sync/lock_timeout_exception.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" class TestClass : public mvcc::Record { public: // constructs first version, size should be 0 - TestClass(int &version_list_size) : version_list_size_(version_list_size) { + explicit TestClass(int &version_list_size) : version_list_size_(version_list_size) { ++version_list_size_; } TestClass *CloneData() { return new TestClass(version_list_size_); } @@ -58,7 +58,7 @@ class Mvcc : public ::testing::Test { } // variable where number of versions is stored int version_list_size = 0; - tx::MasterEngine engine; + tx::SingleNodeEngine engine; tx::Transaction *t1 = engine.Begin(); mvcc::VersionList version_list{*t1, 0, version_list_size}; TestClass *v1 = nullptr; diff --git a/tests/unit/mvcc_gc.cpp b/tests/unit/mvcc_gc.cpp index c3575fae6..d1e2f6226 100644 --- a/tests/unit/mvcc_gc.cpp +++ b/tests/unit/mvcc_gc.cpp @@ -12,13 +12,13 @@ #include "mvcc/version_list.hpp" #include "storage/garbage_collector.hpp" #include "storage/vertex.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "mvcc_gc_common.hpp" class MvccGcTest : public ::testing::Test { protected: - tx::MasterEngine engine; + tx::SingleNodeEngine engine; private: tx::Transaction *t0 = engine.Begin(); @@ -116,7 +116,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) { */ TEST(GarbageCollector, GcClean) { ConcurrentMap *> collection; - tx::MasterEngine engine; + tx::SingleNodeEngine engine; DeferredDeleter deleter; DeferredDeleter> vlist_deleter; GarbageCollector gc(collection, deleter, diff --git a/tests/unit/mvcc_gc_common.hpp b/tests/unit/mvcc_gc_common.hpp index 8f6445c25..270dbc36c 100644 --- a/tests/unit/mvcc_gc_common.hpp +++ b/tests/unit/mvcc_gc_common.hpp @@ -1,7 +1,7 @@ #pragma once #include "mvcc/record.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" /** * @brief - Empty class which inherits from mvcc:Record. @@ -18,7 +18,7 @@ class Prop : public mvcc::Record { */ class DestrCountRec : public mvcc::Record { public: - DestrCountRec(std::atomic &count) : count_(count) {} + explicit DestrCountRec(std::atomic &count) : count_(count) {} DestrCountRec *CloneData() { return new DestrCountRec(count_); } ~DestrCountRec() { ++count_; } @@ -29,7 +29,7 @@ class DestrCountRec : public mvcc::Record { // helper function for creating a GC snapshot // if given a nullptr it makes a GC snapshot like there // are no active transactions -auto GcSnapshot(tx::MasterEngine &engine, tx::Transaction *t) { +auto GcSnapshot(tx::SingleNodeEngine &engine, tx::Transaction *t) { if (t != nullptr) { tx::Snapshot gc_snap = t->snapshot(); gc_snap.insert(t->id_); diff --git a/tests/unit/transaction_engine_worker.cpp b/tests/unit/transaction_engine_distributed.cpp similarity index 97% rename from tests/unit/transaction_engine_worker.cpp rename to tests/unit/transaction_engine_distributed.cpp index f0d580c3b..17d6e73f3 100644 --- a/tests/unit/transaction_engine_worker.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -16,12 +16,10 @@ class WorkerEngineTest : public testing::Test { const std::string local{"127.0.0.1"}; System master_system_{local, 0}; - MasterEngine master_; + MasterEngine master_{master_system_}; System worker_system_{local, 0}; WorkerEngine worker_{worker_system_, master_system_.endpoint()}; - - void SetUp() override { master_.StartServer(master_system_); } }; TEST_F(WorkerEngineTest, LocalBegin) { diff --git a/tests/unit/transaction_engine_master.cpp b/tests/unit/transaction_engine_single_node.cpp similarity index 92% rename from tests/unit/transaction_engine_master.cpp rename to tests/unit/transaction_engine_single_node.cpp index 1b2f723bd..da9b1c9ee 100644 --- a/tests/unit/transaction_engine_master.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -4,11 +4,11 @@ #include #include "data_structures/concurrent/concurrent_set.hpp" -#include "transactions/engine_master.hpp" +#include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" TEST(Engine, GcSnapshot) { - tx::MasterEngine engine; + tx::SingleNodeEngine engine; ASSERT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); std::vector transactions; @@ -36,7 +36,7 @@ TEST(Engine, GcSnapshot) { } TEST(Engine, Advance) { - tx::MasterEngine engine; + tx::SingleNodeEngine engine; auto t0 = engine.Begin(); auto t1 = engine.Begin(); @@ -49,7 +49,7 @@ TEST(Engine, Advance) { } TEST(Engine, ConcurrentBegin) { - tx::MasterEngine engine; + tx::SingleNodeEngine engine; std::vector threads; ConcurrentSet tx_ids; for (int i = 0; i < 10; ++i) {