diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 11bbd75a9..daa337f9a 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -14,6 +14,9 @@ namespace database { GraphDbAccessor::GraphDbAccessor(GraphDb &db) : db_(db), transaction_(*SingleNodeEngine().Begin()) {} +GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id) + : db_(db), transaction_(*db.tx_engine().RunningTransaction(tx_id)) {} + GraphDbAccessor::~GraphDbAccessor() { if (!commited_ && !aborted_) { this->Abort(); diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 52ac4fdda..e0412d3e8 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -13,6 +13,7 @@ #include "storage/vertex_accessor.hpp" #include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" +#include "transactions/type.hpp" #include "utils/bound.hpp" namespace database { @@ -92,7 +93,13 @@ class GraphDbAccessor { }; public: + /** Creates a new accessor by starting a new transaction. Only applicable to + * the single-node or distributed master. */ explicit GraphDbAccessor(GraphDb &db); + + /** Creates an accessor for a running transaction. Applicable to all types of + * memgraph. */ + GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id); ~GraphDbAccessor(); GraphDbAccessor(const GraphDbAccessor &other) = delete; diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index b68cc985b..6bb48980c 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -56,6 +56,9 @@ class Engine { virtual void LocalForEachActiveTransaction( std::function f) = 0; + /** Gets a transaction object for a running transaction. */ + virtual tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) = 0; + auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 7d0a9eaef..c06cc0bce 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -15,7 +15,8 @@ MasterEngine::MasterEngine(communication::messaging::System &system, rpc_server_.Register([this](const SnapshotReq &req) { // It is guaranteed that the Worker will not be requesting this for a // transaction that's done, and that there are no race conditions here. - return std::make_unique(GetSnapshot(req.member)); + return std::make_unique( + RunningTransaction(req.member)->snapshot()); }); rpc_server_.Register( diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index 36643033f..6d6606e9d 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -103,11 +103,12 @@ void SingleNodeEngine::LocalForEachActiveTransaction( } } -Snapshot SingleNodeEngine::GetSnapshot(tx::transaction_id_t tx_id) { +tx::Transaction *SingleNodeEngine::RunningTransaction( + tx::transaction_id_t tx_id) { std::lock_guard guard(lock_); auto found = store_.find(tx_id); - DCHECK(found != store_.end()) + CHECK(found != store_.end()) << "Can't return snapshot for an inactive transaction"; - return found->second->snapshot(); + return found->second.get(); } } // namespace tx diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 73b8c9812..bb2c351a4 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -65,10 +65,7 @@ class SingleNodeEngine : public Engine { tx::transaction_id_t LocalLast() const override; void LocalForEachActiveTransaction( std::function f) override; - - protected: - // Exposed for MasterEngine. Transaction for tx_id must be alive. - Snapshot GetSnapshot(tx::transaction_id_t tx_id); + tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override; private: std::atomic counter_{0}; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 5b871d86c..93cb66fdd 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -13,20 +13,6 @@ WorkerEngine::WorkerEngine(communication::messaging::System &system, const io::network::Endpoint &endpoint) : rpc_client_(system, endpoint, kTransactionEngineRpc) {} -Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) { - auto accessor = active_.access(); - auto found = accessor.find(tx_id); - if (found != accessor.end()) return found->second; - - Snapshot snapshot( - std::move(rpc_client_.Call(kRpcTimeout, tx_id)->member)); - auto insertion = - accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this)); - CHECK(insertion.second) << "Transaction already inserted"; - utils::EnsureAtomicGe(local_last_, tx_id); - return insertion.first->second; -} - CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { auto info = clog_.fetch_info(tid); // If we don't know the transaction to be commited nor aborted, ask the @@ -66,4 +52,18 @@ void WorkerEngine::LocalForEachActiveTransaction( for (auto pair : active_.access()) f(*pair.second); } +tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) { + auto accessor = active_.access(); + auto found = accessor.find(tx_id); + if (found != accessor.end()) return found->second; + + Snapshot snapshot( + std::move(rpc_client_.Call(kRpcTimeout, tx_id)->member)); + auto insertion = + accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this)); + CHECK(insertion.second) << "Transaction already inserted"; + utils::EnsureAtomicGe(local_last_, tx_id); + return insertion.first->second; +} + } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index d74d3cbfa..e7d7c85f5 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -20,8 +20,6 @@ class WorkerEngine : public Engine { WorkerEngine(communication::messaging::System &system, const io::network::Endpoint &endpoint); - Transaction *LocalBegin(transaction_id_t tx_id); - CommitLog::Info Info(transaction_id_t tid) const override; Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; @@ -29,6 +27,7 @@ class WorkerEngine : public Engine { tx::transaction_id_t LocalLast() const override; void LocalForEachActiveTransaction( std::function f) override; + tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override; private: // Local caches. diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index b83df7927..d116f7421 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -22,11 +22,11 @@ class WorkerEngineTest : public testing::Test { WorkerEngine worker_{worker_system_, master_system_.endpoint()}; }; -TEST_F(WorkerEngineTest, LocalBegin) { +TEST_F(WorkerEngineTest, RunningTransaction) { master_.Begin(); master_.Begin(); - worker_.LocalBegin(1); - worker_.LocalBegin(2); + worker_.RunningTransaction(1); + worker_.RunningTransaction(2); int count = 0; worker_.LocalForEachActiveTransaction([&count](Transaction &t) { ++count; @@ -87,24 +87,24 @@ TEST_F(WorkerEngineTest, GlobalIsActive) { TEST_F(WorkerEngineTest, LocalLast) { master_.Begin(); EXPECT_EQ(worker_.LocalLast(), 0); - worker_.LocalBegin(1); + worker_.RunningTransaction(1); EXPECT_EQ(worker_.LocalLast(), 1); master_.Begin(); EXPECT_EQ(worker_.LocalLast(), 1); master_.Begin(); EXPECT_EQ(worker_.LocalLast(), 1); master_.Begin(); - worker_.LocalBegin(4); + worker_.RunningTransaction(4); EXPECT_EQ(worker_.LocalLast(), 4); } TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { master_.Begin(); - worker_.LocalBegin(1); + worker_.RunningTransaction(1); master_.Begin(); master_.Begin(); master_.Begin(); - worker_.LocalBegin(4); + worker_.RunningTransaction(4); std::unordered_set local; worker_.LocalForEachActiveTransaction( [&local](Transaction &t) { local.insert(t.id_); }); diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index da9b1c9ee..ee7ccbdcb 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -63,3 +63,12 @@ TEST(Engine, ConcurrentBegin) { for (auto &t : threads) t.join(); EXPECT_EQ(tx_ids.access().size(), 1000); } + +TEST(Engine, RunningTransaction) { + tx::SingleNodeEngine engine; + auto t0 = engine.Begin(); + auto t1 = engine.Begin(); + EXPECT_EQ(t0, engine.RunningTransaction(t0->id_)); + EXPECT_NE(t1, engine.RunningTransaction(t0->id_)); + EXPECT_EQ(t1, engine.RunningTransaction(t1->id_)); +}