diff --git a/src/database/counters.cpp b/src/database/counters.cpp index a399cfb2c..62aa2616f 100644 --- a/src/database/counters.cpp +++ b/src/database/counters.cpp @@ -41,18 +41,19 @@ MasterCounters::MasterCounters(communication::rpc::Server &server) }); } -WorkerCounters::WorkerCounters(const io::network::Endpoint &master_endpoint) - : rpc_client_pool_(master_endpoint) {} +WorkerCounters::WorkerCounters( + communication::rpc::ClientPool &master_client_pool) + : master_client_pool_(master_client_pool) {} int64_t WorkerCounters::Get(const std::string &name) { - auto response = rpc_client_pool_.Call<CountersGetRpc>(name); + auto response = master_client_pool_.Call<CountersGetRpc>(name); CHECK(response) << "CountersGetRpc - failed to get response from master"; return response->member; } void WorkerCounters::Set(const std::string &name, int64_t value) { auto response = - rpc_client_pool_.Call<CountersSetRpc>(CountersSetReqData{name, value}); + master_client_pool_.Call<CountersSetRpc>(CountersSetReqData{name, value}); CHECK(response) << "CountersSetRpc - failed to get response from master"; } diff --git a/src/database/counters.hpp b/src/database/counters.hpp index 06838dea2..c5661107b 100644 --- a/src/database/counters.hpp +++ b/src/database/counters.hpp @@ -54,13 +54,13 @@ class MasterCounters : public SingleNodeCounters { /** Implementation for distributed worker. */ class WorkerCounters : public Counters { public: - WorkerCounters(const io::network::Endpoint &master_endpoint); + WorkerCounters(communication::rpc::ClientPool &master_client_pool); int64_t Get(const std::string &name) override; void Set(const std::string &name, int64_t value) override; private: - communication::rpc::ClientPool rpc_client_pool_; + communication::rpc::ClientPool &master_client_pool_; }; } // namespace database diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index f0b2c025f..9cd9d1a89 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -54,7 +54,7 @@ class PrivateBase : public GraphDb { distributed::PlanDispatcher &plan_dispatcher() override { LOG(FATAL) << "Plan dispatcher only available in distributed master."; } - distributed::RpcWorkerClients &index_rpc_clients() override { + distributed::IndexRpcClients &index_rpc_clients() override { LOG(FATAL) << "Index RPC clients only available in distributed master."; } @@ -162,7 +162,7 @@ class Master : public PrivateBase { distributed::RemotePullRpcClients &remote_pull_clients() override { return remote_pull_clients_; } - distributed::RpcWorkerClients &index_rpc_clients() override { + distributed::IndexRpcClients &index_rpc_clients() override { return index_rpc_clients_; } @@ -178,15 +178,17 @@ class Master : public PrivateBase { tx::MasterEngine tx_engine_{server_, &wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; distributed::MasterCoordination coordination_{server_}; + distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_}; database::MasterCounters counters_{server_}; distributed::RemoteDataRpcServer remote_data_server_{*this, server_}; - distributed::RemoteDataRpcClients remote_data_clients_{coordination_}; - distributed::PlanDispatcher plan_dispatcher_{coordination_}; - distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; - distributed::RpcWorkerClients index_rpc_clients_{coordination_}; + distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_}; + distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_}; + distributed::RemotePullRpcClients remote_pull_clients_{rpc_worker_clients_}; + distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_}; distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; - distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; + distributed::RemoteUpdatesRpcClients remote_updates_clients_{ + rpc_worker_clients_}; distributed::RemoteDataManager remote_data_manager_{storage_, remote_data_clients_}; distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; @@ -223,18 +225,21 @@ class Worker : public PrivateBase { config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)}; distributed::WorkerCoordination coordination_{server_, config_.master_endpoint}; - tx::WorkerEngine tx_engine_{config_.master_endpoint}; + distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; + tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; - TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{config_.master_endpoint}; - database::WorkerCounters counters_{config_.master_endpoint}; + TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{ + rpc_worker_clients_.GetClientPool(0)}; + database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)}; distributed::RemoteDataRpcServer remote_data_server_{*this, server_}; - distributed::RemoteDataRpcClients remote_data_clients_{coordination_}; + distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_}; distributed::PlanConsumer plan_consumer_{server_}; distributed::RemoteProduceRpcServer remote_produce_server_{ *this, tx_engine_, server_, plan_consumer_}; distributed::IndexRpcServer index_rpc_server_{*this, server_}; distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; - distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; + distributed::RemoteUpdatesRpcClients remote_updates_clients_{ + rpc_worker_clients_}; distributed::RemoteDataManager remote_data_manager_{storage_, remote_data_clients_}; distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; @@ -291,7 +296,7 @@ distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { distributed::PlanDispatcher &PublicBase::plan_dispatcher() { return impl_->plan_dispatcher(); } -distributed::RpcWorkerClients &PublicBase::index_rpc_clients() { +distributed::IndexRpcClients &PublicBase::index_rpc_clients() { return impl_->index_rpc_clients(); } distributed::PlanConsumer &PublicBase::plan_consumer() { diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 3bb3ad3e0..3f2aa9e69 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -7,7 +7,6 @@ #include "database/counters.hpp" #include "database/storage.hpp" #include "database/storage_gc.hpp" -#include "distributed/rpc_worker_clients.hpp" #include "durability/wal.hpp" #include "io/network/endpoint.hpp" #include "storage/concurrent_id_mapper.hpp" @@ -25,6 +24,7 @@ class RemoteProduceRpcServer; class RemoteUpdatesRpcServer; class RemoteUpdatesRpcClients; class RemoteDataManager; +class IndexRpcClients; } namespace database { @@ -104,7 +104,7 @@ class GraphDb { // Supported only in distributed master. virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; virtual distributed::PlanDispatcher &plan_dispatcher() = 0; - virtual distributed::RpcWorkerClients &index_rpc_clients() = 0; + virtual distributed::IndexRpcClients &index_rpc_clients() = 0; // Supported only in distributed worker. // TODO remove once end2end testing is possible. @@ -141,7 +141,7 @@ class PublicBase : public GraphDb { distributed::RemoteDataRpcServer &remote_data_server() override; distributed::RemoteDataRpcClients &remote_data_clients() override; distributed::PlanDispatcher &plan_dispatcher() override; - distributed::RpcWorkerClients &index_rpc_clients() override; + distributed::IndexRpcClients &index_rpc_clients() override; distributed::PlanConsumer &plan_consumer() override; distributed::RemotePullRpcClients &remote_pull_clients() override; distributed::RemoteProduceRpcServer &remote_produce_server() override; diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index c697f6319..008ed4809 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -5,9 +5,9 @@ #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" -#include "distributed/index_rpc_messages.hpp" #include "distributed/remote_data_manager.hpp" #include "distributed/remote_updates_rpc_clients.hpp" +#include "distributed/rpc_worker_clients.hpp" #include "storage/address_types.hpp" #include "storage/edge.hpp" #include "storage/edge_accessor.hpp" @@ -183,15 +183,8 @@ void GraphDbAccessor::BuildIndex(storage::Label label, // Notify all workers to start building an index if we are the master since // they don't have to wait anymore if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) { - index_rpc_completions.emplace( - db_.index_rpc_clients().ExecuteOnWorkers<bool>( - this->db_.WorkerId(), - [label, property, - this](communication::rpc::ClientPool &client_pool) { - return client_pool.Call<distributed::BuildIndexRpc>( - distributed::IndexLabelPropertyTx{ - label, property, transaction_id()}) != nullptr; - })); + index_rpc_completions.emplace(db_.index_rpc_clients().GetBuildIndexFutures( + label, property, transaction_id(), this->db_.WorkerId())); } // Add transaction to the build_tx_in_progress as this transaction doesn't diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp index 4867ad14d..2a77f0145 100644 --- a/src/distributed/plan_dispatcher.cpp +++ b/src/distributed/plan_dispatcher.cpp @@ -2,8 +2,7 @@ namespace distributed { -PlanDispatcher::PlanDispatcher(Coordination &coordination) - : clients_(coordination) {} +PlanDispatcher::PlanDispatcher(RpcWorkerClients &clients) : clients_(clients) {} void PlanDispatcher::DispatchPlan( int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan, diff --git a/src/distributed/plan_dispatcher.hpp b/src/distributed/plan_dispatcher.hpp index c08aeb21a..7645435c6 100644 --- a/src/distributed/plan_dispatcher.hpp +++ b/src/distributed/plan_dispatcher.hpp @@ -13,7 +13,7 @@ namespace distributed { */ class PlanDispatcher { public: - explicit PlanDispatcher(Coordination &coordination); + explicit PlanDispatcher(RpcWorkerClients &clients); /** * Synchronously dispatch a plan to all workers and wait for their @@ -24,7 +24,7 @@ class PlanDispatcher { const SymbolTable &symbol_table); private: - RpcWorkerClients clients_; + RpcWorkerClients &clients_; }; } // namespace distributed diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/remote_data_rpc_clients.hpp index cdf74f8c1..f6cc24d57 100644 --- a/src/distributed/remote_data_rpc_clients.hpp +++ b/src/distributed/remote_data_rpc_clients.hpp @@ -14,7 +14,7 @@ namespace distributed { /** Provides access to other worker's data. */ class RemoteDataRpcClients { public: - RemoteDataRpcClients(Coordination &coordination) : clients_(coordination) {} + RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} /// Returns a remote worker's data for the given params. That worker must own /// the vertex for the given id, and that vertex must be visible in given @@ -43,7 +43,7 @@ class RemoteDataRpcClients { gid::Gid gid); private: - RpcWorkerClients clients_; + RpcWorkerClients &clients_; }; template <> diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 6878df2f6..88422a0a6 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -21,8 +21,7 @@ class RemotePullRpcClients { using ClientPool = communication::rpc::ClientPool; public: - RemotePullRpcClients(Coordination &coordination) - : clients_(coordination) {} + RemotePullRpcClients(RpcWorkerClients &clients) : clients_(clients) {} /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this /// function for the same (tx_id, worker_id, plan_id) before the previous call @@ -95,6 +94,6 @@ class RemotePullRpcClients { } private: - RpcWorkerClients clients_; + RpcWorkerClients &clients_; }; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index e75911c09..225d96e80 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -4,7 +4,6 @@ #include <vector> #include "database/state_delta.hpp" -#include "distributed/coordination.hpp" #include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" #include "query/exceptions.hpp" @@ -21,8 +20,8 @@ namespace distributed { /// apply the accumulated deferred updates, or discard them. class RemoteUpdatesRpcClients { public: - explicit RemoteUpdatesRpcClients(distributed::Coordination &coordination) - : worker_clients_(coordination) {} + explicit RemoteUpdatesRpcClients(RpcWorkerClients &clients) + : worker_clients_(clients) {} /// Sends an update delta to the given worker. RemoteUpdateResult RemoteUpdate(int worker_id, @@ -143,7 +142,7 @@ class RemoteUpdatesRpcClients { } private: - RpcWorkerClients worker_clients_; + RpcWorkerClients &worker_clients_; void RaiseIfRemoteError(RemoteUpdateResult result) { switch (result) { diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index 23b4cdd4e..2c1c9bb7e 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -7,6 +7,9 @@ #include "communication/rpc/client_pool.hpp" #include "distributed/coordination.hpp" +#include "distributed/index_rpc_messages.hpp" +#include "storage/types.hpp" +#include "transactions/transaction.hpp" namespace distributed { @@ -68,4 +71,27 @@ class RpcWorkerClients { std::mutex lock_; }; +/** Wrapper class around a RPC call to build indices. + */ +class IndexRpcClients { + public: + IndexRpcClients(RpcWorkerClients &clients) : clients_(clients) {} + + auto GetBuildIndexFutures(const storage::Label &label, + const storage::Property &property, + tx::transaction_id_t transaction_id, + int worker_id) { + return clients_.ExecuteOnWorkers<bool>( + worker_id, [label, property, transaction_id]( + communication::rpc::ClientPool &client_pool) { + return client_pool.Call<BuildIndexRpc>( + distributed::IndexLabelPropertyTx{ + label, property, transaction_id}) != nullptr; + }); + } + + private: + RpcWorkerClients &clients_; +}; + } // namespace distributed diff --git a/src/storage/concurrent_id_mapper_worker.cpp b/src/storage/concurrent_id_mapper_worker.cpp index 708b981fc..be78c0d7c 100644 --- a/src/storage/concurrent_id_mapper_worker.cpp +++ b/src/storage/concurrent_id_mapper_worker.cpp @@ -10,14 +10,14 @@ namespace storage { template <> \ type WorkerConcurrentIdMapper<type>::RpcValueToId( \ const std::string &value) { \ - auto response = rpc_client_pool_.Call<type##IdRpc>(value); \ + auto response = master_client_pool_.Call<type##IdRpc>(value); \ CHECK(response) << ("Failed to obtain " #type " from master"); \ return response->member; \ } \ \ template <> \ std::string WorkerConcurrentIdMapper<type>::RpcIdToValue(type id) { \ - auto response = rpc_client_pool_.Call<Id##type##Rpc>(id); \ + auto response = master_client_pool_.Call<Id##type##Rpc>(id); \ CHECK(response) << ("Failed to obtain " #type " value from master"); \ return response->member; \ } @@ -31,8 +31,8 @@ ID_VALUE_RPC_CALLS(Property) template <typename TId> WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper( - const io::network::Endpoint &master_endpoint) - : rpc_client_pool_(master_endpoint) {} + communication::rpc::ClientPool &master_client_pool) + : master_client_pool_(master_client_pool) {} template <typename TId> TId WorkerConcurrentIdMapper<TId>::value_to_id(const std::string &value) { diff --git a/src/storage/concurrent_id_mapper_worker.hpp b/src/storage/concurrent_id_mapper_worker.hpp index 89da2ee41..5a45299a8 100644 --- a/src/storage/concurrent_id_mapper_worker.hpp +++ b/src/storage/concurrent_id_mapper_worker.hpp @@ -17,7 +17,7 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> { std::string RpcIdToValue(TId id); public: - WorkerConcurrentIdMapper(const io::network::Endpoint &master_endpoint); + WorkerConcurrentIdMapper(communication::rpc::ClientPool &master_client_pool); TId value_to_id(const std::string &value) override; const std::string &id_to_value(const TId &id) override; @@ -29,6 +29,6 @@ class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId> { ConcurrentMap<TId, std::string> id_to_value_cache_; // Communication to the concurrent ID master. - mutable communication::rpc::ClientPool rpc_client_pool_; + communication::rpc::ClientPool &master_client_pool_; }; } // namespace storage diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index df9b1eb13..0381910d9 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -8,8 +8,8 @@ namespace tx { -WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) - : rpc_client_pool_(endpoint) {} +WorkerEngine::WorkerEngine(communication::rpc::ClientPool &master_client_pool) + : master_client_pool_(master_client_pool) {} WorkerEngine::~WorkerEngine() { for (auto &kv : active_.access()) { @@ -18,7 +18,7 @@ WorkerEngine::~WorkerEngine() { } Transaction *WorkerEngine::Begin() { - auto data = rpc_client_pool_.Call<BeginRpc>()->member; + auto data = master_client_pool_.Call<BeginRpc>()->member; UpdateOldestActive(data.snapshot, data.tx_id); Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this); auto insertion = active_.access().insert(data.tx_id, tx); @@ -27,7 +27,7 @@ Transaction *WorkerEngine::Begin() { } command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { - auto res = rpc_client_pool_.Call<AdvanceRpc>(tx_id); + auto res = master_client_pool_.Call<AdvanceRpc>(tx_id); auto access = active_.access(); auto found = access.find(tx_id); CHECK(found != access.end()) @@ -37,7 +37,7 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { } command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { - command_id_t cmd_id = rpc_client_pool_.Call<CommandRpc>(tx_id)->member; + command_id_t cmd_id = master_client_pool_.Call<CommandRpc>(tx_id)->member; // Assume there is no concurrent work being done on this worker in the given // transaction. This assumption is sound because command advancing needs to be @@ -55,12 +55,12 @@ command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { } void WorkerEngine::Commit(const Transaction &t) { - auto res = rpc_client_pool_.Call<CommitRpc>(t.id_); + auto res = master_client_pool_.Call<CommitRpc>(t.id_); ClearSingleTransaction(t.id_); } void WorkerEngine::Abort(const Transaction &t) { - auto res = rpc_client_pool_.Call<AbortRpc>(t.id_); + auto res = master_client_pool_.Call<AbortRpc>(t.id_); ClearSingleTransaction(t.id_); } @@ -71,7 +71,7 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { if (!(info.is_aborted() || info.is_committed())) { // @review: this version of Call is just used because Info has no // default constructor. - info = rpc_client_pool_.Call<ClogInfoRpc>(tid)->member; + info = master_client_pool_.Call<ClogInfoRpc>(tid)->member; if (!info.is_active()) { if (info.is_committed()) clog_.set_committed(tid); if (info.is_aborted()) clog_.set_aborted(tid); @@ -83,14 +83,14 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { } Snapshot WorkerEngine::GlobalGcSnapshot() { - auto snapshot = std::move(rpc_client_pool_.Call<GcSnapshotRpc>()->member); + auto snapshot = std::move(master_client_pool_.Call<GcSnapshotRpc>()->member); UpdateOldestActive(snapshot, local_last_.load()); return snapshot; } Snapshot WorkerEngine::GlobalActiveTransactions() { auto snapshot = - std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member); + std::move(master_client_pool_.Call<ActiveTransactionsRpc>()->member); UpdateOldestActive(snapshot, local_last_.load()); return snapshot; } @@ -111,7 +111,8 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) { auto found = accessor.find(tx_id); if (found != accessor.end()) return found->second; - auto snapshot = std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member); + auto snapshot = + std::move(master_client_pool_.Call<SnapshotRpc>(tx_id)->member); UpdateOldestActive(snapshot, local_last_.load()); return RunningTransaction(tx_id, snapshot); } diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 0bda358db..5e2666989 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -20,7 +20,7 @@ class WorkerEngine : public Engine { /// expired on the master. static constexpr std::chrono::seconds kCacheReleasePeriod{1}; - WorkerEngine(const io::network::Endpoint &endpoint); + WorkerEngine(communication::rpc::ClientPool &master_client_pool); ~WorkerEngine(); Transaction *Begin() override; @@ -53,7 +53,7 @@ class WorkerEngine : public Engine { mutable CommitLog clog_; // Communication to the transactional master. - mutable communication::rpc::ClientPool rpc_client_pool_; + communication::rpc::ClientPool &master_client_pool_; // Used for clearing of caches of transactions that have expired. // Initialize the oldest_active_ with 1 because there's never a tx with id=0 diff --git a/tests/unit/concurrent_id_mapper_distributed.cpp b/tests/unit/concurrent_id_mapper_distributed.cpp index 420f26943..9f0dc8629 100644 --- a/tests/unit/concurrent_id_mapper_distributed.cpp +++ b/tests/unit/concurrent_id_mapper_distributed.cpp @@ -13,18 +13,22 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test { protected: communication::rpc::Server master_server_{{kLocal, 0}}; + std::experimental::optional<communication::rpc::ClientPool> + master_client_pool_; std::experimental::optional<storage::MasterConcurrentIdMapper<TId>> master_mapper_; std::experimental::optional<storage::WorkerConcurrentIdMapper<TId>> worker_mapper_; void SetUp() override { + master_client_pool_.emplace(master_server_.endpoint()); master_mapper_.emplace(master_server_); - worker_mapper_.emplace(master_server_.endpoint()); + worker_mapper_.emplace(master_client_pool_.value()); } void TearDown() override { worker_mapper_ = std::experimental::nullopt; master_mapper_ = std::experimental::nullopt; + master_client_pool_ = std::experimental::nullopt; } }; diff --git a/tests/unit/counters.cpp b/tests/unit/counters.cpp index dc0d6663c..fad665443 100644 --- a/tests/unit/counters.cpp +++ b/tests/unit/counters.cpp @@ -8,9 +8,10 @@ const std::string kLocal = "127.0.0.1"; TEST(CountersDistributed, All) { communication::rpc::Server master_server({kLocal, 0}); database::MasterCounters master(master_server); + communication::rpc::ClientPool master_client_pool(master_server.endpoint()); - database::WorkerCounters w1(master_server.endpoint()); - database::WorkerCounters w2(master_server.endpoint()); + database::WorkerCounters w1(master_client_pool); + database::WorkerCounters w2(master_client_pool); EXPECT_EQ(w1.Get("a"), 0); EXPECT_EQ(w1.Get("a"), 1); diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 95d9f0b7c..418c0a627 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -20,8 +20,9 @@ class WorkerEngineTest : public testing::Test { Server master_server_{{local, 0}}; MasterEngine master_{master_server_}; + ClientPool master_client_pool{master_server_.endpoint()}; - WorkerEngine worker_{master_server_.endpoint()}; + WorkerEngine worker_{master_client_pool}; }; TEST_F(WorkerEngineTest, BeginOnWorker) {