From 78afaa07a39671187169808d6a9e7828402f673e Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Thu, 25 Jan 2018 14:28:14 +0100 Subject: [PATCH] Use RPC `ClientPool` instead of `Client` Summary: Use RPC `ClientPool` instead of `Client` Reviewers: florijan, teon.banek Reviewed By: florijan Subscribers: pullbot, mtomic Differential Revision: https://phabricator.memgraph.io/D1153 --- src/database/counters.cpp | 6 +++--- src/database/counters.hpp | 4 ++-- src/database/graph_db_accessor.cpp | 5 +++-- src/distributed/coordination_master.cpp | 4 ++-- src/distributed/coordination_worker.cpp | 8 ++++---- src/distributed/coordination_worker.hpp | 4 ++-- src/distributed/plan_dispatcher.cpp | 5 +++-- src/distributed/remote_data_rpc_clients.hpp | 7 ++----- src/distributed/remote_pull_rpc_clients.hpp | 12 ++++++------ src/distributed/rpc_worker_clients.hpp | 18 +++++++++--------- src/storage/concurrent_id_mapper_worker.cpp | 6 +++--- src/storage/concurrent_id_mapper_worker.hpp | 4 ++-- src/transactions/engine_worker.cpp | 13 +++++++------ src/transactions/engine_worker.hpp | 4 ++-- 14 files changed, 50 insertions(+), 50 deletions(-) diff --git a/src/database/counters.cpp b/src/database/counters.cpp index bd5112fc3..06c6f57f3 100644 --- a/src/database/counters.cpp +++ b/src/database/counters.cpp @@ -44,17 +44,17 @@ MasterCounters::MasterCounters(communication::rpc::System &system) } WorkerCounters::WorkerCounters(const io::network::Endpoint &master_endpoint) - : rpc_client_(master_endpoint, kCountersRpc) {} + : rpc_client_pool_(master_endpoint, kCountersRpc) {} int64_t WorkerCounters::Get(const std::string &name) { - auto response = rpc_client_.Call(name); + auto response = rpc_client_pool_.Call(name); CHECK(response) << "CountersGetRpc - failed to get response from master"; return response->member; } void WorkerCounters::Set(const std::string &name, int64_t value) { auto response = - rpc_client_.Call(CountersSetReqData{name, value}); + rpc_client_pool_.Call(CountersSetReqData{name, value}); CHECK(response) << "CountersSetRpc - failed to get response from master"; } diff --git a/src/database/counters.hpp b/src/database/counters.hpp index 1c51e8fbc..2c6d29ac8 100644 --- a/src/database/counters.hpp +++ b/src/database/counters.hpp @@ -4,8 +4,8 @@ #include #include +#include "communication/rpc/client_pool.hpp" #include "communication/rpc/messages.hpp" -#include "communication/rpc/client.hpp" #include "communication/rpc/server.hpp" #include "data_structures/concurrent/concurrent_map.hpp" @@ -60,7 +60,7 @@ class WorkerCounters : public Counters { void Set(const std::string &name, int64_t value) override; private: - communication::rpc::Client rpc_client_; + communication::rpc::ClientPool rpc_client_pool_; }; } // namespace database diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index d4270ffb0..a5aa48ba8 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -172,8 +172,9 @@ void GraphDbAccessor::BuildIndex(storage::Label label, index_rpc_completions.emplace( db_.index_rpc_clients().ExecuteOnWorkers( this->db_.WorkerId(), - [label, property, this](communication::rpc::Client &client) { - return client.Call( + [label, property, + this](communication::rpc::ClientPool &client_pool) { + return client_pool.Call( distributed::IndexLabelPropertyTx{ label, property, transaction_id()}) != nullptr; })); diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index e4ac594ee..d74dac068 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -52,8 +52,8 @@ MasterCoordination::~MasterCoordination() { Endpoint MasterCoordination::GetEndpoint(int worker_id) { std::lock_guard guard(lock_); auto found = workers_.find(worker_id); - CHECK(found != workers_.end()) - << "No endpoint registered for worker id: " << worker_id; + CHECK(found != workers_.end()) << "No endpoint registered for worker id: " + << worker_id; return found->second; } diff --git a/src/distributed/coordination_worker.cpp b/src/distributed/coordination_worker.cpp index de8d1cfae..d9a66c077 100644 --- a/src/distributed/coordination_worker.cpp +++ b/src/distributed/coordination_worker.cpp @@ -12,12 +12,12 @@ using namespace std::literals::chrono_literals; WorkerCoordination::WorkerCoordination(communication::rpc::System &system, const Endpoint &master_endpoint) : system_(system), - client_(master_endpoint, kCoordinationServerName), + client_pool_(master_endpoint, kCoordinationServerName), server_(system_, kCoordinationServerName) {} int WorkerCoordination::RegisterWorker(int desired_worker_id) { - auto result = - client_.Call(desired_worker_id, system_.endpoint()); + auto result = client_pool_.Call(desired_worker_id, + system_.endpoint()); CHECK(result) << "Failed to RegisterWorker with the master"; return result->member; } @@ -26,7 +26,7 @@ Endpoint WorkerCoordination::GetEndpoint(int worker_id) { auto accessor = endpoint_cache_.access(); auto found = accessor.find(worker_id); if (found != accessor.end()) return found->second; - auto result = client_.Call(worker_id); + auto result = client_pool_.Call(worker_id); CHECK(result) << "Failed to GetEndpoint from the master"; accessor.insert(worker_id, result->member); return result->member; diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index efef8270d..c6fc832e6 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -1,6 +1,6 @@ #pragma once -#include "communication/rpc/client.hpp" +#include "communication/rpc/client_pool.hpp" #include "communication/rpc/server.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "distributed/coordination.hpp" @@ -38,7 +38,7 @@ class WorkerCoordination : public Coordination { private: communication::rpc::System &system_; - communication::rpc::Client client_; + communication::rpc::ClientPool client_pool_; communication::rpc::Server server_; mutable ConcurrentMap endpoint_cache_; }; diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp index 7dc92f1ff..3eaa36655 100644 --- a/src/distributed/plan_dispatcher.cpp +++ b/src/distributed/plan_dispatcher.cpp @@ -9,9 +9,10 @@ void PlanDispatcher::DispatchPlan( int64_t plan_id, std::shared_ptr plan, const SymbolTable &symbol_table) { auto futures = clients_.ExecuteOnWorkers( - 0, [plan_id, plan, symbol_table](communication::rpc::Client &client) { + 0, [plan_id, plan, + symbol_table](communication::rpc::ClientPool &client_pool) { auto result = - client.Call(plan_id, plan, symbol_table); + client_pool.Call(plan_id, plan, symbol_table); CHECK(result) << "Failed to dispatch plan to worker"; }); diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/remote_data_rpc_clients.hpp index 98bead2d0..4701d3099 100644 --- a/src/distributed/remote_data_rpc_clients.hpp +++ b/src/distributed/remote_data_rpc_clients.hpp @@ -3,7 +3,6 @@ #include #include -#include "communication/rpc/client.hpp" #include "distributed/coordination.hpp" #include "distributed/remote_data_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" @@ -14,8 +13,6 @@ namespace distributed { /** Provides access to other worker's data. */ class RemoteDataRpcClients { - using Client = communication::rpc::Client; - public: RemoteDataRpcClients(Coordination &coordination) : clients_(coordination, kRemoteDataRpcName) {} @@ -26,7 +23,7 @@ class RemoteDataRpcClients { std::unique_ptr RemoteVertex(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - auto response = clients_.GetClient(worker_id).Call( + auto response = clients_.GetClientPool(worker_id).Call( TxGidPair{tx_id, gid}); return std::move(response->name_output_); } @@ -36,7 +33,7 @@ class RemoteDataRpcClients { /// transaction. std::unique_ptr RemoteEdge(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - auto response = clients_.GetClient(worker_id).Call( + auto response = clients_.GetClientPool(worker_id).Call( TxGidPair{tx_id, gid}); return std::move(response->name_output_); } diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 1ef29ecfe..7168eb704 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -16,7 +16,7 @@ namespace distributed { * batches and are therefore accompanied with an enum indicator of the state of * remote execution. */ class RemotePullRpcClients { - using Client = communication::rpc::Client; + using ClientPool = communication::rpc::ClientPool; public: RemotePullRpcClients(Coordination &coordination) @@ -30,9 +30,9 @@ class RemotePullRpcClients { const Parameters ¶ms, const std::vector &symbols, int batch_size = kDefaultBatchSize) { return clients_.ExecuteOnWorker( - worker_id, - [tx_id, plan_id, ¶ms, &symbols, batch_size](Client &client) { - return client + worker_id, [tx_id, plan_id, ¶ms, &symbols, + batch_size](ClientPool &client_pool) { + return client_pool .Call(RemotePullReqData{tx_id, plan_id, params, symbols, batch_size}) ->member; @@ -50,8 +50,8 @@ class RemotePullRpcClients { std::future EndRemotePull(int worker_id, tx::transaction_id_t tx_id, int64_t plan_id) { return clients_.ExecuteOnWorker( - worker_id, [tx_id, plan_id](Client &client) { - return client.Call( + worker_id, [tx_id, plan_id](ClientPool &client_pool) { + return client_pool.Call( EndRemotePullReqData{tx_id, plan_id}); }); } diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index f54b5c062..6ef613db0 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -5,7 +5,7 @@ #include #include -#include "communication/rpc/client.hpp" +#include "communication/rpc/client_pool.hpp" #include "distributed/coordination.hpp" namespace distributed { @@ -23,11 +23,11 @@ class RpcWorkerClients { RpcWorkerClients &operator=(const RpcWorkerClients &) = delete; RpcWorkerClients &operator=(RpcWorkerClients &&) = delete; - auto &GetClient(int worker_id) { + auto &GetClientPool(int worker_id) { std::lock_guard guard{lock_}; - auto found = clients_.find(worker_id); - if (found != clients_.end()) return found->second; - return clients_ + auto found = client_pools_.find(worker_id); + if (found != client_pools_.end()) return found->second; + return client_pools_ .emplace(std::piecewise_construct, std::forward_as_tuple(worker_id), std::forward_as_tuple(coordination_.GetEndpoint(worker_id), rpc_client_name_)) @@ -42,8 +42,8 @@ class RpcWorkerClients { template auto ExecuteOnWorker( int worker_id, - std::function execute) { - auto &client = GetClient(worker_id); + std::function execute) { + auto &client = GetClientPool(worker_id); return std::async(std::launch::async, [execute, &client]() { return execute(client); }); } @@ -54,7 +54,7 @@ class RpcWorkerClients { template auto ExecuteOnWorkers( int skip_worker_id, - std::function execute) { + std::function execute) { std::vector> futures; for (auto &worker_id : coordination_.GetWorkerIds()) { if (worker_id == skip_worker_id) continue; @@ -67,7 +67,7 @@ class RpcWorkerClients { // TODO make Coordination const, it's member GetEndpoint must be const too. Coordination &coordination_; const std::string rpc_client_name_; - std::unordered_map clients_; + std::unordered_map client_pools_; std::mutex lock_; }; diff --git a/src/storage/concurrent_id_mapper_worker.cpp b/src/storage/concurrent_id_mapper_worker.cpp index ced7c60ff..ce111e263 100644 --- a/src/storage/concurrent_id_mapper_worker.cpp +++ b/src/storage/concurrent_id_mapper_worker.cpp @@ -10,14 +10,14 @@ namespace storage { template <> \ type WorkerConcurrentIdMapper::RpcValueToId( \ const std::string &value) { \ - auto response = rpc_client_.Call(value); \ + auto response = rpc_client_pool_.Call(value); \ CHECK(response) << ("Failed to obtain " #type " from master"); \ return response->member; \ } \ \ template <> \ std::string WorkerConcurrentIdMapper::RpcIdToValue(type id) { \ - auto response = rpc_client_.Call(id); \ + auto response = rpc_client_pool_.Call(id); \ CHECK(response) << ("Failed to obtain " #type " value from master"); \ return response->member; \ } @@ -32,7 +32,7 @@ ID_VALUE_RPC_CALLS(Property) template WorkerConcurrentIdMapper::WorkerConcurrentIdMapper( const io::network::Endpoint &master_endpoint) - : rpc_client_(master_endpoint, impl::RpcServerNameFromType()) {} + : rpc_client_pool_(master_endpoint, impl::RpcServerNameFromType()) {} template TId WorkerConcurrentIdMapper::value_to_id(const std::string &value) { diff --git a/src/storage/concurrent_id_mapper_worker.hpp b/src/storage/concurrent_id_mapper_worker.hpp index c8005b7cf..89da2ee41 100644 --- a/src/storage/concurrent_id_mapper_worker.hpp +++ b/src/storage/concurrent_id_mapper_worker.hpp @@ -1,6 +1,6 @@ #pragma once -#include "communication/rpc/client.hpp" +#include "communication/rpc/client_pool.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "io/network/endpoint.hpp" #include "storage/concurrent_id_mapper.hpp" @@ -29,6 +29,6 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper { ConcurrentMap id_to_value_cache_; // Communication to the concurrent ID master. - mutable communication::rpc::Client rpc_client_; + mutable communication::rpc::ClientPool rpc_client_pool_; }; } // namespace storage diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 8cd282033..22482e973 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -7,7 +7,7 @@ namespace tx { WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) - : rpc_client_(endpoint, kTransactionEngineRpc) {} + : rpc_client_pool_(endpoint, kTransactionEngineRpc) {} CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { auto info = clog_.fetch_info(tid); @@ -16,7 +16,7 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { if (!(info.is_aborted() || info.is_committed())) { // @review: this version of Call is just used because Info has no // default constructor. - info = rpc_client_.Call(tid)->member; + info = rpc_client_pool_.Call(tid)->member; DCHECK(info.is_committed() || info.is_aborted()) << "It is expected that the transaction is not running anymore. This " "function should be used only after the snapshot of the current " @@ -29,15 +29,15 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { } Snapshot WorkerEngine::GlobalGcSnapshot() { - return std::move(rpc_client_.Call()->member); + return std::move(rpc_client_pool_.Call()->member); } Snapshot WorkerEngine::GlobalActiveTransactions() { - return std::move(rpc_client_.Call()->member); + return std::move(rpc_client_pool_.Call()->member); } bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const { - return rpc_client_.Call(tid)->member; + return rpc_client_pool_.Call(tid)->member; } tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; } @@ -52,7 +52,8 @@ tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) { auto found = accessor.find(tx_id); if (found != accessor.end()) return found->second; - Snapshot snapshot(std::move(rpc_client_.Call(tx_id)->member)); + Snapshot snapshot( + std::move(rpc_client_pool_.Call(tx_id)->member)); auto insertion = accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this)); utils::EnsureAtomicGe(local_last_, tx_id); diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index d78da33bc..ecdf67eb0 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -3,7 +3,7 @@ #include #include -#include "communication/rpc/client.hpp" +#include "communication/rpc/client_pool.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "io/network/endpoint.hpp" #include "transactions/commit_log.hpp" @@ -35,7 +35,7 @@ class WorkerEngine : public Engine { mutable CommitLog clog_; // Communication to the transactional master. - mutable communication::rpc::Client rpc_client_; + mutable communication::rpc::ClientPool rpc_client_pool_; }; } // namespace tx