Notify master of worker recovery

Summary: Notify master when workers finish recovering such that master doesn't start doing stuff before they recovered

Reviewers: buda, msantl, mferencevic

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1389
This commit is contained in:
Dominik Gleich 2018-05-09 13:26:22 +02:00
parent 30506f44f5
commit 797bd9e435
13 changed files with 104 additions and 6 deletions

View File

@ -62,6 +62,8 @@ BOOST_CLASS_EXPORT(distributed::ClusterDiscoveryReq);
BOOST_CLASS_EXPORT(distributed::ClusterDiscoveryRes);
BOOST_CLASS_EXPORT(distributed::StopWorkerReq);
BOOST_CLASS_EXPORT(distributed::StopWorkerRes);
BOOST_CLASS_EXPORT(distributed::NotifyWorkerRecoveredReq);
BOOST_CLASS_EXPORT(distributed::NotifyWorkerRecoveredRes);
// Distributed data exchange.
BOOST_CLASS_EXPORT(distributed::EdgeReq);

View File

@ -54,6 +54,10 @@ DEFINE_VALIDATED_HIDDEN_int32(rpc_num_workers,
std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers (RPC)",
FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_VALIDATED_int32(recovering_cluster_size, 0,
"Number of workers (including master) in the "
"previously snapshooted/wal cluster",
FLAG_IN_RANGE(0, INT32_MAX));
#endif
// clang-format off
@ -76,7 +80,8 @@ database::Config::Config()
master_endpoint{FLAGS_master_host,
static_cast<uint16_t>(FLAGS_master_port)},
worker_endpoint{FLAGS_worker_host,
static_cast<uint16_t>(FLAGS_worker_port)}
static_cast<uint16_t>(FLAGS_worker_port)},
recovering_cluster_size{FLAGS_recovering_cluster_size}
#endif
{}
// clang-format on

View File

@ -333,11 +333,25 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
case Type::DISTRIBUTED_MASTER:
dynamic_cast<impl::Master *>(impl_.get())
->coordination_.SetRecoveryInfo(recovery_info);
if (recovery_info) {
CHECK(impl_->config_.recovering_cluster_size > 0)
<< "Invalid cluster recovery size flag. Recovered cluster size "
"should be at least 1";
while (dynamic_cast<impl::Master *>(impl_.get())
->coordination_.CountRecoveredWorkers() !=
impl_->config_.recovering_cluster_size - 1) {
LOG(INFO) << "Waiting for workers to finish recovering..";
std::this_thread::sleep_for(2s);
}
}
break;
case Type::DISTRIBUTED_WORKER:
if (required_recovery_info != recovery_info)
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
dynamic_cast<impl::Worker *>(impl_.get())
->cluster_discovery_.NotifyWorkerRecovered();
break;
case Type::SINGLE_NODE:
break;

View File

@ -49,6 +49,7 @@ struct Config {
int worker_id{0};
io::network::Endpoint master_endpoint{"0.0.0.0", 0};
io::network::Endpoint worker_endpoint{"0.0.0.0", 0};
int recovering_cluster_size{0};
};
/**

View File

@ -1,5 +1,5 @@
#include "distributed/cluster_discovery_master.hpp"
#include "communication/rpc/client_pool.hpp"
#include "distributed/cluster_discovery_master.hpp"
#include "distributed/coordination_rpc_messages.hpp"
namespace distributed {
@ -28,6 +28,12 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster(
registration_successful, this->coordination_.RecoveryInfo(),
this->coordination_.GetWorkers());
});
server_.Register<NotifyWorkerRecoveredRpc>(
[this](const NotifyWorkerRecoveredReq &req) {
this->coordination_.WorkerRecovered(req.member);
return std::make_unique<NotifyWorkerRecoveredRes>();
});
}
} // namespace distributed

View File

@ -18,13 +18,22 @@ void ClusterDiscoveryWorker::RegisterWorker(int worker_id) {
auto result =
client_pool_.Call<RegisterWorkerRpc>(worker_id, server_.endpoint());
CHECK(result) << "RegisterWorkerRpc failed";
CHECK(result->registration_successful) << "Unable to assign requested ID ("
<< worker_id << ") to worker!";
CHECK(result->registration_successful)
<< "Unable to assign requested ID (" << worker_id << ") to worker!";
worker_id_ = worker_id;
for (auto &kv : result->workers) {
coordination_.RegisterWorker(kv.first, kv.second);
}
recovery_info_ = result->recovery_info;
}
void ClusterDiscoveryWorker::NotifyWorkerRecovered() {
CHECK(worker_id_ >= 0)
<< "Workers id is not yet assigned, preform registration before "
"notifying that the recovery finished";
auto result = client_pool_.Call<NotifyWorkerRecoveredRpc>(worker_id_);
CHECK(result) << "NotifyWorkerRecoveredRpc failed";
}
} // namespace distributed

View File

@ -30,10 +30,17 @@ class ClusterDiscoveryWorker final {
*/
void RegisterWorker(int worker_id);
/**
* Notifies the master that the worker finished recovering. Assumes that the
* worker was already registered with master.
*/
void NotifyWorkerRecovered();
/** Returns the recovery info. Valid only after registration. */
auto recovery_info() const { return recovery_info_; }
private:
int worker_id_{-1};
Server &server_;
WorkerCoordination &coordination_;
communication::rpc::ClientPool &client_pool_;

View File

@ -40,6 +40,11 @@ bool MasterCoordination::RegisterWorker(int desired_worker_id,
return true;
}
void MasterCoordination::WorkerRecovered(int worker_id) {
CHECK(recovered_workers_.insert(worker_id).second)
<< "Worker already notified about finishing recovery";
}
Endpoint MasterCoordination::GetEndpoint(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);
return Coordination::GetEndpoint(worker_id);
@ -73,6 +78,10 @@ void MasterCoordination::SetRecoveryInfo(
recovery_info_ = info;
}
int MasterCoordination::CountRecoveredWorkers() const {
return recovered_workers_.size();
}
std::experimental::optional<durability::RecoveryInfo>
MasterCoordination::RecoveryInfo() const {
std::lock_guard<std::mutex> guard(lock_);

View File

@ -2,6 +2,7 @@
#include <experimental/optional>
#include <mutex>
#include <set>
#include <unordered_map>
#include "distributed/coordination.hpp"
@ -28,6 +29,12 @@ class MasterCoordination final : public Coordination {
*/
bool RegisterWorker(int desired_worker_id, Endpoint endpoint);
/*
* Worker `worker_id` finished with recovering, adds it to the set of
* recovered workers.
*/
void WorkerRecovered(int worker_id);
Endpoint GetEndpoint(int worker_id);
/// Sets the recovery info. nullopt indicates nothing was recovered.
@ -36,6 +43,8 @@ class MasterCoordination final : public Coordination {
std::experimental::optional<durability::RecoveryInfo> RecoveryInfo() const;
int CountRecoveredWorkers() const;
private:
// Most master functions aren't thread-safe.
mutable std::mutex lock_;
@ -43,6 +52,8 @@ class MasterCoordination final : public Coordination {
/// Durabiliry recovery info.
/// Indicates if the recovery phase is done.
bool recovery_done_{false};
/// Set of workers that finished sucesfully recovering
std::set<int> recovered_workers_;
/// If nullopt nothing was recovered.
std::experimental::optional<durability::RecoveryInfo> recovery_info_;
};

View File

@ -81,15 +81,21 @@ struct ClusterDiscoveryReq : public Message {
};
RPC_NO_MEMBER_MESSAGE(ClusterDiscoveryRes);
RPC_NO_MEMBER_MESSAGE(StopWorkerReq);
RPC_NO_MEMBER_MESSAGE(StopWorkerRes);
RPC_SINGLE_MEMBER_MESSAGE(NotifyWorkerRecoveredReq, int);
RPC_NO_MEMBER_MESSAGE(NotifyWorkerRecoveredRes);
using RegisterWorkerRpc =
communication::rpc::RequestResponse<RegisterWorkerReq, RegisterWorkerRes>;
using StopWorkerRpc =
communication::rpc::RequestResponse<StopWorkerReq, StopWorkerRes>;
using NotifyWorkerRecoveredRpc =
communication::rpc::RequestResponse<NotifyWorkerRecoveredReq,
NotifyWorkerRecoveredRes>;
using ClusterDiscoveryRpc =
communication::rpc::RequestResponse<ClusterDiscoveryReq,
ClusterDiscoveryRes>;
} // namespace distributed

View File

@ -56,6 +56,7 @@ class MgCluster:
"--query-vertex-count-to-expand-existing", "-1",
"--num-workers", str(WORKERS),
"--rpc-num-workers", str(WORKERS),
"--recovering-cluster-size", str(len(self._workers) + 1)
])
# sleep to allow the master to startup

View File

@ -41,9 +41,14 @@ class DistributedGraphDbTest : public ::testing::Test {
master_config.master_endpoint = {kLocal, 0};
master_config.query_execution_time_sec = QueryExecutionTimeSec(0);
master_config.durability_directory = tmp_dir_;
// This is semantically wrong since this is not a cluster of size 1 but of
// size kWorkerCount+1, but it's hard to wait here for workers to recover
// and simultaneously assign the port to which the workers must connect
// TODO(dgleich): Fix sometime in the future - not mission critical
master_config.recovering_cluster_size = 1;
master_ = std::make_unique<database::Master>(modify_config(master_config));
std::this_thread::sleep_for(kInitTime);
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
database::Config config;
config.worker_id = worker_id;

View File

@ -60,6 +60,7 @@ class WorkerCoordinationInThread {
}
auto worker_ids() { return worker->coord.GetWorkerIds(); }
void join() { worker_thread_.join(); }
void NotifyWorkerRecovered() { worker->discovery.NotifyWorkerRecovered(); }
private:
std::thread worker_thread_;
@ -176,6 +177,27 @@ TEST(Distributed, ClusterDiscovery) {
for (auto &worker : workers) worker->join();
}
TEST(Distributed, KeepsTrackOfRecovered) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveryInfo(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
int worker_count = 10;
for (int i = 1; i <= worker_count; ++i) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), i));
workers.back()->NotifyWorkerRecovered();
EXPECT_THAT(master_coord.CountRecoveredWorkers(), i);
}
}
for (auto &worker : workers) worker->join();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";