Extract RpcWorkerClients

Summary: Extracting the RPC client-per-worker functionality we'll commonly need.

Reviewers: teon.banek, msantl, dgleich

Reviewed By: dgleich

Differential Revision: https://phabricator.memgraph.io/D1127
This commit is contained in:
florijan 2018-01-22 15:45:32 +01:00
parent 1d6ac3d23d
commit 912d178391
2 changed files with 51 additions and 24 deletions

View File

@ -4,11 +4,9 @@
#include <utility>
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "database/state_delta.hpp"
#include "distributed/coordination.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "storage/gid.hpp"
#include "transactions/type.hpp"
@ -21,7 +19,7 @@ class RemoteDataRpcClients {
public:
RemoteDataRpcClients(communication::messaging::System &system,
Coordination &coordination)
: system_(system), coordination_(coordination) {}
: clients_(system, coordination, kRemoteDataRpcName) {}
/// Returns a remote worker's data for the given params. That worker must own
/// the vertex for the given id, and that vertex must be visible in given
@ -29,7 +27,7 @@ class RemoteDataRpcClients {
std::unique_ptr<Vertex> RemoteVertex(int worker_id,
tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response = RemoteDataClient(worker_id).Call<RemoteVertexRpc>(
auto response = clients_.GetClient(worker_id).Call<RemoteVertexRpc>(
kRemoteDataRpcTimeout, TxGidPair{tx_id, gid});
return std::move(response->name_output_);
}
@ -39,7 +37,7 @@ class RemoteDataRpcClients {
/// transaction.
std::unique_ptr<Edge> RemoteEdge(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response = RemoteDataClient(worker_id).Call<RemoteEdgeRpc>(
auto response = clients_.GetClient(worker_id).Call<RemoteEdgeRpc>(
kRemoteDataRpcTimeout, TxGidPair{tx_id, gid});
return std::move(response->name_output_);
}
@ -50,24 +48,7 @@ class RemoteDataRpcClients {
gid::Gid gid);
private:
communication::messaging::System &system_;
// TODO make Coordination const, it's member GetEndpoint must be const too.
Coordination &coordination_;
std::unordered_map<int, Client> clients_;
std::mutex lock_;
Client &RemoteDataClient(int worker_id) {
std::lock_guard<std::mutex> guard{lock_};
auto found = clients_.find(worker_id);
if (found != clients_.end()) return found->second;
return clients_
.emplace(
std::piecewise_construct, std::forward_as_tuple(worker_id),
std::forward_as_tuple(system_, coordination_.GetEndpoint(worker_id),
kRemoteDataRpcName))
.first->second;
}
RpcWorkerClients clients_;
};
template <>

View File

@ -0,0 +1,46 @@
#pragma once
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "distributed/coordination.hpp"
namespace distributed {
/** A cache of RPC clients (of the given name/kind) per MG distributed worker.
* Thread safe. */
class RpcWorkerClients {
public:
RpcWorkerClients(communication::messaging::System &system,
Coordination &coordination,
const std::string &rpc_client_name)
: system_(system),
coordination_(coordination),
rpc_client_name_(rpc_client_name) {}
RpcWorkerClients(const RpcWorkerClients &) = delete;
RpcWorkerClients(RpcWorkerClients &&) = delete;
RpcWorkerClients &operator=(const RpcWorkerClients &) = delete;
RpcWorkerClients &operator=(RpcWorkerClients &&) = delete;
auto &GetClient(int worker_id) {
std::lock_guard<std::mutex> guard{lock_};
auto found = clients_.find(worker_id);
if (found != clients_.end()) return found->second;
return clients_
.emplace(
std::piecewise_construct, std::forward_as_tuple(worker_id),
std::forward_as_tuple(system_, coordination_.GetEndpoint(worker_id),
rpc_client_name_))
.first->second;
}
private:
communication::messaging::System &system_;
// TODO make Coordination const, it's member GetEndpoint must be const too.
Coordination &coordination_;
const std::string rpc_client_name_;
std::unordered_map<int, communication::rpc::Client> clients_;
std::mutex lock_;
};
} // namespace distributed