diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 00615d03a..067a83682 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -10,15 +10,6 @@ #include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp" -BOOST_CLASS_EXPORT(tx::SnapshotReq); -BOOST_CLASS_EXPORT(tx::SnapshotRes); -BOOST_CLASS_EXPORT(tx::GcSnapshotReq); -BOOST_CLASS_EXPORT(tx::ClogInfoReq); -BOOST_CLASS_EXPORT(tx::ClogInfoRes); -BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq); -BOOST_CLASS_EXPORT(tx::IsActiveReq); -BOOST_CLASS_EXPORT(tx::IsActiveRes); - #define ID_VALUE_EXPORT_BOOST_TYPE(type) \ BOOST_CLASS_EXPORT(storage::type##IdReq); \ BOOST_CLASS_EXPORT(storage::type##IdRes); \ @@ -31,6 +22,26 @@ ID_VALUE_EXPORT_BOOST_TYPE(Property) #undef ID_VALUE_EXPORT_BOOST_TYPE + +// Distributed transaction engine. +BOOST_CLASS_EXPORT(tx::TxAndSnapshot); +BOOST_CLASS_EXPORT(tx::BeginReq); +BOOST_CLASS_EXPORT(tx::BeginRes); +BOOST_CLASS_EXPORT(tx::AdvanceReq); +BOOST_CLASS_EXPORT(tx::AdvanceRes); +BOOST_CLASS_EXPORT(tx::CommitReq); +BOOST_CLASS_EXPORT(tx::CommitRes); +BOOST_CLASS_EXPORT(tx::AbortReq); +BOOST_CLASS_EXPORT(tx::AbortRes); +BOOST_CLASS_EXPORT(tx::SnapshotReq); +BOOST_CLASS_EXPORT(tx::SnapshotRes); +BOOST_CLASS_EXPORT(tx::GcSnapshotReq); +BOOST_CLASS_EXPORT(tx::ClogInfoReq); +BOOST_CLASS_EXPORT(tx::ClogInfoRes); +BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq); +BOOST_CLASS_EXPORT(tx::IsActiveReq); +BOOST_CLASS_EXPORT(tx::IsActiveRes); + // Distributed coordination. BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq); BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes); diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 9aefdab9a..7f837466f 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -17,7 +17,7 @@ namespace database { GraphDbAccessor::GraphDbAccessor(GraphDb &db) : db_(db), - transaction_(*SingleNodeEngine().Begin()), + transaction_(*db.tx_engine().Begin()), transaction_starter_{true} { if (db_.type() != GraphDb::Type::SINGLE_NODE) { remote_vertices_.emplace(db_.remote_data_clients()); @@ -47,18 +47,18 @@ tx::transaction_id_t GraphDbAccessor::transaction_id() const { void GraphDbAccessor::AdvanceCommand() { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - SingleNodeEngine().Advance(transaction_.id_); + db_.tx_engine().Advance(transaction_.id_); } void GraphDbAccessor::Commit() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; - SingleNodeEngine().Commit(transaction_); + db_.tx_engine().Commit(transaction_); commited_ = true; } void GraphDbAccessor::Abort() { DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction."; - SingleNodeEngine().Abort(transaction_); + db_.tx_engine().Abort(transaction_); aborted_ = true; } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index fc75aab07..9d6bc5e29 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -45,12 +45,10 @@ class GraphDbAccessor { friend class ::EdgeAccessor; public: - /** Creates a new accessor by starting a new transaction. Only applicable to - * the single-node or distributed master. */ + /// Creates a new accessor by starting a new transaction. explicit GraphDbAccessor(GraphDb &db); - /** Creates an accessor for a running transaction. Applicable to all types of - * memgraph. */ + /// Creates an accessor for a running transaction. GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id); ~GraphDbAccessor(); @@ -587,17 +585,6 @@ class GraphDbAccessor { remote_vertices_; std::experimental::optional> remote_edges_; - /** Casts the transaction engine to SingleNodeEngine and returns it. If the - * engine is a WorkerEngine (and not SingleNode nor Master), a call to this - * method will crash MG. */ - tx::SingleNodeEngine &SingleNodeEngine() { - auto *single_node_engine = - dynamic_cast(&db_.tx_engine()); - DCHECK(single_node_engine) - << "Asked for SingleNodeEngine on distributed worker"; - return *single_node_engine; - } - /** * Insert this vertex into corresponding label and label+property (if it * exists) index. diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 6bb48980c..fd955a4d8 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -22,6 +22,20 @@ class Engine { public: virtual ~Engine() = default; + /// Begins a transaction and returns a pointer to it's object. + virtual Transaction *Begin() = 0; + + /// Advances the command on the transaction with the given id. + virtual command_id_t Advance(transaction_id_t id) = 0; + + /// Comits the given transaction. Deletes the transaction object, it's not + /// valid after this function executes. + virtual void Commit(const Transaction &t) = 0; + + /// Aborts the given transaction. Deletes the transaction object, it's not + /// valid after this function executes. + virtual void Abort(const Transaction &t) = 0; + /** Returns the commit log Info about the given transaction. */ virtual CommitLog::Info Info(transaction_id_t tx) const = 0; diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 69df40ca8..59b74b76e 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -12,6 +12,25 @@ namespace tx { MasterEngine::MasterEngine(communication::rpc::System &system, durability::WriteAheadLog *wal) : SingleNodeEngine(wal), rpc_server_(system, kTransactionEngineRpc) { + rpc_server_.Register([this](const BeginReq &) { + auto tx = Begin(); + return std::make_unique(TxAndSnapshot{tx->id_, tx->snapshot()}); + }); + + rpc_server_.Register([this](const AdvanceReq &req) { + return std::make_unique(Advance(req.member)); + }); + + rpc_server_.Register([this](const CommitReq &req) { + Commit(*RunningTransaction(req.member)); + return std::make_unique(); + }); + + rpc_server_.Register([this](const AbortReq &req) { + Abort(*RunningTransaction(req.member)); + return std::make_unique(); + }); + rpc_server_.Register([this](const SnapshotReq &req) { // It is guaranteed that the Worker will not be requesting this for a // transaction that's done, and that there are no race conditions here. diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index 070b5e676..431fa7278 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -9,25 +9,55 @@ namespace tx { const std::string kTransactionEngineRpc = "transaction_engine_rpc"; +RPC_NO_MEMBER_MESSAGE(BeginReq) +struct TxAndSnapshot { + transaction_id_t tx_id; + Snapshot snapshot; + + private: + friend class boost::serialization::access; + template + void serialize(TArchive &ar, unsigned int) { + ar &tx_id; + ar &snapshot; + } +}; +RPC_SINGLE_MEMBER_MESSAGE(BeginRes, TxAndSnapshot); +using BeginRpc = + communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, command_id_t); +using AdvanceRpc = communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(CommitReq, transaction_id_t); +RPC_NO_MEMBER_MESSAGE(CommitRes); +using CommitRpc = communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(AbortReq, transaction_id_t); +RPC_NO_MEMBER_MESSAGE(AbortRes); +using AbortRpc = communication::rpc::RequestResponse; + RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t) RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot) -RPC_NO_MEMBER_MESSAGE(GcSnapshotReq) -RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t) -RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info) -RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t) -RPC_SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t) -RPC_SINGLE_MEMBER_MESSAGE(IsActiveRes, bool) - using SnapshotRpc = communication::rpc::RequestResponse; + +RPC_NO_MEMBER_MESSAGE(GcSnapshotReq) using GcSnapshotRpc = communication::rpc::RequestResponse; -using GcSnapshotRpc = - communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t) +RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info) using ClogInfoRpc = communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t) using ActiveTransactionsRpc = communication::rpc::RequestResponse; + +RPC_SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t) +RPC_SINGLE_MEMBER_MESSAGE(IsActiveRes, bool) using IsActiveRpc = communication::rpc::RequestResponse; diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index 6d6606e9d..459a50a16 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -25,7 +25,7 @@ Transaction *SingleNodeEngine::Begin() { return t; } -void SingleNodeEngine::Advance(transaction_id_t id) { +command_id_t SingleNodeEngine::Advance(transaction_id_t id) { std::lock_guard guard(lock_); auto it = store_.find(id); @@ -38,7 +38,7 @@ void SingleNodeEngine::Advance(transaction_id_t id) { "Reached maximum number of commands in this " "transaction."); - t->cid_++; + return ++(t->cid_); } void SingleNodeEngine::Commit(const Transaction &t) { diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 7dba50c7e..63b0999e7 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -29,33 +29,10 @@ class SingleNodeEngine : public Engine { */ explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr); - /** - * Begins a transaction and returns a pointer to - * it's object. - * - * The transaction object is owned by this engine. - * It will be released when the transaction gets - * committted or aborted. - */ - Transaction *Begin(); - - /** - * Advances the command on the transaction with the - * given id. - * - * @param id - Transation id. That transaction must - * be currently active. - */ - void Advance(transaction_id_t id); - - /** Comits the given transaction. Deletes the transaction object, it's not - * valid after this function executes. */ - void Commit(const Transaction &t); - - /** Aborts the given transaction. Deletes the transaction object, it's not - * valid after this function executes. */ - void Abort(const Transaction &t); - + Transaction *Begin() override; + command_id_t Advance(transaction_id_t id) override; + void Commit(const Transaction &t) override; + void Abort(const Transaction &t) override; CommitLog::Info Info(transaction_id_t tx) const override; Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 22482e973..9e470e1a1 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -9,6 +9,43 @@ namespace tx { WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) : rpc_client_pool_(endpoint, kTransactionEngineRpc) {} +WorkerEngine::~WorkerEngine() { + for (auto &kv : active_.access()) { + delete kv.second; + } +} + +Transaction *WorkerEngine::Begin() { + auto res = rpc_client_pool_.Call(); + Transaction *tx = + new Transaction(res->member.tx_id, res->member.snapshot, *this); + auto insertion = active_.access().insert(res->member.tx_id, tx); + CHECK(insertion.second) << "Failed to start creation from worker"; + return tx; +} + +command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { + auto res = rpc_client_pool_.Call(tx_id); + auto access = active_.access(); + auto found = access.find(tx_id); + CHECK(found != access.end()) + << "Can't advance a transaction not in local cache"; + found->second->cid_ = res->member; + return res->member; +} + +void WorkerEngine::Commit(const Transaction &t) { + auto res = rpc_client_pool_.Call(t.id_); + auto removal = active_.access().remove(t.id_); + CHECK(removal) << "Can't commit a transaction not in local cache"; +} + +void WorkerEngine::Abort(const Transaction &t) { + auto res = rpc_client_pool_.Call(t.id_); + auto removal = active_.access().remove(t.id_); + CHECK(removal) << "Can't abort a transaction not in local cache"; +} + CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { auto info = clog_.fetch_info(tid); // If we don't know the transaction to be commited nor aborted, ask the diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index ecdf67eb0..8a406b2e9 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -13,11 +13,17 @@ namespace tx { /** Distributed worker transaction engine. Connects to a MasterEngine (single - * source of truth) to obtain transactional info. Caches most info locally. */ + * source of truth) to obtain transactional info. Caches most info locally. Can + * begin/advance/end transactions on the master. */ class WorkerEngine : public Engine { public: WorkerEngine(const io::network::Endpoint &endpoint); + ~WorkerEngine(); + Transaction *Begin() override; + command_id_t Advance(transaction_id_t id) override; + void Commit(const Transaction &t) override; + void Abort(const Transaction &t) override; CommitLog::Info Info(transaction_id_t tid) const override; Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index cfb2e30a8..f8777675e 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -446,3 +446,16 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 300); } } + +TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) { + database::GraphDbAccessor dba_w1(worker1()); + auto v = dba_w1.InsertVertex(); + auto prop = dba_w1.Property("p"); + v.PropsSet(prop, 42); + auto v_ga = v.GlobalAddress(); + dba_w1.Commit(); + + database::GraphDbAccessor dba_w2(worker2()); + VertexAccessor v_in_w2{v_ga, dba_w2}; + EXPECT_EQ(v_in_w2.PropsAt(prop).Value(), 42); +} diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 713904c7f..73558416d 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -21,6 +21,32 @@ class WorkerEngineTest : public testing::Test { WorkerEngine worker_{master_system_.endpoint()}; }; +TEST_F(WorkerEngineTest, BeginOnWorker) { + worker_.Begin(); + auto second = worker_.Begin(); + EXPECT_EQ(master_.RunningTransaction(second->id_)->snapshot().size(), 1); +} + +TEST_F(WorkerEngineTest, AdvanceOnWorker) { + auto tx = worker_.Begin(); + auto cid = tx->cid(); + EXPECT_EQ(worker_.Advance(tx->id_), cid + 1); +} + +TEST_F(WorkerEngineTest, CommitOnWorker) { + auto tx = worker_.Begin(); + auto tx_id = tx->id_; + worker_.Commit(*tx); + EXPECT_TRUE(master_.Info(tx_id).is_committed()); +} + +TEST_F(WorkerEngineTest, AbortOnWorker) { + auto tx = worker_.Begin(); + auto tx_id = tx->id_; + worker_.Abort(*tx); + EXPECT_TRUE(master_.Info(tx_id).is_aborted()); +} + TEST_F(WorkerEngineTest, RunningTransaction) { master_.Begin(); master_.Begin();