From 912d1783917d649e810028075a0ef11a341ee212 Mon Sep 17 00:00:00 2001 From: florijan Date: Mon, 22 Jan 2018 15:45:32 +0100 Subject: [PATCH] Extract RpcWorkerClients Summary: Extracting the RPC client-per-worker functionality we'll commonly need. Reviewers: teon.banek, msantl, dgleich Reviewed By: dgleich Differential Revision: https://phabricator.memgraph.io/D1127 --- src/distributed/remote_data_rpc_clients.hpp | 29 +++---------- src/distributed/rpc_worker_clients.hpp | 46 +++++++++++++++++++++ 2 files changed, 51 insertions(+), 24 deletions(-) create mode 100644 src/distributed/rpc_worker_clients.hpp diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/remote_data_rpc_clients.hpp index 0f34114a6..a877aa5ce 100644 --- a/src/distributed/remote_data_rpc_clients.hpp +++ b/src/distributed/remote_data_rpc_clients.hpp @@ -4,11 +4,9 @@ #include #include "communication/messaging/distributed.hpp" -#include "communication/rpc/rpc.hpp" -#include "database/state_delta.hpp" #include "distributed/coordination.hpp" #include "distributed/remote_data_rpc_messages.hpp" -#include "distributed/remote_data_rpc_messages.hpp" +#include "distributed/rpc_worker_clients.hpp" #include "storage/gid.hpp" #include "transactions/type.hpp" @@ -21,7 +19,7 @@ class RemoteDataRpcClients { public: RemoteDataRpcClients(communication::messaging::System &system, Coordination &coordination) - : system_(system), coordination_(coordination) {} + : clients_(system, coordination, kRemoteDataRpcName) {} /// Returns a remote worker's data for the given params. That worker must own /// the vertex for the given id, and that vertex must be visible in given @@ -29,7 +27,7 @@ class RemoteDataRpcClients { std::unique_ptr RemoteVertex(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - auto response = RemoteDataClient(worker_id).Call( + auto response = clients_.GetClient(worker_id).Call( kRemoteDataRpcTimeout, TxGidPair{tx_id, gid}); return std::move(response->name_output_); } @@ -39,7 +37,7 @@ class RemoteDataRpcClients { /// transaction. std::unique_ptr RemoteEdge(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - auto response = RemoteDataClient(worker_id).Call( + auto response = clients_.GetClient(worker_id).Call( kRemoteDataRpcTimeout, TxGidPair{tx_id, gid}); return std::move(response->name_output_); } @@ -50,24 +48,7 @@ class RemoteDataRpcClients { gid::Gid gid); private: - communication::messaging::System &system_; - // TODO make Coordination const, it's member GetEndpoint must be const too. - Coordination &coordination_; - - std::unordered_map clients_; - std::mutex lock_; - - Client &RemoteDataClient(int worker_id) { - std::lock_guard guard{lock_}; - auto found = clients_.find(worker_id); - if (found != clients_.end()) return found->second; - return clients_ - .emplace( - std::piecewise_construct, std::forward_as_tuple(worker_id), - std::forward_as_tuple(system_, coordination_.GetEndpoint(worker_id), - kRemoteDataRpcName)) - .first->second; - } + RpcWorkerClients clients_; }; template <> diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp new file mode 100644 index 000000000..c8aed68d9 --- /dev/null +++ b/src/distributed/rpc_worker_clients.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "communication/messaging/distributed.hpp" +#include "communication/rpc/rpc.hpp" +#include "distributed/coordination.hpp" + +namespace distributed { + +/** A cache of RPC clients (of the given name/kind) per MG distributed worker. + * Thread safe. */ +class RpcWorkerClients { + public: + RpcWorkerClients(communication::messaging::System &system, + Coordination &coordination, + const std::string &rpc_client_name) + : system_(system), + coordination_(coordination), + rpc_client_name_(rpc_client_name) {} + + RpcWorkerClients(const RpcWorkerClients &) = delete; + RpcWorkerClients(RpcWorkerClients &&) = delete; + RpcWorkerClients &operator=(const RpcWorkerClients &) = delete; + RpcWorkerClients &operator=(RpcWorkerClients &&) = delete; + + auto &GetClient(int worker_id) { + std::lock_guard guard{lock_}; + auto found = clients_.find(worker_id); + if (found != clients_.end()) return found->second; + return clients_ + .emplace( + std::piecewise_construct, std::forward_as_tuple(worker_id), + std::forward_as_tuple(system_, coordination_.GetEndpoint(worker_id), + rpc_client_name_)) + .first->second; + } + + private: + communication::messaging::System &system_; + // TODO make Coordination const, it's member GetEndpoint must be const too. + Coordination &coordination_; + const std::string rpc_client_name_; + std::unordered_map clients_; + std::mutex lock_; +}; + +} // namespace distributed