From 4cbfc800b8d728b369e1c1ce24d9560640b3a171 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Tue, 10 Apr 2018 13:07:01 +0200 Subject: [PATCH] Check that workers desired id is equal to the assigned id from master. Summary: Returns -1 from coordinations `AddWorker` method and propagate it to worker if master can't assign the desired worker id. Reviewers: dgleich, florijan Reviewed By: dgleich Subscribers: pullbot, buda Differential Revision: https://phabricator.memgraph.io/D1352 --- src/database/config.cpp | 8 ++++--- src/distributed/cluster_discovery_master.cpp | 19 ++++++++-------- src/distributed/cluster_discovery_worker.cpp | 10 ++++----- src/distributed/cluster_discovery_worker.hpp | 6 ++--- src/distributed/coordination_master.cpp | 22 ++++++++----------- src/distributed/coordination_master.hpp | 18 ++++++--------- src/distributed/coordination_rpc_messages.hpp | 10 ++++----- src/distributed/coordination_worker.cpp | 3 +-- src/distributed/coordination_worker.hpp | 2 +- tests/unit/distributed_coordination.cpp | 22 +++++++++++-------- 10 files changed, 59 insertions(+), 61 deletions(-) diff --git a/src/database/config.cpp b/src/database/config.cpp index 43e338d8f..b96c35c74 100644 --- a/src/database/config.cpp +++ b/src/database/config.cpp @@ -1,6 +1,7 @@ #include #include "database/graph_db.hpp" +#include "storage/gid.hpp" #include "utils/flag_validation.hpp" // Durability flags. @@ -27,9 +28,10 @@ DEFINE_int32(gc_cycle_sec, 30, "-1 to turn off."); // Distributed master/worker flags. -DEFINE_HIDDEN_int32(worker_id, 0, - "ID of a worker in a distributed system. Igored in " - "single-node and distributed-master."); +DEFINE_VALIDATED_HIDDEN_int32(worker_id, 0, + "ID of a worker in a distributed system. Igored " + "in single-node.", + FLAG_IN_RANGE(0, 1 << gid::kWorkerIdSize)); DEFINE_HIDDEN_string(master_host, "0.0.0.0", "For master node indicates the host served on. For worker " "node indicates the master location."); diff --git a/src/distributed/cluster_discovery_master.cpp b/src/distributed/cluster_discovery_master.cpp index 839d06c5b..63b2e6acf 100644 --- a/src/distributed/cluster_discovery_master.cpp +++ b/src/distributed/cluster_discovery_master.cpp @@ -12,19 +12,20 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster( coordination_(coordination), rpc_worker_clients_(rpc_worker_clients) { server_.Register([this](const RegisterWorkerReq &req) { - int assigned_worker_id = + bool registration_successful = this->coordination_.RegisterWorker(req.desired_worker_id, req.endpoint); - rpc_worker_clients_.ExecuteOnWorkers( - 0, - [assigned_worker_id, req](communication::rpc::ClientPool &client_pool) { - auto result = client_pool.Call( - assigned_worker_id, req.endpoint); - CHECK(result) << "ClusterDiscoveryRpc failed"; - }); + if (registration_successful) { + rpc_worker_clients_.ExecuteOnWorkers( + 0, [req](communication::rpc::ClientPool &client_pool) { + auto result = client_pool.Call( + req.desired_worker_id, req.endpoint); + CHECK(result) << "ClusterDiscoveryRpc failed"; + }); + } return std::make_unique( - assigned_worker_id, this->coordination_.GetWorkers()); + registration_successful, this->coordination_.GetWorkers()); }); } diff --git a/src/distributed/cluster_discovery_worker.cpp b/src/distributed/cluster_discovery_worker.cpp index b4c833130..1636b1e28 100644 --- a/src/distributed/cluster_discovery_worker.cpp +++ b/src/distributed/cluster_discovery_worker.cpp @@ -14,16 +14,16 @@ ClusterDiscoveryWorker::ClusterDiscoveryWorker( }); } -int ClusterDiscoveryWorker::RegisterWorker(int desired_worker_id) { - auto result = client_pool_.Call(desired_worker_id, - server_.endpoint()); +void ClusterDiscoveryWorker::RegisterWorker(int worker_id) { + auto result = + client_pool_.Call(worker_id, server_.endpoint()); CHECK(result) << "RegisterWorkerRpc failed"; + CHECK(result->registration_successful) << "Unable to assign requested ID (" + << worker_id << ") to worker!"; for (auto &kv : result->workers) { coordination_.RegisterWorker(kv.first, kv.second); } - - return result->assigned_worker_id; } } // namespace distributed diff --git a/src/distributed/cluster_discovery_worker.hpp b/src/distributed/cluster_discovery_worker.hpp index 28af8b658..723521925 100644 --- a/src/distributed/cluster_discovery_worker.hpp +++ b/src/distributed/cluster_discovery_worker.hpp @@ -22,10 +22,10 @@ class ClusterDiscoveryWorker final { /** * Registers a worker with the master. * - * @param worker_id - Desired ID. If -1, or if the desired ID is already - * taken, the worker gets the next available ID. + * @param worker_id - Desired ID. If master can't assign the desired worker + * id, worker will exit. */ - int RegisterWorker(int desired_worker_id = -1); + void RegisterWorker(int worker_id); private: Server &server_; diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index 7c22b9000..4a527af00 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -12,25 +12,21 @@ namespace distributed { MasterCoordination::MasterCoordination(const Endpoint &master_endpoint) : Coordination(master_endpoint) {} -int MasterCoordination::RegisterWorker(int desired_worker_id, - Endpoint endpoint) { +bool MasterCoordination::RegisterWorker(int desired_worker_id, + Endpoint endpoint) { std::lock_guard guard(lock_); auto workers = GetWorkers(); - // If there is a desired worker ID, try to set it. - if (desired_worker_id >= 0) { - if (workers.find(desired_worker_id) == workers.end()) { - AddWorker(desired_worker_id, endpoint); - return desired_worker_id; - } + // Check if the desired worker id already exists. + if (workers.find(desired_worker_id) != workers.end()) { LOG(WARNING) << "Unable to assign requested ID (" << desired_worker_id << ") to worker at: " << endpoint; + // If the desired worker ID is already assigned, return -1 and don't add + // that worker to master coordination. + return false; } - // Look for the next ID that's not used. - int worker_id = 1; - while (workers.find(worker_id) != workers.end()) ++worker_id; - AddWorker(worker_id, endpoint); - return worker_id; + AddWorker(desired_worker_id, endpoint); + return true; } Endpoint MasterCoordination::GetEndpoint(int worker_id) { diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index 7b5ebeee9..d57d8ba02 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -18,17 +18,13 @@ class MasterCoordination final : public Coordination { /** Shuts down all the workers and this master server. */ ~MasterCoordination(); - /** - * Registers a new worker with this master server. Notifies all the known - * workers of the new worker. - * - * @param desired_worker_id - The ID the worker would like to have. Set to - * -1 if the worker doesn't care. Does not guarantee that the desired ID will - * be returned, it is possible it's already occupied. If that's an error (for - * example in recovery), the worker should handle it as such. - * @return The assigned ID for the worker asking to become registered. - */ - int RegisterWorker(int desired_worker_id, Endpoint endpoint); + /** Registers a new worker with this master coordination. + * + * @param desired_worker_id - The ID the worker would like to have. + * @return True if the desired ID for the worker is available, or false + * if the desired ID is already taken. + */ + bool RegisterWorker(int desired_worker_id, Endpoint endpoint); Endpoint GetEndpoint(int worker_id); diff --git a/src/distributed/coordination_rpc_messages.hpp b/src/distributed/coordination_rpc_messages.hpp index f1bbba957..5b992f764 100644 --- a/src/distributed/coordination_rpc_messages.hpp +++ b/src/distributed/coordination_rpc_messages.hpp @@ -34,11 +34,11 @@ struct RegisterWorkerReq : public Message { }; struct RegisterWorkerRes : public Message { - RegisterWorkerRes(int assigned_worker_id, - std::unordered_map workers) - : assigned_worker_id(assigned_worker_id), workers(std::move(workers)) {} + RegisterWorkerRes(bool registration_successful, + const std::unordered_map &workers) + : registration_successful(registration_successful), workers(workers) {} - int assigned_worker_id; + bool registration_successful; std::unordered_map workers; private: @@ -48,7 +48,7 @@ struct RegisterWorkerRes : public Message { template void serialize(TArchive &ar, unsigned int) { ar &boost::serialization::base_object(*this); - ar &assigned_worker_id; + ar ®istration_successful; ar &workers; } }; diff --git a/src/distributed/coordination_worker.cpp b/src/distributed/coordination_worker.cpp index b1ea621d0..49b4f0404 100644 --- a/src/distributed/coordination_worker.cpp +++ b/src/distributed/coordination_worker.cpp @@ -16,10 +16,9 @@ WorkerCoordination::WorkerCoordination(communication::rpc::Server &server, const Endpoint &master_endpoint) : Coordination(master_endpoint), server_(server) {} -int WorkerCoordination::RegisterWorker(int worker_id, Endpoint endpoint) { +void WorkerCoordination::RegisterWorker(int worker_id, Endpoint endpoint) { std::lock_guard guard(lock_); AddWorker(worker_id, endpoint); - return worker_id; } void WorkerCoordination::WaitForShutdown() { diff --git a/src/distributed/coordination_worker.hpp b/src/distributed/coordination_worker.hpp index 01a656e9f..d18e44e74 100644 --- a/src/distributed/coordination_worker.hpp +++ b/src/distributed/coordination_worker.hpp @@ -18,7 +18,7 @@ class WorkerCoordination final : public Coordination { const Endpoint &master_endpoint); /** Registers the worker with the given endpoint. */ - int RegisterWorker(int worker_id, Endpoint endpoint); + void RegisterWorker(int worker_id, Endpoint endpoint); /** Starts listening for a remote shutdown command (issued by the master). * Blocks the calling thread until that has finished. */ diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index 7aab09da9..b8fbd3080 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -28,7 +28,7 @@ const std::string kLocal = "127.0.0.1"; class WorkerCoordinationInThread { public: WorkerCoordinationInThread(io::network::Endpoint master_endpoint, - int desired_id = -1) { + int desired_id) { std::atomic init_done{false}; worker_thread_ = std::thread([this, master_endpoint, desired_id, &init_done] { @@ -36,7 +36,10 @@ class WorkerCoordinationInThread { coord_.emplace(*server_, master_endpoint); client_pool_.emplace(master_endpoint); discovery_.emplace(*server_, *coord_, *client_pool_); - worker_id_ = discovery_->RegisterWorker(desired_id); + // Try and register the worker with the desired id. If another worker + // is already using the desired id it will exit here. + discovery_->RegisterWorker(desired_id); + worker_id_ = desired_id; init_done = true; coord_->WaitForShutdown(); }); @@ -68,9 +71,9 @@ TEST(Distributed, Coordination) { ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); - for (int i = 0; i < kWorkerCount; ++i) + for (int i = 1; i <= kWorkerCount; ++i) workers.emplace_back(std::make_unique( - master_server.endpoint())); + master_server.endpoint(), i)); // Expect that all workers have a different ID. std::unordered_set worker_ids; @@ -99,11 +102,12 @@ TEST(Distributed, DesiredAndUniqueId) { workers.emplace_back(std::make_unique( master_server.endpoint(), 42)); - workers.emplace_back(std::make_unique( - master_server.endpoint(), 42)); - EXPECT_EQ(workers[0]->worker_id(), 42); - EXPECT_NE(workers[1]->worker_id(), 42); + + EXPECT_DEATH( + workers.emplace_back(std::make_unique( + master_server.endpoint(), 42)), + ""); } for (auto &worker : workers) worker->join(); @@ -121,7 +125,7 @@ TEST(Distributed, CoordinationWorkersId) { workers.emplace_back(std::make_unique( master_server.endpoint(), 42)); workers.emplace_back(std::make_unique( - master_server.endpoint(), 42)); + master_server.endpoint(), 43)); std::vector ids; ids.push_back(0);