diff --git a/src/communication/rpc/rpc.cpp b/src/communication/rpc/rpc.cpp index 44bc94317..2dc2e7465 100644 --- a/src/communication/rpc/rpc.cpp +++ b/src/communication/rpc/rpc.cpp @@ -2,26 +2,12 @@ #include #include "communication/rpc/rpc.hpp" +#include "utils/string.hpp" namespace communication::rpc { const char kProtocolStreamPrefix[] = "rpc-"; -std::string UniqueId() { - static thread_local std::mt19937 pseudo_rand_gen{std::random_device{}()}; - const char kCharset[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - const auto kMaxIndex = (sizeof(kCharset) - 1); - static thread_local std::uniform_int_distribution<> rand_dist{0, kMaxIndex}; - - std::string id; - std::generate_n(std::back_inserter(id), 20, - [&] { return kCharset[rand_dist(pseudo_rand_gen)]; }); - return id; -} - class Request : public messaging::Message { public: Request(const std::string &address, uint16_t port, const std::string &stream, @@ -29,7 +15,7 @@ class Request : public messaging::Message { : address_(address), port_(port), stream_(stream), - message_id_(UniqueId()), + message_id_(utils::RandomString(20)), message_(std::move(message)) {} const std::string &address() const { return address_; } @@ -81,7 +67,7 @@ Client::Client(messaging::System &system, const std::string &address, uint16_t port, const std::string &name) : system_(system), writer_(system, address, port, kProtocolStreamPrefix + name), - stream_(system.Open(UniqueId())) {} + stream_(system.Open(utils::RandomString(20))) {} // Because of the way Call is implemented it can fail without reporting (it will // just block indefinately). This is why you always need to provide reasonable diff --git a/src/communication/rpc/rpc.hpp b/src/communication/rpc/rpc.hpp index 52afd0bee..526c9f1ff 100644 --- a/src/communication/rpc/rpc.hpp +++ b/src/communication/rpc/rpc.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include "communication/messaging/distributed.hpp" diff --git a/src/transactions/commit_log.hpp b/src/transactions/commit_log.hpp index da96b8c40..5ff2edd2b 100644 --- a/src/transactions/commit_log.hpp +++ b/src/transactions/commit_log.hpp @@ -39,6 +39,7 @@ class CommitLog { ABORTED = 2, // 10 }; + Info() = default; // Required for cereal serialization explicit Info(uint8_t flags) : flags_(flags) {} bool is_active() const { return flags_ == ACTIVE; } @@ -49,8 +50,14 @@ class CommitLog { operator uint8_t() const { return flags_; } + /** Required for cereal serialization. */ + template + void serialize(Archive &archive) { + archive(flags_); + } + private: - uint8_t flags_; + uint8_t flags_{0}; }; Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; } diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 19f064a8c..c5af8afaa 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -4,8 +4,14 @@ #include "glog/logging.h" #include "transactions/engine_master.hpp" +#include "transactions/engine_rpc_messages.hpp" namespace tx { + +MasterEngine::~MasterEngine() { + if (rpc_server_) StopServer(); +} + Transaction *MasterEngine::Begin() { std::lock_guard guard(lock_); @@ -87,4 +93,47 @@ void MasterEngine::LocalForEachActiveTransaction( f(*store_.find(transaction)->second); } } + +void MasterEngine::StartServer(communication::messaging::System &system) { + CHECK(!rpc_server_) << "Can't start a running server"; + rpc_server_.emplace(system, "tx_engine"); + + rpc_server_->Register([this](const SnapshotReq &req) { + // It is guaranteed that the Worker will not be requesting this for a + // transaction that's done, and that there are no race conditions here. + auto found = store_.find(req.member); + DCHECK(found != store_.end()) + << "Can't return snapshot for an inactive transaction"; + return std::make_unique(found->second->snapshot()); + }); + + rpc_server_->Register( + [this](const communication::messaging::Message &) { + return std::make_unique(GlobalGcSnapshot()); + }); + + rpc_server_->Register([this](const ClogInfoReq &req) { + return std::make_unique(Info(req.member)); + }); + + rpc_server_->Register( + [this](const communication::messaging::Message &) { + return std::make_unique(GlobalActiveTransactions()); + }); + + rpc_server_->Register([this](const IsActiveReq &req) { + return std::make_unique(GlobalIsActive(req.member)); + }); + + rpc_server_thread_ = std::thread([this] { rpc_server_->Start(); }); +} + +void MasterEngine::StopServer() { + CHECK(rpc_server_) << "Can't stop a server that's not running"; + rpc_server_->Shutdown(); + if (rpc_server_thread_.joinable()) { + rpc_server_thread_.join(); + } + rpc_server_ = std::experimental::nullopt; +} } // namespace tx diff --git a/src/transactions/engine_master.hpp b/src/transactions/engine_master.hpp index e20e53ed7..3ff9a5c1e 100644 --- a/src/transactions/engine_master.hpp +++ b/src/transactions/engine_master.hpp @@ -1,9 +1,11 @@ #pragma once #include -#include +#include #include +#include "communication/messaging/distributed.hpp" +#include "communication/rpc/rpc.hpp" #include "threading/sync/spinlock.hpp" #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" @@ -26,6 +28,9 @@ class TransactionError : public utils::BasicException { */ class MasterEngine : public Engine { public: + /** Stops the tx server if it's running. */ + ~MasterEngine(); + /** * Begins a transaction and returns a pointer to * it's object. @@ -61,11 +66,21 @@ class MasterEngine : public Engine { void LocalForEachActiveTransaction( std::function f) override; + /** Starts the RPC server of the master transactional engine. */ + void StartServer(communication::messaging::System &system); + + /** Stops the RPC server of the master transactional engine. */ + void StopServer(); + private: std::atomic counter_{0}; CommitLog clog_; std::unordered_map> store_; Snapshot active_; SpinLock lock_; + + // Optional RPC server, only used in distributed, not in single_node. + std::experimental::optional rpc_server_; + std::thread rpc_server_thread_; }; } // namespace tx diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp new file mode 100644 index 000000000..b8b749a7e --- /dev/null +++ b/src/transactions/engine_rpc_messages.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include "cereal/archives/binary.hpp" +#include "cereal/types/base_class.hpp" +#include "cereal/types/memory.hpp" +#include "cereal/types/polymorphic.hpp" +#include "cereal/types/string.hpp" +#include "cereal/types/utility.hpp" +#include "cereal/types/vector.hpp" + +#include "communication/rpc/rpc.hpp" +#include "transactions/commit_log.hpp" +#include "transactions/snapshot.hpp" +#include "transactions/type.hpp" + +#define NO_MEMBER_MESSAGE(name) \ + namespace tx { \ + using communication::messaging::Message; \ + struct name : public Message { \ + name() {} \ + template \ + void serialize(Archive &ar) { \ + ar(cereal::virtual_base_class(this)); \ + } \ + }; \ + } \ + CEREAL_REGISTER_TYPE(tx::name); + +#define SINGLE_MEMBER_MESSAGE(name, type) \ + namespace tx { \ + using communication::messaging::Message; \ + struct name : public Message { \ + name() {} \ + name(const type &member) : member(member) {} \ + type member; \ + template \ + void serialize(Archive &ar) { \ + ar(cereal::virtual_base_class(this), member); \ + } \ + }; \ + } \ + CEREAL_REGISTER_TYPE(tx::name); + +SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t) +SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot) +NO_MEMBER_MESSAGE(GcSnapshotReq) +SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t) +SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info) +SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t) +SINGLE_MEMBER_MESSAGE(IsActiveReq, transaction_id_t) +SINGLE_MEMBER_MESSAGE(IsActiveRes, bool) + +#undef SINGLE_MEMBER_MESSAGE +#undef NO_MEMBER_MESSAGE + +namespace tx { +using SnapshotRpc = + communication::rpc::RequestResponse; +using GcSnapshotRpc = + communication::rpc::RequestResponse; +using GcSnapshotRpc = + communication::rpc::RequestResponse; +using ClogInfoRpc = + communication::rpc::RequestResponse; +using ActiveTransactionsRpc = + communication::rpc::RequestResponse; +using IsActiveRpc = + communication::rpc::RequestResponse; +} diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 43484a497..8c2c1f180 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -1,33 +1,43 @@ #include "glog/logging.h" +#include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_worker.hpp" #include "utils/atomic.hpp" namespace tx { +namespace { +static const auto kRpcTimeout = 100ms; +} + +WorkerEngine::WorkerEngine(communication::messaging::System &system, + const std::string &tx_server_host, + uint16_t tx_server_port) + : rpc_client_(system, tx_server_host, tx_server_port, "tx_engine") {} + Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) { + std::lock_guard guard(rpc_client_lock_); auto accessor = active_.access(); auto found = accessor.find(tx_id); if (found != accessor.end()) return found->second; - Snapshot snapshot = rpc_.Call("Snapshot", tx_id); - auto *tx = new Transaction(tx_id, snapshot, *this); - auto insertion = accessor.insert(tx_id, tx); - if (!insertion.second) { - delete tx; - tx = insertion.first->second; - } + Snapshot snapshot( + std::move(rpc_client_.Call(kRpcTimeout, tx_id)->member)); + auto insertion = + accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this)); + CHECK(insertion.second) << "Transaction already inserted"; utils::EnsureAtomicGe(local_last_, tx_id); - return tx; + return insertion.first->second; } CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { + std::lock_guard guard(rpc_client_lock_); auto info = clog_.fetch_info(tid); // If we don't know the transaction to be commited nor aborted, ask the // master about it and update the local commit log. if (!(info.is_aborted() || info.is_committed())) { // @review: this version of Call is just used because Info has no default // constructor. - info = rpc_.Call("Info", info, tid); + info = rpc_client_.Call(kRpcTimeout, tid)->member; DCHECK(info.is_committed() || info.is_aborted()) << "It is expected that the transaction is not running anymore. This " "function should be used only after the snapshot of the current " @@ -40,15 +50,19 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { } Snapshot WorkerEngine::GlobalGcSnapshot() { - return rpc_.Call("Snapshot"); + std::lock_guard guard(rpc_client_lock_); + return std::move(rpc_client_.Call(kRpcTimeout)->member); } Snapshot WorkerEngine::GlobalActiveTransactions() { - return rpc_.Call("Active"); + std::lock_guard guard(rpc_client_lock_); + return std::move( + rpc_client_.Call(kRpcTimeout)->member); } bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const { - return rpc_.Call("GlobalIsActive", tid); + std::lock_guard guard(rpc_client_lock_); + return rpc_client_.Call(kRpcTimeout, tid)->member; } tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; } diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index d1baf59be..17c903c12 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -1,8 +1,10 @@ #pragma once +#include #include -#include +#include "communication/messaging/distributed.hpp" +#include "communication/rpc/rpc.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" @@ -11,23 +13,9 @@ namespace tx { /** A transactional engine for the worker in a distributed system. */ class WorkerEngine : public Engine { - // Mock class for RPC. - // TODO Replace with the real thing, once available. - class Rpc { - public: - template - TReturn Call(const std::string &, TArgs &&...) { - return TReturn{}; - } - - template - TReturn Call(const std::string &, TReturn default_return, TArgs &&...) { - return default_return; - } - }; - public: - WorkerEngine(Rpc &rpc) : rpc_(rpc) {} + WorkerEngine(communication::messaging::System &system, + const std::string &tx_server_host, uint16_t tx_server_port); Transaction *LocalBegin(transaction_id_t tx_id); @@ -40,13 +28,14 @@ class WorkerEngine : public Engine { std::function f) override; private: - // Communication with the transactional engine on the master. - Rpc &rpc_; - // Local caches. ConcurrentMap active_; - std::atomic local_last_; + std::atomic local_last_{0}; // Mutable because just getting info can cause a cache fill. mutable CommitLog clog_; + + // Communication to the transactional master. + mutable communication::rpc::Client rpc_client_; + mutable std::mutex rpc_client_lock_; }; } // namespace tx diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp index e888ec621..2b9877396 100644 --- a/src/transactions/snapshot.hpp +++ b/src/transactions/snapshot.hpp @@ -84,6 +84,12 @@ class Snapshot { return stream; } + /** Required for cereal serialization. */ + template + void serialize(Archive &archive) { + archive(transaction_ids_); + } + private: std::vector transaction_ids_; }; diff --git a/tests/unit/bolt_encoder.cpp b/tests/unit/bolt_encoder.cpp index 43f3d9ecc..6518cd19b 100644 --- a/tests/unit/bolt_encoder.cpp +++ b/tests/unit/bolt_encoder.cpp @@ -45,10 +45,10 @@ void CheckTypeSize(std::vector &v, int typ, uint64_t size) { } void CheckInt(std::vector &output, int64_t value) { - TestSocket socket(20); - TestBuffer encoder_buffer(socket); + TestSocket test_socket(20); + TestBuffer encoder_buffer(test_socket); communication::bolt::BaseEncoder bolt_encoder(encoder_buffer); - std::vector &encoded = socket.output; + std::vector &encoded = test_socket.output; bolt_encoder.WriteInt(value); CheckOutput(output, encoded.data(), encoded.size(), false); } @@ -58,10 +58,10 @@ void CheckRecordHeader(std::vector &v, uint64_t size) { CheckTypeSize(v, LIST, size); } -TestSocket socket(10); -TestBuffer encoder_buffer(socket); +TestSocket test_socket(10); +TestBuffer encoder_buffer(test_socket); communication::bolt::Encoder bolt_encoder(encoder_buffer); -std::vector &output = socket.output; +std::vector &output = test_socket.output; TEST(BoltEncoder, NullAndBool) { output.clear(); diff --git a/tests/unit/transaction_local_engine.cpp b/tests/unit/transaction_engine_master.cpp similarity index 100% rename from tests/unit/transaction_local_engine.cpp rename to tests/unit/transaction_engine_master.cpp diff --git a/tests/unit/transaction_engine_worker.cpp b/tests/unit/transaction_engine_worker.cpp new file mode 100644 index 000000000..3470b7643 --- /dev/null +++ b/tests/unit/transaction_engine_worker.cpp @@ -0,0 +1,118 @@ +#include + +#include "gtest/gtest.h" + +#include "communication/messaging/distributed.hpp" +#include "transactions/engine_master.hpp" +#include "transactions/engine_worker.hpp" + +using namespace tx; +using namespace communication::messaging; + +class WorkerEngineTest : public testing::Test { + protected: + const static uint16_t master_port_{22345}; + const std::string local{"127.0.0.1"}; + + System master_system_{local, master_port_}; + MasterEngine master_; + + System worker_system_{local, master_port_ + 1}; + WorkerEngine worker_{worker_system_, local, master_port_}; + + void SetUp() override { master_.StartServer(master_system_); } + void TearDown() override { + worker_system_.Shutdown(); + master_.StopServer(); + master_system_.Shutdown(); + } +}; + +TEST_F(WorkerEngineTest, LocalBegin) { + master_.Begin(); + master_.Begin(); + worker_.LocalBegin(1); + worker_.LocalBegin(2); + int count = 0; + worker_.LocalForEachActiveTransaction([&count](Transaction &t) { + ++count; + if (t.id_ == 1) { + EXPECT_EQ(t.snapshot(), + tx::Snapshot(std::vector{})); + } else { + EXPECT_EQ(t.snapshot(), tx::Snapshot({1})); + } + }); + EXPECT_EQ(count, 2); +} + +TEST_F(WorkerEngineTest, Info) { + auto *tx_1 = master_.Begin(); + auto *tx_2 = master_.Begin(); + // We can't check active transactions in the worker (see comments there for + // info). + master_.Commit(*tx_1); + EXPECT_TRUE(master_.Info(1).is_committed()); + EXPECT_TRUE(worker_.Info(1).is_committed()); + master_.Abort(*tx_2); + EXPECT_TRUE(master_.Info(2).is_aborted()); + EXPECT_TRUE(worker_.Info(2).is_aborted()); +} + +TEST_F(WorkerEngineTest, GlobalGcSnapshot) { + auto *tx_1 = master_.Begin(); + master_.Begin(); + master_.Commit(*tx_1); + EXPECT_EQ(master_.GlobalGcSnapshot(), tx::Snapshot({1, 2})); + EXPECT_EQ(worker_.GlobalGcSnapshot(), master_.GlobalGcSnapshot()); +} + +TEST_F(WorkerEngineTest, GlobalActiveTransactions) { + auto *tx_1 = master_.Begin(); + master_.Begin(); + auto *tx_3 = master_.Begin(); + master_.Begin(); + master_.Commit(*tx_1); + master_.Abort(*tx_3); + EXPECT_EQ(worker_.GlobalActiveTransactions(), tx::Snapshot({2, 4})); +} + +TEST_F(WorkerEngineTest, GlobalIsActive) { + auto *tx_1 = master_.Begin(); + master_.Begin(); + auto *tx_3 = master_.Begin(); + master_.Begin(); + master_.Commit(*tx_1); + master_.Abort(*tx_3); + EXPECT_FALSE(worker_.GlobalIsActive(1)); + EXPECT_TRUE(worker_.GlobalIsActive(2)); + EXPECT_FALSE(worker_.GlobalIsActive(3)); + EXPECT_TRUE(worker_.GlobalIsActive(4)); +} + +TEST_F(WorkerEngineTest, LocalLast) { + master_.Begin(); + EXPECT_EQ(worker_.LocalLast(), 0); + worker_.LocalBegin(1); + EXPECT_EQ(worker_.LocalLast(), 1); + master_.Begin(); + EXPECT_EQ(worker_.LocalLast(), 1); + master_.Begin(); + EXPECT_EQ(worker_.LocalLast(), 1); + master_.Begin(); + worker_.LocalBegin(4); + EXPECT_EQ(worker_.LocalLast(), 4); +} + +TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { + master_.Begin(); + worker_.LocalBegin(1); + master_.Begin(); + master_.Begin(); + master_.Begin(); + worker_.LocalBegin(4); + std::unordered_set local; + worker_.LocalForEachActiveTransaction( + [&local](Transaction &t) { local.insert(t.id_); }); + EXPECT_EQ(local, std::unordered_set({1, 4})); +}