From 3df590a84239b70f6fdcd0b2a9cf5d65a525425a Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Thu, 29 Mar 2018 15:24:15 +0200 Subject: [PATCH] Fix distributed master shutdown Summary: Master shouldn't stop processing rpc calls immediately on shutdown. It should wait for all workers to stop, and then destroy itself. Reviewers: dgleich, mferencevic Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1330 --- src/communication/rpc/client.hpp | 2 +- src/database/graph_db.cpp | 7 ------- src/distributed/coordination_master.cpp | 3 +++ 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/communication/rpc/client.hpp b/src/communication/rpc/client.hpp index d02ffc8ff..b5712028e 100644 --- a/src/communication/rpc/client.hpp +++ b/src/communication/rpc/client.hpp @@ -46,7 +46,7 @@ class Client { return nullptr; } - if (VLOG_IS_ON(12)) { + if (VLOG_IS_ON(12) && response) { auto res_type = utils::Demangle(response->type_index().name()); LOG(INFO) << "[RpcClient] received " << (res_type ? res_type.value() : ""); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 27410b53f..f3ab463fe 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -160,13 +160,6 @@ class Master : public PrivateBase { return index_rpc_clients_; } - ~Master() { - // The server is stopped explicitly here to disable RPC calls during the - // destruction of this object. This works because this destructor is called - // before the destructors of all objects. - server_.StopProcessingCalls(); - } - communication::rpc::Server server_{ config_.master_endpoint, static_cast(config_.rpc_num_workers)}; tx::MasterEngine tx_engine_{server_, &wal_}; diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index 8aba091dc..e63623844 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -47,6 +47,9 @@ MasterCoordination::~MasterCoordination() { auto result = client.Call(); CHECK(result) << "StopWorkerRpc failed work worker: " << kv.first; } + + // Make sure all StopWorkerRpc request/response are exchanged. + std::this_thread::sleep_for(2s); } Endpoint MasterCoordination::GetEndpoint(int worker_id) {