diff --git a/experimental/distributed/README.md b/experimental/distributed/README.md index 62ebf1dda..9fa0b113c 100644 --- a/experimental/distributed/README.md +++ b/experimental/distributed/README.md @@ -20,10 +20,11 @@ This subdirectory structure implements distributed infrastructure of Memgraph. * System, Distributed are singletons. They should be always alive. * ChannelWriter (write-end) should be lightweight and can be copied arbitrarily. * EventStream (read-end) should never be written by anyone except the owner (the reactor that created it). +* In general: always think about who owns an object. Preferably write it in its comment block. ## Code Conventions -* Locked: A method having a Locked... prefix indicates that you +* Locked: A method having a "Locked..." prefix indicates that you have to lock the appropriate mutex before calling this function. * ALWAYS close channels. You will memory leak if you don't. Reactor::CloseChannel or Subscription::Close will do the trick. diff --git a/experimental/distributed/main-client.cpp b/experimental/distributed/main-client.cpp index fdc9974c4..b9030de5b 100644 --- a/experimental/distributed/main-client.cpp +++ b/experimental/distributed/main-client.cpp @@ -1,13 +1,12 @@ -#include "memgraph_distributed.hpp" -#include "memgraph_config.hpp" - #include "reactors_distributed.hpp" -#include -#include +#include "memgraph_config.hpp" +#include "memgraph_distributed.hpp" +#include "memgraph_transactions.hpp" -#include -#include +#include +#include +#include /** * This is the client that issues some hard-coded queries. @@ -17,14 +16,54 @@ class Client : public Reactor { Client(std::string name) : Reactor(name) { } - virtual void Run() { + void IssueQueries(std::shared_ptr channel_to_leader) { + const int NUM_VERTS = 10; + // (concurrently) create a couple of vertices + for (int num_vert = 0; num_vert < NUM_VERTS; ++num_vert) { + // register callback + std::string channel_name = "create-node-" + std::to_string(num_vert); + // TODO(zuza): this is actually pretty bad because if SuccessQueryCreateVertex arrives, then + // FailureQueryCreateVertex never gets unsubscribed. This could cause memory leaks + // in the future (not currently since all callbacks get destroyed when channel is closed). + // The best thing to do is to implement a ThenOnce and Either. Perhaps even a ThenClose. + auto stream = Open(channel_name).first; + stream + ->OnEventOnce() + .ChainOnce([this, num_vert](const SuccessQueryCreateVertex&, const Subscription& sub) { + LOG(INFO) << "successfully created vertex " << num_vert+1 << std::endl; + sub.CloseChannel(); + }); + + stream + ->OnEventOnce() + .ChainOnce([this, num_vert](const FailureQueryCreateVertex&, const Subscription& sub) { + LOG(INFO) << "failed on creating vertex " << num_vert+1 << std::endl; + sub.CloseChannel(); + }); + + // then issue the query (to avoid race conditions) + LOG(INFO) << "Issuing command to create vertex " << num_vert+1; + channel_to_leader->Send(channel_name); + } + } + + virtual void Run() { + MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); + int mnid = memgraph.LeaderMnid(); + + memgraph.FindChannel(mnid, "master", "client-queries") + ->OnEventOnce() + .ChainOnce([this](const ChannelResolvedMessage &msg, const Subscription& sub) { + sub.CloseChannel(); + IssueQueries(msg.channelWriter()); + }); } }; int main(int argc, char *argv[]) { - google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); System &system = System::GetInstance(); Distributed &distributed = Distributed::GetInstance(); diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index 559ea59e9..5fed57528 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -1,11 +1,13 @@ -#include "memgraph_distributed.hpp" #include "memgraph_config.hpp" +#include "memgraph_distributed.hpp" +#include "memgraph_transactions.hpp" #include "reactors_distributed.hpp" #include #include +#include #include DEFINE_uint64(my_mnid, -1, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future @@ -14,7 +16,7 @@ DEFINE_uint64(my_mnid, -1, "Memgraph node id"); // TODO(zuza): this should be as * Sends a text message and has a return address. */ class TextMessage : public ReturnAddressMsg { -public: + public: TextMessage(std::string reactor, std::string channel, std::string s) : ReturnAddressMsg(reactor, channel), text(s) {} @@ -25,7 +27,7 @@ public: std::string text; -protected: + protected: friend class cereal::access; TextMessage() {} // Cereal needs access to a default constructor. }; @@ -35,45 +37,35 @@ class Master : public Reactor { public: Master(std::string name, MnidT mnid) : Reactor(name), mnid_(mnid) { - worker_mnids_ = MemgraphDistributed::GetInstance().GetAllMnids(); - worker_mnids_.erase(worker_mnids_.begin()); // remove the master from the beginning + MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); + worker_mnids_ = memgraph.GetAllMnids(); + // remove the leader (itself), because its not a worker + auto leader_it = std::find(worker_mnids_.begin(), worker_mnids_.end(), memgraph.LeaderMnid()); + worker_mnids_.erase(leader_it); } virtual void Run() { - MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance(); Distributed &distributed = Distributed::GetInstance(); - std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address() + LOG(INFO) << "Master (" << mnid_ << ") @ " << distributed.network().Address() << ":" << distributed.network().Port() << std::endl; - auto stream = main_.first; + // TODO(zuza): check if all workers are up - // wait until every worker sends a ReturnAddressMsg back, then close - stream->OnEvent([this](const TextMessage &msg, - const Subscription &subscription) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; - ++workers_seen; - if (workers_seen == worker_mnids_.size()) { - subscription.Unsubscribe(); - // Sleep for a while so we can read output in the terminal. - // (start_distributed.py runs each process in a new tab which is - // closed immediately after process has finished) - std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseChannel("main"); - } - }); + auto stream = Open("client-queries").first; + stream->OnEvent([this](const QueryCreateVertex& msg, const Subscription&) { + std::random_device rd; // slow random number generator - // send a TextMessage to each worker - for (auto wmnid : worker_mnids_) { - std::cout << "wmnid_ = " << wmnid << std::endl; + // succeed and fail with 50-50 + if (rd() % 2 == 0) { + msg.GetReturnChannelWriter() + ->Send(); + } else { + msg.GetReturnChannelWriter() + ->Send(); + } + }); - auto stream = memgraph.FindChannel(wmnid, "worker", "main"); - stream->OnEventOnce() - .ChainOnce([this, stream](const ChannelResolvedMessage &msg, const Subscription&){ - msg.channelWriter()->Send("master", "main", "hi from master"); - stream->Close(); - }); - } } protected: @@ -90,22 +82,8 @@ class Worker : public Reactor { virtual void Run() { Distributed &distributed = Distributed::GetInstance(); - std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address() + LOG(INFO) << "Worker (" << mnid_ << ") @ " << distributed.network().Address() << ":" << distributed.network().Port() << std::endl; - - auto stream = main_.first; - // wait until master sends us a TextMessage, then reply back and close - stream->OnEventOnce() - .ChainOnce([this](const TextMessage &msg, const Subscription&) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; - - msg.GetReturnChannelWriter() - ->Send("worker", "main", "hi from worker"); - - // Sleep for a while so we can read output in the terminal. - std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseChannel("main"); - }); } protected: @@ -113,8 +91,9 @@ class Worker : public Reactor { }; int main(int argc, char *argv[]) { - google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); + gflags::ParseCommandLineFlags(&argc, &argv, /* remove flags from command line */ true); + std::string logging_name = std::string(argv[0]) + "-mnid-" + std::to_string(FLAGS_my_mnid); + google::InitGoogleLogging(logging_name.c_str()); System &system = System::GetInstance(); Distributed& distributed = Distributed::GetInstance(); diff --git a/experimental/distributed/src/memgraph_transactions.hpp b/experimental/distributed/src/memgraph_transactions.hpp new file mode 100644 index 000000000..ac41c8147 --- /dev/null +++ b/experimental/distributed/src/memgraph_transactions.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "reactors_local.hpp" +#include "reactors_distributed.hpp" + +class QueryCreateVertex : public ReturnAddressMsg { +public: + QueryCreateVertex(std::string return_channel) : ReturnAddressMsg(return_channel) {} + + template + void serialize(Archive &archive) { + archive(cereal::virtual_base_class(this)); + } + +protected: + friend class cereal::access; + QueryCreateVertex() {} // Cereal needs access to a default constructor. +}; +CEREAL_REGISTER_TYPE(QueryCreateVertex); + +class SuccessQueryCreateVertex : public Message { +public: + SuccessQueryCreateVertex() {} + + template + void serialize(Archive &archive) { + archive(cereal::virtual_base_class(this)); + } +}; +CEREAL_REGISTER_TYPE(SuccessQueryCreateVertex); + + +class FailureQueryCreateVertex : public Message { +public: + FailureQueryCreateVertex() {} + + template + void serialize(Archive &archive) { + archive(cereal::virtual_base_class(this)); + } +}; +CEREAL_REGISTER_TYPE(FailureQueryCreateVertex); diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp index 7d122d657..5e4ed839d 100644 --- a/experimental/distributed/src/reactors_distributed.hpp +++ b/experimental/distributed/src/reactors_distributed.hpp @@ -81,6 +81,7 @@ class Network { return std::make_shared(this, address, port, reactor_name, channel_name); } + LOG(WARNING) << "Could not resolve " << address << ":" << port << " " << reactor_name << "/" << channel_name; return nullptr; } diff --git a/experimental/distributed/start_main.py b/experimental/distributed/start_main.py index ed6715134..6e71ed608 100644 --- a/experimental/distributed/start_main.py +++ b/experimental/distributed/start_main.py @@ -3,19 +3,26 @@ import os -command = 'gnome-terminal' -config_filename = 'config' -glog_flags = '-alsologtostderr --minloglevel=2' +terminal_command = 'gnome-terminal' +terminal_flags = ' --geometry=200x50 ' # columns x rows +config_filename = 'config' +log_dir = "logs" +glog_flags = '--alsologtostderr --logbufsecs=0 --minloglevel=0 --log_dir="{}" '.format(log_dir) def GetMainCall(my_mnid, address, port): - return "./main {} --my_mnid {} --address {} --port {} --config_filename={}".format( + ret = "./main {} --my_mnid {} --address {} --port {} --config_filename={}".format( glog_flags, my_mnid, address, port, config_filename) + print(ret) + return ret + def GetClientCall(): - return "./main-client {} --address 127.0.0.1 --port 10000 --config_filename={}".format( + ret = "./main-client {} --address 127.0.0.1 --port 10000 --config_filename={}".format( glog_flags, config_filename) + print(ret) + return ret def NamedGnomeTab(name, command): @@ -23,6 +30,8 @@ def NamedGnomeTab(name, command): if __name__ == "__main__": + command = "{} {}".format(terminal_command, terminal_flags) + f = open(config_filename, 'r') for line in f: data = line.strip().split(' ') @@ -33,4 +42,5 @@ if __name__ == "__main__": command += NamedGnomeTab("client", GetClientCall()) print(command) + os.system('mkdir -p {}'.format(log_dir)) os.system(command) diff --git a/src/communication/server.hpp b/src/communication/server.hpp index 5dada284d..df9bcc779 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -52,7 +52,7 @@ class Server } void Start(size_t n) { - std::cout << fmt::format("Starting {} workers", n) << std::endl; + LOG(INFO) << fmt::format("Starting {} workers", n) << std::endl; workers_.reserve(n); for (size_t i = 0; i < n; ++i) { workers_.push_back( @@ -60,8 +60,8 @@ class Server session_data_)); workers_.back()->Start(alive_); } - std::cout << "Server is fully armed and operational" << std::endl; - std::cout << fmt::format("Listening on {} at {}", + LOG(INFO) << "Server is fully armed and operational" << std::endl; + LOG(INFO) << fmt::format("Listening on {} at {}", socket_.endpoint().address(), socket_.endpoint().port()) << std::endl; @@ -69,7 +69,7 @@ class Server this->WaitAndProcessEvents(); } - std::cout << "Shutting down..." << std::endl; + LOG(INFO) << "Shutting down..." << std::endl; for (auto &worker : workers_) worker->thread_.join(); }