Use the same ClientPools in distributed

Summary:
Instead of passing `coordination`, pass `rpc_worker_clients` that
holds a map of worker_id->clientPool. By having only one instance of
`RpcWorkerClients` that is owned by `GraphDB` and passing it by refference
we'll share the same client pools for rpc clients.

Reviewers: teon.banek, florijan, dgleich, mferencevic

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1261
This commit is contained in:
Matija Santl 2018-02-28 16:02:45 +01:00
parent f963c54f77
commit 1ca98826af
18 changed files with 97 additions and 68 deletions

View File

@ -41,18 +41,19 @@ MasterCounters::MasterCounters(communication::rpc::Server &server)
});
}
WorkerCounters::WorkerCounters(const io::network::Endpoint &master_endpoint)
: rpc_client_pool_(master_endpoint) {}
WorkerCounters::WorkerCounters(
communication::rpc::ClientPool &master_client_pool)
: master_client_pool_(master_client_pool) {}
int64_t WorkerCounters::Get(const std::string &name) {
auto response = rpc_client_pool_.Call<CountersGetRpc>(name);
auto response = master_client_pool_.Call<CountersGetRpc>(name);
CHECK(response) << "CountersGetRpc - failed to get response from master";
return response->member;
}
void WorkerCounters::Set(const std::string &name, int64_t value) {
auto response =
rpc_client_pool_.Call<CountersSetRpc>(CountersSetReqData{name, value});
master_client_pool_.Call<CountersSetRpc>(CountersSetReqData{name, value});
CHECK(response) << "CountersSetRpc - failed to get response from master";
}

View File

@ -54,13 +54,13 @@ class MasterCounters : public SingleNodeCounters {
/** Implementation for distributed worker. */
class WorkerCounters : public Counters {
public:
WorkerCounters(const io::network::Endpoint &master_endpoint);
WorkerCounters(communication::rpc::ClientPool &master_client_pool);
int64_t Get(const std::string &name) override;
void Set(const std::string &name, int64_t value) override;
private:
communication::rpc::ClientPool rpc_client_pool_;
communication::rpc::ClientPool &master_client_pool_;
};
} // namespace database

View File

@ -54,7 +54,7 @@ class PrivateBase : public GraphDb {
distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan dispatcher only available in distributed master.";
}
distributed::RpcWorkerClients &index_rpc_clients() override {
distributed::IndexRpcClients &index_rpc_clients() override {
LOG(FATAL) << "Index RPC clients only available in distributed master.";
}
@ -162,7 +162,7 @@ class Master : public PrivateBase {
distributed::RemotePullRpcClients &remote_pull_clients() override {
return remote_pull_clients_;
}
distributed::RpcWorkerClients &index_rpc_clients() override {
distributed::IndexRpcClients &index_rpc_clients() override {
return index_rpc_clients_;
}
@ -178,15 +178,17 @@ class Master : public PrivateBase {
tx::MasterEngine tx_engine_{server_, &wal_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
distributed::MasterCoordination coordination_{server_};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::RemoteDataRpcServer remote_data_server_{*this, server_};
distributed::RemoteDataRpcClients remote_data_clients_{coordination_};
distributed::PlanDispatcher plan_dispatcher_{coordination_};
distributed::RemotePullRpcClients remote_pull_clients_{coordination_};
distributed::RpcWorkerClients index_rpc_clients_{coordination_};
distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_};
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
distributed::RemotePullRpcClients remote_pull_clients_{rpc_worker_clients_};
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{
rpc_worker_clients_};
distributed::RemoteDataManager remote_data_manager_{storage_,
remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
@ -223,18 +225,21 @@ class Worker : public PrivateBase {
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
tx::WorkerEngine tx_engine_{config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{config_.master_endpoint};
database::WorkerCounters counters_{config_.master_endpoint};
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};
distributed::RemoteDataRpcServer remote_data_server_{*this, server_};
distributed::RemoteDataRpcClients remote_data_clients_{coordination_};
distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_};
distributed::PlanConsumer plan_consumer_{server_};
distributed::RemoteProduceRpcServer remote_produce_server_{
*this, tx_engine_, server_, plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{
rpc_worker_clients_};
distributed::RemoteDataManager remote_data_manager_{storage_,
remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
@ -291,7 +296,7 @@ distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() {
distributed::PlanDispatcher &PublicBase::plan_dispatcher() {
return impl_->plan_dispatcher();
}
distributed::RpcWorkerClients &PublicBase::index_rpc_clients() {
distributed::IndexRpcClients &PublicBase::index_rpc_clients() {
return impl_->index_rpc_clients();
}
distributed::PlanConsumer &PublicBase::plan_consumer() {

View File

@ -7,7 +7,6 @@
#include "database/counters.hpp"
#include "database/storage.hpp"
#include "database/storage_gc.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "durability/wal.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
@ -25,6 +24,7 @@ class RemoteProduceRpcServer;
class RemoteUpdatesRpcServer;
class RemoteUpdatesRpcClients;
class RemoteDataManager;
class IndexRpcClients;
}
namespace database {
@ -104,7 +104,7 @@ class GraphDb {
// Supported only in distributed master.
virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0;
virtual distributed::PlanDispatcher &plan_dispatcher() = 0;
virtual distributed::RpcWorkerClients &index_rpc_clients() = 0;
virtual distributed::IndexRpcClients &index_rpc_clients() = 0;
// Supported only in distributed worker.
// TODO remove once end2end testing is possible.
@ -141,7 +141,7 @@ class PublicBase : public GraphDb {
distributed::RemoteDataRpcServer &remote_data_server() override;
distributed::RemoteDataRpcClients &remote_data_clients() override;
distributed::PlanDispatcher &plan_dispatcher() override;
distributed::RpcWorkerClients &index_rpc_clients() override;
distributed::IndexRpcClients &index_rpc_clients() override;
distributed::PlanConsumer &plan_consumer() override;
distributed::RemotePullRpcClients &remote_pull_clients() override;
distributed::RemoteProduceRpcServer &remote_produce_server() override;

View File

@ -5,9 +5,9 @@
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/remote_data_manager.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "storage/address_types.hpp"
#include "storage/edge.hpp"
#include "storage/edge_accessor.hpp"
@ -183,15 +183,8 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
// Notify all workers to start building an index if we are the master since
// they don't have to wait anymore
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
index_rpc_completions.emplace(
db_.index_rpc_clients().ExecuteOnWorkers<bool>(
this->db_.WorkerId(),
[label, property,
this](communication::rpc::ClientPool &client_pool) {
return client_pool.Call<distributed::BuildIndexRpc>(
distributed::IndexLabelPropertyTx{
label, property, transaction_id()}) != nullptr;
}));
index_rpc_completions.emplace(db_.index_rpc_clients().GetBuildIndexFutures(
label, property, transaction_id(), this->db_.WorkerId()));
}
// Add transaction to the build_tx_in_progress as this transaction doesn't

View File

@ -2,8 +2,7 @@
namespace distributed {
PlanDispatcher::PlanDispatcher(Coordination &coordination)
: clients_(coordination) {}
PlanDispatcher::PlanDispatcher(RpcWorkerClients &clients) : clients_(clients) {}
void PlanDispatcher::DispatchPlan(
int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan,

View File

@ -13,7 +13,7 @@ namespace distributed {
*/
class PlanDispatcher {
public:
explicit PlanDispatcher(Coordination &coordination);
explicit PlanDispatcher(RpcWorkerClients &clients);
/**
* Synchronously dispatch a plan to all workers and wait for their
@ -24,7 +24,7 @@ class PlanDispatcher {
const SymbolTable &symbol_table);
private:
RpcWorkerClients clients_;
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -14,7 +14,7 @@ namespace distributed {
/** Provides access to other worker's data. */
class RemoteDataRpcClients {
public:
RemoteDataRpcClients(Coordination &coordination) : clients_(coordination) {}
RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
/// Returns a remote worker's data for the given params. That worker must own
/// the vertex for the given id, and that vertex must be visible in given
@ -43,7 +43,7 @@ class RemoteDataRpcClients {
gid::Gid gid);
private:
RpcWorkerClients clients_;
RpcWorkerClients &clients_;
};
template <>

View File

@ -21,8 +21,7 @@ class RemotePullRpcClients {
using ClientPool = communication::rpc::ClientPool;
public:
RemotePullRpcClients(Coordination &coordination)
: clients_(coordination) {}
RemotePullRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
/// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this
/// function for the same (tx_id, worker_id, plan_id) before the previous call
@ -95,6 +94,6 @@ class RemotePullRpcClients {
}
private:
RpcWorkerClients clients_;
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -4,7 +4,6 @@
#include <vector>
#include "database/state_delta.hpp"
#include "distributed/coordination.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "query/exceptions.hpp"
@ -21,8 +20,8 @@ namespace distributed {
/// apply the accumulated deferred updates, or discard them.
class RemoteUpdatesRpcClients {
public:
explicit RemoteUpdatesRpcClients(distributed::Coordination &coordination)
: worker_clients_(coordination) {}
explicit RemoteUpdatesRpcClients(RpcWorkerClients &clients)
: worker_clients_(clients) {}
/// Sends an update delta to the given worker.
RemoteUpdateResult RemoteUpdate(int worker_id,
@ -143,7 +142,7 @@ class RemoteUpdatesRpcClients {
}
private:
RpcWorkerClients worker_clients_;
RpcWorkerClients &worker_clients_;
void RaiseIfRemoteError(RemoteUpdateResult result) {
switch (result) {

View File

@ -7,6 +7,9 @@
#include "communication/rpc/client_pool.hpp"
#include "distributed/coordination.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
namespace distributed {
@ -68,4 +71,27 @@ class RpcWorkerClients {
std::mutex lock_;
};
/** Wrapper class around a RPC call to build indices.
*/
class IndexRpcClients {
public:
IndexRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
auto GetBuildIndexFutures(const storage::Label &label,
const storage::Property &property,
tx::transaction_id_t transaction_id,
int worker_id) {
return clients_.ExecuteOnWorkers<bool>(
worker_id, [label, property, transaction_id](
communication::rpc::ClientPool &client_pool) {
return client_pool.Call<BuildIndexRpc>(
distributed::IndexLabelPropertyTx{
label, property, transaction_id}) != nullptr;
});
}
private:
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -10,14 +10,14 @@ namespace storage {
template <> \
type WorkerConcurrentIdMapper<type>::RpcValueToId( \
const std::string &value) { \
auto response = rpc_client_pool_.Call<type##IdRpc>(value); \
auto response = master_client_pool_.Call<type##IdRpc>(value); \
CHECK(response) << ("Failed to obtain " #type " from master"); \
return response->member; \
} \
\
template <> \
std::string WorkerConcurrentIdMapper<type>::RpcIdToValue(type id) { \
auto response = rpc_client_pool_.Call<Id##type##Rpc>(id); \
auto response = master_client_pool_.Call<Id##type##Rpc>(id); \
CHECK(response) << ("Failed to obtain " #type " value from master"); \
return response->member; \
}
@ -31,8 +31,8 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper(
const io::network::Endpoint &master_endpoint)
: rpc_client_pool_(master_endpoint) {}
communication::rpc::ClientPool &master_client_pool)
: master_client_pool_(master_client_pool) {}
template <typename TId>
TId WorkerConcurrentIdMapper<TId>::value_to_id(const std::string &value) {

View File

@ -17,7 +17,7 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> {
std::string RpcIdToValue(TId id);
public:
WorkerConcurrentIdMapper(const io::network::Endpoint &master_endpoint);
WorkerConcurrentIdMapper(communication::rpc::ClientPool &master_client_pool);
TId value_to_id(const std::string &value) override;
const std::string &id_to_value(const TId &id) override;
@ -29,6 +29,6 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> {
ConcurrentMap<TId, std::string> id_to_value_cache_;
// Communication to the concurrent ID master.
mutable communication::rpc::ClientPool rpc_client_pool_;
communication::rpc::ClientPool &master_client_pool_;
};
} // namespace storage

View File

@ -8,8 +8,8 @@
namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_pool_(endpoint) {}
WorkerEngine::WorkerEngine(communication::rpc::ClientPool &master_client_pool)
: master_client_pool_(master_client_pool) {}
WorkerEngine::~WorkerEngine() {
for (auto &kv : active_.access()) {
@ -18,7 +18,7 @@ WorkerEngine::~WorkerEngine() {
}
Transaction *WorkerEngine::Begin() {
auto data = rpc_client_pool_.Call<BeginRpc>()->member;
auto data = master_client_pool_.Call<BeginRpc>()->member;
UpdateOldestActive(data.snapshot, data.tx_id);
Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this);
auto insertion = active_.access().insert(data.tx_id, tx);
@ -27,7 +27,7 @@ Transaction *WorkerEngine::Begin() {
}
command_id_t WorkerEngine::Advance(transaction_id_t tx_id) {
auto res = rpc_client_pool_.Call<AdvanceRpc>(tx_id);
auto res = master_client_pool_.Call<AdvanceRpc>(tx_id);
auto access = active_.access();
auto found = access.find(tx_id);
CHECK(found != access.end())
@ -37,7 +37,7 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) {
}
command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) {
command_id_t cmd_id = rpc_client_pool_.Call<CommandRpc>(tx_id)->member;
command_id_t cmd_id = master_client_pool_.Call<CommandRpc>(tx_id)->member;
// Assume there is no concurrent work being done on this worker in the given
// transaction. This assumption is sound because command advancing needs to be
@ -55,12 +55,12 @@ command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) {
}
void WorkerEngine::Commit(const Transaction &t) {
auto res = rpc_client_pool_.Call<CommitRpc>(t.id_);
auto res = master_client_pool_.Call<CommitRpc>(t.id_);
ClearSingleTransaction(t.id_);
}
void WorkerEngine::Abort(const Transaction &t) {
auto res = rpc_client_pool_.Call<AbortRpc>(t.id_);
auto res = master_client_pool_.Call<AbortRpc>(t.id_);
ClearSingleTransaction(t.id_);
}
@ -71,7 +71,7 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
if (!(info.is_aborted() || info.is_committed())) {
// @review: this version of Call is just used because Info has no
// default constructor.
info = rpc_client_pool_.Call<ClogInfoRpc>(tid)->member;
info = master_client_pool_.Call<ClogInfoRpc>(tid)->member;
if (!info.is_active()) {
if (info.is_committed()) clog_.set_committed(tid);
if (info.is_aborted()) clog_.set_aborted(tid);
@ -83,14 +83,14 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
}
Snapshot WorkerEngine::GlobalGcSnapshot() {
auto snapshot = std::move(rpc_client_pool_.Call<GcSnapshotRpc>()->member);
auto snapshot = std::move(master_client_pool_.Call<GcSnapshotRpc>()->member);
UpdateOldestActive(snapshot, local_last_.load());
return snapshot;
}
Snapshot WorkerEngine::GlobalActiveTransactions() {
auto snapshot =
std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member);
std::move(master_client_pool_.Call<ActiveTransactionsRpc>()->member);
UpdateOldestActive(snapshot, local_last_.load());
return snapshot;
}
@ -111,7 +111,8 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) {
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
auto snapshot = std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member);
auto snapshot =
std::move(master_client_pool_.Call<SnapshotRpc>(tx_id)->member);
UpdateOldestActive(snapshot, local_last_.load());
return RunningTransaction(tx_id, snapshot);
}

View File

@ -20,7 +20,7 @@ class WorkerEngine : public Engine {
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
WorkerEngine(const io::network::Endpoint &endpoint);
WorkerEngine(communication::rpc::ClientPool &master_client_pool);
~WorkerEngine();
Transaction *Begin() override;
@ -53,7 +53,7 @@ class WorkerEngine : public Engine {
mutable CommitLog clog_;
// Communication to the transactional master.
mutable communication::rpc::ClientPool rpc_client_pool_;
communication::rpc::ClientPool &master_client_pool_;
// Used for clearing of caches of transactions that have expired.
// Initialize the oldest_active_ with 1 because there's never a tx with id=0

View File

@ -13,18 +13,22 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test {
protected:
communication::rpc::Server master_server_{{kLocal, 0}};
std::experimental::optional<communication::rpc::ClientPool>
master_client_pool_;
std::experimental::optional<storage::MasterConcurrentIdMapper<TId>>
master_mapper_;
std::experimental::optional<storage::WorkerConcurrentIdMapper<TId>>
worker_mapper_;
void SetUp() override {
master_client_pool_.emplace(master_server_.endpoint());
master_mapper_.emplace(master_server_);
worker_mapper_.emplace(master_server_.endpoint());
worker_mapper_.emplace(master_client_pool_.value());
}
void TearDown() override {
worker_mapper_ = std::experimental::nullopt;
master_mapper_ = std::experimental::nullopt;
master_client_pool_ = std::experimental::nullopt;
}
};

View File

@ -8,9 +8,10 @@ const std::string kLocal = "127.0.0.1";
TEST(CountersDistributed, All) {
communication::rpc::Server master_server({kLocal, 0});
database::MasterCounters master(master_server);
communication::rpc::ClientPool master_client_pool(master_server.endpoint());
database::WorkerCounters w1(master_server.endpoint());
database::WorkerCounters w2(master_server.endpoint());
database::WorkerCounters w1(master_client_pool);
database::WorkerCounters w2(master_client_pool);
EXPECT_EQ(w1.Get("a"), 0);
EXPECT_EQ(w1.Get("a"), 1);

View File

@ -20,8 +20,9 @@ class WorkerEngineTest : public testing::Test {
Server master_server_{{local, 0}};
MasterEngine master_{master_server_};
ClientPool master_client_pool{master_server_.endpoint()};
WorkerEngine worker_{master_server_.endpoint()};
WorkerEngine worker_{master_client_pool};
};
TEST_F(WorkerEngineTest, BeginOnWorker) {