Add RPC to distributed tx::Engine

Reviewers: mislav.bradac, dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1035
This commit is contained in:
florijan 2017-12-06 14:12:26 +01:00
parent eb1adccf40
commit 4982d45e27
12 changed files with 313 additions and 58 deletions

View File

@ -2,26 +2,12 @@
#include <random>
#include "communication/rpc/rpc.hpp"
#include "utils/string.hpp"
namespace communication::rpc {
const char kProtocolStreamPrefix[] = "rpc-";
std::string UniqueId() {
static thread_local std::mt19937 pseudo_rand_gen{std::random_device{}()};
const char kCharset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const auto kMaxIndex = (sizeof(kCharset) - 1);
static thread_local std::uniform_int_distribution<> rand_dist{0, kMaxIndex};
std::string id;
std::generate_n(std::back_inserter(id), 20,
[&] { return kCharset[rand_dist(pseudo_rand_gen)]; });
return id;
}
class Request : public messaging::Message {
public:
Request(const std::string &address, uint16_t port, const std::string &stream,
@ -29,7 +15,7 @@ class Request : public messaging::Message {
: address_(address),
port_(port),
stream_(stream),
message_id_(UniqueId()),
message_id_(utils::RandomString(20)),
message_(std::move(message)) {}
const std::string &address() const { return address_; }
@ -81,7 +67,7 @@ Client::Client(messaging::System &system, const std::string &address,
uint16_t port, const std::string &name)
: system_(system),
writer_(system, address, port, kProtocolStreamPrefix + name),
stream_(system.Open(UniqueId())) {}
stream_(system.Open(utils::RandomString(20))) {}
// Because of the way Call is implemented it can fail without reporting (it will
// just block indefinately). This is why you always need to provide reasonable

View File

@ -1,3 +1,5 @@
#pragma once
#include <type_traits>
#include "communication/messaging/distributed.hpp"

View File

@ -39,6 +39,7 @@ class CommitLog {
ABORTED = 2, // 10
};
Info() = default; // Required for cereal serialization
explicit Info(uint8_t flags) : flags_(flags) {}
bool is_active() const { return flags_ == ACTIVE; }
@ -49,8 +50,14 @@ class CommitLog {
operator uint8_t() const { return flags_; }
/** Required for cereal serialization. */
template <class Archive>
void serialize(Archive &archive) {
archive(flags_);
}
private:
uint8_t flags_;
uint8_t flags_{0};
};
Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; }

View File

@ -4,8 +4,14 @@
#include "glog/logging.h"
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
namespace tx {
MasterEngine::~MasterEngine() {
if (rpc_server_) StopServer();
}
Transaction *MasterEngine::Begin() {
std::lock_guard<SpinLock> guard(lock_);
@ -87,4 +93,47 @@ void MasterEngine::LocalForEachActiveTransaction(
f(*store_.find(transaction)->second);
}
}
void MasterEngine::StartServer(communication::messaging::System &system) {
CHECK(!rpc_server_) << "Can't start a running server";
rpc_server_.emplace(system, "tx_engine");
rpc_server_->Register<SnapshotRpc>([this](const SnapshotReq &req) {
// It is guaranteed that the Worker will not be requesting this for a
// transaction that's done, and that there are no race conditions here.
auto found = store_.find(req.member);
DCHECK(found != store_.end())
<< "Can't return snapshot for an inactive transaction";
return std::make_unique<SnapshotRes>(found->second->snapshot());
});
rpc_server_->Register<GcSnapshotRpc>(
[this](const communication::messaging::Message &) {
return std::make_unique<SnapshotRes>(GlobalGcSnapshot());
});
rpc_server_->Register<ClogInfoRpc>([this](const ClogInfoReq &req) {
return std::make_unique<ClogInfoRes>(Info(req.member));
});
rpc_server_->Register<ActiveTransactionsRpc>(
[this](const communication::messaging::Message &) {
return std::make_unique<SnapshotRes>(GlobalActiveTransactions());
});
rpc_server_->Register<IsActiveRpc>([this](const IsActiveReq &req) {
return std::make_unique<IsActiveRes>(GlobalIsActive(req.member));
});
rpc_server_thread_ = std::thread([this] { rpc_server_->Start(); });
}
void MasterEngine::StopServer() {
CHECK(rpc_server_) << "Can't stop a server that's not running";
rpc_server_->Shutdown();
if (rpc_server_thread_.joinable()) {
rpc_server_thread_.join();
}
rpc_server_ = std::experimental::nullopt;
}
} // namespace tx

View File

@ -1,9 +1,11 @@
#pragma once
#include <atomic>
#include <memory>
#include <experimental/optional>
#include <unordered_map>
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
@ -26,6 +28,9 @@ class TransactionError : public utils::BasicException {
*/
class MasterEngine : public Engine {
public:
/** Stops the tx server if it's running. */
~MasterEngine();
/**
* Begins a transaction and returns a pointer to
* it's object.
@ -61,11 +66,21 @@ class MasterEngine : public Engine {
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
/** Starts the RPC server of the master transactional engine. */
void StartServer(communication::messaging::System &system);
/** Stops the RPC server of the master transactional engine. */
void StopServer();
private:
std::atomic<transaction_id_t> counter_{0};
CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
Snapshot active_;
SpinLock lock_;
// Optional RPC server, only used in distributed, not in single_node.
std::experimental::optional<communication::rpc::Server> rpc_server_;
std::thread rpc_server_thread_;
};
} // namespace tx

View File

@ -0,0 +1,69 @@
#pragma once
#include "cereal/archives/binary.hpp"
#include "cereal/types/base_class.hpp"
#include "cereal/types/memory.hpp"
#include "cereal/types/polymorphic.hpp"
#include "cereal/types/string.hpp"
#include "cereal/types/utility.hpp"
#include "cereal/types/vector.hpp"
#include "communication/rpc/rpc.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/snapshot.hpp"
#include "transactions/type.hpp"
#define NO_MEMBER_MESSAGE(name) \
namespace tx { \
using communication::messaging::Message; \
struct name : public Message { \
name() {} \
template <class Archive> \
void serialize(Archive &ar) { \
ar(cereal::virtual_base_class<Message>(this)); \
} \
}; \
} \
CEREAL_REGISTER_TYPE(tx::name);
#define SINGLE_MEMBER_MESSAGE(name, type) \
namespace tx { \
using communication::messaging::Message; \
struct name : public Message { \
name() {} \
name(const type &member) : member(member) {} \
type member; \
template <class Archive> \
void serialize(Archive &ar) { \
ar(cereal::virtual_base_class<Message>(this), member); \
} \
}; \
} \
CEREAL_REGISTER_TYPE(tx::name);
SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t)
SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot)
NO_MEMBER_MESSAGE(GcSnapshotReq)
SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t)
SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info)
SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t)
SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t)
SINGLE_MEMBER_MESSAGE(IsActiveRes, bool)
#undef SINGLE_MEMBER_MESSAGE
#undef NO_MEMBER_MESSAGE
namespace tx {
using SnapshotRpc =
communication::rpc::RequestResponse<SnapshotReq, SnapshotRes>;
using GcSnapshotRpc =
communication::rpc::RequestResponse<GcSnapshotReq, SnapshotRes>;
using GcSnapshotRpc =
communication::rpc::RequestResponse<GcSnapshotReq, SnapshotRes>;
using ClogInfoRpc =
communication::rpc::RequestResponse<ClogInfoReq, ClogInfoRes>;
using ActiveTransactionsRpc =
communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>;
using IsActiveRpc =
communication::rpc::RequestResponse<IsActiveReq, IsActiveRes>;
}

View File

@ -1,33 +1,43 @@
#include "glog/logging.h"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
#include "utils/atomic.hpp"
namespace tx {
namespace {
static const auto kRpcTimeout = 100ms;
}
WorkerEngine::WorkerEngine(communication::messaging::System &system,
const std::string &tx_server_host,
uint16_t tx_server_port)
: rpc_client_(system, tx_server_host, tx_server_port, "tx_engine") {}
Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot = rpc_.Call<Snapshot>("Snapshot", tx_id);
auto *tx = new Transaction(tx_id, snapshot, *this);
auto insertion = accessor.insert(tx_id, tx);
if (!insertion.second) {
delete tx;
tx = insertion.first->second;
}
Snapshot snapshot(
std::move(rpc_client_.Call<SnapshotRpc>(kRpcTimeout, tx_id)->member));
auto insertion =
accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this));
CHECK(insertion.second) << "Transaction already inserted";
utils::EnsureAtomicGe(local_last_, tx_id);
return tx;
return insertion.first->second;
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the
// master about it and update the local commit log.
if (!(info.is_aborted() || info.is_committed())) {
// @review: this version of Call is just used because Info has no default
// constructor.
info = rpc_.Call<CommitLog::Info>("Info", info, tid);
info = rpc_client_.Call<ClogInfoRpc>(kRpcTimeout, tid)->member;
DCHECK(info.is_committed() || info.is_aborted())
<< "It is expected that the transaction is not running anymore. This "
"function should be used only after the snapshot of the current "
@ -40,15 +50,19 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
}
Snapshot WorkerEngine::GlobalGcSnapshot() {
return rpc_.Call<Snapshot>("Snapshot");
std::lock_guard<std::mutex> guard(rpc_client_lock_);
return std::move(rpc_client_.Call<GcSnapshotRpc>(kRpcTimeout)->member);
}
Snapshot WorkerEngine::GlobalActiveTransactions() {
return rpc_.Call<Snapshot>("Active");
std::lock_guard<std::mutex> guard(rpc_client_lock_);
return std::move(
rpc_client_.Call<ActiveTransactionsRpc>(kRpcTimeout)->member);
}
bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const {
return rpc_.Call<bool>("GlobalIsActive", tid);
std::lock_guard<std::mutex> guard(rpc_client_lock_);
return rpc_client_.Call<IsActiveRpc>(kRpcTimeout, tid)->member;
}
tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }

View File

@ -1,8 +1,10 @@
#pragma once
#include <mutex>
#include <atomic>
#include <memory>
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
@ -11,23 +13,9 @@
namespace tx {
/** A transactional engine for the worker in a distributed system. */
class WorkerEngine : public Engine {
// Mock class for RPC.
// TODO Replace with the real thing, once available.
class Rpc {
public:
template <typename TReturn, typename... TArgs>
TReturn Call(const std::string &, TArgs &&...) {
return TReturn{};
}
template <typename TReturn, typename... TArgs>
TReturn Call(const std::string &, TReturn default_return, TArgs &&...) {
return default_return;
}
};
public:
WorkerEngine(Rpc &rpc) : rpc_(rpc) {}
WorkerEngine(communication::messaging::System &system,
const std::string &tx_server_host, uint16_t tx_server_port);
Transaction *LocalBegin(transaction_id_t tx_id);
@ -40,13 +28,14 @@ class WorkerEngine : public Engine {
std::function<void(Transaction &)> f) override;
private:
// Communication with the transactional engine on the master.
Rpc &rpc_;
// Local caches.
ConcurrentMap<transaction_id_t, Transaction *> active_;
std::atomic<transaction_id_t> local_last_;
std::atomic<transaction_id_t> local_last_{0};
// Mutable because just getting info can cause a cache fill.
mutable CommitLog clog_;
// Communication to the transactional master.
mutable communication::rpc::Client rpc_client_;
mutable std::mutex rpc_client_lock_;
};
} // namespace tx

View File

@ -84,6 +84,12 @@ class Snapshot {
return stream;
}
/** Required for cereal serialization. */
template <class Archive>
void serialize(Archive &archive) {
archive(transaction_ids_);
}
private:
std::vector<transaction_id_t> transaction_ids_;
};

View File

@ -45,10 +45,10 @@ void CheckTypeSize(std::vector<uint8_t> &v, int typ, uint64_t size) {
}
void CheckInt(std::vector<uint8_t> &output, int64_t value) {
TestSocket socket(20);
TestBuffer encoder_buffer(socket);
TestSocket test_socket(20);
TestBuffer encoder_buffer(test_socket);
communication::bolt::BaseEncoder<TestBuffer> bolt_encoder(encoder_buffer);
std::vector<uint8_t> &encoded = socket.output;
std::vector<uint8_t> &encoded = test_socket.output;
bolt_encoder.WriteInt(value);
CheckOutput(output, encoded.data(), encoded.size(), false);
}
@ -58,10 +58,10 @@ void CheckRecordHeader(std::vector<uint8_t> &v, uint64_t size) {
CheckTypeSize(v, LIST, size);
}
TestSocket socket(10);
TestBuffer encoder_buffer(socket);
TestSocket test_socket(10);
TestBuffer encoder_buffer(test_socket);
communication::bolt::Encoder<TestBuffer> bolt_encoder(encoder_buffer);
std::vector<uint8_t> &output = socket.output;
std::vector<uint8_t> &output = test_socket.output;
TEST(BoltEncoder, NullAndBool) {
output.clear();

View File

@ -0,0 +1,118 @@
#include <unordered_set>
#include "gtest/gtest.h"
#include "communication/messaging/distributed.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_worker.hpp"
using namespace tx;
using namespace communication::messaging;
class WorkerEngineTest : public testing::Test {
protected:
const static uint16_t master_port_{22345};
const std::string local{"127.0.0.1"};
System master_system_{local, master_port_};
MasterEngine master_;
System worker_system_{local, master_port_ + 1};
WorkerEngine worker_{worker_system_, local, master_port_};
void SetUp() override { master_.StartServer(master_system_); }
void TearDown() override {
worker_system_.Shutdown();
master_.StopServer();
master_system_.Shutdown();
}
};
TEST_F(WorkerEngineTest, LocalBegin) {
master_.Begin();
master_.Begin();
worker_.LocalBegin(1);
worker_.LocalBegin(2);
int count = 0;
worker_.LocalForEachActiveTransaction([&count](Transaction &t) {
++count;
if (t.id_ == 1) {
EXPECT_EQ(t.snapshot(),
tx::Snapshot(std::vector<tx::transaction_id_t>{}));
} else {
EXPECT_EQ(t.snapshot(), tx::Snapshot({1}));
}
});
EXPECT_EQ(count, 2);
}
TEST_F(WorkerEngineTest, Info) {
auto *tx_1 = master_.Begin();
auto *tx_2 = master_.Begin();
// We can't check active transactions in the worker (see comments there for
// info).
master_.Commit(*tx_1);
EXPECT_TRUE(master_.Info(1).is_committed());
EXPECT_TRUE(worker_.Info(1).is_committed());
master_.Abort(*tx_2);
EXPECT_TRUE(master_.Info(2).is_aborted());
EXPECT_TRUE(worker_.Info(2).is_aborted());
}
TEST_F(WorkerEngineTest, GlobalGcSnapshot) {
auto *tx_1 = master_.Begin();
master_.Begin();
master_.Commit(*tx_1);
EXPECT_EQ(master_.GlobalGcSnapshot(), tx::Snapshot({1, 2}));
EXPECT_EQ(worker_.GlobalGcSnapshot(), master_.GlobalGcSnapshot());
}
TEST_F(WorkerEngineTest, GlobalActiveTransactions) {
auto *tx_1 = master_.Begin();
master_.Begin();
auto *tx_3 = master_.Begin();
master_.Begin();
master_.Commit(*tx_1);
master_.Abort(*tx_3);
EXPECT_EQ(worker_.GlobalActiveTransactions(), tx::Snapshot({2, 4}));
}
TEST_F(WorkerEngineTest, GlobalIsActive) {
auto *tx_1 = master_.Begin();
master_.Begin();
auto *tx_3 = master_.Begin();
master_.Begin();
master_.Commit(*tx_1);
master_.Abort(*tx_3);
EXPECT_FALSE(worker_.GlobalIsActive(1));
EXPECT_TRUE(worker_.GlobalIsActive(2));
EXPECT_FALSE(worker_.GlobalIsActive(3));
EXPECT_TRUE(worker_.GlobalIsActive(4));
}
TEST_F(WorkerEngineTest, LocalLast) {
master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 0);
worker_.LocalBegin(1);
EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin();
EXPECT_EQ(worker_.LocalLast(), 1);
master_.Begin();
worker_.LocalBegin(4);
EXPECT_EQ(worker_.LocalLast(), 4);
}
TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
master_.Begin();
worker_.LocalBegin(1);
master_.Begin();
master_.Begin();
master_.Begin();
worker_.LocalBegin(4);
std::unordered_set<tx::transaction_id_t> local;
worker_.LocalForEachActiveTransaction(
[&local](Transaction &t) { local.insert(t.id_); });
EXPECT_EQ(local, std::unordered_set<tx::transaction_id_t>({1, 4}));
}