Expose worker ids

Summary: Add worker ids expose to rpc_worker_clients

Reviewers: msantl

Reviewed By: msantl

Differential Revision: https://phabricator.memgraph.io/D1128
This commit is contained in:
Dominik Gleich 2018-01-22 16:59:40 +01:00
parent 912d178391
commit ca9fac8adc
7 changed files with 54 additions and 3 deletions

View File

@ -11,6 +11,10 @@ class Coordination {
/** Gets the endpoint for the given worker ID from the master. */
virtual io::network::Endpoint GetEndpoint(int worker_id) = 0;
/** Gets the connected worker ids - should only be called on a master
* instance*/
virtual std::vector<int> GetWorkerIds() = 0;
};
} // namespace distributed

View File

@ -53,8 +53,15 @@ MasterCoordination::~MasterCoordination() {
Endpoint MasterCoordination::GetEndpoint(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);
auto found = workers_.find(worker_id);
CHECK(found != workers_.end()) << "No endpoint registered for worker id: "
<< worker_id;
CHECK(found != workers_.end())
<< "No endpoint registered for worker id: " << worker_id;
return found->second;
}
std::vector<int> MasterCoordination::GetWorkerIds() {
std::vector<int> worker_ids;
for (auto worker : workers_) worker_ids.push_back(worker.first);
return worker_ids;
}
} // namespace distributed

View File

@ -35,6 +35,9 @@ class MasterCoordination : public Coordination {
/** Returns the Endpoint for the given worker_id. */
Endpoint GetEndpoint(int worker_id) override;
/** Returns all workers id, this includes master id(0) */
std::vector<int> GetWorkerIds() override;
private:
communication::messaging::System &system_;
communication::rpc::Server server_;

View File

@ -1,7 +1,9 @@
#include "distributed/coordination_worker.hpp"
#include <condition_variable>
#include <mutex>
#include "distributed/coordination_worker.hpp"
#include "glog/logging.h"
namespace distributed {
@ -49,4 +51,8 @@ void WorkerCoordination::WaitForShutdown() {
// convention, but maybe better...
std::this_thread::sleep_for(100ms);
};
std::vector<int> WorkerCoordination::GetWorkerIds() {
LOG(FATAL) << "Unimplemented worker ids discovery on worker";
};
} // namespace distributed

View File

@ -26,6 +26,10 @@ class WorkerCoordination : public Coordination {
/** Gets the endpoint for the given worker ID from the master. */
Endpoint GetEndpoint(int worker_id) override;
/** Shouldn't be called on worker for now!
* TODO fix this */
std::vector<int> GetWorkerIds() override;
/** Starts listening for a remote shutdown command (issued by the master).
* Blocks the calling thread until that has finished. */
void WaitForShutdown();

View File

@ -34,6 +34,8 @@ class RpcWorkerClients {
.first->second;
}
auto GetWorkerIds() { return coordination_.GetWorkerIds(); }
private:
communication::messaging::System &system_;
// TODO make Coordination const, it's member GetEndpoint must be const too.

View File

@ -4,6 +4,7 @@
#include <unordered_set>
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "communication/messaging/distributed.hpp"
@ -88,3 +89,27 @@ TEST(Distributed, DesiredAndUniqueId) {
for (auto &worker : workers) worker->join();
}
TEST(Distributed, CoordinationWorkersId) {
System master_system({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::this_thread::sleep_for(200ms);
std::vector<int> ids;
ids.push_back(0);
for (auto &worker : workers) ids.push_back(worker->worker_id());
EXPECT_THAT(master_coord.GetWorkerIds(),
testing::UnorderedElementsAreArray(ids));
}
for (auto &worker : workers) worker->join();
}