Use RPC ClientPool instead of Client

Summary: Use RPC `ClientPool` instead of `Client`

Reviewers: florijan, teon.banek

Reviewed By: florijan

Subscribers: pullbot, mtomic

Differential Revision: https://phabricator.memgraph.io/D1153
This commit is contained in:
Matija Santl 2018-01-25 14:28:14 +01:00
parent e5a55a39e3
commit 78afaa07a3
14 changed files with 50 additions and 50 deletions

View File

@ -44,17 +44,17 @@ MasterCounters::MasterCounters(communication::rpc::System &system)
}
WorkerCounters::WorkerCounters(const io::network::Endpoint &master_endpoint)
: rpc_client_(master_endpoint, kCountersRpc) {}
: rpc_client_pool_(master_endpoint, kCountersRpc) {}
int64_t WorkerCounters::Get(const std::string &name) {
auto response = rpc_client_.Call<CountersGetRpc>(name);
auto response = rpc_client_pool_.Call<CountersGetRpc>(name);
CHECK(response) << "CountersGetRpc - failed to get response from master";
return response->member;
}
void WorkerCounters::Set(const std::string &name, int64_t value) {
auto response =
rpc_client_.Call<CountersSetRpc>(CountersSetReqData{name, value});
rpc_client_pool_.Call<CountersSetRpc>(CountersSetReqData{name, value});
CHECK(response) << "CountersSetRpc - failed to get response from master";
}

View File

@ -4,8 +4,8 @@
#include <cstdint>
#include <string>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/messages.hpp"
#include "communication/rpc/client.hpp"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
@ -60,7 +60,7 @@ class WorkerCounters : public Counters {
void Set(const std::string &name, int64_t value) override;
private:
communication::rpc::Client rpc_client_;
communication::rpc::ClientPool rpc_client_pool_;
};
} // namespace database

View File

@ -172,8 +172,9 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
index_rpc_completions.emplace(
db_.index_rpc_clients().ExecuteOnWorkers<bool>(
this->db_.WorkerId(),
[label, property, this](communication::rpc::Client &client) {
return client.Call<distributed::BuildIndexRpc>(
[label, property,
this](communication::rpc::ClientPool &client_pool) {
return client_pool.Call<distributed::BuildIndexRpc>(
distributed::IndexLabelPropertyTx{
label, property, transaction_id()}) != nullptr;
}));

View File

@ -52,8 +52,8 @@ MasterCoordination::~MasterCoordination() {
Endpoint MasterCoordination::GetEndpoint(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);
auto found = workers_.find(worker_id);
CHECK(found != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
CHECK(found != workers_.end()) << "No endpoint registered for worker id: "
<< worker_id;
return found->second;
}

View File

@ -12,12 +12,12 @@ using namespace std::literals::chrono_literals;
WorkerCoordination::WorkerCoordination(communication::rpc::System &system,
const Endpoint &master_endpoint)
: system_(system),
client_(master_endpoint, kCoordinationServerName),
client_pool_(master_endpoint, kCoordinationServerName),
server_(system_, kCoordinationServerName) {}
int WorkerCoordination::RegisterWorker(int desired_worker_id) {
auto result =
client_.Call<RegisterWorkerRpc>(desired_worker_id, system_.endpoint());
auto result = client_pool_.Call<RegisterWorkerRpc>(desired_worker_id,
system_.endpoint());
CHECK(result) << "Failed to RegisterWorker with the master";
return result->member;
}
@ -26,7 +26,7 @@ Endpoint WorkerCoordination::GetEndpoint(int worker_id) {
auto accessor = endpoint_cache_.access();
auto found = accessor.find(worker_id);
if (found != accessor.end()) return found->second;
auto result = client_.Call<GetEndpointRpc>(worker_id);
auto result = client_pool_.Call<GetEndpointRpc>(worker_id);
CHECK(result) << "Failed to GetEndpoint from the master";
accessor.insert(worker_id, result->member);
return result->member;

View File

@ -1,6 +1,6 @@
#pragma once
#include "communication/rpc/client.hpp"
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/coordination.hpp"
@ -38,7 +38,7 @@ class WorkerCoordination : public Coordination {
private:
communication::rpc::System &system_;
communication::rpc::Client client_;
communication::rpc::ClientPool client_pool_;
communication::rpc::Server server_;
mutable ConcurrentMap<int, Endpoint> endpoint_cache_;
};

View File

@ -9,9 +9,10 @@ void PlanDispatcher::DispatchPlan(
int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan,
const SymbolTable &symbol_table) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [plan_id, plan, symbol_table](communication::rpc::Client &client) {
0, [plan_id, plan,
symbol_table](communication::rpc::ClientPool &client_pool) {
auto result =
client.Call<DistributedPlanRpc>(plan_id, plan, symbol_table);
client_pool.Call<DistributedPlanRpc>(plan_id, plan, symbol_table);
CHECK(result) << "Failed to dispatch plan to worker";
});

View File

@ -3,7 +3,6 @@
#include <mutex>
#include <utility>
#include "communication/rpc/client.hpp"
#include "distributed/coordination.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
@ -14,8 +13,6 @@ namespace distributed {
/** Provides access to other worker's data. */
class RemoteDataRpcClients {
using Client = communication::rpc::Client;
public:
RemoteDataRpcClients(Coordination &coordination)
: clients_(coordination, kRemoteDataRpcName) {}
@ -26,7 +23,7 @@ class RemoteDataRpcClients {
std::unique_ptr<Vertex> RemoteVertex(int worker_id,
tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response = clients_.GetClient(worker_id).Call<RemoteVertexRpc>(
auto response = clients_.GetClientPool(worker_id).Call<RemoteVertexRpc>(
TxGidPair{tx_id, gid});
return std::move(response->name_output_);
}
@ -36,7 +33,7 @@ class RemoteDataRpcClients {
/// transaction.
std::unique_ptr<Edge> RemoteEdge(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response = clients_.GetClient(worker_id).Call<RemoteEdgeRpc>(
auto response = clients_.GetClientPool(worker_id).Call<RemoteEdgeRpc>(
TxGidPair{tx_id, gid});
return std::move(response->name_output_);
}

View File

@ -16,7 +16,7 @@ namespace distributed {
* batches and are therefore accompanied with an enum indicator of the state of
* remote execution. */
class RemotePullRpcClients {
using Client = communication::rpc::Client;
using ClientPool = communication::rpc::ClientPool;
public:
RemotePullRpcClients(Coordination &coordination)
@ -30,9 +30,9 @@ class RemotePullRpcClients {
const Parameters &params, const std::vector<query::Symbol> &symbols,
int batch_size = kDefaultBatchSize) {
return clients_.ExecuteOnWorker<RemotePullResData>(
worker_id,
[tx_id, plan_id, &params, &symbols, batch_size](Client &client) {
return client
worker_id, [tx_id, plan_id, &params, &symbols,
batch_size](ClientPool &client_pool) {
return client_pool
.Call<RemotePullRpc>(RemotePullReqData{tx_id, plan_id, params,
symbols, batch_size})
->member;
@ -50,8 +50,8 @@ class RemotePullRpcClients {
std::future<void> EndRemotePull(int worker_id, tx::transaction_id_t tx_id,
int64_t plan_id) {
return clients_.ExecuteOnWorker<void>(
worker_id, [tx_id, plan_id](Client &client) {
return client.Call<EndRemotePullRpc>(
worker_id, [tx_id, plan_id](ClientPool &client_pool) {
return client_pool.Call<EndRemotePullRpc>(
EndRemotePullReqData{tx_id, plan_id});
});
}

View File

@ -5,7 +5,7 @@
#include <type_traits>
#include <unordered_map>
#include "communication/rpc/client.hpp"
#include "communication/rpc/client_pool.hpp"
#include "distributed/coordination.hpp"
namespace distributed {
@ -23,11 +23,11 @@ class RpcWorkerClients {
RpcWorkerClients &operator=(const RpcWorkerClients &) = delete;
RpcWorkerClients &operator=(RpcWorkerClients &&) = delete;
auto &GetClient(int worker_id) {
auto &GetClientPool(int worker_id) {
std::lock_guard<std::mutex> guard{lock_};
auto found = clients_.find(worker_id);
if (found != clients_.end()) return found->second;
return clients_
auto found = client_pools_.find(worker_id);
if (found != client_pools_.end()) return found->second;
return client_pools_
.emplace(std::piecewise_construct, std::forward_as_tuple(worker_id),
std::forward_as_tuple(coordination_.GetEndpoint(worker_id),
rpc_client_name_))
@ -42,8 +42,8 @@ class RpcWorkerClients {
template <typename TResult>
auto ExecuteOnWorker(
int worker_id,
std::function<TResult(communication::rpc::Client &)> execute) {
auto &client = GetClient(worker_id);
std::function<TResult(communication::rpc::ClientPool &)> execute) {
auto &client = GetClientPool(worker_id);
return std::async(std::launch::async,
[execute, &client]() { return execute(client); });
}
@ -54,7 +54,7 @@ class RpcWorkerClients {
template <typename TResult>
auto ExecuteOnWorkers(
int skip_worker_id,
std::function<TResult(communication::rpc::Client &)> execute) {
std::function<TResult(communication::rpc::ClientPool &)> execute) {
std::vector<std::future<TResult>> futures;
for (auto &worker_id : coordination_.GetWorkerIds()) {
if (worker_id == skip_worker_id) continue;
@ -67,7 +67,7 @@ class RpcWorkerClients {
// TODO make Coordination const, it's member GetEndpoint must be const too.
Coordination &coordination_;
const std::string rpc_client_name_;
std::unordered_map<int, communication::rpc::Client> clients_;
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
std::mutex lock_;
};

View File

@ -10,14 +10,14 @@ namespace storage {
template <> \
type WorkerConcurrentIdMapper<type>::RpcValueToId( \
const std::string &value) { \
auto response = rpc_client_.Call<type##IdRpc>(value); \
auto response = rpc_client_pool_.Call<type##IdRpc>(value); \
CHECK(response) << ("Failed to obtain " #type " from master"); \
return response->member; \
} \
\
template <> \
std::string WorkerConcurrentIdMapper<type>::RpcIdToValue(type id) { \
auto response = rpc_client_.Call<Id##type##Rpc>(id); \
auto response = rpc_client_pool_.Call<Id##type##Rpc>(id); \
CHECK(response) << ("Failed to obtain " #type " value from master"); \
return response->member; \
}
@ -32,7 +32,7 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper(
const io::network::Endpoint &master_endpoint)
: rpc_client_(master_endpoint, impl::RpcServerNameFromType<TId>()) {}
: rpc_client_pool_(master_endpoint, impl::RpcServerNameFromType<TId>()) {}
template <typename TId>
TId WorkerConcurrentIdMapper<TId>::value_to_id(const std::string &value) {

View File

@ -1,6 +1,6 @@
#pragma once
#include "communication/rpc/client.hpp"
#include "communication/rpc/client_pool.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
@ -29,6 +29,6 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> {
ConcurrentMap<TId, std::string> id_to_value_cache_;
// Communication to the concurrent ID master.
mutable communication::rpc::Client rpc_client_;
mutable communication::rpc::ClientPool rpc_client_pool_;
};
} // namespace storage

View File

@ -7,7 +7,7 @@
namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_(endpoint, kTransactionEngineRpc) {}
: rpc_client_pool_(endpoint, kTransactionEngineRpc) {}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
auto info = clog_.fetch_info(tid);
@ -16,7 +16,7 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
if (!(info.is_aborted() || info.is_committed())) {
// @review: this version of Call is just used because Info has no
// default constructor.
info = rpc_client_.Call<ClogInfoRpc>(tid)->member;
info = rpc_client_pool_.Call<ClogInfoRpc>(tid)->member;
DCHECK(info.is_committed() || info.is_aborted())
<< "It is expected that the transaction is not running anymore. This "
"function should be used only after the snapshot of the current "
@ -29,15 +29,15 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
}
Snapshot WorkerEngine::GlobalGcSnapshot() {
return std::move(rpc_client_.Call<GcSnapshotRpc>()->member);
return std::move(rpc_client_pool_.Call<GcSnapshotRpc>()->member);
}
Snapshot WorkerEngine::GlobalActiveTransactions() {
return std::move(rpc_client_.Call<ActiveTransactionsRpc>()->member);
return std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member);
}
bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const {
return rpc_client_.Call<IsActiveRpc>(tid)->member;
return rpc_client_pool_.Call<IsActiveRpc>(tid)->member;
}
tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
@ -52,7 +52,8 @@ tx::Transaction *WorkerEngine::RunningTransaction(tx::transaction_id_t tx_id) {
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot(std::move(rpc_client_.Call<SnapshotRpc>(tx_id)->member));
Snapshot snapshot(
std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member));
auto insertion =
accessor.insert(tx_id, new Transaction(tx_id, snapshot, *this));
utils::EnsureAtomicGe(local_last_, tx_id);

View File

@ -3,7 +3,7 @@
#include <atomic>
#include <mutex>
#include "communication/rpc/client.hpp"
#include "communication/rpc/client_pool.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/commit_log.hpp"
@ -35,7 +35,7 @@ class WorkerEngine : public Engine {
mutable CommitLog clog_;
// Communication to the transactional master.
mutable communication::rpc::Client rpc_client_;
mutable communication::rpc::ClientPool rpc_client_pool_;
};
} // namespace tx