Implement GraphDbAccessor creation for running transaction

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1119
This commit is contained in:
florijan 2018-01-19 11:18:13 +01:00
parent 07d262cd1e
commit 9361d79c6d
10 changed files with 51 additions and 31 deletions

View File

@ -14,6 +14,9 @@ namespace database {
GraphDbAccessor::GraphDbAccessor(GraphDb &db) GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db), transaction_(*SingleNodeEngine().Begin()) {} : db_(db), transaction_(*SingleNodeEngine().Begin()) {}
GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id)
: db_(db), transaction_(*db.tx_engine().RunningTransaction(tx_id)) {}
GraphDbAccessor::~GraphDbAccessor() { GraphDbAccessor::~GraphDbAccessor() {
if (!commited_ && !aborted_) { if (!commited_ && !aborted_) {
this->Abort(); this->Abort();

View File

@ -13,6 +13,7 @@
#include "storage/vertex_accessor.hpp" #include "storage/vertex_accessor.hpp"
#include "transactions/engine_single_node.hpp" #include "transactions/engine_single_node.hpp"
#include "transactions/transaction.hpp" #include "transactions/transaction.hpp"
#include "transactions/type.hpp"
#include "utils/bound.hpp" #include "utils/bound.hpp"
namespace database { namespace database {
@ -92,7 +93,13 @@ class GraphDbAccessor {
}; };
public: public:
/** Creates a new accessor by starting a new transaction. Only applicable to
* the single-node or distributed master. */
explicit GraphDbAccessor(GraphDb &db); explicit GraphDbAccessor(GraphDb &db);
/** Creates an accessor for a running transaction. Applicable to all types of
* memgraph. */
GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id);
~GraphDbAccessor(); ~GraphDbAccessor();
GraphDbAccessor(const GraphDbAccessor &other) = delete; GraphDbAccessor(const GraphDbAccessor &other) = delete;

View File

@ -56,6 +56,9 @@ class Engine {
virtual void LocalForEachActiveTransaction( virtual void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) = 0; std::function<void(Transaction &)> f) = 0;
/** Gets a transaction object for a running transaction. */
virtual tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) = 0;
auto &local_lock_graph() { return local_lock_graph_; } auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; }

View File

@ -15,7 +15,8 @@ MasterEngine::MasterEngine(communication::messaging::System &system,
rpc_server_.Register<SnapshotRpc>([this](const SnapshotReq &req) { rpc_server_.Register<SnapshotRpc>([this](const SnapshotReq &req) {
// It is guaranteed that the Worker will not be requesting this for a // It is guaranteed that the Worker will not be requesting this for a
// transaction that's done, and that there are no race conditions here. // transaction that's done, and that there are no race conditions here.
return std::make_unique<SnapshotRes>(GetSnapshot(req.member)); return std::make_unique<SnapshotRes>(
RunningTransaction(req.member)->snapshot());
}); });
rpc_server_.Register<GcSnapshotRpc>( rpc_server_.Register<GcSnapshotRpc>(

View File

@ -103,11 +103,12 @@ void SingleNodeEngine::LocalForEachActiveTransaction(
} }
} }
Snapshot SingleNodeEngine::GetSnapshot(tx::transaction_id_t tx_id) { tx::Transaction *SingleNodeEngine::RunningTransaction(
tx::transaction_id_t tx_id) {
std::lock_guard<SpinLock> guard(lock_); std::lock_guard<SpinLock> guard(lock_);
auto found = store_.find(tx_id); auto found = store_.find(tx_id);
DCHECK(found != store_.end()) CHECK(found != store_.end())
<< "Can't return snapshot for an inactive transaction"; << "Can't return snapshot for an inactive transaction";
return found->second->snapshot(); return found->second.get();
} }
} // namespace tx } // namespace tx

View File

@ -65,10 +65,7 @@ class SingleNodeEngine : public Engine {
tx::transaction_id_t LocalLast() const override; tx::transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction( void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override; std::function<void(Transaction &)> f) override;
tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override;
protected:
// Exposed for MasterEngine. Transaction for tx_id must be alive.
Snapshot GetSnapshot(tx::transaction_id_t tx_id);
private: private:
std::atomic<transaction_id_t> counter_{0}; std::atomic<transaction_id_t> counter_{0};

View File

@ -13,20 +13,6 @@ WorkerEngine::WorkerEngine(communication::messaging::System &system,
const io::network::Endpoint &endpoint) const io::network::Endpoint &endpoint)
: rpc_client_(system, endpoint, kTransactionEngineRpc) {} : rpc_client_(system, endpoint, kTransactionEngineRpc) {}
Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot(
std::move(rpc_client_.Call<SnapshotRpc>(kRpcTimeout, tx_id)->member));
auto insertion =
accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this));
CHECK(insertion.second) << "Transaction already inserted";
utils::EnsureAtomicGe(local_last_, tx_id);
return insertion.first->second;
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
auto info = clog_.fetch_info(tid); auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the // If we don't know the transaction to be commited nor aborted, ask the
@ -66,4 +52,18 @@ void WorkerEngine::LocalForEachActiveTransaction(
for (auto pair : active_.access()) f(*pair.second); for (auto pair : active_.access()) f(*pair.second);
} }
tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot(
std::move(rpc_client_.Call<SnapshotRpc>(kRpcTimeout, tx_id)->member));
auto insertion =
accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this));
CHECK(insertion.second) << "Transaction already inserted";
utils::EnsureAtomicGe(local_last_, tx_id);
return insertion.first->second;
}
} // namespace tx } // namespace tx

View File

@ -20,8 +20,6 @@ class WorkerEngine : public Engine {
WorkerEngine(communication::messaging::System &system, WorkerEngine(communication::messaging::System &system,
const io::network::Endpoint &endpoint); const io::network::Endpoint &endpoint);
Transaction *LocalBegin(transaction_id_t tx_id);
CommitLog::Info Info(transaction_id_t tid) const override; CommitLog::Info Info(transaction_id_t tid) const override;
Snapshot GlobalGcSnapshot() override; Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override; Snapshot GlobalActiveTransactions() override;
@ -29,6 +27,7 @@ class WorkerEngine : public Engine {
tx::transaction_id_t LocalLast() const override; tx::transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction( void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override; std::function<void(Transaction &)> f) override;
tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override;
private: private:
// Local caches. // Local caches.

View File

@ -22,11 +22,11 @@ class WorkerEngineTest : public testing::Test {
WorkerEngine worker_{worker_system_, master_system_.endpoint()}; WorkerEngine worker_{worker_system_, master_system_.endpoint()};
}; };
TEST_F(WorkerEngineTest, LocalBegin) { TEST_F(WorkerEngineTest, RunningTransaction) {
master_.Begin(); master_.Begin();
master_.Begin(); master_.Begin();
worker_.LocalBegin(1); worker_.RunningTransaction(1);
worker_.LocalBegin(2); worker_.RunningTransaction(2);
int count = 0; int count = 0;
worker_.LocalForEachActiveTransaction([&count](Transaction &t) { worker_.LocalForEachActiveTransaction([&count](Transaction &t) {
++count; ++count;
@ -87,24 +87,24 @@ TEST_F(WorkerEngineTest, GlobalIsActive) {
TEST_F(WorkerEngineTest, LocalLast) { TEST_F(WorkerEngineTest, LocalLast) {
master_.Begin(); master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 0); EXPECT_EQ(worker_.LocalLast(), 0);
worker_.LocalBegin(1); worker_.RunningTransaction(1);
EXPECT_EQ(worker_.LocalLast(), 1); EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin(); master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 1); EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin(); master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 1); EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin(); master_.Begin();
worker_.LocalBegin(4); worker_.RunningTransaction(4);
EXPECT_EQ(worker_.LocalLast(), 4); EXPECT_EQ(worker_.LocalLast(), 4);
} }
TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
master_.Begin(); master_.Begin();
worker_.LocalBegin(1); worker_.RunningTransaction(1);
master_.Begin(); master_.Begin();
master_.Begin(); master_.Begin();
master_.Begin(); master_.Begin();
worker_.LocalBegin(4); worker_.RunningTransaction(4);
std::unordered_set<tx::transaction_id_t> local; std::unordered_set<tx::transaction_id_t> local;
worker_.LocalForEachActiveTransaction( worker_.LocalForEachActiveTransaction(
[&local](Transaction &t) { local.insert(t.id_); }); [&local](Transaction &t) { local.insert(t.id_); });

View File

@ -63,3 +63,12 @@ TEST(Engine, ConcurrentBegin) {
for (auto &t : threads) t.join(); for (auto &t : threads) t.join();
EXPECT_EQ(tx_ids.access().size(), 1000); EXPECT_EQ(tx_ids.access().size(), 1000);
} }
TEST(Engine, RunningTransaction) {
tx::SingleNodeEngine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0, engine.RunningTransaction(t0->id_));
EXPECT_NE(t1, engine.RunningTransaction(t0->id_));
EXPECT_EQ(t1, engine.RunningTransaction(t1->id_));
}