Move RPC server to Coordination

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1658
This commit is contained in:
Matej Ferencevic 2018-10-16 09:12:19 +02:00
parent c553a309d2
commit baae40fcc6
44 changed files with 361 additions and 376 deletions

View File

@ -1,19 +1,16 @@
#include "database/distributed/distributed_counters.hpp"
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "database/distributed/counters_rpc_messages.hpp"
namespace database {
MasterCounters::MasterCounters(communication::rpc::Server *server)
: rpc_server_(server) {
rpc_server_->Register<CountersGetRpc>(
MasterCounters::MasterCounters(distributed::Coordination *coordination) {
coordination->Register<CountersGetRpc>(
[this](const auto &req_reader, auto *res_builder) {
CountersGetRes res(Get(req_reader.getName()));
Save(res, res_builder);
});
rpc_server_->Register<CountersSetRpc>(
coordination->Register<CountersSetRpc>(
[this](const auto &req_reader, auto *res_builder) {
Set(req_reader.getName(), req_reader.getValue());
return std::make_unique<CountersSetRes>();

View File

@ -7,24 +7,19 @@
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/distributed/counters.hpp"
namespace communication::rpc {
class Server;
class ClientPool;
} // namespace communication::rpc
#include "distributed/coordination.hpp"
namespace database {
/// Implementation for distributed master
class MasterCounters : public Counters {
public:
explicit MasterCounters(communication::rpc::Server *server);
explicit MasterCounters(distributed::Coordination *coordination);
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;
private:
communication::rpc::Server *rpc_server_;
ConcurrentMap<std::string, std::atomic<int64_t>> counters_;
};

View File

@ -580,7 +580,7 @@ namespace impl {
template <template <typename TId> class TMapper>
struct TypemapPack {
template <typename... TMapperArgs>
explicit TypemapPack(TMapperArgs &... args)
explicit TypemapPack(TMapperArgs ... args)
: label(args...), edge_type(args...), property(args...) {}
// TODO this should also be garbage collected
TMapper<storage::Label> label;
@ -614,35 +614,33 @@ class Master {
// clean the mess. Also, be careful of virtual calls to `self_` in
// constructors of members.
database::Master *self_{nullptr};
communication::rpc::Server server_{
config_.master_endpoint,
static_cast<size_t>(config_.rpc_num_server_workers)};
tx::EngineMaster tx_engine_{&server_, &coordination_, &wal_};
distributed::MasterCoordination coordination_{server_.endpoint(),
distributed::MasterCoordination coordination_{config_.master_endpoint,
config_.rpc_num_server_workers,
config_.rpc_num_client_workers};
tx::EngineMaster tx_engine_{&coordination_, &wal_};
std::unique_ptr<StorageGcMaster> storage_gc_ =
std::make_unique<StorageGcMaster>(
*storage_, tx_engine_, config_.gc_cycle_sec, server_, coordination_);
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{&server_};
*storage_, tx_engine_, config_.gc_cycle_sec, &coordination_);
TypemapPack<storage::MasterConcurrentIdMapper> typemap_pack_{&coordination_};
database::MasterCounters counters_{&coordination_};
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
distributed::BfsRpcServer bfs_subcursor_server_{self_, &coordination_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{
self_, &subcursor_storage_, &coordination_, &data_manager_};
distributed::DurabilityRpcMaster durability_rpc_{&coordination_};
distributed::DataRpcServer data_server_{self_, &server_};
distributed::DataRpcServer data_server_{self_, &coordination_};
distributed::DataRpcClients data_clients_{&coordination_};
distributed::PlanDispatcher plan_dispatcher_{&coordination_};
distributed::PullRpcClients pull_clients_{&coordination_, &data_manager_};
distributed::UpdatesRpcServer updates_server_{self_, &server_};
distributed::UpdatesRpcServer updates_server_{self_, &coordination_};
distributed::UpdatesRpcClients updates_clients_{&coordination_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::ClusterDiscoveryMaster cluster_discovery_{
&server_, &coordination_, config_.durability_directory};
&coordination_, config_.durability_directory};
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_, &server_};
distributed::DynamicWorkerAddition dynamic_worker_addition_{self_, &server_};
self_, config_.worker_id, &coordination_};
distributed::DynamicWorkerAddition dynamic_worker_addition_{self_, &coordination_};
};
} // namespace impl
@ -830,11 +828,11 @@ void Master::ReinitializeStorage() {
impl_->config_.worker_id, impl_->config_.properties_on_disk);
impl_->storage_gc_ = std::make_unique<StorageGcMaster>(
*impl_->storage_, impl_->tx_engine_, impl_->config_.gc_cycle_sec,
impl_->server_, impl_->coordination_);
&impl_->coordination_);
}
io::network::Endpoint Master::endpoint() const {
return impl_->server_.endpoint();
return impl_->coordination_.GetServerEndpoint();
}
io::network::Endpoint Master::GetEndpoint(int worker_id) {
@ -895,11 +893,6 @@ bool Master::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
return true;
});
// We stop the RPC server to disable further requests.
// TODO (mferencevic): Move the RPC into coordination.
impl_->server_.Shutdown();
impl_->server_.AwaitShutdown();
// Return the shutdown success status.
return ret;
}
@ -991,41 +984,36 @@ class Worker {
// clean the mess. Also, be careful of virtual calls to `self_` in
// constructors of members.
database::Worker *self_{nullptr};
communication::rpc::Server server_{
config_.worker_endpoint,
static_cast<size_t>(config_.rpc_num_server_workers)};
distributed::WorkerCoordination coordination_{
&server_, config_.master_endpoint, config_.worker_id,
config_.rpc_num_client_workers};
// TODO (mferencevic): Pass the coordination object directly wherever there is
// a `GetClientPool(xyz)` call.
tx::EngineWorker tx_engine_{&server_, coordination_.GetClientPool(0), &wal_};
config_.worker_endpoint, config_.worker_id, config_.master_endpoint,
config_.rpc_num_server_workers, config_.rpc_num_client_workers};
tx::EngineWorker tx_engine_{&coordination_, &wal_};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
*coordination_.GetClientPool(0), config_.worker_id);
TypemapPack<storage::WorkerConcurrentIdMapper> typemap_pack_{
*coordination_.GetClientPool(0)};
coordination_.GetClientPool(0)};
database::WorkerCounters counters_{coordination_.GetClientPool(0)};
distributed::BfsSubcursorStorage subcursor_storage_{&bfs_subcursor_clients_};
distributed::BfsRpcServer bfs_subcursor_server_{self_, &server_,
distributed::BfsRpcServer bfs_subcursor_server_{self_, &coordination_,
&subcursor_storage_};
distributed::BfsRpcClients bfs_subcursor_clients_{
self_, &subcursor_storage_, &coordination_, &data_manager_};
distributed::DataRpcServer data_server_{self_, &server_};
distributed::DataRpcServer data_server_{self_, &coordination_};
distributed::DataRpcClients data_clients_{&coordination_};
distributed::PlanConsumer plan_consumer_{server_};
distributed::ProduceRpcServer produce_server_{self_, &tx_engine_, server_,
distributed::PlanConsumer plan_consumer_{&coordination_};
distributed::ProduceRpcServer produce_server_{self_, &tx_engine_, &coordination_,
plan_consumer_, &data_manager_};
distributed::IndexRpcServer index_rpc_server_{*self_, server_};
distributed::UpdatesRpcServer updates_server_{self_, &server_};
distributed::IndexRpcServer index_rpc_server_{self_, &coordination_};
distributed::UpdatesRpcServer updates_server_{self_, &coordination_};
distributed::UpdatesRpcClients updates_clients_{&coordination_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::DurabilityRpcWorker durability_rpc_{self_, &server_};
distributed::DurabilityRpcWorker durability_rpc_{self_, &coordination_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
server_, coordination_, *coordination_.GetClientPool(0)};
&coordination_};
distributed::TokenSharingRpcServer token_sharing_server_{
self_, config_.worker_id, &coordination_, &server_};
self_, config_.worker_id, &coordination_};
distributed::DynamicWorkerRegistration dynamic_worker_registration_{
coordination_.GetClientPool(0)};
};
@ -1197,7 +1185,7 @@ void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) {
}
io::network::Endpoint Worker::endpoint() const {
return impl_->server_.endpoint();
return impl_->coordination_.GetServerEndpoint();
}
io::network::Endpoint Worker::GetEndpoint(int worker_id) {
@ -1231,10 +1219,6 @@ bool Worker::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
return true;
});
// Stop the RPC server
impl_->server_.Shutdown();
impl_->server_.AwaitShutdown();
// Return the shutdown success status.
return ret;
}

View File

@ -4,10 +4,9 @@
#include <map>
#include "communication/rpc/server.hpp"
#include "distributed/bfs_rpc_messages.hpp"
#include "distributed/bfs_subcursor.hpp"
#include "distributed/coordination.hpp"
namespace distributed {
@ -18,10 +17,10 @@ namespace distributed {
class BfsRpcServer {
public:
BfsRpcServer(database::DistributedGraphDb *db,
communication::rpc::Server *server,
distributed::Coordination *coordination,
BfsSubcursorStorage *subcursor_storage)
: db_(db), server_(server), subcursor_storage_(subcursor_storage) {
server_->Register<CreateBfsSubcursorRpc>(
: db_(db), subcursor_storage_(subcursor_storage) {
coordination->Register<CreateBfsSubcursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
CreateBfsSubcursorReq req;
auto ast_storage = std::make_unique<query::AstStorage>();
@ -36,7 +35,7 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<RegisterSubcursorsRpc>(
coordination->Register<RegisterSubcursorsRpc>(
[this](const auto &req_reader, auto *res_builder) {
RegisterSubcursorsReq req;
Load(&req, req_reader);
@ -46,7 +45,7 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<ResetSubcursorRpc>(
coordination->Register<ResetSubcursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
ResetSubcursorReq req;
Load(&req, req_reader);
@ -55,7 +54,7 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<RemoveBfsSubcursorRpc>(
coordination->Register<RemoveBfsSubcursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
RemoveBfsSubcursorReq req;
Load(&req, req_reader);
@ -65,7 +64,7 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<SetSourceRpc>(
coordination->Register<SetSourceRpc>(
[this](const auto &req_reader, auto *res_builder) {
SetSourceReq req;
Load(&req, req_reader);
@ -74,7 +73,7 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<ExpandLevelRpc>(
coordination->Register<ExpandLevelRpc>(
[this](const auto &req_reader, auto *res_builder) {
ExpandLevelReq req;
Load(&req, req_reader);
@ -90,7 +89,7 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<SubcursorPullRpc>(
coordination->Register<SubcursorPullRpc>(
[this](const auto &req_reader, auto *res_builder) {
SubcursorPullReq req;
Load(&req, req_reader);
@ -99,7 +98,7 @@ class BfsRpcServer {
Save(res, res_builder, db_->WorkerId());
});
server_->Register<ExpandToRemoteVertexRpc>(
coordination->Register<ExpandToRemoteVertexRpc>(
[this](const auto &req_reader, auto *res_builder) {
ExpandToRemoteVertexReq req;
Load(&req, req_reader);
@ -109,8 +108,8 @@ class BfsRpcServer {
Save(res, res_builder);
});
server_->Register<ReconstructPathRpc>([this](const auto &req_reader,
auto *res_builder) {
coordination->Register<ReconstructPathRpc>([this](const auto &req_reader,
auto *res_builder) {
ReconstructPathReq req;
Load(&req, req_reader);
auto subcursor = subcursor_storage_->Get(req.subcursor_id);
@ -127,8 +126,8 @@ class BfsRpcServer {
Save(res, res_builder, db_->WorkerId());
});
server_->Register<PrepareForExpandRpc>([this](const auto &req_reader,
auto *res_builder) {
coordination->Register<PrepareForExpandRpc>([this](const auto &req_reader,
auto *res_builder) {
PrepareForExpandReq req;
auto subcursor_id = req_reader.getSubcursorId();
auto *subcursor = subcursor_storage_->Get(subcursor_id);
@ -142,7 +141,6 @@ class BfsRpcServer {
private:
database::DistributedGraphDb *db_;
communication::rpc::Server *server_;
std::map<int64_t, std::unique_ptr<database::GraphDbAccessor>> db_accessors_;
BfsSubcursorStorage *subcursor_storage_;
};

View File

@ -2,24 +2,19 @@
#include <experimental/filesystem>
#include "communication/rpc/client_pool.hpp"
#include "distributed/coordination_rpc_messages.hpp"
#include "io/network/endpoint.hpp"
#include "utils/file.hpp"
#include "utils/string.hpp"
namespace distributed {
using Server = communication::rpc::Server;
ClusterDiscoveryMaster::ClusterDiscoveryMaster(
Server *server, MasterCoordination *coordination,
const std::string &durability_directory)
: server_(server),
coordination_(coordination),
durability_directory_(durability_directory) {
server_->Register<RegisterWorkerRpc>([this](const auto &endpoint,
const auto &req_reader,
auto *res_builder) {
MasterCoordination *coordination, const std::string &durability_directory)
: coordination_(coordination), durability_directory_(durability_directory) {
coordination_->Register<RegisterWorkerRpc>([this](const auto &endpoint,
const auto &req_reader,
auto *res_builder) {
bool registration_successful = false;
bool durability_error = false;
@ -80,12 +75,13 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster(
Save(res, res_builder);
});
server_->Register<NotifyWorkerRecoveredRpc>([this](const auto &req_reader,
auto *res_builder) {
NotifyWorkerRecoveredReq req;
Load(&req, req_reader);
coordination_->WorkerRecoveredSnapshot(req.worker_id, req.recovery_info);
});
coordination_->Register<NotifyWorkerRecoveredRpc>(
[this](const auto &req_reader, auto *res_builder) {
NotifyWorkerRecoveredReq req;
Load(&req, req_reader);
coordination_->WorkerRecoveredSnapshot(req.worker_id,
req.recovery_info);
});
}
} // namespace distributed

View File

@ -1,6 +1,5 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/coordination_master.hpp"
namespace distributed {
@ -14,11 +13,10 @@ using Server = communication::rpc::Server;
*/
class ClusterDiscoveryMaster final {
public:
ClusterDiscoveryMaster(Server *server, MasterCoordination *coordination,
ClusterDiscoveryMaster(MasterCoordination *coordination,
const std::string &durability_directory);
private:
Server *server_;
MasterCoordination *coordination_;
std::string durability_directory_;
};

View File

@ -8,15 +8,14 @@
namespace distributed {
using Server = communication::rpc::Server;
ClusterDiscoveryWorker::ClusterDiscoveryWorker(
Server &server, WorkerCoordination &coordination,
communication::rpc::ClientPool &client_pool)
: server_(server), coordination_(coordination), client_pool_(client_pool) {
server_.Register<ClusterDiscoveryRpc>(
ClusterDiscoveryWorker::ClusterDiscoveryWorker(WorkerCoordination *coordination)
: coordination_(coordination),
client_pool_(coordination->GetClientPool(0)) {
coordination->Register<ClusterDiscoveryRpc>(
[this](const auto &req_reader, auto *res_builder) {
ClusterDiscoveryReq req;
Load(&req, req_reader);
this->coordination_.RegisterWorker(req.worker_id, req.endpoint);
coordination_->RegisterWorker(req.worker_id, req.endpoint);
});
}
@ -31,8 +30,9 @@ void ClusterDiscoveryWorker::RegisterWorker(
// Register to the master.
try {
auto result = client_pool_.Call<RegisterWorkerRpc>(
worker_id, server_.endpoint().port(), full_durability_directory);
auto result = client_pool_->Call<RegisterWorkerRpc>(
worker_id, coordination_->GetServerEndpoint().port(),
full_durability_directory);
CHECK(!result.durability_error)
<< "This worker was started on the same machine and with the same "
"durability directory as the master! Please change the durability "
@ -42,7 +42,7 @@ void ClusterDiscoveryWorker::RegisterWorker(
worker_id_ = worker_id;
for (auto &kv : result.workers) {
coordination_.RegisterWorker(kv.first, kv.second);
coordination_->RegisterWorker(kv.first, kv.second);
}
snapshot_to_recover_ = result.snapshot_to_recover;
} catch (const communication::rpc::RpcFailedException &e) {
@ -57,7 +57,7 @@ void ClusterDiscoveryWorker::NotifyWorkerRecovered(
<< "Workers id is not yet assigned, preform registration before "
"notifying that the recovery finished";
try {
client_pool_.Call<NotifyWorkerRecoveredRpc>(worker_id_, recovery_info);
client_pool_->Call<NotifyWorkerRecoveredRpc>(worker_id_, recovery_info);
} catch (const communication::rpc::RpcFailedException &e) {
LOG(FATAL) << "Couldn't notify the master that we finished recovering!";
}

View File

@ -8,8 +8,6 @@
#include "durability/distributed/recovery.hpp"
namespace distributed {
using Server = communication::rpc::Server;
using ClientPool = communication::rpc::ClientPool;
/** Handle cluster discovery on worker.
*
@ -19,8 +17,7 @@ using ClientPool = communication::rpc::ClientPool;
*/
class ClusterDiscoveryWorker final {
public:
ClusterDiscoveryWorker(Server &server, WorkerCoordination &coordination,
ClientPool &client_pool);
ClusterDiscoveryWorker(WorkerCoordination *coordination);
/**
* Registers a worker with the master.
@ -46,9 +43,8 @@ class ClusterDiscoveryWorker final {
private:
int worker_id_{-1};
Server &server_;
WorkerCoordination &coordination_;
communication::rpc::ClientPool &client_pool_;
distributed::WorkerCoordination *coordination_;
communication::rpc::ClientPool *client_pool_;
std::experimental::optional<std::pair<int64_t, tx::TransactionId>> snapshot_to_recover_;
};

View File

@ -6,11 +6,26 @@
namespace distributed {
Coordination::Coordination(const io::network::Endpoint &master_endpoint,
int worker_id, int client_workers_count)
: worker_id_(worker_id), thread_pool_(client_workers_count, "RPC client") {
// The master is always worker 0.
workers_.emplace(0, master_endpoint);
Coordination::Coordination(const io::network::Endpoint &worker_endpoint,
int worker_id,
const io::network::Endpoint &master_endpoint,
int server_workers_count, int client_workers_count)
: server_(worker_endpoint, server_workers_count),
worker_id_(worker_id),
thread_pool_(client_workers_count, "RPC client") {
// The master endpoint should be added to workers and not to the master.
if (worker_id != 0) {
// The master is always worker 0.
// TODO (mferencevic): Strictly speaking, this isn't correct when creating a
// `CoordinationMaster` because the supplied `master_endpoint` is supplied
// before the server starts listening on the address. Eg. if `0.0.0.0:0` is
// supplied that should be first resolved by the server when it binds to
// that address and `server_.endpoint()` should be used. Currently,
// `workers_[0]` isn't used on the master so all is well.
workers_.emplace(0, master_endpoint);
} else {
workers_.emplace(0, server_.endpoint());
}
}
Coordination::~Coordination() {}
@ -24,6 +39,10 @@ io::network::Endpoint Coordination::GetEndpoint(int worker_id) {
return found->second;
}
io::network::Endpoint Coordination::GetServerEndpoint() {
return server_.endpoint();
}
std::vector<int> Coordination::GetWorkerIds() {
std::lock_guard<std::mutex> guard(lock_);
std::vector<int> worker_ids;

View File

@ -8,6 +8,7 @@
#include <vector>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "io/network/endpoint.hpp"
#include "utils/future.hpp"
#include "utils/thread.hpp"
@ -17,7 +18,9 @@ namespace distributed {
/// Coordination base class. This class is thread safe.
class Coordination {
protected:
Coordination(const io::network::Endpoint &master_endpoint, int worker_id,
Coordination(const io::network::Endpoint &worker_endpoint, int worker_id,
const io::network::Endpoint &master_endpoint,
int server_workers_count = std::thread::hardware_concurrency(),
int client_workers_count = std::thread::hardware_concurrency());
~Coordination();
@ -25,6 +28,9 @@ class Coordination {
/// Gets the endpoint for the given worker ID from the master.
io::network::Endpoint GetEndpoint(int worker_id);
/// Gets the endpoint for this RPC server.
io::network::Endpoint GetServerEndpoint();
/// Returns all workers id, this includes master (ID 0).
std::vector<int> GetWorkerIds();
@ -66,6 +72,23 @@ class Coordination {
return futures;
}
template <class TRequestResponse>
void Register(std::function<
void(const typename TRequestResponse::Request::Capnp::Reader &,
typename TRequestResponse::Response::Capnp::Builder *)>
callback) {
server_.Register<TRequestResponse>(callback);
}
template <class TRequestResponse>
void Register(std::function<
void(const io::network::Endpoint &,
const typename TRequestResponse::Request::Capnp::Reader &,
typename TRequestResponse::Response::Capnp::Builder *)>
callback) {
server_.Register<TRequestResponse>(callback);
}
protected:
/// Adds a worker to the coordination. This function can be called multiple
/// times to replace an existing worker.
@ -74,6 +97,8 @@ class Coordination {
/// Gets a worker name for the given endpoint.
std::string GetWorkerName(const io::network::Endpoint &endpoint);
communication::rpc::Server server_;
private:
std::unordered_map<int, io::network::Endpoint> workers_;
mutable std::mutex lock_;

View File

@ -18,8 +18,10 @@ namespace distributed {
const int kHeartbeatIntervalSeconds = 1;
MasterCoordination::MasterCoordination(const Endpoint &master_endpoint,
int server_workers_count,
int client_workers_count)
: Coordination(master_endpoint, 0, client_workers_count) {
: Coordination(master_endpoint, 0, {}, server_workers_count,
client_workers_count) {
// TODO (mferencevic): Move this to an explicit `Start` method.
scheduler_.Run("Heartbeat", std::chrono::seconds(kHeartbeatIntervalSeconds),
[this] { IssueHeartbeats(); });
@ -175,6 +177,10 @@ bool MasterCoordination::AwaitShutdown(
}
LOG(INFO) << "Shutdown of all workers is complete.";
// Shutdown our RPC server.
server_.Shutdown();
server_.AwaitShutdown();
// Return `true` if the cluster is alive and the `call_before_shutdown`
// succeeded.
return ret && is_cluster_alive;

View File

@ -21,10 +21,16 @@ class MasterCoordination final : public Coordination {
public:
explicit MasterCoordination(
const Endpoint &master_endpoint,
int server_workers_count = std::thread::hardware_concurrency(),
int client_workers_count = std::thread::hardware_concurrency());
~MasterCoordination();
MasterCoordination(const MasterCoordination &) = delete;
MasterCoordination(MasterCoordination &&) = delete;
MasterCoordination &operator=(const MasterCoordination &) = delete;
MasterCoordination &operator=(MasterCoordination &&) = delete;
/** Registers a new worker with this master coordination.
*
* @param desired_worker_id - The ID the worker would like to have.

View File

@ -22,18 +22,19 @@ const int kHeartbeatCheckSeconds = 2;
using namespace std::chrono_literals;
WorkerCoordination::WorkerCoordination(communication::rpc::Server *server,
const Endpoint &master_endpoint,
int worker_id, int client_workers_count)
: Coordination(master_endpoint, worker_id, client_workers_count),
server_(server) {
server_->Register<StopWorkerRpc>(
WorkerCoordination::WorkerCoordination(
const io::network::Endpoint &worker_endpoint, int worker_id,
const io::network::Endpoint &master_endpoint, int server_workers_count,
int client_workers_count)
: Coordination(worker_endpoint, worker_id, master_endpoint,
server_workers_count, client_workers_count) {
server_.Register<StopWorkerRpc>(
[&](const auto &req_reader, auto *res_builder) {
LOG(INFO) << "The master initiated shutdown of this worker.";
Shutdown();
});
server_->Register<HeartbeatRpc>([&](const auto &req_reader,
server_.Register<HeartbeatRpc>([&](const auto &req_reader,
auto *res_builder) {
std::lock_guard<std::mutex> guard(heartbeat_lock_);
last_heartbeat_time_ = std::chrono::steady_clock::now();
@ -64,7 +65,8 @@ WorkerCoordination::~WorkerCoordination() {
"distributed::WorkerCoordination!";
}
void WorkerCoordination::RegisterWorker(int worker_id, Endpoint endpoint) {
void WorkerCoordination::RegisterWorker(int worker_id,
io::network::Endpoint endpoint) {
AddWorker(worker_id, endpoint);
}
@ -87,6 +89,10 @@ bool WorkerCoordination::AwaitShutdown(
// Call the before shutdown callback.
bool ret = call_before_shutdown(is_cluster_alive);
// Shutdown our RPC server.
server_.Shutdown();
server_.AwaitShutdown();
// All other cleanup must be done here.
// Return `true` if the cluster is alive and the `call_before_shutdown`

View File

@ -13,18 +13,22 @@ namespace distributed {
/// Handles worker registration, getting of other workers' endpoints and
/// coordinated shutdown in a distributed memgraph. Worker side.
class WorkerCoordination final : public Coordination {
using Endpoint = io::network::Endpoint;
public:
WorkerCoordination(
communication::rpc::Server *server, const Endpoint &master_endpoint,
int worker_id,
const io::network::Endpoint &worker_endpoint, int worker_id,
const io::network::Endpoint &master_endpoint,
int server_workers_count = std::thread::hardware_concurrency(),
int client_workers_count = std::thread::hardware_concurrency());
~WorkerCoordination();
WorkerCoordination(const WorkerCoordination &) = delete;
WorkerCoordination(WorkerCoordination &&) = delete;
WorkerCoordination &operator=(const WorkerCoordination &) = delete;
WorkerCoordination &operator=(WorkerCoordination &&) = delete;
/// Registers the worker with the given endpoint.
void RegisterWorker(int worker_id, Endpoint endpoint);
void RegisterWorker(int worker_id, io::network::Endpoint endpoint);
/// Starts listening for a remote shutdown command (issued by the master) or
/// for the `Shutdown` method to be called (suitable for use with signal
@ -45,8 +49,6 @@ class WorkerCoordination final : public Coordination {
bool IsClusterAlive();
private:
communication::rpc::Server *server_;
// Heartbeat variables
std::mutex heartbeat_lock_;
std::chrono::time_point<std::chrono::steady_clock> last_heartbeat_time_;

View File

@ -9,9 +9,9 @@
namespace distributed {
DataRpcServer::DataRpcServer(database::DistributedGraphDb *db,
communication::rpc::Server *server)
: db_(db), rpc_server_(server) {
rpc_server_->Register<VertexRpc>(
distributed::Coordination *coordination)
: db_(db) {
coordination->Register<VertexRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto vertex = dba->FindVertex(req_reader.getMember().getGid(), false);
@ -21,8 +21,8 @@ DataRpcServer::DataRpcServer(database::DistributedGraphDb *db,
Save(response, res_builder);
});
rpc_server_->Register<EdgeRpc>([this](const auto &req_reader,
auto *res_builder) {
coordination->Register<EdgeRpc>([this](const auto &req_reader,
auto *res_builder) {
auto dba = db_->Access(req_reader.getMember().getTxId());
auto edge = dba->FindEdge(req_reader.getMember().getGid(), false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
@ -30,7 +30,7 @@ DataRpcServer::DataRpcServer(database::DistributedGraphDb *db,
Save(response, res_builder);
});
rpc_server_->Register<VertexCountRpc>(
coordination->Register<VertexCountRpc>(
[this](const auto &req_reader, auto *res_builder) {
VertexCountReq req;
Load(&req, req_reader);

View File

@ -1,7 +1,7 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/coordination.hpp"
namespace database {
class DistributedGraphDb;
@ -13,11 +13,10 @@ namespace distributed {
class DataRpcServer {
public:
DataRpcServer(database::DistributedGraphDb *db,
communication::rpc::Server *server);
distributed::Coordination *coordination);
private:
database::DistributedGraphDb *db_;
communication::rpc::Server *rpc_server_;
};
} // namespace distributed

View File

@ -6,17 +6,17 @@
namespace distributed {
DurabilityRpcWorker::DurabilityRpcWorker(database::Worker *db,
communication::rpc::Server *server)
: db_(db), rpc_server_(server) {
rpc_server_->Register<MakeSnapshotRpc>(
DurabilityRpcWorker::DurabilityRpcWorker(
database::Worker *db, distributed::Coordination *coordination)
: db_(db) {
coordination->Register<MakeSnapshotRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto dba = db_->Access(req_reader.getMember());
MakeSnapshotRes res(db_->MakeSnapshot(*dba));
Save(res, res_builder);
});
rpc_server_->Register<RecoverWalAndIndexesRpc>(
coordination->Register<RecoverWalAndIndexesRpc>(
[this](const auto &req_reader, auto *res_builder) {
durability::RecoveryData recovery_data;
durability::Load(&recovery_data, req_reader.getMember());

View File

@ -1,6 +1,6 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/coordination.hpp"
namespace database {
class Worker;
@ -10,11 +10,10 @@ namespace distributed {
class DurabilityRpcWorker {
public:
DurabilityRpcWorker(database::Worker *db, communication::rpc::Server *server);
DurabilityRpcWorker(database::Worker *db, distributed::Coordination *coordination);
private:
database::Worker *db_;
communication::rpc::Server *rpc_server_;
};
} // namespace distributed

View File

@ -4,13 +4,11 @@
#include "distributed/dynamic_worker_rpc_messages.hpp"
namespace distributed {
using Server = communication::rpc::Server;
using ClientPool = communication::rpc::ClientPool;
DynamicWorkerAddition::DynamicWorkerAddition(database::DistributedGraphDb *db,
Server *server)
: db_(db), server_(server) {
server_->Register<DynamicWorkerRpc>(
distributed::Coordination *coordination)
: db_(db), coordination_(coordination) {
coordination_->Register<DynamicWorkerRpc>(
[this](const auto &req_reader, auto *res_builder) {
DynamicWorkerReq req;
Load(&req, req_reader);
@ -33,7 +31,7 @@ DynamicWorkerAddition::GetIndicesToCreate() {
void DynamicWorkerAddition::Enable() { enabled_.store(true); }
DynamicWorkerRegistration::DynamicWorkerRegistration(ClientPool *client_pool)
DynamicWorkerRegistration::DynamicWorkerRegistration(communication::rpc::ClientPool *client_pool)
: client_pool_(client_pool) {}
std::vector<std::pair<std::string, std::string>>

View File

@ -6,26 +6,24 @@
#include <vector>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "distributed/coordination.hpp"
namespace database {
class DistributedGraphDb;
} // namespace database
namespace distributed {
using Server = communication::rpc::Server;
using ClientPool = communication::rpc::ClientPool;
class DynamicWorkerAddition final {
public:
DynamicWorkerAddition(database::DistributedGraphDb *db, Server *server);
DynamicWorkerAddition(database::DistributedGraphDb *db,
distributed::Coordination *coordination);
/// Enable dynamic worker addition.
void Enable();
private:
database::DistributedGraphDb *db_{nullptr};
Server *server_;
distributed::Coordination *coordination_;
std::atomic<bool> enabled_{false};
@ -35,13 +33,14 @@ class DynamicWorkerAddition final {
class DynamicWorkerRegistration final {
public:
explicit DynamicWorkerRegistration(ClientPool *client_pool);
explicit DynamicWorkerRegistration(
communication::rpc::ClientPool *client_pool);
/// Make a RPC call to master to get indices to create.
std::vector<std::pair<std::string, std::string>> GetIndicesToCreate();
private:
ClientPool *client_pool_;
communication::rpc::ClientPool *client_pool_;
};
} // namespace distributed

View File

@ -1,29 +1,28 @@
#include "distributed/index_rpc_server.hpp"
#include "communication/rpc/server.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/index_rpc_messages.hpp"
namespace distributed {
IndexRpcServer::IndexRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<CreateIndexRpc>(
IndexRpcServer::IndexRpcServer(database::GraphDb *db,
distributed::Coordination *coordination)
: db_(db) {
coordination->Register<CreateIndexRpc>(
[this](const auto &req_reader, auto *res_builder) {
CreateIndexReq req;
Load(&req, req_reader);
database::LabelPropertyIndex::Key key{req.label, req.property};
db_.storage().label_property_index_.CreateIndex(key);
db_->storage().label_property_index_.CreateIndex(key);
});
rpc_server_.Register<PopulateIndexRpc>(
coordination->Register<PopulateIndexRpc>(
[this](const auto &req_reader, auto *res_builder) {
PopulateIndexReq req;
Load(&req, req_reader);
database::LabelPropertyIndex::Key key{req.label, req.property};
auto dba = db_.Access(req.tx_id);
auto dba = db_->Access(req.tx_id);
dba->PopulateIndex(key);
dba->EnableIndex(key);
});

View File

@ -1,8 +1,6 @@
#pragma once
namespace communication::rpc {
class Server;
}
#include "distributed/coordination.hpp"
namespace database {
class GraphDb;
@ -12,11 +10,10 @@ namespace distributed {
class IndexRpcServer {
public:
IndexRpcServer(database::GraphDb &db, communication::rpc::Server &server);
IndexRpcServer(database::GraphDb *db, distributed::Coordination *coordination);
private:
database::GraphDb &db_;
communication::rpc::Server &rpc_server_;
database::GraphDb *db_;
};
} // namespace distributed

View File

@ -2,9 +2,8 @@
namespace distributed {
PlanConsumer::PlanConsumer(communication::rpc::Server &server)
: server_(server) {
server_.Register<DispatchPlanRpc>(
PlanConsumer::PlanConsumer(distributed::Coordination *coordination) {
coordination->Register<DispatchPlanRpc>(
[this](const auto &req_reader, auto *res_builder) {
DispatchPlanReq req;
Load(&req, req_reader);
@ -15,7 +14,7 @@ PlanConsumer::PlanConsumer(communication::rpc::Server &server)
Save(res, res_builder);
});
server_.Register<RemovePlanRpc>(
coordination->Register<RemovePlanRpc>(
[this](const auto &req_reader, auto *res_builder) {
plan_cache_.access().remove(req_reader.getMember());
});

View File

@ -2,7 +2,7 @@
#include <vector>
#include "communication/rpc/server.hpp"
#include "distributed/coordination.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
@ -26,7 +26,7 @@ class PlanConsumer {
const query::AstStorage storage;
};
explicit PlanConsumer(communication::rpc::Server &server);
explicit PlanConsumer(distributed::Coordination *coordination);
/** Return cached plan and symbol table for a given plan id. */
PlanPack &PlanForId(int64_t plan_id) const;
@ -35,7 +35,6 @@ class PlanConsumer {
std::vector<int64_t> CachedPlanIds() const;
private:
communication::rpc::Server &server_;
// TODO remove unique_ptr. This is to get it to work, emplacing into a
// ConcurrentMap is tricky.
mutable ConcurrentMap<int64_t, std::unique_ptr<PlanPack>> plan_cache_;

View File

@ -100,14 +100,13 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
ProduceRpcServer::ProduceRpcServer(database::Worker *db,
tx::EngineWorker *tx_engine,
communication::rpc::Server &server,
distributed::Coordination *coordination,
const PlanConsumer &plan_consumer,
DataManager *data_manager)
: db_(db),
produce_rpc_server_(server),
plan_consumer_(plan_consumer),
tx_engine_(tx_engine) {
produce_rpc_server_.Register<PullRpc>(
coordination->Register<PullRpc>(
[this](const auto &req_reader, auto *res_builder) {
PullReq req;
Load(&req, req_reader);
@ -115,7 +114,7 @@ ProduceRpcServer::ProduceRpcServer(database::Worker *db,
Save(res, res_builder);
});
produce_rpc_server_.Register<ResetCursorRpc>(
coordination->Register<ResetCursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
ResetCursorReq req;
Load(&req, req_reader);
@ -126,7 +125,7 @@ ProduceRpcServer::ProduceRpcServer(database::Worker *db,
CHECK(data_manager);
produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
coordination->Register<TransactionCommandAdvancedRpc>(
[this, data_manager](const auto &req_reader, auto *res_builder) {
TransactionCommandAdvancedReq req;
Load(&req, req_reader);

View File

@ -7,9 +7,9 @@
#include <utility>
#include <vector>
#include "communication/rpc/server.hpp"
#include "database/distributed/graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/coordination.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "query/context.hpp"
@ -74,7 +74,7 @@ class ProduceRpcServer {
public:
ProduceRpcServer(database::Worker *db, tx::EngineWorker *tx_engine,
communication::rpc::Server &server,
distributed::Coordination *coordination,
const PlanConsumer &plan_consumer,
DataManager *data_manager);
@ -92,7 +92,6 @@ class ProduceRpcServer {
OngoingProduce>
ongoing_produces_;
database::Worker *db_;
communication::rpc::Server &produce_rpc_server_;
const distributed::PlanConsumer &plan_consumer_;
tx::EngineWorker *tx_engine_;

View File

@ -5,10 +5,6 @@
#include "distributed/coordination.hpp"
#include "distributed/dgp/partitioner.hpp"
namespace communication::rpc {
class Server;
}
namespace database {
class DistributedGraphDb;
};
@ -28,13 +24,9 @@ namespace distributed {
class TokenSharingRpcServer {
public:
TokenSharingRpcServer(database::DistributedGraphDb *db, int worker_id,
distributed::Coordination *coordination,
communication::rpc::Server *server)
: worker_id_(worker_id),
coordination_(coordination),
server_(server),
dgp_(db) {
server_->Register<distributed::TokenTransferRpc>(
distributed::Coordination *coordination)
: worker_id_(worker_id), coordination_(coordination), dgp_(db) {
coordination_->Register<distributed::TokenTransferRpc>(
[this](const auto &req_reader, auto *res_builder) { token_ = true; });
// TODO (buda): It's not trivial to move this part in the Start method
// because worker then doesn't run the step. Will resolve that with
@ -111,7 +103,6 @@ class TokenSharingRpcServer {
private:
int worker_id_;
distributed::Coordination *coordination_;
communication::rpc::Server *server_;
std::atomic<bool> started_{false};
std::atomic<bool> token_{false};

View File

@ -177,10 +177,10 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
}
UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
communication::rpc::Server *server)
distributed::Coordination *coordination)
: db_(db) {
server->Register<UpdateRpc>([this](const auto &req_reader,
auto *res_builder) {
coordination->Register<UpdateRpc>([this](const auto &req_reader,
auto *res_builder) {
UpdateReq req;
Load(&req, req_reader);
using DeltaType = database::StateDelta::Type;
@ -208,7 +208,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
}
});
server->Register<UpdateApplyRpc>(
coordination->Register<UpdateApplyRpc>(
[this](const auto &req_reader, auto *res_builder) {
UpdateApplyReq req;
Load(&req, req_reader);
@ -216,8 +216,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
Save(res, res_builder);
});
server->Register<CreateVertexRpc>([this](const auto &req_reader,
auto *res_builder) {
coordination->Register<CreateVertexRpc>([this](const auto &req_reader,
auto *res_builder) {
CreateVertexReq req;
Load(&req, req_reader);
auto result = GetUpdates(vertex_updates_, req.member.tx_id)
@ -228,7 +228,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
Save(res, res_builder);
});
server->Register<CreateEdgeRpc>(
coordination->Register<CreateEdgeRpc>(
[this](const auto &req_reader, auto *res_builder) {
CreateEdgeReq req;
Load(&req, req_reader);
@ -250,7 +250,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
Save(res, res_builder);
});
server->Register<AddInEdgeRpc>(
coordination->Register<AddInEdgeRpc>(
[this](const auto &req_reader, auto *res_builder) {
AddInEdgeReq req;
Load(&req, req_reader);
@ -263,7 +263,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
Save(res, res_builder);
});
server->Register<RemoveVertexRpc>(
coordination->Register<RemoveVertexRpc>(
[this](const auto &req_reader, auto *res_builder) {
RemoveVertexReq req;
Load(&req, req_reader);
@ -275,7 +275,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
Save(res, res_builder);
});
server->Register<RemoveEdgeRpc>(
coordination->Register<RemoveEdgeRpc>(
[this](const auto &req_reader, auto *res_builder) {
RemoveEdgeReq req;
Load(&req, req_reader);
@ -283,8 +283,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db,
Save(res, res_builder);
});
server->Register<RemoveInEdgeRpc>([this](const auto &req_reader,
auto *res_builder) {
coordination->Register<RemoveInEdgeRpc>([this](const auto &req_reader,
auto *res_builder) {
RemoveInEdgeReq req;
Load(&req, req_reader);
auto data = req.member;

View File

@ -7,10 +7,10 @@
#include "glog/logging.h"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/distributed/distributed_graph_db.hpp"
#include "database/distributed/graph_db_accessor.hpp"
#include "distributed/coordination.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "durability/distributed/state_delta.hpp"
#include "query/typed_value.hpp"
@ -76,7 +76,7 @@ class UpdatesRpcServer {
public:
UpdatesRpcServer(database::DistributedGraphDb *db,
communication::rpc::Server *server);
distributed::Coordination *coordination);
/// Applies all existsing updates for the given transaction ID. If there are
/// no updates for that transaction, nothing happens. Clears the updates

View File

@ -9,26 +9,26 @@ namespace storage {
namespace {
template <typename TId>
void RegisterRpc(MasterConcurrentIdMapper<TId> &mapper,
communication::rpc::Server &rpc_server);
#define ID_VALUE_RPC_CALLS(type) \
template <> \
void RegisterRpc<type>(MasterConcurrentIdMapper<type> & mapper, \
communication::rpc::Server & rpc_server) { \
rpc_server.Register<type##IdRpc>( \
[&mapper](const auto &req_reader, auto *res_builder) { \
type##IdReq req; \
Load(&req, req_reader); \
type##IdRes res(mapper.value_to_id(req.member)); \
Save(res, res_builder); \
}); \
rpc_server.Register<Id##type##Rpc>( \
[&mapper](const auto &req_reader, auto *res_builder) { \
Id##type##Req req; \
Load(&req, req_reader); \
Id##type##Res res(mapper.id_to_value(req.member)); \
Save(res, res_builder); \
}); \
void RegisterRpc(MasterConcurrentIdMapper<TId> *mapper,
distributed::Coordination *coordination);
#define ID_VALUE_RPC_CALLS(type) \
template <> \
void RegisterRpc<type>(MasterConcurrentIdMapper<type> * mapper, \
distributed::Coordination * coordination) { \
coordination->Register<type##IdRpc>( \
[mapper](const auto &req_reader, auto *res_builder) { \
type##IdReq req; \
Load(&req, req_reader); \
type##IdRes res(mapper->value_to_id(req.member)); \
Save(res, res_builder); \
}); \
coordination->Register<Id##type##Rpc>( \
[mapper](const auto &req_reader, auto *res_builder) { \
Id##type##Req req; \
Load(&req, req_reader); \
Id##type##Res res(mapper->id_to_value(req.member)); \
Save(res, res_builder); \
}); \
}
using namespace storage;
@ -40,12 +40,12 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
MasterConcurrentIdMapper<TId>::MasterConcurrentIdMapper(
communication::rpc::Server &server)
distributed::Coordination *coordination)
// We have to make sure our rpc server name is unique with regards to type.
// Otherwise we will try to reuse the same rpc server name for different
// types (Label/EdgeType/Property)
: rpc_server_(server) {
RegisterRpc(*this, rpc_server_);
: coordination_(coordination) {
RegisterRpc(this, coordination_);
}
template class MasterConcurrentIdMapper<Label>;

View File

@ -2,8 +2,7 @@
#include <experimental/optional>
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/coordination.hpp"
#include "storage/distributed/concurrent_id_mapper_single_node.hpp"
namespace storage {
@ -12,9 +11,9 @@ namespace storage {
template <typename TId>
class MasterConcurrentIdMapper : public SingleNodeConcurrentIdMapper<TId> {
public:
explicit MasterConcurrentIdMapper(communication::rpc::Server &server);
explicit MasterConcurrentIdMapper(distributed::Coordination *coordination);
private:
communication::rpc::Server &rpc_server_;
distributed::Coordination *coordination_;
};
} // namespace storage

View File

@ -11,12 +11,12 @@ namespace storage {
template <> \
type WorkerConcurrentIdMapper<type>::RpcValueToId( \
const std::string &value) { \
return master_client_pool_.Call<type##IdRpc>(value).member; \
return master_client_pool_->Call<type##IdRpc>(value).member; \
} \
\
template <> \
std::string WorkerConcurrentIdMapper<type>::RpcIdToValue(type id) { \
return master_client_pool_.Call<Id##type##Rpc>(id).member; \
return master_client_pool_->Call<Id##type##Rpc>(id).member; \
}
using namespace storage;
@ -28,7 +28,7 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper(
communication::rpc::ClientPool &master_client_pool)
communication::rpc::ClientPool *master_client_pool)
: master_client_pool_(master_client_pool) {}
template <typename TId>

View File

@ -17,7 +17,7 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> {
std::string RpcIdToValue(TId id);
public:
WorkerConcurrentIdMapper(communication::rpc::ClientPool &master_client_pool);
WorkerConcurrentIdMapper(communication::rpc::ClientPool *master_client_pool);
TId value_to_id(const std::string &value) override;
const std::string &id_to_value(const TId &id) override;
@ -29,6 +29,6 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> {
ConcurrentMap<TId, std::string> id_to_value_cache_;
// Communication to the concurrent ID master.
communication::rpc::ClientPool &master_client_pool_;
communication::rpc::ClientPool *master_client_pool_;
};
} // namespace storage

View File

@ -3,7 +3,6 @@
#include <mutex>
#include "communication/rpc/server.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
#include "storage/distributed/storage_gc.hpp"
@ -13,12 +12,10 @@ class StorageGcMaster : public StorageGc {
public:
using StorageGc::StorageGc;
StorageGcMaster(Storage &storage, tx::Engine &tx_engine, int pause_sec,
communication::rpc::Server &rpc_server,
distributed::MasterCoordination &coordination)
distributed::MasterCoordination *coordination)
: StorageGc(storage, tx_engine, pause_sec),
rpc_server_(rpc_server),
coordination_(coordination) {
rpc_server_.Register<distributed::RanLocalGcRpc>(
coordination_->Register<distributed::RanLocalGcRpc>(
[this](const auto &req_reader, auto *res_builder) {
distributed::RanLocalGcReq req;
Load(&req, req_reader);
@ -37,7 +34,6 @@ class StorageGcMaster : public StorageGc {
void Stop() {
scheduler_.Stop();
rpc_server_.UnRegister<distributed::RanLocalGcRpc>();
}
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {
@ -51,7 +47,7 @@ class StorageGcMaster : public StorageGc {
tx::TransactionId min_safe = *safe_transaction;
{
std::unique_lock<std::mutex> lock(worker_safe_transaction_mutex_);
for (auto worker_id : coordination_.GetWorkerIds()) {
for (auto worker_id : coordination_->GetWorkerIds()) {
// Skip itself
if (worker_id == 0) continue;
min_safe = std::min(min_safe, worker_safe_transaction_[worker_id]);
@ -65,8 +61,7 @@ class StorageGcMaster : public StorageGc {
}
}
communication::rpc::Server &rpc_server_;
distributed::MasterCoordination &coordination_;
distributed::MasterCoordination *coordination_;
// Mapping of worker ids and oldest active transaction which is safe for
// deletion from worker perspective
std::unordered_map<int, tx::TransactionId> worker_safe_transaction_;

View File

@ -2,7 +2,6 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/coordination.hpp"
#include "transactions/distributed/engine.hpp"
#include "utils/exceptions.hpp"

View File

@ -9,36 +9,33 @@
namespace tx {
EngineMaster::EngineMaster(communication::rpc::Server *server,
distributed::Coordination *coordination,
EngineMaster::EngineMaster(distributed::Coordination *coordination,
durability::WriteAheadLog *wal)
: engine_single_node_(wal),
server_(server),
coordination_(coordination) {
server_->Register<BeginRpc>(
: engine_single_node_(wal), coordination_(coordination) {
coordination_->Register<BeginRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto tx = this->Begin();
BeginRes res(TxAndSnapshot{tx->id_, tx->snapshot()});
Save(res, res_builder);
});
server_->Register<AdvanceRpc>(
coordination_->Register<AdvanceRpc>(
[this](const auto &req_reader, auto *res_builder) {
AdvanceRes res(this->Advance(req_reader.getMember()));
Save(res, res_builder);
});
server_->Register<CommitRpc>(
coordination_->Register<CommitRpc>(
[this](const auto &req_reader, auto *res_builder) {
this->Commit(*this->RunningTransaction(req_reader.getMember()));
});
server_->Register<AbortRpc>(
coordination_->Register<AbortRpc>(
[this](const auto &req_reader, auto *res_builder) {
this->Abort(*this->RunningTransaction(req_reader.getMember()));
});
server_->Register<SnapshotRpc>(
coordination_->Register<SnapshotRpc>(
[this](const auto &req_reader, auto *res_builder) {
// It is guaranteed that the Worker will not be requesting this for a
// transaction that's done, and that there are no race conditions here.
@ -47,7 +44,7 @@ EngineMaster::EngineMaster(communication::rpc::Server *server,
Save(res, res_builder);
});
server_->Register<CommandRpc>(
coordination_->Register<CommandRpc>(
[this](const auto &req_reader, auto *res_builder) {
// It is guaranteed that the Worker will not be requesting this for a
// transaction that's done, and that there are no race conditions here.
@ -55,30 +52,30 @@ EngineMaster::EngineMaster(communication::rpc::Server *server,
Save(res, res_builder);
});
server_->Register<GcSnapshotRpc>(
coordination_->Register<GcSnapshotRpc>(
[this](const auto &req_reader, auto *res_builder) {
GcSnapshotRes res(this->GlobalGcSnapshot());
Save(res, res_builder);
});
server_->Register<ClogInfoRpc>(
coordination_->Register<ClogInfoRpc>(
[this](const auto &req_reader, auto *res_builder) {
ClogInfoRes res(this->Info(req_reader.getMember()));
Save(res, res_builder);
});
server_->Register<ActiveTransactionsRpc>(
coordination_->Register<ActiveTransactionsRpc>(
[this](const auto &req_reader, auto *res_builder) {
ActiveTransactionsRes res(this->GlobalActiveTransactions());
Save(res, res_builder);
});
server_->Register<EnsureNextIdGreaterRpc>(
coordination_->Register<EnsureNextIdGreaterRpc>(
[this](const auto &req_reader, auto *res_builder) {
this->EnsureNextIdGreater(req_reader.getMember());
});
server_->Register<GlobalLastRpc>(
coordination_->Register<GlobalLastRpc>(
[this](const auto &req_reader, auto *res_builder) {
GlobalLastRes res(this->GlobalLast());
Save(res, res_builder);

View File

@ -2,7 +2,6 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/coordination.hpp"
#include "transactions/distributed/engine_distributed.hpp"
#include "transactions/distributed/engine_single_node.hpp"
@ -17,8 +16,7 @@ class EngineMaster final : public EngineDistributed {
/// @param coordination - Required. Used for communication with the workers.
/// @param wal - Optional. If present, the Engine will write tx
/// Begin/Commit/Abort atomically (while under lock).
EngineMaster(communication::rpc::Server *server,
distributed::Coordination *coordination,
EngineMaster(distributed::Coordination *coordination,
durability::WriteAheadLog *wal = nullptr);
EngineMaster(const EngineMaster &) = delete;
@ -46,7 +44,6 @@ class EngineMaster final : public EngineDistributed {
private:
EngineSingleNode engine_single_node_;
communication::rpc::Server *server_;
distributed::Coordination *coordination_;
};
} // namespace tx

View File

@ -8,10 +8,11 @@
namespace tx {
EngineWorker::EngineWorker(communication::rpc::Server *server,
communication::rpc::ClientPool *master_client_pool,
EngineWorker::EngineWorker(distributed::Coordination *coordination,
durability::WriteAheadLog *wal)
: server_(server), master_client_pool_(master_client_pool), wal_(wal) {
: coordination_(coordination),
master_client_pool_(coordination->GetClientPool(0)),
wal_(wal) {
// Register our `NotifyCommittedRpc` server. This RPC should only write the
// `TxCommit` operation into the WAL. It is only used to indicate that the
// transaction has succeeded on all workers and that it will be committed on
@ -27,7 +28,7 @@ EngineWorker::EngineWorker(communication::rpc::Server *server,
// RPC call could fail on other workers which will cause the transaction to be
// aborted. This mismatch in committed/aborted across workers is resolved by
// using the master as a single source of truth when doing recovery.
server_->Register<NotifyCommittedRpc>(
coordination_->Register<NotifyCommittedRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto tid = req_reader.getMember();
if (wal_) {

View File

@ -3,7 +3,6 @@
#include <atomic>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "durability/distributed/wal.hpp"
#include "io/network/endpoint.hpp"
@ -18,8 +17,7 @@ namespace tx {
* begin/advance/end transactions on the master. */
class EngineWorker final : public EngineDistributed {
public:
EngineWorker(communication::rpc::Server *server,
communication::rpc::ClientPool *master_client_pool,
EngineWorker(distributed::Coordination *coordination,
durability::WriteAheadLog *wal = nullptr);
~EngineWorker();
@ -59,8 +57,8 @@ class EngineWorker final : public EngineDistributed {
// Mutable because just getting info can cause a cache fill.
mutable CommitLog clog_;
// Our local RPC server.
communication::rpc::Server *server_;
// Our local coordination.
distributed::Coordination *coordination_;
// Communication to the transactional master.
communication::rpc::ClientPool *master_client_pool_;

View File

@ -7,12 +7,14 @@
#include "storage/distributed/concurrent_id_mapper_master.hpp"
#include "storage/distributed/concurrent_id_mapper_worker.hpp"
#include "test_coordination.hpp"
template <typename TId>
class DistributedConcurrentIdMapperTest : public ::testing::Test {
const std::string kLocal{"127.0.0.1"};
protected:
communication::rpc::Server master_server_{{kLocal, 0}};
TestMasterCoordination coordination_;
std::experimental::optional<communication::rpc::ClientPool>
master_client_pool_;
std::experimental::optional<storage::MasterConcurrentIdMapper<TId>>
@ -21,16 +23,15 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test {
worker_mapper_;
void SetUp() override {
master_client_pool_.emplace(master_server_.endpoint());
master_mapper_.emplace(master_server_);
worker_mapper_.emplace(master_client_pool_.value());
master_client_pool_.emplace(coordination_.GetServerEndpoint());
master_mapper_.emplace(&coordination_);
worker_mapper_.emplace(&master_client_pool_.value());
}
void TearDown() override {
master_server_.Shutdown();
master_server_.AwaitShutdown();
worker_mapper_ = std::experimental::nullopt;
master_mapper_ = std::experimental::nullopt;
master_client_pool_ = std::experimental::nullopt;
coordination_.Stop();
}
};

View File

@ -4,12 +4,13 @@
#include "communication/rpc/server.hpp"
#include "database/distributed/distributed_counters.hpp"
const std::string kLocal = "127.0.0.1";
#include "test_coordination.hpp"
TEST(CountersDistributed, All) {
communication::rpc::Server master_server({kLocal, 0});
database::MasterCounters master(&master_server);
communication::rpc::ClientPool master_client_pool(master_server.endpoint());
TestMasterCoordination coordination;
database::MasterCounters master(&coordination);
communication::rpc::ClientPool master_client_pool(
coordination.GetServerEndpoint());
database::WorkerCounters w1(&master_client_pool);
database::WorkerCounters w2(&master_client_pool);
@ -25,6 +26,5 @@ TEST(CountersDistributed, All) {
w1.Set("b", 42);
EXPECT_EQ(w2.Get("b"), 42);
master_server.Shutdown();
master_server.AwaitShutdown();
coordination.Stop();
}

View File

@ -8,8 +8,6 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "distributed/cluster_discovery_master.hpp"
#include "distributed/cluster_discovery_worker.hpp"
#include "distributed/coordination_master.hpp"
@ -17,8 +15,6 @@
#include "io/network/endpoint.hpp"
#include "utils/file.hpp"
using communication::rpc::ClientPool;
using communication::rpc::Server;
using namespace distributed;
using namespace std::literals::chrono_literals;
@ -28,14 +24,11 @@ const std::string kLocal = "127.0.0.1";
class WorkerCoordinationInThread {
struct Worker {
Worker(Endpoint master_endpoint, int worker_id)
: master_endpoint(master_endpoint),
coord(&server, master_endpoint, worker_id),
: coord({kLocal, 0}, worker_id, master_endpoint),
discovery(&coord),
worker_id(worker_id) {}
Endpoint master_endpoint;
Server server{{kLocal, 0}};
WorkerCoordination coord;
ClientPool client_pool{master_endpoint};
ClusterDiscoveryWorker discovery{server, coord, client_pool};
ClusterDiscoveryWorker discovery;
std::atomic<int> worker_id;
};
@ -53,17 +46,14 @@ class WorkerCoordinationInThread {
// shutdown by the master. We only wait for the shutdown to be
// finished.
EXPECT_TRUE(worker->coord.AwaitShutdown());
// Shutdown the RPC server.
worker->server.Shutdown();
worker->server.AwaitShutdown();
worker = std::experimental::nullopt;
});
while (!init_done) std::this_thread::sleep_for(10ms);
}
int worker_id() const { return worker->worker_id; }
auto endpoint() const { return worker->server.endpoint(); }
int worker_id() { return worker->worker_id; }
auto endpoint() { return worker->coord.GetServerEndpoint(); }
auto worker_endpoint(int worker_id) {
return worker->coord.GetEndpoint(worker_id);
}
@ -95,17 +85,16 @@ class Distributed : public ::testing::Test {
};
TEST_F(Distributed, Coordination) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord(master_server.endpoint());
MasterCoordination master_coord({kLocal, 0});
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_server, &master_coord,
tmp_dir("master"));
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
for (int i = 1; i <= kWorkerCount; ++i)
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir(fmt::format("worker{}", i)), i));
master_coord.GetServerEndpoint(), tmp_dir(fmt::format("worker{}", i)),
i));
// Expect that all workers have a different ID.
std::unordered_set<int> worker_ids;
@ -123,49 +112,41 @@ TEST_F(Distributed, Coordination) {
master_coord.Shutdown();
EXPECT_TRUE(master_coord.AwaitShutdown());
for (auto &worker : workers) worker->join();
master_server.Shutdown();
master_server.AwaitShutdown();
}
TEST_F(Distributed, DesiredAndUniqueId) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord(master_server.endpoint());
MasterCoordination master_coord({kLocal, 0});
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_server, &master_coord,
tmp_dir("master"));
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir("worker42"), 42));
master_coord.GetServerEndpoint(), tmp_dir("worker42"), 42));
EXPECT_EQ(workers[0]->worker_id(), 42);
EXPECT_DEATH(
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir("worker42"), 42)),
master_coord.GetServerEndpoint(), tmp_dir("worker42"), 42)),
"");
// Coordinated shutdown.
master_coord.Shutdown();
EXPECT_TRUE(master_coord.AwaitShutdown());
for (auto &worker : workers) worker->join();
master_server.Shutdown();
master_server.AwaitShutdown();
}
TEST_F(Distributed, CoordinationWorkersId) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord(master_server.endpoint());
MasterCoordination master_coord({kLocal, 0});
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_server, &master_coord,
tmp_dir("master"));
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir("worker42"), 42));
master_coord.GetServerEndpoint(), tmp_dir("worker42"), 42));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir("worker43"), 43));
master_coord.GetServerEndpoint(), tmp_dir("worker43"), 43));
std::vector<int> ids;
ids.push_back(0);
@ -178,25 +159,22 @@ TEST_F(Distributed, CoordinationWorkersId) {
master_coord.Shutdown();
EXPECT_TRUE(master_coord.AwaitShutdown());
for (auto &worker : workers) worker->join();
master_server.Shutdown();
master_server.AwaitShutdown();
}
TEST_F(Distributed, ClusterDiscovery) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord(master_server.endpoint());
MasterCoordination master_coord({kLocal, 0});
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_server, &master_coord,
tmp_dir("master"));
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
std::vector<int> ids;
int worker_count = 10;
ids.push_back(0);
for (int i = 1; i <= worker_count; ++i) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir(fmt::format("worker", i)), i));
master_coord.GetServerEndpoint(), tmp_dir(fmt::format("worker", i)),
i));
ids.push_back(i);
}
@ -211,22 +189,19 @@ TEST_F(Distributed, ClusterDiscovery) {
master_coord.Shutdown();
EXPECT_TRUE(master_coord.AwaitShutdown());
for (auto &worker : workers) worker->join();
master_server.Shutdown();
master_server.AwaitShutdown();
}
TEST_F(Distributed, KeepsTrackOfRecovered) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
MasterCoordination master_coord(master_server.endpoint());
MasterCoordination master_coord({kLocal, 0});
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
ClusterDiscoveryMaster master_discovery_(&master_server, &master_coord,
tmp_dir("master"));
ClusterDiscoveryMaster master_discovery_(&master_coord, tmp_dir("master"));
int worker_count = 10;
for (int i = 1; i <= worker_count; ++i) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), tmp_dir(fmt::format("worker{}", i)), i));
master_coord.GetServerEndpoint(), tmp_dir(fmt::format("worker{}", i)),
i));
workers.back()->NotifyWorkerRecovered();
EXPECT_THAT(master_coord.CountRecoveredWorkers(), i);
}
@ -235,8 +210,6 @@ TEST_F(Distributed, KeepsTrackOfRecovered) {
master_coord.Shutdown();
EXPECT_TRUE(master_coord.AwaitShutdown());
for (auto &worker : workers) worker->join();
master_server.Shutdown();
master_server.AwaitShutdown();
}
int main(int argc, char **argv) {

View File

@ -0,0 +1,31 @@
#pragma once
#include "distributed/coordination.hpp"
const std::string kLocal = "127.0.0.1";
// TODO (mferencevic): These test classes should be replaced with the real
// coordination once `ClusterDiscoveryXYZ` is merged with `CoordinationXYZ`.
class TestMasterCoordination : public distributed::Coordination {
public:
TestMasterCoordination()
: distributed::Coordination({kLocal, 0}, 0, {kLocal, 0}) {}
void Stop() {
server_.Shutdown();
server_.AwaitShutdown();
}
};
class TestWorkerCoordination : public distributed::Coordination {
public:
TestWorkerCoordination(const io::network::Endpoint &master_endpoint,
int worker_id)
: distributed::Coordination({kLocal, 0}, worker_id, master_endpoint) {}
void Stop() {
server_.Shutdown();
server_.AwaitShutdown();
}
};

View File

@ -5,41 +5,29 @@
#include "gtest/gtest.h"
#include "communication/rpc/server.hpp"
#include "distributed/cluster_discovery_master.hpp"
#include "distributed/coordination_master.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/distributed/engine_master.hpp"
#include "transactions/distributed/engine_worker.hpp"
#include "test_coordination.hpp"
using namespace tx;
using namespace communication::rpc;
using namespace distributed;
class WorkerEngineTest : public testing::Test {
protected:
const std::string local{"127.0.0.1"};
void TearDown() override {
// First we shutdown the master.
master_coordination_.Shutdown();
EXPECT_TRUE(master_coordination_.AwaitShutdown());
// Shutdown the RPC servers.
master_server_.Shutdown();
master_server_.AwaitShutdown();
worker_server_.Shutdown();
worker_server_.AwaitShutdown();
std::thread thread([this] { worker_coordination_.Stop(); });
master_coordination_.Stop();
if (thread.joinable()) thread.join();
}
Server master_server_{{local, 0}};
Server worker_server_{{local, 0}};
TestMasterCoordination master_coordination_;
EngineMaster master_{&master_coordination_};
MasterCoordination master_coordination_{master_server_.endpoint()};
EngineMaster master_{&master_server_, &master_coordination_};
ClientPool master_client_pool{master_server_.endpoint()};
EngineWorker worker_{&worker_server_, &master_client_pool};
TestWorkerCoordination worker_coordination_{
master_coordination_.GetServerEndpoint(), 1};
EngineWorker worker_{&worker_coordination_};
};
TEST_F(WorkerEngineTest, BeginOnWorker) {