diff --git a/experimental/distributed/CMakeLists.txt b/experimental/distributed/CMakeLists.txt index ca154e6a8..15ec7a016 100644 --- a/experimental/distributed/CMakeLists.txt +++ b/experimental/distributed/CMakeLists.txt @@ -27,21 +27,34 @@ include_directories(SYSTEM ${CMAKE_BINARY_DIR}/libs/gflags/include) file(GLOB_RECURSE src_files ${src_dir}/*.cpp) add_library(distributed_lib STATIC ${src_files}) -## executable -#### HACK: there is temporarily no working main file as the API is changing -# set(executable_name distributed) -# add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main.cpp) -# target_link_libraries(${executable_name} distributed_lib) -# target_link_libraries(${executable_name} memgraph_lib) -# target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS}) +## distributed Memgraph executable +set(executable_name main) +add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main.cpp) +target_link_libraries(${executable_name} distributed_lib) +target_link_libraries(${executable_name} memgraph_lib) +target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS}) + +## dummy distributed Memgraph client +set(executable_name main-client) +add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main-client.cpp) +target_link_libraries(${executable_name} distributed_lib) +target_link_libraries(${executable_name} memgraph_lib) +target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS}) # tests add_subdirectory(${PROJECT_SOURCE_DIR}/tests) -# copy test scripts into the build/ directory +# copy test scripts into the build/ directory (for distributed tests) configure_file(${PROJECT_SOURCE_DIR}/tests/start_distributed.py ${PROJECT_BINARY_DIR}/tests/start_distributed.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/config ${PROJECT_BINARY_DIR}/tests/config COPYONLY) +# copy main scripts into build/ directory (for distributed Memgraph) +configure_file(${PROJECT_SOURCE_DIR}/start_main.py + ${PROJECT_BINARY_DIR}/start_main.py COPYONLY) + +configure_file(${PROJECT_SOURCE_DIR}/config + ${PROJECT_BINARY_DIR}/config COPYONLY) + diff --git a/experimental/distributed/config b/experimental/distributed/config new file mode 100644 index 000000000..c6ec12a0f --- /dev/null +++ b/experimental/distributed/config @@ -0,0 +1,3 @@ +0 127.0.0.1 10010 +1 127.0.0.1 10011 +2 127.0.0.1 10012 diff --git a/experimental/distributed/main-client.cpp b/experimental/distributed/main-client.cpp new file mode 100644 index 000000000..fdc9974c4 --- /dev/null +++ b/experimental/distributed/main-client.cpp @@ -0,0 +1,41 @@ +#include "memgraph_distributed.hpp" +#include "memgraph_config.hpp" + +#include "reactors_distributed.hpp" + +#include +#include + +#include +#include + +/** + * This is the client that issues some hard-coded queries. + */ +class Client : public Reactor { + public: + Client(std::string name) : Reactor(name) { + } + + virtual void Run() { + + } +}; + +int main(int argc, char *argv[]) { + google::InitGoogleLogging(argv[0]); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + System &system = System::GetInstance(); + Distributed &distributed = Distributed::GetInstance(); + MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); + memgraph.RegisterConfig(ParseConfig()); + distributed.StartServices(); + + system.Spawn("client"); + + system.AwaitShutdown(); + distributed.StopServices(); + + return 0; +} diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp new file mode 100644 index 000000000..559ea59e9 --- /dev/null +++ b/experimental/distributed/main.cpp @@ -0,0 +1,134 @@ +#include "memgraph_distributed.hpp" +#include "memgraph_config.hpp" + +#include "reactors_distributed.hpp" + +#include +#include + +#include + +DEFINE_uint64(my_mnid, -1, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future + +/** + * Sends a text message and has a return address. + */ +class TextMessage : public ReturnAddressMsg { +public: + TextMessage(std::string reactor, std::string channel, std::string s) + : ReturnAddressMsg(reactor, channel), text(s) {} + + template + void serialize(Archive &archive) { + archive(cereal::virtual_base_class(this), text); + } + + std::string text; + +protected: + friend class cereal::access; + TextMessage() {} // Cereal needs access to a default constructor. +}; +CEREAL_REGISTER_TYPE(TextMessage); + +class Master : public Reactor { + public: + Master(std::string name, MnidT mnid) + : Reactor(name), mnid_(mnid) { + worker_mnids_ = MemgraphDistributed::GetInstance().GetAllMnids(); + worker_mnids_.erase(worker_mnids_.begin()); // remove the master from the beginning + } + + virtual void Run() { + MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance(); + Distributed &distributed = Distributed::GetInstance(); + + std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address() + << ":" << distributed.network().Port() << std::endl; + + auto stream = main_.first; + + // wait until every worker sends a ReturnAddressMsg back, then close + stream->OnEvent([this](const TextMessage &msg, + const Subscription &subscription) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; + ++workers_seen; + if (workers_seen == worker_mnids_.size()) { + subscription.Unsubscribe(); + // Sleep for a while so we can read output in the terminal. + // (start_distributed.py runs each process in a new tab which is + // closed immediately after process has finished) + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseChannel("main"); + } + }); + + // send a TextMessage to each worker + for (auto wmnid : worker_mnids_) { + std::cout << "wmnid_ = " << wmnid << std::endl; + + auto stream = memgraph.FindChannel(wmnid, "worker", "main"); + stream->OnEventOnce() + .ChainOnce([this, stream](const ChannelResolvedMessage &msg, const Subscription&){ + msg.channelWriter()->Send("master", "main", "hi from master"); + stream->Close(); + }); + } + } + + protected: + MnidT workers_seen = 0; + const MnidT mnid_; + std::vector worker_mnids_; +}; + +class Worker : public Reactor { + public: + Worker(std::string name, MnidT mnid) + : Reactor(name), mnid_(mnid) {} + + virtual void Run() { + Distributed &distributed = Distributed::GetInstance(); + + std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address() + << ":" << distributed.network().Port() << std::endl; + + auto stream = main_.first; + // wait until master sends us a TextMessage, then reply back and close + stream->OnEventOnce() + .ChainOnce([this](const TextMessage &msg, const Subscription&) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; + + msg.GetReturnChannelWriter() + ->Send("worker", "main", "hi from worker"); + + // Sleep for a while so we can read output in the terminal. + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseChannel("main"); + }); + } + + protected: + const MnidT mnid_; +}; + +int main(int argc, char *argv[]) { + google::InitGoogleLogging(argv[0]); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + System &system = System::GetInstance(); + Distributed& distributed = Distributed::GetInstance(); + MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); + memgraph.RegisterConfig(ParseConfig()); + distributed.StartServices(); + + if (FLAGS_my_mnid == memgraph.LeaderMnid()) { + system.Spawn("master", FLAGS_my_mnid); + } else { + system.Spawn("worker", FLAGS_my_mnid); + } + system.AwaitShutdown(); + distributed.StopServices(); + + return 0; +} diff --git a/experimental/distributed/src/memgraph_config.cpp b/experimental/distributed/src/memgraph_config.cpp new file mode 100644 index 000000000..cbc30ae1c --- /dev/null +++ b/experimental/distributed/src/memgraph_config.cpp @@ -0,0 +1,25 @@ +#include "memgraph_config.hpp" + +DEFINE_string(config_filename, "", "File containing list of all processes"); + +Config ParseConfig(const std::string &filename) { + std::ifstream file(filename, std::ifstream::in); + assert(file.good()); + + Config config; + + while (file.good()) { + MnidT mnid; + std::string address; + uint16_t port; + + file >> mnid >> address >> port; + if (file.eof()) + break; + + config.nodes.push_back(Config::NodeConfig{mnid, address, port}); + } + + file.close(); + return config; +} diff --git a/experimental/distributed/src/memgraph_config.hpp b/experimental/distributed/src/memgraph_config.hpp new file mode 100644 index 000000000..b3048f845 --- /dev/null +++ b/experimental/distributed/src/memgraph_config.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +/** + * About config file + * + * Each line contains three strings: + * memgraph node id, ip address of the worker, and port of the worker + * Data on the first line is used to start master. + * Data on the remaining lines is used to start workers. + */ +DECLARE_string(config_filename); + +using MnidT = uint64_t; + +struct Config { + struct NodeConfig { + MnidT mnid; + std::string address; + uint16_t port; + }; + + std::vector nodes; +}; + +/** + * Parse config file. + * + * @return config object. + */ +Config ParseConfig(const std::string &filename = FLAGS_config_filename); diff --git a/experimental/distributed/src/memgraph_distributed.hpp b/experimental/distributed/src/memgraph_distributed.hpp new file mode 100644 index 000000000..aee089d4d --- /dev/null +++ b/experimental/distributed/src/memgraph_distributed.hpp @@ -0,0 +1,77 @@ +#pragma once + +#include "memgraph_config.hpp" + +#include "reactors_distributed.hpp" + +#include +#include +#include +#include +#include + +class MemgraphDistributed { + private: + using Location = std::pair; + + public: + /** + * Get the (singleton) instance of MemgraphDistributed. + * + * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern + */ + static MemgraphDistributed &GetInstance() { + static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use + return memgraph; + } + + EventStream* FindChannel(MnidT mnid, + const std::string &reactor, + const std::string &channel) { + std::unique_lock lock(mutex_); + const auto &location = mnodes_.at(mnid); + return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel); + } + + void RegisterConfig(const Config &config) { + config_ = config; + for (auto &node : config_.nodes) { + RegisterMemgraphNode(node.mnid, node.address, node.port); + } + } + + std::vector GetAllMnids() { + std::vector mnids; + for (auto &node : config_.nodes) { + mnids.push_back(node.mnid); + } + return mnids; + } + + /** + * The leader is currently the first node in the config. + */ + MnidT LeaderMnid() { + return config_.nodes.front().mnid; + } + + protected: + MemgraphDistributed() {} + + /** Register memgraph node id to the given location. */ + void RegisterMemgraphNode(MnidT mnid, const std::string &address, uint16_t port) { + std::unique_lock lock(mutex_); + mnodes_[mnid] = Location(address, port); + } + + private: + Config config_; + + std::recursive_mutex mutex_; + std::unordered_map mnodes_; + + MemgraphDistributed(const MemgraphDistributed &) = delete; + MemgraphDistributed(MemgraphDistributed &&) = delete; + MemgraphDistributed &operator=(const MemgraphDistributed &) = delete; + MemgraphDistributed &operator=(MemgraphDistributed &&) = delete; +}; diff --git a/experimental/distributed/start_main.py b/experimental/distributed/start_main.py new file mode 100644 index 000000000..ed6715134 --- /dev/null +++ b/experimental/distributed/start_main.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +# Automatically copied to the build/ directory during Makefile (configured by cmake) + +import os + +command = 'gnome-terminal' +config_filename = 'config' +glog_flags = '-alsologtostderr --minloglevel=2' + + +def GetMainCall(my_mnid, address, port): + return "./main {} --my_mnid {} --address {} --port {} --config_filename={}".format( + glog_flags, my_mnid, address, port, config_filename) + + +def GetClientCall(): + return "./main-client {} --address 127.0.0.1 --port 10000 --config_filename={}".format( + glog_flags, config_filename) + + +def NamedGnomeTab(name, command): + return " --tab -e \"bash -c 'printf \\\"\\033]0;{}\\007\\\"; {}'\" ".format(name, command) + + +if __name__ == "__main__": + f = open(config_filename, 'r') + for line in f: + data = line.strip().split(' ') + my_mnid = data[0] + address = data[1] + port = data[2] + command += NamedGnomeTab("mnid={}".format(my_mnid), GetMainCall(my_mnid, address, port)) + + command += NamedGnomeTab("client", GetClientCall()) + print(command) + os.system(command) diff --git a/experimental/distributed/tests/start_distributed.py b/experimental/distributed/tests/start_distributed.py index 3fb483fdb..648db169c 100755 --- a/experimental/distributed/tests/start_distributed.py +++ b/experimental/distributed/tests/start_distributed.py @@ -1,5 +1,4 @@ -# Unfortunately I don't know how to force CMake to copy this script to -# the test folder so for now you will have to do it yourself. +# Automatically copied to the build/ directory during Makefile (configured by cmake) import os