diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index 90085bb5c..a4997e0f8 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -253,7 +253,7 @@ class Master : public Reactor { for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) { if (channels_[w_id] == nullptr) { // TODO: Resolve worker channel using the network service. - channels_[w_id] = system_->FindLocalChannel(GetWorkerName(w_id), "main"); + channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main"); if (channels_[w_id] != nullptr) ++workers_found; } } @@ -345,7 +345,7 @@ class Worker : public Reactor { void FindMaster() { // TODO: Replace with network service and channel resolution. - while (!(master_channel_ = system_->FindLocalChannel("master", "main"))) + while (!(master_channel_ = system_->FindChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::seconds(1)); } @@ -356,7 +356,7 @@ class Worker : public Reactor { void ClientMain(System *system) { std::shared_ptr channel = nullptr; // TODO: Replace this with network channel resolution. - while (!(channel = system->FindLocalChannel("master", "main"))) + while (!(channel = system->FindChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "I/O Client Main active" << std::endl; @@ -378,7 +378,6 @@ int main(int argc, char *argv[]) { //google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); System system; - system.StartServices(); system.Spawn("master"); std::thread client(ClientMain, &system); for (int i = 0; i < NUM_WORKERS; ++i) diff --git a/experimental/distributed/src/communication.cpp b/experimental/distributed/src/communication.cpp index 16818c02d..c4eedfdce 100644 --- a/experimental/distributed/src/communication.cpp +++ b/experimental/distributed/src/communication.cpp @@ -10,11 +10,11 @@ void EventStream::Subscription::unsubscribe() const { thread_local Reactor* current_reactor_ = nullptr; std::string Connector::LocalChannel::Address() { - return system_->network().Address(); + return FLAGS_address; } uint16_t Connector::LocalChannel::Port() { - return system_->network().Port(); + return FLAGS_port; } std::string Connector::LocalChannel::ReactorName() { @@ -38,7 +38,7 @@ std::pair> Reactor::Open(const std::strin + "already exists"); } auto it = connectors_.emplace(connector_name, - std::make_shared(Connector::Params{system_, name_, connector_name, mutex_, cvar_})).first; + std::make_shared(Connector::Params{name_, connector_name, mutex_, cvar_})).first; it->second->self_ptr_ = it->second; return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } @@ -50,7 +50,7 @@ std::pair> Reactor::Open() { if (connectors_.count(connector_name) == 0) { // Connector &queue = connectors_[connector_name]; auto it = connectors_.emplace(connector_name, - std::make_shared(Connector::Params{system_, name_, connector_name, mutex_, cvar_})).first; + std::make_shared(Connector::Params{name_, connector_name, mutex_, cvar_})).first; it->second->self_ptr_ = it->second; return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } @@ -158,10 +158,11 @@ std::string SenderMessage::ReactorName() const { return reactor_; } std::string SenderMessage::ChannelName() const { return channel_; } std::shared_ptr SenderMessage::GetChannelToSender( - System *system) const { - if (address_ == system->network().Address() && - port_ == system->network().Port()) { - return system->FindLocalChannel(reactor_, channel_); + System *system, Distributed *distributed) const { + if (address_ == FLAGS_address && port_ == FLAGS_port) { + return system->FindChannel(reactor_, channel_); } - return system->network().Resolve(address_, port_, reactor_, channel_); + if (distributed) + return distributed->network().Resolve(address_, port_, reactor_, channel_); + assert(false); } diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index fbba7b3ce..13add150b 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -205,7 +205,6 @@ class EventStream { typedef std::function Callback; private: - virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0; }; @@ -227,8 +226,7 @@ class Connector { friend class EventStream::Subscription; Connector(Params params) - : system_(params.system), - connector_name_(params.connector_name), + : connector_name_(params.connector_name), reactor_name_(params.reactor_name), mutex_(params.mutex), cvar_(params.cvar), @@ -247,12 +245,11 @@ class Connector { friend class Connector; LocalChannel(std::shared_ptr mutex, std::string reactor_name, - std::string connector_name, std::weak_ptr queue, System *system) + std::string connector_name, std::weak_ptr queue) : mutex_(mutex), reactor_name_(reactor_name), connector_name_(connector_name), - weak_queue_(queue), - system_(system) {} + weak_queue_(queue) {} virtual void Send(std::unique_ptr m) { std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. @@ -276,7 +273,6 @@ class Connector { std::string reactor_name_; std::string connector_name_; std::weak_ptr weak_queue_; - System *system_; }; /** @@ -321,7 +317,6 @@ private: * Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor. */ struct Params { - System* system; std::string reactor_name; std::string connector_name; std::shared_ptr mutex; @@ -337,7 +332,7 @@ private: std::shared_ptr LockedOpenChannel() { assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned - return std::make_shared(mutex_, reactor_name_, connector_name_, self_ptr_, system_); + return std::make_shared(mutex_, reactor_name_, connector_name_, self_ptr_); } std::unique_ptr LockedAwaitPop(std::unique_lock &lock) { @@ -373,7 +368,6 @@ private: assert(num_erased == 1); } - System *system_; std::string connector_name_; std::string reactor_name_; std::queue> queue_; @@ -681,6 +675,8 @@ class Message { } }; +class Distributed; + /** * Message that includes the sender channel used to respond. */ @@ -694,7 +690,8 @@ class SenderMessage : public Message { std::string ReactorName() const; std::string ChannelName() const; - std::shared_ptr GetChannelToSender(System *system) const; + std::shared_ptr GetChannelToSender(System *system, + Distributed *distributed = nullptr) const; template void serialize(Archive &ar) { @@ -716,19 +713,15 @@ CEREAL_REGISTER_TYPE(SenderMessage); * E.g. holds set of reactors, channels for all reactors. */ class System { - using Location = std::pair; - public: friend class Reactor; - System() : network_(this) {} + System() {} - void operator=(const System &) = delete; - - void StartServices() { - network_.StartClient(4); - network_.StartServer(4); - } + System(const System &) = delete; + System(System &&) = delete; + System &operator=(const System &) = delete; + System &operator=(System &&) = delete; template const std::shared_ptr Spawn(const std::string &name, @@ -746,7 +739,7 @@ class System { return nullptr; } - const std::shared_ptr FindLocalChannel(const std::string &reactor_name, + const std::shared_ptr FindChannel(const std::string &reactor_name, const std::string &channel_name) { std::unique_lock lock(mutex_); auto it_reactor = reactors_.find(reactor_name); @@ -754,36 +747,13 @@ class System { return it_reactor->second.first->FindChannel(channel_name); } - /** Register memgraph node's id to the given location. */ - void RegisterMemgraphNode(int64_t id, const std::string& address, uint16_t port) { - mnodes_[id] = Location(address, port); - } - - /** Finds channel using memgrpah node's id. */ - const std::shared_ptr FindChannel(int64_t mnid, - const std::string &reactor_name, - const std::string &channel_name) { - const auto& location = mnodes_.at(mnid); - if (network().Address() == location.first && - network().Port() == location.second) - return FindLocalChannel(reactor_name, channel_name); - return network().Resolve(location.first, location.second, reactor_name, - channel_name); - } - - const auto& Processes() { return mnodes_; } - void AwaitShutdown() { for (auto &key_value : reactors_) { auto &thread = key_value.second.second; thread.join(); } - network_.StopClient(); - network_.StopServer(); } - Network &network() { return network_; } - private: void StartReactor(Reactor& reactor) { current_reactor_ = &reactor; @@ -797,6 +767,83 @@ class System { std::unordered_map, std::thread>> reactors_; - std::unordered_map mnodes_; +}; + + +/** + * Message that will arrive on a stream returned by Distributed::FindChannel + * once and if the channel is successfully resolved. + */ +class ChannelResolvedMessage : public Message { + public: + ChannelResolvedMessage() {} + ChannelResolvedMessage(std::shared_ptr channel) + : Message(), channel_(channel) {} + + std::shared_ptr channel() const { return channel_; } + + private: + std::shared_ptr channel_; +}; + +/** + * Placeholder for all functionality related to non-local communication + * E.g. resolve remote channels by memgraph node id, etc. + */ +class Distributed { + public: + Distributed(System &system) : system_(system), network_(&system) {} + + Distributed(const Distributed &) = delete; + Distributed(Distributed &&) = delete; + Distributed &operator=(const Distributed &) = delete; + Distributed &operator=(Distributed &&) = delete; + + void StartServices() { + network_.StartClient(4); + network_.StartServer(4); + } + + void StopServices() { + network_.StopClient(); + network_.StopServer(); + } + + // TODO: Implement remote Spawn. + + /** + * Resolves remote channel. + * + * TODO: Provide asynchronous implementation of this function. + * + * @return EventStream on which message will arrive once channel is resolved. + * @warning It can only be called from local Reactor. + */ + EventStream* FindChannel(const std::string &address, + uint16_t port, + const std::string &reactor_name, + const std::string &channel_name) { + std::shared_ptr channel = nullptr; + while (!(channel = network_.Resolve(address, port, reactor_name, channel_name))) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + auto stream_channel = current_reactor_->Open(); + stream_channel.second->Send(channel); + return stream_channel.first; + } + + System &system() { return system_; } + Network &network() { return network_; } + + protected: + System &system_; Network network_; }; + +class DistributedReactor : public Reactor { + public: + DistributedReactor(System *system, std::string name, Distributed &distributed) + : Reactor(system, name), distributed_(distributed) {} + + protected: + Distributed &distributed_; +}; \ No newline at end of file diff --git a/experimental/distributed/src/protocol.cpp b/experimental/distributed/src/protocol.cpp index 1ac6418c0..e1f965865 100644 --- a/experimental/distributed/src/protocol.cpp +++ b/experimental/distributed/src/protocol.cpp @@ -37,7 +37,7 @@ void Session::Execute() { std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_ << std::endl; - auto channel = system_->FindLocalChannel(reactor_, channel_); + auto channel = system_->FindChannel(reactor_, channel_); SendSuccess(channel != nullptr); handshake_done_ = true; @@ -58,7 +58,7 @@ void Session::Execute() { iarchive(message); buffer_.Shift(len_data); - auto channel = system_->FindLocalChannel(reactor_, channel_); + auto channel = system_->FindChannel(reactor_, channel_); if (channel == nullptr) { SendSuccess(false); return; diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/connector_unit.cpp index 19cf9c270..26d1455ee 100644 --- a/experimental/distributed/tests/connector_unit.cpp +++ b/experimental/distributed/tests/connector_unit.cpp @@ -20,7 +20,6 @@ TEST(SystemTest, ReturnWithoutThrowing) { }; System system; - ASSERT_NO_THROW(system.StartServices()); ASSERT_NO_THROW(system.Spawn("master")); ASSERT_NO_THROW(system.AwaitShutdown()); } @@ -48,7 +47,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); @@ -59,7 +58,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { Worker(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("master", "main"))) + while (!(channel = system_->FindChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); @@ -83,7 +82,7 @@ TEST(SimpleSendTest, OneSimpleSend) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(123); CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. @@ -118,7 +117,7 @@ TEST(SimpleSendTest, OneCallback) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(888); CloseConnector("main"); @@ -154,7 +153,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(101); channel->Send(102); // should be ignored @@ -236,7 +235,7 @@ TEST(MultipleSendTest, UnsubscribeService) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(55); channel->Send(66); @@ -299,7 +298,7 @@ TEST(MultipleSendTest, OnEvent) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(101); @@ -357,7 +356,7 @@ TEST(MultipleSendTest, Chaining) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send(55); channel->Send(66); @@ -408,7 +407,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send('a'); channel->Send(55); @@ -457,7 +456,7 @@ TEST(MultipleSendTest, ProcessManyMessages) { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; - while (!(channel = system_->FindLocalChannel("worker", "main"))) + while (!(channel = system_->FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); diff --git a/experimental/distributed/tests/distributed.cpp b/experimental/distributed/tests/distributed.cpp deleted file mode 100644 index 7e11d51d8..000000000 --- a/experimental/distributed/tests/distributed.cpp +++ /dev/null @@ -1,123 +0,0 @@ -#include -#include - -#include "communication.hpp" - -DEFINE_int64(my_mnid, 0, "Memgraph node id"); -DEFINE_string(config_filename, "", "File containing list of all processes"); - -/** - * About config file - * - * Each line contains three strings: - * memgraph node id, ip address of the worker, and port of the worker - * Data on the first line is used to start master. - * Data on the remaining lines is used to start workers. - */ - -/** - * Parse config file and register processes into system. - * - * @return Pair (master mnid, list of worker's id). - */ -std::pair> -ParseConfigAndRegister(const std::string& filename, System& system) { - std::ifstream file(filename, std::ifstream::in); - assert(file.good()); - int64_t master_mnid; - std::vector worker_mnids; - int64_t mnid; - std::string address; - uint16_t port; - file >> master_mnid >> address >> port; - system.RegisterMemgraphNode(master_mnid, address, port); - while (!(file >> mnid).eof()) { - file >> address >> port; - system.RegisterMemgraphNode(mnid, address, port); - worker_mnids.push_back(mnid); - } - file.close(); - return std::make_pair(master_mnid, worker_mnids); -} - - -class Master : public Reactor { - public: - Master(System* system, std::string name, int64_t mnid, - std::vector&& worker_mnids) - : Reactor(system, name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {} - - virtual void Run() { - std::cout << "Master (" << mnid_ << ") @ " << system_->network().Address() - << ":" << system_->network().Port() << std::endl; - - auto stream = main_.first; - stream->OnEvent([this](const SenderMessage &msg, - const EventStream::Subscription& subscription) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; - ++workers_seen; - if (workers_seen == worker_mnids_.size()) { - subscription.unsubscribe(); - std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseConnector("main"); - } - }); - - for (auto wmnid : worker_mnids_) { - std::shared_ptr channel; - while (!(channel = system_->FindChannel(wmnid, "worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - channel->Send("master", "main"); - } - } - - protected: - int64_t workers_seen = 0; - const int64_t mnid_; - std::vector worker_mnids_; -}; - -class Worker : public Reactor { - public: - Worker(System* system, std::string name, int64_t mnid, int64_t master_mnid) - : Reactor(system, name), mnid_(mnid), master_mnid_(master_mnid) {} - - virtual void Run() { - std::cout << "Worker (" << mnid_ << ") @ " << system_->network().Address() - << ":" << system_->network().Port() << std::endl; - - auto stream = main_.first; - stream->OnEvent([this](const SenderMessage &msg, - const EventStream::Subscription &subscription) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; - subscription.unsubscribe(); - std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseConnector("main"); - }); - - std::shared_ptr channel; - while(!(channel = system_->FindChannel(master_mnid_, "master", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - channel->Send("worker", "main"); - } - - protected: - const int64_t mnid_; - const int64_t master_mnid_; -}; - - -int main(int argc, char *argv[]) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - - System system; - auto mnids = ParseConfigAndRegister(FLAGS_config_filename, system); - system.StartServices(); - if (FLAGS_my_mnid == mnids.first) - system.Spawn("master", FLAGS_my_mnid, std::move(mnids.second)); - else - system.Spawn("worker", FLAGS_my_mnid, mnids.first); - system.AwaitShutdown(); - - return 0; -} \ No newline at end of file diff --git a/experimental/distributed/tests/distributed_test.cpp b/experimental/distributed/tests/distributed_test.cpp new file mode 100644 index 000000000..d527fc5a1 --- /dev/null +++ b/experimental/distributed/tests/distributed_test.cpp @@ -0,0 +1,175 @@ +#include +#include + +#include "communication.hpp" + +DEFINE_int64(my_mnid, 0, "Memgraph node id"); +DEFINE_string(config_filename, "", "File containing list of all processes"); + +class MemgraphDistributed : public Distributed { + private: + using Location = std::pair; + + public: + MemgraphDistributed(System &system) : Distributed(system) {} + + /** Register memgraph node id to the given location. */ + void RegisterMemgraphNode(int64_t mnid, const std::string& address, uint16_t port) { + std::unique_lock lock(mutex_); + mnodes_[mnid] = Location(address, port); + } + + EventStream* FindChannel(int64_t mnid, + const std::string &reactor, + const std::string &channel) { + std::unique_lock lock(mutex_); + const auto& location = mnodes_.at(mnid); + return Distributed::FindChannel(location.first, location.second, reactor, channel); + } + + private: + std::recursive_mutex mutex_; + std::unordered_map mnodes_; +}; + +/** + * About config file + * + * Each line contains three strings: + * memgraph node id, ip address of the worker, and port of the worker + * Data on the first line is used to start master. + * Data on the remaining lines is used to start workers. + */ + +/** + * Parse config file and register processes into system. + * + * @return Pair (master mnid, list of worker's id). + */ +std::pair> +ParseConfigAndRegister(const std::string& filename, + MemgraphDistributed& distributed) { + std::ifstream file(filename, std::ifstream::in); + assert(file.good()); + int64_t master_mnid; + std::vector worker_mnids; + int64_t mnid; + std::string address; + uint16_t port; + file >> master_mnid >> address >> port; + distributed.RegisterMemgraphNode(master_mnid, address, port); + while (file.good()) { + file >> mnid >> address >> port; + if (file.eof()) + break ; + distributed.RegisterMemgraphNode(mnid, address, port); + worker_mnids.push_back(mnid); + } + file.close(); + return std::make_pair(master_mnid, worker_mnids); +} + + +class MemgraphReactor : public Reactor { + public: + MemgraphReactor(System* system, std::string name, + MemgraphDistributed &distributed) + : Reactor(system, name), distributed_(distributed) {} + + protected: + MemgraphDistributed &distributed_; +}; + + +class Master : public MemgraphReactor { + public: + Master(System* system, std::string name, MemgraphDistributed &distributed, + int64_t mnid, std::vector&& worker_mnids) + : MemgraphReactor(system, name, distributed), mnid_(mnid), + worker_mnids_(std::move(worker_mnids)) {} + + virtual void Run() { + std::cout << "Master (" << mnid_ << ") @ " << distributed_.network().Address() + << ":" << distributed_.network().Port() << std::endl; + + auto stream = main_.first; + stream->OnEvent([this](const SenderMessage &msg, + const EventStream::Subscription& subscription) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; + ++workers_seen; + if (workers_seen == worker_mnids_.size()) { + subscription.unsubscribe(); + // Sleep for a while so we can read output in the terminal. + // (start_distributed.py runs each process in a new tab which is + // closed immediately after process has finished) + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseConnector("main"); + } + }); + + for (auto wmnid : worker_mnids_) { + auto stream = distributed_.FindChannel(wmnid, "worker", "main"); + stream->OnEventOnce() + .ChainOnce([this, stream](const ChannelResolvedMessage &msg){ + msg.channel()->Send("master", "main"); + stream->Close(); + }); + } + } + + protected: + int64_t workers_seen = 0; + const int64_t mnid_; + std::vector worker_mnids_; +}; + +class Worker : public MemgraphReactor { + public: + Worker(System* system, std::string name, MemgraphDistributed &distributed, + int64_t mnid, int64_t master_mnid) + : MemgraphReactor(system, name, distributed), mnid_(mnid), + master_mnid_(master_mnid) {} + + virtual void Run() { + std::cout << "Worker (" << mnid_ << ") @ " << distributed_.network().Address() + << ":" << distributed_.network().Port() << std::endl; + + auto stream = main_.first; + stream->OnEventOnce() + .ChainOnce([this](const SenderMessage &msg) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; + // Sleep for a while so we can read output in the terminal. + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseConnector("main"); + }); + + auto remote_stream = distributed_.FindChannel(master_mnid_, "master", "main"); + remote_stream->OnEventOnce() + .ChainOnce([this, remote_stream](const ChannelResolvedMessage &msg){ + msg.channel()->Send("worker", "main"); + remote_stream->Close(); + }); + } + + protected: + const int64_t mnid_; + const int64_t master_mnid_; +}; + + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + System system; + MemgraphDistributed distributed(system); + auto mnids = ParseConfigAndRegister(FLAGS_config_filename, distributed); + distributed.StartServices(); + if (FLAGS_my_mnid == mnids.first) + system.Spawn("master", distributed, FLAGS_my_mnid, std::move(mnids.second)); + else + system.Spawn("worker", distributed, FLAGS_my_mnid, mnids.first); + system.AwaitShutdown(); + distributed.StopServices(); + + return 0; +} \ No newline at end of file diff --git a/experimental/distributed/tests/network_chat.cpp b/experimental/distributed/tests/network_chat.cpp index bbcd830b4..b1760c6d7 100644 --- a/experimental/distributed/tests/network_chat.cpp +++ b/experimental/distributed/tests/network_chat.cpp @@ -36,9 +36,10 @@ class ChatACK : public ChatMessage { }; CEREAL_REGISTER_TYPE(ChatACK); -class ChatServer : public Reactor { +class ChatServer : public DistributedReactor { public: - ChatServer(System *system, std::string name) : Reactor(system, name) {} + ChatServer(System *system, std::string name, Distributed &distributed) + : DistributedReactor(system, name, distributed) {} virtual void Run() { std::cout << "ChatServer is active" << std::endl; @@ -63,9 +64,10 @@ class ChatServer : public Reactor { } }; -class ChatClient : public Reactor { +class ChatClient : public DistributedReactor { public: - ChatClient(System *system, std::string name) : Reactor(system, name) {} + ChatClient(System *system, std::string name, Distributed &distributed) + : DistributedReactor(system, name, distributed) {} virtual void Run() { std::cout << "ChatClient is active" << std::endl; @@ -77,7 +79,7 @@ class ChatClient : public Reactor { std::cin >> address >> port >> message; auto channel = - system_->network().Resolve(address, port, "server", "chat"); + distributed_.network().Resolve(address, port, "server", "chat"); if (channel != nullptr) { channel->Send("server", "chat", message); } else { @@ -90,9 +92,11 @@ class ChatClient : public Reactor { int main(int argc, char *argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); System system; - system.StartServices(); - system.Spawn("server"); - system.Spawn("client"); + Distributed distributed(system); + distributed.StartServices(); + system.Spawn("server", distributed); + system.Spawn("client", distributed); system.AwaitShutdown(); + distributed.StopServices(); return 0; } diff --git a/experimental/distributed/tests/network_client.cpp b/experimental/distributed/tests/network_client.cpp index 47f24d43a..88cee7c0a 100644 --- a/experimental/distributed/tests/network_client.cpp +++ b/experimental/distributed/tests/network_client.cpp @@ -3,12 +3,13 @@ int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); System system; - system.network().StartClient(1); - auto channel = system.network().Resolve("127.0.0.1", 10000, "master", "main"); + Distributed distributed(system); + distributed.network().StartClient(1); + auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main"); std::cout << channel << std::endl; if (channel != nullptr) { channel->Send("master", "main"); } - system.network().StopClient(); + distributed.network().StopClient(); return 0; } diff --git a/experimental/distributed/tests/start_distributed.py b/experimental/distributed/tests/start_distributed.py index 919416d73..5f47325ec 100755 --- a/experimental/distributed/tests/start_distributed.py +++ b/experimental/distributed/tests/start_distributed.py @@ -4,7 +4,7 @@ import os command = 'gnome-terminal' -program = './distributed' +program = './distributed_test' config_filename = 'config' flags = ' --minloglevel 2'