Support worker transaction begin/advance/commit/abort

Reviewers: dgleich, buda, teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1161
This commit is contained in:
florijan 2018-02-01 10:58:56 +01:00
parent e5035cf477
commit b97b48b365
12 changed files with 187 additions and 67 deletions

View File

@ -10,15 +10,6 @@
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "transactions/engine_rpc_messages.hpp"
BOOST_CLASS_EXPORT(tx::SnapshotReq);
BOOST_CLASS_EXPORT(tx::SnapshotRes);
BOOST_CLASS_EXPORT(tx::GcSnapshotReq);
BOOST_CLASS_EXPORT(tx::ClogInfoReq);
BOOST_CLASS_EXPORT(tx::ClogInfoRes);
BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq);
BOOST_CLASS_EXPORT(tx::IsActiveReq);
BOOST_CLASS_EXPORT(tx::IsActiveRes);
#define ID_VALUE_EXPORT_BOOST_TYPE(type) \
BOOST_CLASS_EXPORT(storage::type##IdReq); \
BOOST_CLASS_EXPORT(storage::type##IdRes); \
@ -31,6 +22,26 @@ ID_VALUE_EXPORT_BOOST_TYPE(Property)
#undef ID_VALUE_EXPORT_BOOST_TYPE
// Distributed transaction engine.
BOOST_CLASS_EXPORT(tx::TxAndSnapshot);
BOOST_CLASS_EXPORT(tx::BeginReq);
BOOST_CLASS_EXPORT(tx::BeginRes);
BOOST_CLASS_EXPORT(tx::AdvanceReq);
BOOST_CLASS_EXPORT(tx::AdvanceRes);
BOOST_CLASS_EXPORT(tx::CommitReq);
BOOST_CLASS_EXPORT(tx::CommitRes);
BOOST_CLASS_EXPORT(tx::AbortReq);
BOOST_CLASS_EXPORT(tx::AbortRes);
BOOST_CLASS_EXPORT(tx::SnapshotReq);
BOOST_CLASS_EXPORT(tx::SnapshotRes);
BOOST_CLASS_EXPORT(tx::GcSnapshotReq);
BOOST_CLASS_EXPORT(tx::ClogInfoReq);
BOOST_CLASS_EXPORT(tx::ClogInfoRes);
BOOST_CLASS_EXPORT(tx::ActiveTransactionsReq);
BOOST_CLASS_EXPORT(tx::IsActiveReq);
BOOST_CLASS_EXPORT(tx::IsActiveRes);
// Distributed coordination.
BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq);
BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes);

View File

@ -17,7 +17,7 @@ namespace database {
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db),
transaction_(*SingleNodeEngine().Begin()),
transaction_(*db.tx_engine().Begin()),
transaction_starter_{true} {
if (db_.type() != GraphDb::Type::SINGLE_NODE) {
remote_vertices_.emplace(db_.remote_data_clients());
@ -47,18 +47,18 @@ tx::transaction_id_t GraphDbAccessor::transaction_id() const {
void GraphDbAccessor::AdvanceCommand() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
SingleNodeEngine().Advance(transaction_.id_);
db_.tx_engine().Advance(transaction_.id_);
}
void GraphDbAccessor::Commit() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
SingleNodeEngine().Commit(transaction_);
db_.tx_engine().Commit(transaction_);
commited_ = true;
}
void GraphDbAccessor::Abort() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
SingleNodeEngine().Abort(transaction_);
db_.tx_engine().Abort(transaction_);
aborted_ = true;
}

View File

@ -45,12 +45,10 @@ class GraphDbAccessor {
friend class ::EdgeAccessor;
public:
/** Creates a new accessor by starting a new transaction. Only applicable to
* the single-node or distributed master. */
/// Creates a new accessor by starting a new transaction.
explicit GraphDbAccessor(GraphDb &db);
/** Creates an accessor for a running transaction. Applicable to all types of
* memgraph. */
/// Creates an accessor for a running transaction.
GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id);
~GraphDbAccessor();
@ -587,17 +585,6 @@ class GraphDbAccessor {
remote_vertices_;
std::experimental::optional<distributed::RemoteCache<Edge>> remote_edges_;
/** Casts the transaction engine to SingleNodeEngine and returns it. If the
* engine is a WorkerEngine (and not SingleNode nor Master), a call to this
* method will crash MG. */
tx::SingleNodeEngine &SingleNodeEngine() {
auto *single_node_engine =
dynamic_cast<tx::SingleNodeEngine *>(&db_.tx_engine());
DCHECK(single_node_engine)
<< "Asked for SingleNodeEngine on distributed worker";
return *single_node_engine;
}
/**
* Insert this vertex into corresponding label and label+property (if it
* exists) index.

View File

@ -22,6 +22,20 @@ class Engine {
public:
virtual ~Engine() = default;
/// Begins a transaction and returns a pointer to it's object.
virtual Transaction *Begin() = 0;
/// Advances the command on the transaction with the given id.
virtual command_id_t Advance(transaction_id_t id) = 0;
/// Comits the given transaction. Deletes the transaction object, it's not
/// valid after this function executes.
virtual void Commit(const Transaction &t) = 0;
/// Aborts the given transaction. Deletes the transaction object, it's not
/// valid after this function executes.
virtual void Abort(const Transaction &t) = 0;
/** Returns the commit log Info about the given transaction. */
virtual CommitLog::Info Info(transaction_id_t tx) const = 0;

View File

@ -12,6 +12,25 @@ namespace tx {
MasterEngine::MasterEngine(communication::rpc::System &system,
durability::WriteAheadLog *wal)
: SingleNodeEngine(wal), rpc_server_(system, kTransactionEngineRpc) {
rpc_server_.Register<BeginRpc>([this](const BeginReq &) {
auto tx = Begin();
return std::make_unique<BeginRes>(TxAndSnapshot{tx->id_, tx->snapshot()});
});
rpc_server_.Register<AdvanceRpc>([this](const AdvanceReq &req) {
return std::make_unique<AdvanceRes>(Advance(req.member));
});
rpc_server_.Register<CommitRpc>([this](const CommitReq &req) {
Commit(*RunningTransaction(req.member));
return std::make_unique<CommitRes>();
});
rpc_server_.Register<AbortRpc>([this](const AbortReq &req) {
Abort(*RunningTransaction(req.member));
return std::make_unique<AbortRes>();
});
rpc_server_.Register<SnapshotRpc>([this](const SnapshotReq &req) {
// It is guaranteed that the Worker will not be requesting this for a
// transaction that's done, and that there are no race conditions here.

View File

@ -9,25 +9,55 @@ namespace tx {
const std::string kTransactionEngineRpc = "transaction_engine_rpc";
RPC_NO_MEMBER_MESSAGE(BeginReq)
struct TxAndSnapshot {
transaction_id_t tx_id;
Snapshot snapshot;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &tx_id;
ar &snapshot;
}
};
RPC_SINGLE_MEMBER_MESSAGE(BeginRes, TxAndSnapshot);
using BeginRpc =
communication::rpc::RequestResponse<BeginReq, BeginRes>;
RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, command_id_t);
using AdvanceRpc = communication::rpc::RequestResponse<AdvanceReq, AdvanceRes>;
RPC_SINGLE_MEMBER_MESSAGE(CommitReq, transaction_id_t);
RPC_NO_MEMBER_MESSAGE(CommitRes);
using CommitRpc = communication::rpc::RequestResponse<CommitReq, CommitRes>;
RPC_SINGLE_MEMBER_MESSAGE(AbortReq, transaction_id_t);
RPC_NO_MEMBER_MESSAGE(AbortRes);
using AbortRpc = communication::rpc::RequestResponse<AbortReq, AbortRes>;
RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot)
RPC_NO_MEMBER_MESSAGE(GcSnapshotReq)
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info)
RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(IsActiveRes, bool)
using SnapshotRpc =
communication::rpc::RequestResponse<SnapshotReq, SnapshotRes>;
RPC_NO_MEMBER_MESSAGE(GcSnapshotReq)
using GcSnapshotRpc =
communication::rpc::RequestResponse<GcSnapshotReq, SnapshotRes>;
using GcSnapshotRpc =
communication::rpc::RequestResponse<GcSnapshotReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info)
using ClogInfoRpc =
communication::rpc::RequestResponse<ClogInfoReq, ClogInfoRes>;
RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t)
using ActiveTransactionsRpc =
communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t)
RPC_SINGLE_MEMBER_MESSAGE(IsActiveRes, bool)
using IsActiveRpc =
communication::rpc::RequestResponse<IsActiveReq, IsActiveRes>;

View File

@ -25,7 +25,7 @@ Transaction *SingleNodeEngine::Begin() {
return t;
}
void SingleNodeEngine::Advance(transaction_id_t id) {
command_id_t SingleNodeEngine::Advance(transaction_id_t id) {
std::lock_guard<SpinLock> guard(lock_);
auto it = store_.find(id);
@ -38,7 +38,7 @@ void SingleNodeEngine::Advance(transaction_id_t id) {
"Reached maximum number of commands in this "
"transaction.");
t->cid_++;
return ++(t->cid_);
}
void SingleNodeEngine::Commit(const Transaction &t) {

View File

@ -29,33 +29,10 @@ class SingleNodeEngine : public Engine {
*/
explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr);
/**
* Begins a transaction and returns a pointer to
* it's object.
*
* The transaction object is owned by this engine.
* It will be released when the transaction gets
* committted or aborted.
*/
Transaction *Begin();
/**
* Advances the command on the transaction with the
* given id.
*
* @param id - Transation id. That transaction must
* be currently active.
*/
void Advance(transaction_id_t id);
/** Comits the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Commit(const Transaction &t);
/** Aborts the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Abort(const Transaction &t);
Transaction *Begin() override;
command_id_t Advance(transaction_id_t id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(transaction_id_t tx) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;

View File

@ -9,6 +9,43 @@ namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_pool_(endpoint, kTransactionEngineRpc) {}
WorkerEngine::~WorkerEngine() {
for (auto &kv : active_.access()) {
delete kv.second;
}
}
Transaction *WorkerEngine::Begin() {
auto res = rpc_client_pool_.Call<BeginRpc>();
Transaction *tx =
new Transaction(res->member.tx_id, res->member.snapshot, *this);
auto insertion = active_.access().insert(res->member.tx_id, tx);
CHECK(insertion.second) << "Failed to start creation from worker";
return tx;
}
command_id_t WorkerEngine::Advance(transaction_id_t tx_id) {
auto res = rpc_client_pool_.Call<AdvanceRpc>(tx_id);
auto access = active_.access();
auto found = access.find(tx_id);
CHECK(found != access.end())
<< "Can't advance a transaction not in local cache";
found->second->cid_ = res->member;
return res->member;
}
void WorkerEngine::Commit(const Transaction &t) {
auto res = rpc_client_pool_.Call<CommitRpc>(t.id_);
auto removal = active_.access().remove(t.id_);
CHECK(removal) << "Can't commit a transaction not in local cache";
}
void WorkerEngine::Abort(const Transaction &t) {
auto res = rpc_client_pool_.Call<AbortRpc>(t.id_);
auto removal = active_.access().remove(t.id_);
CHECK(removal) << "Can't abort a transaction not in local cache";
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the

View File

@ -13,11 +13,17 @@
namespace tx {
/** Distributed worker transaction engine. Connects to a MasterEngine (single
* source of truth) to obtain transactional info. Caches most info locally. */
* source of truth) to obtain transactional info. Caches most info locally. Can
* begin/advance/end transactions on the master. */
class WorkerEngine : public Engine {
public:
WorkerEngine(const io::network::Endpoint &endpoint);
~WorkerEngine();
Transaction *Begin() override;
command_id_t Advance(transaction_id_t id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(transaction_id_t tid) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;

View File

@ -446,3 +446,16 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 300);
}
}
TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
database::GraphDbAccessor dba_w1(worker1());
auto v = dba_w1.InsertVertex();
auto prop = dba_w1.Property("p");
v.PropsSet(prop, 42);
auto v_ga = v.GlobalAddress();
dba_w1.Commit();
database::GraphDbAccessor dba_w2(worker2());
VertexAccessor v_in_w2{v_ga, dba_w2};
EXPECT_EQ(v_in_w2.PropsAt(prop).Value<int64_t>(), 42);
}

View File

@ -21,6 +21,32 @@ class WorkerEngineTest : public testing::Test {
WorkerEngine worker_{master_system_.endpoint()};
};
TEST_F(WorkerEngineTest, BeginOnWorker) {
worker_.Begin();
auto second = worker_.Begin();
EXPECT_EQ(master_.RunningTransaction(second->id_)->snapshot().size(), 1);
}
TEST_F(WorkerEngineTest, AdvanceOnWorker) {
auto tx = worker_.Begin();
auto cid = tx->cid();
EXPECT_EQ(worker_.Advance(tx->id_), cid + 1);
}
TEST_F(WorkerEngineTest, CommitOnWorker) {
auto tx = worker_.Begin();
auto tx_id = tx->id_;
worker_.Commit(*tx);
EXPECT_TRUE(master_.Info(tx_id).is_committed());
}
TEST_F(WorkerEngineTest, AbortOnWorker) {
auto tx = worker_.Begin();
auto tx_id = tx->id_;
worker_.Abort(*tx);
EXPECT_TRUE(master_.Info(tx_id).is_aborted());
}
TEST_F(WorkerEngineTest, RunningTransaction) {
master_.Begin();
master_.Begin();