From e5a55a39e3596543e8318a2565a600a03718157c Mon Sep 17 00:00:00 2001 From: florijan Date: Wed, 31 Jan 2018 16:07:23 +0100 Subject: [PATCH] Fix distributed master index recovery from snapshot Summary: Change `GraphDb` so it exposes index clients in the same convention as other members. Reviewers: dgleich, mculinovic Reviewed By: mculinovic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1159 --- src/database/graph_db.cpp | 13 +++++++++---- src/database/graph_db.hpp | 5 ++--- src/database/graph_db_accessor.cpp | 17 ++++++++--------- src/database/graph_db_accessor.hpp | 9 --------- 4 files changed, 19 insertions(+), 25 deletions(-) diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index ef7b901d4..6e071d9dd 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -50,6 +50,9 @@ class PrivateBase : public GraphDb { distributed::PlanDispatcher &plan_dispatcher() override { LOG(FATAL) << "Plan dispatcher only available in distributed master."; } + distributed::RpcWorkerClients &index_rpc_clients() override { + LOG(FATAL) << "Index RPC clients only available in distributed master."; + } protected: Storage storage_{config_.worker_id}; @@ -129,6 +132,9 @@ class Master : public PrivateBase { distributed::RemotePullRpcClients &remote_pull_clients() override { return remote_pull_clients_; } + distributed::RpcWorkerClients &index_rpc_clients() override { + return index_rpc_clients_; + } communication::rpc::System system_{config_.master_endpoint}; tx::MasterEngine tx_engine_{system_, &wal_}; @@ -223,6 +229,9 @@ distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { distributed::PlanDispatcher &PublicBase::plan_dispatcher() { return impl_->plan_dispatcher(); } +distributed::RpcWorkerClients &PublicBase::index_rpc_clients() { + return impl_->index_rpc_clients(); +} distributed::PlanConsumer &PublicBase::plan_consumer() { return impl_->plan_consumer(); } @@ -286,10 +295,6 @@ io::network::Endpoint Master::GetEndpoint(int worker_id) { ->coordination_.GetEndpoint(worker_id); } -distributed::RpcWorkerClients &Master::GetIndexRpcClients() { - return dynamic_cast(impl_.get())->index_rpc_clients_; -} - Worker::Worker(Config config) : PublicBase(std::make_unique(config)) {} diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 2c0897306..1f8b7bc83 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -95,6 +95,7 @@ class GraphDb { // Supported only in distributed master. virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; virtual distributed::PlanDispatcher &plan_dispatcher() = 0; + virtual distributed::RpcWorkerClients &index_rpc_clients() = 0; // Supported only in distributed worker. // TODO remove once end2end testing is possible. @@ -130,6 +131,7 @@ class PublicBase : public GraphDb { distributed::RemoteDataRpcServer &remote_data_server() override; distributed::RemoteDataRpcClients &remote_data_clients() override; distributed::PlanDispatcher &plan_dispatcher() override; + distributed::RpcWorkerClients &index_rpc_clients() override; distributed::PlanConsumer &plan_consumer() override; distributed::RemotePullRpcClients &remote_pull_clients() override; distributed::RemoteProduceRpcServer &remote_produce_server() override; @@ -173,9 +175,6 @@ class Master : public MasterBase { /** Gets the endpoint of the worker with the given id. */ // TODO make const once Coordination::GetEndpoint is const. io::network::Endpoint GetEndpoint(int worker_id); - - /** Gets the index rpc workers*/ - distributed::RpcWorkerClients &GetIndexRpcClients(); }; class Worker : public impl::PublicBase { diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index d513c1971..d4270ffb0 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -169,15 +169,14 @@ void GraphDbAccessor::BuildIndex(storage::Label label, // Notify all workers to start building an index if we are the master since // they don't have to wait anymore if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) { - auto &rpc_clients = MasterGraphDb().GetIndexRpcClients(); - - index_rpc_completions.emplace(rpc_clients.ExecuteOnWorkers( - this->db_.WorkerId(), - [label, property, this](communication::rpc::Client &client) { - return client.Call( - distributed::IndexLabelPropertyTx{ - label, property, transaction_id()}) != nullptr; - })); + index_rpc_completions.emplace( + db_.index_rpc_clients().ExecuteOnWorkers( + this->db_.WorkerId(), + [label, property, this](communication::rpc::Client &client) { + return client.Call( + distributed::IndexLabelPropertyTx{ + label, property, transaction_id()}) != nullptr; + })); } // Add transaction to the build_tx_in_progress as this transaction doesn't diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 22530bd6c..b51fe8f6c 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -590,15 +590,6 @@ class GraphDbAccessor { return *single_node_engine; } - /** Casts the GraphDb to MasterGraphDb and returns it. If the - * GraphDb is not a MasterGraphDb, a call to this method will crash MG. */ - Master &MasterGraphDb() { - auto *master_graph_db = dynamic_cast(&db_); - DCHECK(master_graph_db) - << "Asked for Master Graph db on a distributed worker or single node"; - return *master_graph_db; - } - /** * Insert this vertex into corresponding label and label+property (if it * exists) index.