Add ClusterDiscovery RPC for distributed BFS
Summary: Implemented cluster discovery in distributed memgraph. When a worker registers, it sends a RPC request to master. The master assigns that worker an id and sends the information about other workers (pairs of <worker_id, endpoint>) to the new worker. Master also sends the information about the new worker to all existing workers in the process of worker registration. After the last worker registers, all memgraph instances in the clusters should know about every other. Reviewers: mtomic, buda, florijan Reviewed By: mtomic Subscribers: teon.banek, dgleich, pullbot Differential Revision: https://phabricator.memgraph.io/D1339
This commit is contained in:
parent
71565287b8
commit
7b88e514b8
@ -13,6 +13,9 @@ set(memgraph_src_files
|
||||
database/graph_db.cpp
|
||||
database/graph_db_accessor.cpp
|
||||
database/state_delta.cpp
|
||||
distributed/cluster_discovery_master.cpp
|
||||
distributed/cluster_discovery_worker.cpp
|
||||
distributed/coordination.cpp
|
||||
distributed/coordination_master.cpp
|
||||
distributed/coordination_worker.cpp
|
||||
distributed/durability_rpc_clients.cpp
|
||||
|
@ -56,8 +56,8 @@ BOOST_CLASS_EXPORT(tx::GlobalLastRes);
|
||||
// Distributed coordination.
|
||||
BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes);
|
||||
BOOST_CLASS_EXPORT(distributed::GetEndpointReq);
|
||||
BOOST_CLASS_EXPORT(distributed::GetEndpointRes);
|
||||
BOOST_CLASS_EXPORT(distributed::ClusterDiscoveryReq);
|
||||
BOOST_CLASS_EXPORT(distributed::ClusterDiscoveryRes);
|
||||
BOOST_CLASS_EXPORT(distributed::StopWorkerReq);
|
||||
BOOST_CLASS_EXPORT(distributed::StopWorkerRes);
|
||||
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include "database/storage_gc_master.hpp"
|
||||
#include "database/storage_gc_single_node.hpp"
|
||||
#include "database/storage_gc_worker.hpp"
|
||||
#include "distributed/cluster_discovery_master.hpp"
|
||||
#include "distributed/cluster_discovery_worker.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "distributed/data_manager.hpp"
|
||||
@ -194,7 +196,7 @@ class Master : public PrivateBase {
|
||||
communication::rpc::Server server_{
|
||||
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
|
||||
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
|
||||
distributed::MasterCoordination coordination_{server_};
|
||||
distributed::MasterCoordination coordination_{server_.endpoint()};
|
||||
StorageGcMaster storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec,
|
||||
server_, coordination_};
|
||||
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
|
||||
@ -212,12 +214,14 @@ class Master : public PrivateBase {
|
||||
distributed::DataManager data_manager_{storage_, data_clients_};
|
||||
distributed::TransactionalCacheCleaner cache_cleaner_{
|
||||
tx_engine_, updates_server_, data_manager_};
|
||||
distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_,
|
||||
rpc_worker_clients_};
|
||||
};
|
||||
|
||||
class Worker : public PrivateBase {
|
||||
public:
|
||||
explicit Worker(const Config &config) : PrivateBase(config) {
|
||||
coordination_.RegisterWorker(config.worker_id);
|
||||
cluster_discovery_.RegisterWorker(config.worker_id);
|
||||
}
|
||||
|
||||
GraphDb::Type type() const override {
|
||||
@ -261,6 +265,8 @@ class Worker : public PrivateBase {
|
||||
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
|
||||
tx_engine_, server_, produce_server_, updates_server_, data_manager_};
|
||||
distributed::DurabilityRpcServer durability_rpc_server_{*this, server_};
|
||||
distributed::ClusterDiscoveryWorker cluster_discovery_{
|
||||
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
|
||||
};
|
||||
|
||||
#undef IMPL_GETTERS
|
||||
|
31
src/distributed/cluster_discovery_master.cpp
Normal file
31
src/distributed/cluster_discovery_master.cpp
Normal file
@ -0,0 +1,31 @@
|
||||
#include "distributed/cluster_discovery_master.hpp"
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
|
||||
namespace distributed {
|
||||
using Server = communication::rpc::Server;
|
||||
|
||||
ClusterDiscoveryMaster::ClusterDiscoveryMaster(
|
||||
Server &server, MasterCoordination &coordination,
|
||||
RpcWorkerClients &rpc_worker_clients)
|
||||
: server_(server),
|
||||
coordination_(coordination),
|
||||
rpc_worker_clients_(rpc_worker_clients) {
|
||||
server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) {
|
||||
int assigned_worker_id =
|
||||
this->coordination_.RegisterWorker(req.desired_worker_id, req.endpoint);
|
||||
|
||||
rpc_worker_clients_.ExecuteOnWorkers<void>(
|
||||
0,
|
||||
[assigned_worker_id, req](communication::rpc::ClientPool &client_pool) {
|
||||
auto result = client_pool.Call<ClusterDiscoveryRpc>(
|
||||
assigned_worker_id, req.endpoint);
|
||||
CHECK(result) << "ClusterDiscoveryRpc failed";
|
||||
});
|
||||
|
||||
return std::make_unique<RegisterWorkerRes>(
|
||||
assigned_worker_id, this->coordination_.GetWorkers());
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace distributed
|
27
src/distributed/cluster_discovery_master.hpp
Normal file
27
src/distributed/cluster_discovery_master.hpp
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
|
||||
namespace distributed {
|
||||
using Server = communication::rpc::Server;
|
||||
|
||||
/** Handle cluster discovery on master.
|
||||
*
|
||||
* Cluster discovery on master handles worker registration and broadcasts new
|
||||
* worker information to already registered workers, and already registered
|
||||
* worker information to the new worker.
|
||||
*/
|
||||
class ClusterDiscoveryMaster final {
|
||||
public:
|
||||
ClusterDiscoveryMaster(Server &server, MasterCoordination &coordination,
|
||||
RpcWorkerClients &rpc_worker_clients);
|
||||
|
||||
private:
|
||||
Server &server_;
|
||||
MasterCoordination &coordination_;
|
||||
RpcWorkerClients &rpc_worker_clients_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
29
src/distributed/cluster_discovery_worker.cpp
Normal file
29
src/distributed/cluster_discovery_worker.cpp
Normal file
@ -0,0 +1,29 @@
|
||||
#include "distributed/cluster_discovery_worker.hpp"
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
|
||||
namespace distributed {
|
||||
using Server = communication::rpc::Server;
|
||||
|
||||
ClusterDiscoveryWorker::ClusterDiscoveryWorker(
|
||||
Server &server, WorkerCoordination &coordination,
|
||||
communication::rpc::ClientPool &client_pool)
|
||||
: server_(server), coordination_(coordination), client_pool_(client_pool) {
|
||||
server_.Register<ClusterDiscoveryRpc>([this](const ClusterDiscoveryReq &req) {
|
||||
this->coordination_.RegisterWorker(req.worker_id, req.endpoint);
|
||||
return std::make_unique<ClusterDiscoveryRes>();
|
||||
});
|
||||
}
|
||||
|
||||
int ClusterDiscoveryWorker::RegisterWorker(int desired_worker_id) {
|
||||
auto result = client_pool_.Call<RegisterWorkerRpc>(desired_worker_id,
|
||||
server_.endpoint());
|
||||
CHECK(result) << "RegisterWorkerRpc failed";
|
||||
|
||||
for (auto &kv : result->workers) {
|
||||
coordination_.RegisterWorker(kv.first, kv.second);
|
||||
}
|
||||
|
||||
return result->assigned_worker_id;
|
||||
}
|
||||
|
||||
} // namespace distributed
|
36
src/distributed/cluster_discovery_worker.hpp
Normal file
36
src/distributed/cluster_discovery_worker.hpp
Normal file
@ -0,0 +1,36 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
|
||||
namespace distributed {
|
||||
using Server = communication::rpc::Server;
|
||||
using ClientPool = communication::rpc::ClientPool;
|
||||
|
||||
/** Handle cluster discovery on worker.
|
||||
*
|
||||
* Cluster discovery on worker handles worker registration by sending an rpc
|
||||
* request to master and processes received rpc response with other worker
|
||||
* information.
|
||||
*/
|
||||
class ClusterDiscoveryWorker final {
|
||||
public:
|
||||
ClusterDiscoveryWorker(Server &server, WorkerCoordination &coordination,
|
||||
ClientPool &client_pool);
|
||||
|
||||
/**
|
||||
* Registers a worker with the master.
|
||||
*
|
||||
* @param worker_id - Desired ID. If -1, or if the desired ID is already
|
||||
* taken, the worker gets the next available ID.
|
||||
*/
|
||||
int RegisterWorker(int desired_worker_id = -1);
|
||||
|
||||
private:
|
||||
Server &server_;
|
||||
WorkerCoordination &coordination_;
|
||||
communication::rpc::ClientPool &client_pool_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
34
src/distributed/coordination.cpp
Normal file
34
src/distributed/coordination.cpp
Normal file
@ -0,0 +1,34 @@
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "distributed/coordination.hpp"
|
||||
|
||||
namespace distributed {
|
||||
using Endpoint = io::network::Endpoint;
|
||||
|
||||
Coordination::Coordination(const Endpoint &master_endpoint) {
|
||||
// The master is always worker 0.
|
||||
workers_.emplace(0, master_endpoint);
|
||||
}
|
||||
|
||||
Endpoint Coordination::GetEndpoint(int worker_id) {
|
||||
auto found = workers_.find(worker_id);
|
||||
CHECK(found != workers_.end()) << "No endpoint registered for worker id: "
|
||||
<< worker_id;
|
||||
return found->second;
|
||||
}
|
||||
|
||||
std::vector<int> Coordination::GetWorkerIds() const {
|
||||
std::vector<int> worker_ids;
|
||||
for (auto worker : workers_) worker_ids.push_back(worker.first);
|
||||
return worker_ids;
|
||||
}
|
||||
|
||||
void Coordination::AddWorker(int worker_id, Endpoint endpoint) {
|
||||
workers_.emplace(worker_id, endpoint);
|
||||
}
|
||||
|
||||
std::unordered_map<int, Endpoint> Coordination::GetWorkers() {
|
||||
return workers_;
|
||||
}
|
||||
|
||||
} // namespace distributed
|
@ -1,20 +1,36 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/** API for the distributed coordination class. */
|
||||
/** Coordination base class. This class is not thread safe. */
|
||||
class Coordination {
|
||||
public:
|
||||
virtual ~Coordination() {}
|
||||
explicit Coordination(const io::network::Endpoint &master_endpoint);
|
||||
|
||||
/** Gets the endpoint for the given worker ID from the master. */
|
||||
virtual io::network::Endpoint GetEndpoint(int worker_id) = 0;
|
||||
io::network::Endpoint GetEndpoint(int worker_id);
|
||||
|
||||
/** Gets the connected worker ids - should only be called on a master
|
||||
* instance*/
|
||||
virtual std::vector<int> GetWorkerIds() const = 0;
|
||||
/** Returns all workers id, this includes master id(0). */
|
||||
std::vector<int> GetWorkerIds() const;
|
||||
|
||||
/** Gets the mapping of worker id to worker endpoint including master (worker
|
||||
* id = 0).
|
||||
*/
|
||||
std::unordered_map<int, io::network::Endpoint> GetWorkers();
|
||||
|
||||
protected:
|
||||
~Coordination() {}
|
||||
|
||||
/** Adds a worker to coordination. */
|
||||
void AddWorker(int worker_id, io::network::Endpoint endpoint);
|
||||
|
||||
private:
|
||||
std::unordered_map<int, io::network::Endpoint> workers_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -1,30 +1,25 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
MasterCoordination::MasterCoordination(communication::rpc::Server &server)
|
||||
: server_(server) {
|
||||
// The master is always worker 0.
|
||||
workers_.emplace(0, server_.endpoint());
|
||||
|
||||
server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) {
|
||||
auto worker_id = RegisterWorker(req.desired_worker_id, req.endpoint);
|
||||
return std::make_unique<RegisterWorkerRes>(worker_id);
|
||||
});
|
||||
server_.Register<GetEndpointRpc>([this](const GetEndpointReq &req) {
|
||||
return std::make_unique<GetEndpointRes>(GetEndpoint(req.member));
|
||||
});
|
||||
}
|
||||
MasterCoordination::MasterCoordination(const Endpoint &master_endpoint)
|
||||
: Coordination(master_endpoint) {}
|
||||
|
||||
int MasterCoordination::RegisterWorker(int desired_worker_id,
|
||||
Endpoint endpoint) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
|
||||
auto workers = GetWorkers();
|
||||
// If there is a desired worker ID, try to set it.
|
||||
if (desired_worker_id >= 0) {
|
||||
if (workers_.find(desired_worker_id) == workers_.end()) {
|
||||
workers_.emplace(desired_worker_id, endpoint);
|
||||
if (workers.find(desired_worker_id) == workers.end()) {
|
||||
AddWorker(desired_worker_id, endpoint);
|
||||
return desired_worker_id;
|
||||
}
|
||||
LOG(WARNING) << "Unable to assign requested ID (" << desired_worker_id
|
||||
@ -33,37 +28,30 @@ int MasterCoordination::RegisterWorker(int desired_worker_id,
|
||||
|
||||
// Look for the next ID that's not used.
|
||||
int worker_id = 1;
|
||||
while (workers_.find(worker_id) != workers_.end()) ++worker_id;
|
||||
workers_.emplace(worker_id, endpoint);
|
||||
while (workers.find(worker_id) != workers.end()) ++worker_id;
|
||||
AddWorker(worker_id, endpoint);
|
||||
return worker_id;
|
||||
}
|
||||
|
||||
MasterCoordination::~MasterCoordination() {
|
||||
Endpoint MasterCoordination::GetEndpoint(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
for (const auto &kv : workers_) {
|
||||
return Coordination::GetEndpoint(worker_id);
|
||||
}
|
||||
|
||||
MasterCoordination::~MasterCoordination() {
|
||||
using namespace std::chrono_literals;
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
auto workers = GetWorkers();
|
||||
for (const auto &kv : workers) {
|
||||
// Skip master (self).
|
||||
if (kv.first == 0) continue;
|
||||
communication::rpc::Client client(kv.second);
|
||||
auto result = client.Call<StopWorkerRpc>();
|
||||
CHECK(result) << "StopWorkerRpc failed work worker: " << kv.first;
|
||||
CHECK(result) << "StopWorkerRpc failed for worker: " << kv.first;
|
||||
}
|
||||
|
||||
// Make sure all StopWorkerRpc request/response are exchanged.
|
||||
std::this_thread::sleep_for(2s);
|
||||
}
|
||||
|
||||
Endpoint MasterCoordination::GetEndpoint(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
auto found = workers_.find(worker_id);
|
||||
CHECK(found != workers_.end()) << "No endpoint registered for worker id: "
|
||||
<< worker_id;
|
||||
return found->second;
|
||||
}
|
||||
|
||||
std::vector<int> MasterCoordination::GetWorkerIds() const {
|
||||
std::vector<int> worker_ids;
|
||||
for (auto worker : workers_) worker_ids.push_back(worker.first);
|
||||
return worker_ids;
|
||||
}
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -3,8 +3,6 @@
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
@ -13,35 +11,29 @@ using Endpoint = io::network::Endpoint;
|
||||
|
||||
/** Handles worker registration, getting of other workers' endpoints and
|
||||
* coordinated shutdown in a distributed memgraph. Master side. */
|
||||
class MasterCoordination : public Coordination {
|
||||
/**
|
||||
* Registers a new worker with this master server. Notifies all the known
|
||||
* workers of the new worker.
|
||||
*
|
||||
* @param desired_worker_id - The ID the worker would like to have. Set to
|
||||
* -1 if the worker doesn't care. Does not guarantee that the desired ID will
|
||||
* be returned, it is possible it's already occupied. If that's an error (for
|
||||
* example in recovery), the worker should handle it as such.
|
||||
* @return The assigned ID for the worker asking to become registered.
|
||||
*/
|
||||
int RegisterWorker(int desired_worker_id, Endpoint endpoint);
|
||||
|
||||
class MasterCoordination final : public Coordination {
|
||||
public:
|
||||
explicit MasterCoordination(communication::rpc::Server &server);
|
||||
explicit MasterCoordination(const Endpoint &master_endpoint);
|
||||
|
||||
/** Shuts down all the workers and this master server. */
|
||||
~MasterCoordination();
|
||||
|
||||
/** Returns the Endpoint for the given worker_id. */
|
||||
Endpoint GetEndpoint(int worker_id) override;
|
||||
/**
|
||||
* Registers a new worker with this master server. Notifies all the known
|
||||
* workers of the new worker.
|
||||
*
|
||||
* @param desired_worker_id - The ID the worker would like to have. Set to
|
||||
* -1 if the worker doesn't care. Does not guarantee that the desired ID will
|
||||
* be returned, it is possible it's already occupied. If that's an error (for
|
||||
* example in recovery), the worker should handle it as such.
|
||||
* @return The assigned ID for the worker asking to become registered.
|
||||
*/
|
||||
int RegisterWorker(int desired_worker_id, Endpoint endpoint);
|
||||
|
||||
/** Returns all workers id, this includes master id(0) */
|
||||
std::vector<int> GetWorkerIds() const override;
|
||||
Endpoint GetEndpoint(int worker_id);
|
||||
|
||||
private:
|
||||
communication::rpc::Server &server_;
|
||||
// Most master functions aren't thread-safe.
|
||||
mutable std::mutex lock_;
|
||||
std::unordered_map<int, Endpoint> workers_;
|
||||
};
|
||||
} // namespace distributed
|
||||
|
@ -1,7 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include "boost/serialization/access.hpp"
|
||||
#include "boost/serialization/base_object.hpp"
|
||||
#include "boost/serialization/unordered_map.hpp"
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
@ -12,7 +15,6 @@ using communication::rpc::Message;
|
||||
using Endpoint = io::network::Endpoint;
|
||||
|
||||
struct RegisterWorkerReq : public Message {
|
||||
RegisterWorkerReq() {}
|
||||
// Set desired_worker_id to -1 to get an automatically assigned ID.
|
||||
RegisterWorkerReq(int desired_worker_id, const Endpoint &endpoint)
|
||||
: desired_worker_id(desired_worker_id), endpoint(endpoint) {}
|
||||
@ -21,6 +23,7 @@ struct RegisterWorkerReq : public Message {
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
RegisterWorkerReq() {}
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
@ -30,17 +33,55 @@ struct RegisterWorkerReq : public Message {
|
||||
}
|
||||
};
|
||||
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RegisterWorkerRes, int);
|
||||
RPC_SINGLE_MEMBER_MESSAGE(GetEndpointReq, int);
|
||||
RPC_SINGLE_MEMBER_MESSAGE(GetEndpointRes, Endpoint);
|
||||
struct RegisterWorkerRes : public Message {
|
||||
RegisterWorkerRes(int assigned_worker_id,
|
||||
std::unordered_map<int, Endpoint> workers)
|
||||
: assigned_worker_id(assigned_worker_id), workers(std::move(workers)) {}
|
||||
|
||||
int assigned_worker_id;
|
||||
std::unordered_map<int, Endpoint> workers;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
RegisterWorkerRes() {}
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &boost::serialization::base_object<Message>(*this);
|
||||
ar &assigned_worker_id;
|
||||
ar &workers;
|
||||
}
|
||||
};
|
||||
|
||||
struct ClusterDiscoveryReq : public Message {
|
||||
ClusterDiscoveryReq(int worker_id, Endpoint endpoint)
|
||||
: worker_id(worker_id), endpoint(endpoint) {}
|
||||
|
||||
int worker_id;
|
||||
Endpoint endpoint;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
ClusterDiscoveryReq() {}
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &boost::serialization::base_object<Message>(*this);
|
||||
ar &worker_id;
|
||||
ar &endpoint;
|
||||
}
|
||||
};
|
||||
|
||||
RPC_NO_MEMBER_MESSAGE(ClusterDiscoveryRes);
|
||||
RPC_NO_MEMBER_MESSAGE(StopWorkerReq);
|
||||
RPC_NO_MEMBER_MESSAGE(StopWorkerRes);
|
||||
|
||||
using RegisterWorkerRpc =
|
||||
communication::rpc::RequestResponse<RegisterWorkerReq, RegisterWorkerRes>;
|
||||
using GetEndpointRpc =
|
||||
communication::rpc::RequestResponse<GetEndpointReq, GetEndpointRes>;
|
||||
using StopWorkerRpc =
|
||||
communication::rpc::RequestResponse<StopWorkerReq, StopWorkerRes>;
|
||||
using ClusterDiscoveryRpc =
|
||||
communication::rpc::RequestResponse<ClusterDiscoveryReq,
|
||||
ClusterDiscoveryRes>;
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -1,36 +1,29 @@
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
WorkerCoordination::WorkerCoordination(communication::rpc::Server &server,
|
||||
const Endpoint &master_endpoint)
|
||||
: server_(server), client_pool_(master_endpoint) {}
|
||||
: Coordination(master_endpoint), server_(server) {}
|
||||
|
||||
int WorkerCoordination::RegisterWorker(int desired_worker_id) {
|
||||
auto result = client_pool_.Call<RegisterWorkerRpc>(desired_worker_id,
|
||||
server_.endpoint());
|
||||
CHECK(result) << "RegisterWorkerRpc failed";
|
||||
return result->member;
|
||||
}
|
||||
|
||||
Endpoint WorkerCoordination::GetEndpoint(int worker_id) {
|
||||
auto accessor = endpoint_cache_.access();
|
||||
auto found = accessor.find(worker_id);
|
||||
if (found != accessor.end()) return found->second;
|
||||
auto result = client_pool_.Call<GetEndpointRpc>(worker_id);
|
||||
CHECK(result) << "GetEndpointRpc failed";
|
||||
accessor.insert(worker_id, result->member);
|
||||
return result->member;
|
||||
int WorkerCoordination::RegisterWorker(int worker_id, Endpoint endpoint) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
AddWorker(worker_id, endpoint);
|
||||
return worker_id;
|
||||
}
|
||||
|
||||
void WorkerCoordination::WaitForShutdown() {
|
||||
using namespace std::chrono_literals;
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
bool shutdown = false;
|
||||
@ -52,7 +45,9 @@ void WorkerCoordination::WaitForShutdown() {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
};
|
||||
|
||||
std::vector<int> WorkerCoordination::GetWorkerIds() const {
|
||||
LOG(FATAL) << "Unimplemented worker ids discovery on worker";
|
||||
};
|
||||
Endpoint WorkerCoordination::GetEndpoint(int worker_id) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
return Coordination::GetEndpoint(worker_id);
|
||||
}
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -1,44 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "distributed/coordination_rpc_messages.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/** Handles worker registration, getting of other workers' endpoints and
|
||||
* coordinated shutdown in a distributed memgraph. Worker side. */
|
||||
class WorkerCoordination : public Coordination {
|
||||
class WorkerCoordination final : public Coordination {
|
||||
using Endpoint = io::network::Endpoint;
|
||||
|
||||
public:
|
||||
WorkerCoordination(communication::rpc::Server &server,
|
||||
const Endpoint &master_endpoint);
|
||||
|
||||
/**
|
||||
* Registers a worker with the master.
|
||||
*
|
||||
* @param worker_id - Desired ID. If -1, or if the desired ID is already
|
||||
* taken, the worker gets the next available ID.
|
||||
*/
|
||||
int RegisterWorker(int desired_worker_id = -1);
|
||||
|
||||
/** Gets the endpoint for the given worker ID from the master. */
|
||||
Endpoint GetEndpoint(int worker_id) override;
|
||||
|
||||
/** Shouldn't be called on worker for now!
|
||||
* TODO fix this */
|
||||
std::vector<int> GetWorkerIds() const override;
|
||||
/** Registers the worker with the given endpoint. */
|
||||
int RegisterWorker(int worker_id, Endpoint endpoint);
|
||||
|
||||
/** Starts listening for a remote shutdown command (issued by the master).
|
||||
* Blocks the calling thread until that has finished. */
|
||||
void WaitForShutdown();
|
||||
|
||||
Endpoint GetEndpoint(int worker_id);
|
||||
|
||||
private:
|
||||
communication::rpc::Server &server_;
|
||||
communication::rpc::ClientPool client_pool_;
|
||||
mutable ConcurrentMap<int, Endpoint> endpoint_cache_;
|
||||
mutable std::mutex lock_;
|
||||
};
|
||||
} // namespace distributed
|
||||
|
@ -8,12 +8,17 @@
|
||||
#include "gmock/gmock.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "distributed/cluster_discovery_master.hpp"
|
||||
#include "distributed/cluster_discovery_worker.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
using communication::rpc::Server;
|
||||
using communication::rpc::ClientPool;
|
||||
using namespace distributed;
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
@ -29,7 +34,9 @@ class WorkerCoordinationInThread {
|
||||
std::thread([this, master_endpoint, desired_id, &init_done] {
|
||||
server_.emplace(Endpoint(kLocal, 0));
|
||||
coord_.emplace(*server_, master_endpoint);
|
||||
worker_id_ = coord_->RegisterWorker(desired_id);
|
||||
client_pool_.emplace(master_endpoint);
|
||||
discovery_.emplace(*server_, *coord_, *client_pool_);
|
||||
worker_id_ = discovery_->RegisterWorker(desired_id);
|
||||
init_done = true;
|
||||
coord_->WaitForShutdown();
|
||||
});
|
||||
@ -40,12 +47,15 @@ class WorkerCoordinationInThread {
|
||||
int worker_id() const { return worker_id_; }
|
||||
auto endpoint() const { return server_->endpoint(); }
|
||||
auto worker_endpoint(int worker_id) { return coord_->GetEndpoint(worker_id); }
|
||||
auto worker_ids() { return coord_->GetWorkerIds(); }
|
||||
void join() { worker_thread_.join(); }
|
||||
|
||||
private:
|
||||
std::thread worker_thread_;
|
||||
std::experimental::optional<Server> server_;
|
||||
std::experimental::optional<WorkerCoordination> coord_;
|
||||
std::experimental::optional<ClientPool> client_pool_;
|
||||
std::experimental::optional<ClusterDiscoveryWorker> discovery_;
|
||||
std::atomic<int> worker_id_{0};
|
||||
};
|
||||
|
||||
@ -53,7 +63,10 @@ TEST(Distributed, Coordination) {
|
||||
Server master_server({kLocal, 0});
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server);
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
|
||||
for (int i = 0; i < kWorkerCount; ++i)
|
||||
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
|
||||
@ -79,7 +92,10 @@ TEST(Distributed, DesiredAndUniqueId) {
|
||||
Server master_server({kLocal, 0});
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server);
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
|
||||
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
|
||||
master_server.endpoint(), 42));
|
||||
@ -97,7 +113,10 @@ TEST(Distributed, CoordinationWorkersId) {
|
||||
Server master_server({kLocal, 0});
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server);
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
|
||||
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
|
||||
master_server.endpoint(), 42));
|
||||
@ -114,3 +133,33 @@ TEST(Distributed, CoordinationWorkersId) {
|
||||
|
||||
for (auto &worker : workers) worker->join();
|
||||
}
|
||||
|
||||
TEST(Distributed, ClusterDiscovery) {
|
||||
Server master_server({kLocal, 0});
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
std::vector<int> ids;
|
||||
int worker_count = 10;
|
||||
|
||||
ids.push_back(0);
|
||||
for (int i = 1; i <= worker_count; ++i) {
|
||||
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
|
||||
master_server.endpoint(), i));
|
||||
|
||||
ids.push_back(i);
|
||||
}
|
||||
|
||||
EXPECT_THAT(master_coord.GetWorkerIds(),
|
||||
testing::UnorderedElementsAreArray(ids));
|
||||
for (auto &worker : workers) {
|
||||
EXPECT_THAT(worker->worker_ids(),
|
||||
testing::UnorderedElementsAreArray(ids));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto &worker : workers) worker->join();
|
||||
}
|
||||
|
@ -6,6 +6,8 @@
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "distributed/cluster_discovery_master.hpp"
|
||||
#include "distributed/cluster_discovery_worker.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
@ -38,7 +40,13 @@ class RpcWorkerClientsTest : public ::testing::Test {
|
||||
std::make_unique<distributed::WorkerCoordination>(
|
||||
*workers_server_.back(), master_server_.endpoint()));
|
||||
|
||||
workers_coord_.back()->RegisterWorker(i);
|
||||
cluster_discovery_.emplace_back(
|
||||
std::make_unique<distributed::ClusterDiscoveryWorker>(
|
||||
*workers_server_.back(), *workers_coord_.back(),
|
||||
rpc_workers_.GetClientPool(0)));
|
||||
|
||||
cluster_discovery_.back()->RegisterWorker(i);
|
||||
|
||||
workers_server_.back()->Register<distributed::IncrementCounterRpc>(
|
||||
[this, i](const distributed::IncrementCounterReq &) {
|
||||
workers_cnt_[i]++;
|
||||
@ -61,13 +69,17 @@ class RpcWorkerClientsTest : public ::testing::Test {
|
||||
|
||||
std::vector<std::unique_ptr<communication::rpc::Server>> workers_server_;
|
||||
std::vector<std::unique_ptr<distributed::WorkerCoordination>> workers_coord_;
|
||||
std::vector<std::unique_ptr<distributed::ClusterDiscoveryWorker>>
|
||||
cluster_discovery_;
|
||||
std::unordered_map<int, int> workers_cnt_;
|
||||
|
||||
communication::rpc::Server master_server_{kLocalHost};
|
||||
std::experimental::optional<distributed::MasterCoordination> master_coord_{
|
||||
master_server_};
|
||||
master_server_.endpoint()};
|
||||
|
||||
distributed::RpcWorkerClients rpc_workers_{*master_coord_};
|
||||
distributed::ClusterDiscoveryMaster cluster_disocvery_{
|
||||
master_server_, *master_coord_, rpc_workers_};
|
||||
};
|
||||
|
||||
TEST_F(RpcWorkerClientsTest, GetWorkerIds) {
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "distributed/cluster_discovery_master.hpp"
|
||||
#include "distributed/coordination_master.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "transactions/engine_master.hpp"
|
||||
@ -21,8 +22,10 @@ class WorkerEngineTest : public testing::Test {
|
||||
const std::string local{"127.0.0.1"};
|
||||
|
||||
Server master_server_{{local, 0}};
|
||||
MasterCoordination master_coordination_{master_server_};
|
||||
MasterCoordination master_coordination_{master_server_.endpoint()};
|
||||
RpcWorkerClients rpc_worker_clients_{master_coordination_};
|
||||
ClusterDiscoveryMaster cluster_disocvery_{
|
||||
master_server_, master_coordination_, rpc_worker_clients_};
|
||||
|
||||
MasterEngine master_{master_server_, rpc_worker_clients_};
|
||||
ClientPool master_client_pool{master_server_.endpoint()};
|
||||
|
Loading…
Reference in New Issue
Block a user