From 43015f164cb2b69c9e683d9339d430b491189553 Mon Sep 17 00:00:00 2001 From: Sasa Stanko Date: Mon, 14 Aug 2017 16:09:02 +0200 Subject: [PATCH] Implement basic master and worker Summary: Add FindChannel to System which works over the network Give example of master and worker Add script to start master and worker Rewrite worker and master Reviewers: zuza, buda Reviewed By: zuza Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D662 --- experimental/distributed/main.cpp | 6 ++--- .../distributed/src/communication.cpp | 2 +- .../distributed/src/communication.hpp | 24 ++++++++++++++++++- experimental/distributed/src/protocol.cpp | 4 ++-- .../distributed/tests/connector_unit.cpp | 20 ++++++++-------- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index cc21bc7f3..90085bb5c 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -253,7 +253,7 @@ class Master : public Reactor { for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) { if (channels_[w_id] == nullptr) { // TODO: Resolve worker channel using the network service. - channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main"); + channels_[w_id] = system_->FindLocalChannel(GetWorkerName(w_id), "main"); if (channels_[w_id] != nullptr) ++workers_found; } } @@ -345,7 +345,7 @@ class Worker : public Reactor { void FindMaster() { // TODO: Replace with network service and channel resolution. - while (!(master_channel_ = system_->FindChannel("master", "main"))) + while (!(master_channel_ = system_->FindLocalChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -356,7 +356,7 @@ class Worker : public Reactor { void ClientMain(System *system) { std::shared_ptr channel = nullptr; // TODO: Replace this with network channel resolution. - while (!(channel = system->FindChannel("master", "main"))) + while (!(channel = system->FindLocalChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "I/O Client Main active" << std::endl; diff --git a/experimental/distributed/src/communication.cpp b/experimental/distributed/src/communication.cpp index 649a63f92..16818c02d 100644 --- a/experimental/distributed/src/communication.cpp +++ b/experimental/distributed/src/communication.cpp @@ -161,7 +161,7 @@ std::shared_ptr SenderMessage::GetChannelToSender( System *system) const { if (address_ == system->network().Address() && port_ == system->network().Port()) { - return system->FindChannel(reactor_, channel_); + return system->FindLocalChannel(reactor_, channel_); } return system->network().Resolve(address_, port_, reactor_, channel_); } diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index ed9e1c4ee..b9ce836b4 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -716,6 +716,8 @@ CEREAL_REGISTER_TYPE(SenderMessage); * E.g. holds set of reactors, channels for all reactors. */ class System { + using Location = std::pair; + public: friend class Reactor; @@ -744,7 +746,7 @@ class System { return nullptr; } - const std::shared_ptr FindChannel(const std::string &reactor_name, + const std::shared_ptr FindLocalChannel(const std::string &reactor_name, const std::string &channel_name) { std::unique_lock lock(mutex_); auto it_reactor = reactors_.find(reactor_name); @@ -752,6 +754,25 @@ class System { return it_reactor->second.first->FindChannel(channel_name); } + /** Register process id to the given location. */ + void RegisterProcess(int64_t id, const std::string& address, uint16_t port) { + processes_[id] = Location(address, port); + } + + /** Finds channel using process's id. */ + const std::shared_ptr FindChannel(int64_t process_id, + const std::string &reactor_name, + const std::string &channel_name) { + const auto& location = processes_.at(process_id); + if (network().Address() == location.first && + network().Port() == location.second) + return FindLocalChannel(reactor_name, channel_name); + return network().Resolve(location.first, location.second, reactor_name, + channel_name); + } + + const auto& Processes() { return processes_; } + void AwaitShutdown() { for (auto &key_value : reactors_) { auto &thread = key_value.second.second; @@ -776,5 +797,6 @@ class System { std::unordered_map, std::thread>> reactors_; + std::unordered_map processes_; Network network_; }; diff --git a/experimental/distributed/src/protocol.cpp b/experimental/distributed/src/protocol.cpp index e1f965865..1ac6418c0 100644 --- a/experimental/distributed/src/protocol.cpp +++ b/experimental/distributed/src/protocol.cpp @@ -37,7 +37,7 @@ void Session::Execute() { std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_ << std::endl; - auto channel = system_->FindChannel(reactor_, channel_); + auto channel = system_->FindLocalChannel(reactor_, channel_); SendSuccess(channel != nullptr); handshake_done_ = true; @@ -58,7 +58,7 @@ void Session::Execute() { iarchive(message); buffer_.Shift(len_data); - auto channel = system_->FindChannel(reactor_, channel_); + auto channel = system_->FindLocalChannel(reactor_, channel_); if (channel == nullptr) { SendSuccess(false); return; diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/connector_unit.cpp index e7195f5d3..19cf9c270 100644 --- a/experimental/distributed/tests/connector_unit.cpp +++ b/experimental/distributed/tests/connector_unit.cpp @@ -48,7 +48,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); @@ -59,7 +59,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { Worker(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("master", "main"))) + while (!(channel = system_->FindLocalChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); @@ -83,7 +83,7 @@ TEST(SimpleSendTest, OneSimpleSend) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(123); CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. @@ -118,7 +118,7 @@ TEST(SimpleSendTest, OneCallback) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(888); CloseConnector("main"); @@ -154,7 +154,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(101); channel->Send(102); // should be ignored @@ -236,7 +236,7 @@ TEST(MultipleSendTest, UnsubscribeService) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(55); channel->Send(66); @@ -299,7 +299,7 @@ TEST(MultipleSendTest, OnEvent) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(101); @@ -357,7 +357,7 @@ TEST(MultipleSendTest, Chaining) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(55); channel->Send(66); @@ -408,7 +408,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send('a'); channel->Send(55); @@ -457,7 +457,7 @@ TEST(MultipleSendTest, ProcessManyMessages) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = system_->FindLocalChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));