diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index b9ce836b4..fbba7b3ce 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -754,16 +754,16 @@ class System { return it_reactor->second.first->FindChannel(channel_name); } - /** Register process id to the given location. */ - void RegisterProcess(int64_t id, const std::string& address, uint16_t port) { - processes_[id] = Location(address, port); + /** Register memgraph node's id to the given location. */ + void RegisterMemgraphNode(int64_t id, const std::string& address, uint16_t port) { + mnodes_[id] = Location(address, port); } - /** Finds channel using process's id. */ - const std::shared_ptr FindChannel(int64_t process_id, + /** Finds channel using memgrpah node's id. */ + const std::shared_ptr FindChannel(int64_t mnid, const std::string &reactor_name, const std::string &channel_name) { - const auto& location = processes_.at(process_id); + const auto& location = mnodes_.at(mnid); if (network().Address() == location.first && network().Port() == location.second) return FindLocalChannel(reactor_name, channel_name); @@ -771,7 +771,7 @@ class System { channel_name); } - const auto& Processes() { return processes_; } + const auto& Processes() { return mnodes_; } void AwaitShutdown() { for (auto &key_value : reactors_) { @@ -797,6 +797,6 @@ class System { std::unordered_map, std::thread>> reactors_; - std::unordered_map processes_; + std::unordered_map mnodes_; Network network_; }; diff --git a/experimental/distributed/tests/config b/experimental/distributed/tests/config new file mode 100644 index 000000000..cfc9046b9 --- /dev/null +++ b/experimental/distributed/tests/config @@ -0,0 +1,3 @@ +0 127.0.0.1 10000 +2 127.0.0.1 10001 +3 127.0.0.1 10002 diff --git a/experimental/distributed/tests/distributed.cpp b/experimental/distributed/tests/distributed.cpp new file mode 100644 index 000000000..7e11d51d8 --- /dev/null +++ b/experimental/distributed/tests/distributed.cpp @@ -0,0 +1,123 @@ +#include +#include + +#include "communication.hpp" + +DEFINE_int64(my_mnid, 0, "Memgraph node id"); +DEFINE_string(config_filename, "", "File containing list of all processes"); + +/** + * About config file + * + * Each line contains three strings: + * memgraph node id, ip address of the worker, and port of the worker + * Data on the first line is used to start master. + * Data on the remaining lines is used to start workers. + */ + +/** + * Parse config file and register processes into system. + * + * @return Pair (master mnid, list of worker's id). + */ +std::pair> +ParseConfigAndRegister(const std::string& filename, System& system) { + std::ifstream file(filename, std::ifstream::in); + assert(file.good()); + int64_t master_mnid; + std::vector worker_mnids; + int64_t mnid; + std::string address; + uint16_t port; + file >> master_mnid >> address >> port; + system.RegisterMemgraphNode(master_mnid, address, port); + while (!(file >> mnid).eof()) { + file >> address >> port; + system.RegisterMemgraphNode(mnid, address, port); + worker_mnids.push_back(mnid); + } + file.close(); + return std::make_pair(master_mnid, worker_mnids); +} + + +class Master : public Reactor { + public: + Master(System* system, std::string name, int64_t mnid, + std::vector&& worker_mnids) + : Reactor(system, name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {} + + virtual void Run() { + std::cout << "Master (" << mnid_ << ") @ " << system_->network().Address() + << ":" << system_->network().Port() << std::endl; + + auto stream = main_.first; + stream->OnEvent([this](const SenderMessage &msg, + const EventStream::Subscription& subscription) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; + ++workers_seen; + if (workers_seen == worker_mnids_.size()) { + subscription.unsubscribe(); + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseConnector("main"); + } + }); + + for (auto wmnid : worker_mnids_) { + std::shared_ptr channel; + while (!(channel = system_->FindChannel(wmnid, "worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + channel->Send("master", "main"); + } + } + + protected: + int64_t workers_seen = 0; + const int64_t mnid_; + std::vector worker_mnids_; +}; + +class Worker : public Reactor { + public: + Worker(System* system, std::string name, int64_t mnid, int64_t master_mnid) + : Reactor(system, name), mnid_(mnid), master_mnid_(master_mnid) {} + + virtual void Run() { + std::cout << "Worker (" << mnid_ << ") @ " << system_->network().Address() + << ":" << system_->network().Port() << std::endl; + + auto stream = main_.first; + stream->OnEvent([this](const SenderMessage &msg, + const EventStream::Subscription &subscription) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; + subscription.unsubscribe(); + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseConnector("main"); + }); + + std::shared_ptr channel; + while(!(channel = system_->FindChannel(master_mnid_, "master", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + channel->Send("worker", "main"); + } + + protected: + const int64_t mnid_; + const int64_t master_mnid_; +}; + + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + System system; + auto mnids = ParseConfigAndRegister(FLAGS_config_filename, system); + system.StartServices(); + if (FLAGS_my_mnid == mnids.first) + system.Spawn("master", FLAGS_my_mnid, std::move(mnids.second)); + else + system.Spawn("worker", FLAGS_my_mnid, mnids.first); + system.AwaitShutdown(); + + return 0; +} \ No newline at end of file diff --git a/experimental/distributed/tests/start_distributed.py b/experimental/distributed/tests/start_distributed.py new file mode 100755 index 000000000..919416d73 --- /dev/null +++ b/experimental/distributed/tests/start_distributed.py @@ -0,0 +1,21 @@ +# Unfortunately I don't know how to force CMake to copy this script to +# the test folder so for now you will have to do it yourself. + +import os + +command = 'gnome-terminal' +program = './distributed' +config_filename = 'config' +flags = ' --minloglevel 2' + +f = open(config_filename, 'r') +for line in f: + data = line.strip().split(' ') + my_mnid = data[0] + address = data[1] + port = data[2] + call = program + flags + ' --my_mnid ' + my_mnid + ' --address ' + address +\ + ' --port ' + port + ' --config_filename ' + config_filename + command += " --tab -e '" + call + "'" + +os.system(command) \ No newline at end of file