From baae40fcc6740878bf795a53feb065e68a15e090 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Tue, 16 Oct 2018 09:12:19 +0200 Subject: [PATCH] Move RPC server to Coordination Reviewers: teon.banek, msantl Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1658 --- .../distributed/distributed_counters.cpp | 9 +-- .../distributed/distributed_counters.hpp | 9 +-- .../distributed/distributed_graph_db.cpp | 74 +++++++----------- src/distributed/bfs_rpc_server.hpp | 32 ++++---- src/distributed/cluster_discovery_master.cpp | 28 +++---- src/distributed/cluster_discovery_master.hpp | 4 +- src/distributed/cluster_discovery_worker.cpp | 20 ++--- src/distributed/cluster_discovery_worker.hpp | 10 +-- src/distributed/coordination.cpp | 29 +++++-- src/distributed/coordination.hpp | 27 ++++++- src/distributed/coordination_master.cpp | 8 +- src/distributed/coordination_master.hpp | 6 ++ src/distributed/coordination_worker.cpp | 22 ++++-- src/distributed/coordination_worker.hpp | 16 ++-- src/distributed/data_rpc_server.cpp | 12 +-- src/distributed/data_rpc_server.hpp | 5 +- src/distributed/durability_rpc_worker.cpp | 10 +-- src/distributed/durability_rpc_worker.hpp | 5 +- src/distributed/dynamic_worker.cpp | 10 +-- src/distributed/dynamic_worker.hpp | 15 ++-- src/distributed/index_rpc_server.cpp | 15 ++-- src/distributed/index_rpc_server.hpp | 9 +-- src/distributed/plan_consumer.cpp | 7 +- src/distributed/plan_consumer.hpp | 5 +- src/distributed/produce_rpc_server.cpp | 9 +-- src/distributed/produce_rpc_server.hpp | 5 +- src/distributed/token_sharing_rpc_server.hpp | 15 +--- src/distributed/updates_rpc_server.cpp | 24 +++--- src/distributed/updates_rpc_server.hpp | 4 +- .../concurrent_id_mapper_master.cpp | 46 +++++------ .../concurrent_id_mapper_master.hpp | 7 +- .../concurrent_id_mapper_worker.cpp | 6 +- .../concurrent_id_mapper_worker.hpp | 4 +- src/storage/distributed/storage_gc_master.hpp | 13 +--- .../distributed/engine_distributed.hpp | 1 - .../distributed/engine_master.cpp | 29 ++++--- .../distributed/engine_master.hpp | 5 +- .../distributed/engine_worker.cpp | 9 ++- .../distributed/engine_worker.hpp | 8 +- .../unit/concurrent_id_mapper_distributed.cpp | 13 ++-- tests/unit/counters.cpp | 12 +-- tests/unit/distributed_coordination.cpp | 77 ++++++------------- tests/unit/test_coordination.hpp | 31 ++++++++ tests/unit/transaction_engine_distributed.cpp | 32 +++----- 44 files changed, 361 insertions(+), 376 deletions(-) create mode 100644 tests/unit/test_coordination.hpp diff --git a/src/database/distributed/distributed_counters.cpp b/src/database/distributed/distributed_counters.cpp index 338c8533d..70eb5df11 100644 --- a/src/database/distributed/distributed_counters.cpp +++ b/src/database/distributed/distributed_counters.cpp @@ -1,19 +1,16 @@ #include "database/distributed/distributed_counters.hpp" -#include "communication/rpc/client_pool.hpp" -#include "communication/rpc/server.hpp" #include "database/distributed/counters_rpc_messages.hpp" namespace database { -MasterCounters::MasterCounters(communication::rpc::Server *server) - : rpc_server_(server) { - rpc_server_->Register( +MasterCounters::MasterCounters(distributed::Coordination *coordination) { + coordination->Register( [this](const auto &req_reader, auto *res_builder) { CountersGetRes res(Get(req_reader.getName())); Save(res, res_builder); }); - rpc_server_->Register( + coordination->Register( [this](const auto &req_reader, auto *res_builder) { Set(req_reader.getName(), req_reader.getValue()); return std::make_unique(); diff --git a/src/database/distributed/distributed_counters.hpp b/src/database/distributed/distributed_counters.hpp index 7ac12a6be..3c926c598 100644 --- a/src/database/distributed/distributed_counters.hpp +++ b/src/database/distributed/distributed_counters.hpp @@ -7,24 +7,19 @@ #include "data_structures/concurrent/concurrent_map.hpp" #include "database/distributed/counters.hpp" - -namespace communication::rpc { -class Server; -class ClientPool; -} // namespace communication::rpc +#include "distributed/coordination.hpp" namespace database { /// Implementation for distributed master class MasterCounters : public Counters { public: - explicit MasterCounters(communication::rpc::Server *server); + explicit MasterCounters(distributed::Coordination *coordination); int64_t Get(const std::string &name) override; void Set(const std::string &name, int64_t value) override; private: - communication::rpc::Server *rpc_server_; ConcurrentMap> counters_; }; diff --git a/src/database/distributed/distributed_graph_db.cpp b/src/database/distributed/distributed_graph_db.cpp index b33ba2d2a..5b12bf33b 100644 --- a/src/database/distributed/distributed_graph_db.cpp +++ b/src/database/distributed/distributed_graph_db.cpp @@ -580,7 +580,7 @@ namespace impl { template