Fix distributed master index recovery from snapshot

Summary:
Change `GraphDb` so it exposes index clients in the same
convention as other members.

Reviewers: dgleich, mculinovic

Reviewed By: mculinovic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1159
This commit is contained in:
florijan 2018-01-31 16:07:23 +01:00
parent 03e571ea3a
commit e5a55a39e3
4 changed files with 19 additions and 25 deletions

View File

@ -50,6 +50,9 @@ class PrivateBase : public GraphDb {
distributed::PlanDispatcher &plan_dispatcher() override { distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan dispatcher only available in distributed master."; LOG(FATAL) << "Plan dispatcher only available in distributed master.";
} }
distributed::RpcWorkerClients &index_rpc_clients() override {
LOG(FATAL) << "Index RPC clients only available in distributed master.";
}
protected: protected:
Storage storage_{config_.worker_id}; Storage storage_{config_.worker_id};
@ -129,6 +132,9 @@ class Master : public PrivateBase {
distributed::RemotePullRpcClients &remote_pull_clients() override { distributed::RemotePullRpcClients &remote_pull_clients() override {
return remote_pull_clients_; return remote_pull_clients_;
} }
distributed::RpcWorkerClients &index_rpc_clients() override {
return index_rpc_clients_;
}
communication::rpc::System system_{config_.master_endpoint}; communication::rpc::System system_{config_.master_endpoint};
tx::MasterEngine tx_engine_{system_, &wal_}; tx::MasterEngine tx_engine_{system_, &wal_};
@ -223,6 +229,9 @@ distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() {
distributed::PlanDispatcher &PublicBase::plan_dispatcher() { distributed::PlanDispatcher &PublicBase::plan_dispatcher() {
return impl_->plan_dispatcher(); return impl_->plan_dispatcher();
} }
distributed::RpcWorkerClients &PublicBase::index_rpc_clients() {
return impl_->index_rpc_clients();
}
distributed::PlanConsumer &PublicBase::plan_consumer() { distributed::PlanConsumer &PublicBase::plan_consumer() {
return impl_->plan_consumer(); return impl_->plan_consumer();
} }
@ -286,10 +295,6 @@ io::network::Endpoint Master::GetEndpoint(int worker_id) {
->coordination_.GetEndpoint(worker_id); ->coordination_.GetEndpoint(worker_id);
} }
distributed::RpcWorkerClients &Master::GetIndexRpcClients() {
return dynamic_cast<impl::Master *>(impl_.get())->index_rpc_clients_;
}
Worker::Worker(Config config) Worker::Worker(Config config)
: PublicBase(std::make_unique<impl::Worker>(config)) {} : PublicBase(std::make_unique<impl::Worker>(config)) {}

View File

@ -95,6 +95,7 @@ class GraphDb {
// Supported only in distributed master. // Supported only in distributed master.
virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0;
virtual distributed::PlanDispatcher &plan_dispatcher() = 0; virtual distributed::PlanDispatcher &plan_dispatcher() = 0;
virtual distributed::RpcWorkerClients &index_rpc_clients() = 0;
// Supported only in distributed worker. // Supported only in distributed worker.
// TODO remove once end2end testing is possible. // TODO remove once end2end testing is possible.
@ -130,6 +131,7 @@ class PublicBase : public GraphDb {
distributed::RemoteDataRpcServer &remote_data_server() override; distributed::RemoteDataRpcServer &remote_data_server() override;
distributed::RemoteDataRpcClients &remote_data_clients() override; distributed::RemoteDataRpcClients &remote_data_clients() override;
distributed::PlanDispatcher &plan_dispatcher() override; distributed::PlanDispatcher &plan_dispatcher() override;
distributed::RpcWorkerClients &index_rpc_clients() override;
distributed::PlanConsumer &plan_consumer() override; distributed::PlanConsumer &plan_consumer() override;
distributed::RemotePullRpcClients &remote_pull_clients() override; distributed::RemotePullRpcClients &remote_pull_clients() override;
distributed::RemoteProduceRpcServer &remote_produce_server() override; distributed::RemoteProduceRpcServer &remote_produce_server() override;
@ -173,9 +175,6 @@ class Master : public MasterBase {
/** Gets the endpoint of the worker with the given id. */ /** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const. // TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id); io::network::Endpoint GetEndpoint(int worker_id);
/** Gets the index rpc workers*/
distributed::RpcWorkerClients &GetIndexRpcClients();
}; };
class Worker : public impl::PublicBase { class Worker : public impl::PublicBase {

View File

@ -169,9 +169,8 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
// Notify all workers to start building an index if we are the master since // Notify all workers to start building an index if we are the master since
// they don't have to wait anymore // they don't have to wait anymore
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) { if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
auto &rpc_clients = MasterGraphDb().GetIndexRpcClients(); index_rpc_completions.emplace(
db_.index_rpc_clients().ExecuteOnWorkers<bool>(
index_rpc_completions.emplace(rpc_clients.ExecuteOnWorkers<bool>(
this->db_.WorkerId(), this->db_.WorkerId(),
[label, property, this](communication::rpc::Client &client) { [label, property, this](communication::rpc::Client &client) {
return client.Call<distributed::BuildIndexRpc>( return client.Call<distributed::BuildIndexRpc>(

View File

@ -590,15 +590,6 @@ class GraphDbAccessor {
return *single_node_engine; return *single_node_engine;
} }
/** Casts the GraphDb to MasterGraphDb and returns it. If the
* GraphDb is not a MasterGraphDb, a call to this method will crash MG. */
Master &MasterGraphDb() {
auto *master_graph_db = dynamic_cast<Master *>(&db_);
DCHECK(master_graph_db)
<< "Asked for Master Graph db on a distributed worker or single node";
return *master_graph_db;
}
/** /**
* Insert this vertex into corresponding label and label+property (if it * Insert this vertex into corresponding label and label+property (if it
* exists) index. * exists) index.