Check that workers desired id is equal to the assigned id from master.

Summary:
Returns -1 from coordinations `AddWorker` method and propagate it to
worker if master can't assign the desired worker id.

Reviewers: dgleich, florijan

Reviewed By: dgleich

Subscribers: pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D1352
This commit is contained in:
Matija Santl 2018-04-10 13:07:01 +02:00
parent a01c26439b
commit 4cbfc800b8
10 changed files with 59 additions and 61 deletions

View File

@ -1,6 +1,7 @@
#include <limits> #include <limits>
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "storage/gid.hpp"
#include "utils/flag_validation.hpp" #include "utils/flag_validation.hpp"
// Durability flags. // Durability flags.
@ -27,9 +28,10 @@ DEFINE_int32(gc_cycle_sec, 30,
"-1 to turn off."); "-1 to turn off.");
// Distributed master/worker flags. // Distributed master/worker flags.
DEFINE_HIDDEN_int32(worker_id, 0, DEFINE_VALIDATED_HIDDEN_int32(worker_id, 0,
"ID of a worker in a distributed system. Igored in " "ID of a worker in a distributed system. Igored "
"single-node and distributed-master."); "in single-node.",
FLAG_IN_RANGE(0, 1 << gid::kWorkerIdSize));
DEFINE_HIDDEN_string(master_host, "0.0.0.0", DEFINE_HIDDEN_string(master_host, "0.0.0.0",
"For master node indicates the host served on. For worker " "For master node indicates the host served on. For worker "
"node indicates the master location."); "node indicates the master location.");

View File

@ -12,19 +12,20 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster(
coordination_(coordination), coordination_(coordination),
rpc_worker_clients_(rpc_worker_clients) { rpc_worker_clients_(rpc_worker_clients) {
server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) { server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) {
int assigned_worker_id = bool registration_successful =
this->coordination_.RegisterWorker(req.desired_worker_id, req.endpoint); this->coordination_.RegisterWorker(req.desired_worker_id, req.endpoint);
rpc_worker_clients_.ExecuteOnWorkers<void>( if (registration_successful) {
0, rpc_worker_clients_.ExecuteOnWorkers<void>(
[assigned_worker_id, req](communication::rpc::ClientPool &client_pool) { 0, [req](communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<ClusterDiscoveryRpc>( auto result = client_pool.Call<ClusterDiscoveryRpc>(
assigned_worker_id, req.endpoint); req.desired_worker_id, req.endpoint);
CHECK(result) << "ClusterDiscoveryRpc failed"; CHECK(result) << "ClusterDiscoveryRpc failed";
}); });
}
return std::make_unique<RegisterWorkerRes>( return std::make_unique<RegisterWorkerRes>(
assigned_worker_id, this->coordination_.GetWorkers()); registration_successful, this->coordination_.GetWorkers());
}); });
} }

View File

@ -14,16 +14,16 @@ ClusterDiscoveryWorker::ClusterDiscoveryWorker(
}); });
} }
int ClusterDiscoveryWorker::RegisterWorker(int desired_worker_id) { void ClusterDiscoveryWorker::RegisterWorker(int worker_id) {
auto result = client_pool_.Call<RegisterWorkerRpc>(desired_worker_id, auto result =
server_.endpoint()); client_pool_.Call<RegisterWorkerRpc>(worker_id, server_.endpoint());
CHECK(result) << "RegisterWorkerRpc failed"; CHECK(result) << "RegisterWorkerRpc failed";
CHECK(result->registration_successful) << "Unable to assign requested ID ("
<< worker_id << ") to worker!";
for (auto &kv : result->workers) { for (auto &kv : result->workers) {
coordination_.RegisterWorker(kv.first, kv.second); coordination_.RegisterWorker(kv.first, kv.second);
} }
return result->assigned_worker_id;
} }
} // namespace distributed } // namespace distributed

View File

@ -22,10 +22,10 @@ class ClusterDiscoveryWorker final {
/** /**
* Registers a worker with the master. * Registers a worker with the master.
* *
* @param worker_id - Desired ID. If -1, or if the desired ID is already * @param worker_id - Desired ID. If master can't assign the desired worker
* taken, the worker gets the next available ID. * id, worker will exit.
*/ */
int RegisterWorker(int desired_worker_id = -1); void RegisterWorker(int worker_id);
private: private:
Server &server_; Server &server_;

View File

@ -12,25 +12,21 @@ namespace distributed {
MasterCoordination::MasterCoordination(const Endpoint &master_endpoint) MasterCoordination::MasterCoordination(const Endpoint &master_endpoint)
: Coordination(master_endpoint) {} : Coordination(master_endpoint) {}
int MasterCoordination::RegisterWorker(int desired_worker_id, bool MasterCoordination::RegisterWorker(int desired_worker_id,
Endpoint endpoint) { Endpoint endpoint) {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(lock_);
auto workers = GetWorkers(); auto workers = GetWorkers();
// If there is a desired worker ID, try to set it. // Check if the desired worker id already exists.
if (desired_worker_id >= 0) { if (workers.find(desired_worker_id) != workers.end()) {
if (workers.find(desired_worker_id) == workers.end()) {
AddWorker(desired_worker_id, endpoint);
return desired_worker_id;
}
LOG(WARNING) << "Unable to assign requested ID (" << desired_worker_id LOG(WARNING) << "Unable to assign requested ID (" << desired_worker_id
<< ") to worker at: " << endpoint; << ") to worker at: " << endpoint;
// If the desired worker ID is already assigned, return -1 and don't add
// that worker to master coordination.
return false;
} }
// Look for the next ID that's not used. AddWorker(desired_worker_id, endpoint);
int worker_id = 1; return true;
while (workers.find(worker_id) != workers.end()) ++worker_id;
AddWorker(worker_id, endpoint);
return worker_id;
} }
Endpoint MasterCoordination::GetEndpoint(int worker_id) { Endpoint MasterCoordination::GetEndpoint(int worker_id) {

View File

@ -18,17 +18,13 @@ class MasterCoordination final : public Coordination {
/** Shuts down all the workers and this master server. */ /** Shuts down all the workers and this master server. */
~MasterCoordination(); ~MasterCoordination();
/** /** Registers a new worker with this master coordination.
* Registers a new worker with this master server. Notifies all the known *
* workers of the new worker. * @param desired_worker_id - The ID the worker would like to have.
* * @return True if the desired ID for the worker is available, or false
* @param desired_worker_id - The ID the worker would like to have. Set to * if the desired ID is already taken.
* -1 if the worker doesn't care. Does not guarantee that the desired ID will */
* be returned, it is possible it's already occupied. If that's an error (for bool RegisterWorker(int desired_worker_id, Endpoint endpoint);
* example in recovery), the worker should handle it as such.
* @return The assigned ID for the worker asking to become registered.
*/
int RegisterWorker(int desired_worker_id, Endpoint endpoint);
Endpoint GetEndpoint(int worker_id); Endpoint GetEndpoint(int worker_id);

View File

@ -34,11 +34,11 @@ struct RegisterWorkerReq : public Message {
}; };
struct RegisterWorkerRes : public Message { struct RegisterWorkerRes : public Message {
RegisterWorkerRes(int assigned_worker_id, RegisterWorkerRes(bool registration_successful,
std::unordered_map<int, Endpoint> workers) const std::unordered_map<int, Endpoint> &workers)
: assigned_worker_id(assigned_worker_id), workers(std::move(workers)) {} : registration_successful(registration_successful), workers(workers) {}
int assigned_worker_id; bool registration_successful;
std::unordered_map<int, Endpoint> workers; std::unordered_map<int, Endpoint> workers;
private: private:
@ -48,7 +48,7 @@ struct RegisterWorkerRes : public Message {
template <class TArchive> template <class TArchive>
void serialize(TArchive &ar, unsigned int) { void serialize(TArchive &ar, unsigned int) {
ar &boost::serialization::base_object<Message>(*this); ar &boost::serialization::base_object<Message>(*this);
ar &assigned_worker_id; ar &registration_successful;
ar &workers; ar &workers;
} }
}; };

View File

@ -16,10 +16,9 @@ WorkerCoordination::WorkerCoordination(communication::rpc::Server &server,
const Endpoint &master_endpoint) const Endpoint &master_endpoint)
: Coordination(master_endpoint), server_(server) {} : Coordination(master_endpoint), server_(server) {}
int WorkerCoordination::RegisterWorker(int worker_id, Endpoint endpoint) { void WorkerCoordination::RegisterWorker(int worker_id, Endpoint endpoint) {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(lock_);
AddWorker(worker_id, endpoint); AddWorker(worker_id, endpoint);
return worker_id;
} }
void WorkerCoordination::WaitForShutdown() { void WorkerCoordination::WaitForShutdown() {

View File

@ -18,7 +18,7 @@ class WorkerCoordination final : public Coordination {
const Endpoint &master_endpoint); const Endpoint &master_endpoint);
/** Registers the worker with the given endpoint. */ /** Registers the worker with the given endpoint. */
int RegisterWorker(int worker_id, Endpoint endpoint); void RegisterWorker(int worker_id, Endpoint endpoint);
/** Starts listening for a remote shutdown command (issued by the master). /** Starts listening for a remote shutdown command (issued by the master).
* Blocks the calling thread until that has finished. */ * Blocks the calling thread until that has finished. */

View File

@ -28,7 +28,7 @@ const std::string kLocal = "127.0.0.1";
class WorkerCoordinationInThread { class WorkerCoordinationInThread {
public: public:
WorkerCoordinationInThread(io::network::Endpoint master_endpoint, WorkerCoordinationInThread(io::network::Endpoint master_endpoint,
int desired_id = -1) { int desired_id) {
std::atomic<bool> init_done{false}; std::atomic<bool> init_done{false};
worker_thread_ = worker_thread_ =
std::thread([this, master_endpoint, desired_id, &init_done] { std::thread([this, master_endpoint, desired_id, &init_done] {
@ -36,7 +36,10 @@ class WorkerCoordinationInThread {
coord_.emplace(*server_, master_endpoint); coord_.emplace(*server_, master_endpoint);
client_pool_.emplace(master_endpoint); client_pool_.emplace(master_endpoint);
discovery_.emplace(*server_, *coord_, *client_pool_); discovery_.emplace(*server_, *coord_, *client_pool_);
worker_id_ = discovery_->RegisterWorker(desired_id); // Try and register the worker with the desired id. If another worker
// is already using the desired id it will exit here.
discovery_->RegisterWorker(desired_id);
worker_id_ = desired_id;
init_done = true; init_done = true;
coord_->WaitForShutdown(); coord_->WaitForShutdown();
}); });
@ -68,9 +71,9 @@ TEST(Distributed, Coordination) {
ClusterDiscoveryMaster master_discovery_(master_server, master_coord, ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients); rpc_worker_clients);
for (int i = 0; i < kWorkerCount; ++i) for (int i = 1; i <= kWorkerCount; ++i)
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint())); master_server.endpoint(), i));
// Expect that all workers have a different ID. // Expect that all workers have a different ID.
std::unordered_set<int> worker_ids; std::unordered_set<int> worker_ids;
@ -99,11 +102,12 @@ TEST(Distributed, DesiredAndUniqueId) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42)); master_server.endpoint(), 42));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
EXPECT_EQ(workers[0]->worker_id(), 42); EXPECT_EQ(workers[0]->worker_id(), 42);
EXPECT_NE(workers[1]->worker_id(), 42);
EXPECT_DEATH(
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42)),
"");
} }
for (auto &worker : workers) worker->join(); for (auto &worker : workers) worker->join();
@ -121,7 +125,7 @@ TEST(Distributed, CoordinationWorkersId) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42)); master_server.endpoint(), 42));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>( workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42)); master_server.endpoint(), 43));
std::vector<int> ids; std::vector<int> ids;
ids.push_back(0); ids.push_back(0);