diff --git a/experimental/distributed/CMakeLists.txt b/experimental/distributed/CMakeLists.txt index f4ad9fd03..ca154e6a8 100644 --- a/experimental/distributed/CMakeLists.txt +++ b/experimental/distributed/CMakeLists.txt @@ -28,11 +28,20 @@ file(GLOB_RECURSE src_files ${src_dir}/*.cpp) add_library(distributed_lib STATIC ${src_files}) ## executable -set(executable_name distributed) -add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main.cpp) -target_link_libraries(${executable_name} distributed_lib) -target_link_libraries(${executable_name} memgraph_lib) -target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS}) +#### HACK: there is temporarily no working main file as the API is changing +# set(executable_name distributed) +# add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main.cpp) +# target_link_libraries(${executable_name} distributed_lib) +# target_link_libraries(${executable_name} memgraph_lib) +# target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS}) # tests add_subdirectory(${PROJECT_SOURCE_DIR}/tests) + +# copy test scripts into the build/ directory +configure_file(${PROJECT_SOURCE_DIR}/tests/start_distributed.py + ${PROJECT_BINARY_DIR}/tests/start_distributed.py COPYONLY) + +configure_file(${PROJECT_SOURCE_DIR}/tests/config + ${PROJECT_BINARY_DIR}/tests/config COPYONLY) + diff --git a/experimental/distributed/README.md b/experimental/distributed/README.md index 31d38956b..92e096794 100644 --- a/experimental/distributed/README.md +++ b/experimental/distributed/README.md @@ -9,7 +9,7 @@ This subdirectory structure implements distributed infrastructure of Memgraph. * Node: a computer that performs (distributed) work. * Vertex: an abstract graph concept. * Reactor: a unit of concurrent execution, lives on its own thread. -* Connector: a communication abstraction between Reactors. The reactors can be on the same machine or on different processes. +* Connector: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes. * EventStream: read-end of a connector, is owned by exactly one Reactor/thread. * Channel: write-end of a connector, can be owned (wrote into) by multiple threads. diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp deleted file mode 100644 index 62582e319..000000000 --- a/experimental/distributed/main.cpp +++ /dev/null @@ -1,387 +0,0 @@ -#include <atomic> -#include <chrono> -#include <cstdlib> -#include <iostream> -#include <string> -#include <thread> -#include <vector> - -#include "reactors_distributed.hpp" - -const int NUM_WORKERS = 1; - -class Txn : public SenderMessage { - public: - Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {} - int64_t id() const { return id_; } - - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<SenderMessage>(this), id_); - } - - private: - int64_t id_; -}; - -class CreateNodeTxn : public Txn { - public: - CreateNodeTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {} - - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<Txn>(this)); - } -}; - -class CountNodesTxn : public Txn { - public: - CountNodesTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {} - - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<Txn>(this)); - } -}; - -class CountNodesTxnResult : public Message { - public: - CountNodesTxnResult(int64_t count) : count_(count) {} - int64_t count() const { return count_; } - - template <class Archive> - void serialize(Archive &archive) { - archive(count_); - } - - private: - int64_t count_; -}; - -class CommitRequest : public SenderMessage { - public: - CommitRequest(std::string reactor, std::string channel, int64_t worker_id) - : SenderMessage(reactor, channel), worker_id_(worker_id) {} - int64_t worker_id() { return worker_id_; } - - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<SenderMessage>(this), worker_id_); - } - - private: - int64_t worker_id_; -}; - -class AbortRequest : public SenderMessage { - public: - AbortRequest(std::string reactor, std::string channel, int64_t worker_id) - : SenderMessage(reactor, channel), worker_id_(worker_id) {} - int64_t worker_id() { return worker_id_; } - - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<SenderMessage>(this), worker_id_); - } - - private: - int64_t worker_id_; -}; - -class CommitDirective : public Message { - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<Message>(this)); - } -}; - -class AbortDirective : public Message { - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<Message>(this)); - } -}; - -class Query : public Message { - public: - Query(std::string query) : Message(), query_(query) {} - std::string query() const { return query_; } - - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<Message>(this), query_); - } - - private: - std::string query_; -}; - -class Quit : public Message { - template <class Archive> - void serialize(Archive &archive) { - archive(cereal::base_class<Message>(this)); - } -}; - -class Master : public Reactor { - public: - Master(System *system, std::string name) : Reactor(system, name), next_xid_(1) {} - - virtual void Run() { - auto stream = main_.first; - FindWorkers(); - - std::cout << "Master is active" << std::endl; - while (true) { - auto m = stream->AwaitEvent(); - if (Query *query = dynamic_cast<Query *>(m.get())) { - ProcessQuery(query); - break; // process only the first query - } else if (SenderMessage *msg = dynamic_cast<SenderMessage *>(m.get())) { - std::cout << "SenderMessage received!" << std::endl; - std::cout << " Address: " << msg->Address() << std::endl; - std::cout << " Port: " << msg->Port() << std::endl; - std::cout << " Reactor: " << msg->ReactorName() << std::endl; - std::cout << " Channel: " << msg->ChannelName() << std::endl; - } else { - std::cerr << "unknown message\n"; - exit(1); - } - } - - stream->OnEvent<Message>([this](const Message &msg, const EventStream::Subscription& subscription) { - std::cout << "Processing Query via Callback" << std::endl; - const Query &query = - dynamic_cast<const Query &>(msg); // exception bad_cast - ProcessQuery(&query); - subscription.unsubscribe(); - }); - } - - private: - void ProcessQuery(const Query *query) { - if (query->query() == "create node") { - PerformCreateNode(); - } else if (query->query() == "count nodes") { - PerformCountNodes(); - } else { - std::cout << "got query: " << query->query() << std::endl; - } - } - - void PerformCreateNode() { - int worker_id = rand() % NUM_WORKERS; - int64_t xid = GetTransactionId(); - std::string txn_channel_name = GetTxnName(xid); - auto connector = Open(txn_channel_name); - auto stream = connector.first; - - channels_[worker_id]->Send<CreateNodeTxn>("master", "main", xid); - auto m = stream->AwaitEvent(); - if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) { - req->GetChannelToSender(system_)->Send<CommitDirective>(); - } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) { - req->GetChannelToSender(system_)->Send<AbortDirective>(); - } else { - std::cerr << "unknown message\n"; - exit(1); - } - CloseConnector(txn_channel_name); - } - - void PerformCountNodes() { - int64_t xid = GetTransactionId(); - std::string txn_channel_name = GetTxnName(xid); - auto connector = Open(txn_channel_name); - auto stream = connector.first; - for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) - channels_[w_id]->Send<CountNodesTxn>("master", "main", xid); - - std::vector<std::shared_ptr<Channel>> txn_channels; - txn_channels.resize(NUM_WORKERS, nullptr); - bool commit = true; - for (int responds = 0; responds < NUM_WORKERS; ++responds) { - auto m = stream->AwaitEvent(); - if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) { - txn_channels[req->worker_id()] = req->GetChannelToSender(system_); - commit &= true; - } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) { - txn_channels[req->worker_id()] = req->GetChannelToSender(system_); - commit = false; - } else { - std::cerr << "unknown message\n"; - exit(1); - } - } - - if (commit) { - for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) - txn_channels[w_id]->Send<CommitDirective>(); - } else { - for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) - txn_channels[w_id]->Send<AbortDirective>(); - } - - int64_t count = 0; - for (int responds = 0; responds < NUM_WORKERS; ++responds) { - auto m = stream->AwaitEvent(); - if (CountNodesTxnResult *cnt = - dynamic_cast<CountNodesTxnResult *>(m.get())) { - count += cnt->count(); - } else { - std::cerr << "unknown message\n"; - exit(1); - } - } - - CloseConnector(txn_channel_name); - std::cout << "graph has " << count << " vertices" << std::endl; - } - - int64_t GetTransactionId() { return next_xid_++; } - - std::string GetWorkerName(int worker_id) { - return "worker" + std::to_string(worker_id); - } - - std::string GetTxnName(int txn_id) { return "txn" + std::to_string(txn_id); } - - void FindWorkers() { - channels_.resize(NUM_WORKERS, nullptr); - int workers_found = 0; - while (workers_found < NUM_WORKERS) { - for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) { - if (channels_[w_id] == nullptr) { - // TODO: Resolve worker channel using the network service. - channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main"); - if (channels_[w_id] != nullptr) ++workers_found; - } - } - if (workers_found < NUM_WORKERS) - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } - - // TODO: Why is master atomic, it should be unique? - std::atomic<int64_t> next_xid_; - std::vector<std::shared_ptr<Channel>> channels_; -}; - -class Worker : public Reactor { - public: - Worker(System *system, std::string name, int64_t id) : Reactor(system, name), - worker_id_(id) {} - - virtual void Run() { - std::cout << "worker " << worker_id_ << " is active" << std::endl; - auto stream = main_.first; - FindMaster(); - while (true) { - auto m = stream->AwaitEvent(); - if (Txn *txn = dynamic_cast<Txn *>(m.get())) { - HandleTransaction(txn); - } else { - std::cerr << "unknown message\n"; - exit(1); - } - } - } - - private: - void HandleTransaction(Txn *txn) { - if (CreateNodeTxn *create_txn = dynamic_cast<CreateNodeTxn *>(txn)) { - HandleCreateNode(create_txn); - } else if (CountNodesTxn *cnt_txn = dynamic_cast<CountNodesTxn *>(txn)) { - HandleCountNodes(cnt_txn); - } else { - std::cerr << "unknown transaction\n"; - exit(1); - } - } - - void HandleCreateNode(CreateNodeTxn *txn) { - auto connector = Open(GetTxnChannelName(txn->id())); - auto stream = connector.first; - auto masterChannel = txn->GetChannelToSender(system_); - // TODO: Do the actual commit. - masterChannel->Send<CommitRequest>("master", "main", worker_id_); - auto m = stream->AwaitEvent(); - if (dynamic_cast<CommitDirective *>(m.get())) { - // TODO: storage_.CreateNode(); - } else if (dynamic_cast<AbortDirective *>(m.get())) { - // TODO: Rollback. - } else { - std::cerr << "unknown message\n"; - exit(1); - } - CloseConnector(GetTxnChannelName(txn->id())); - } - - void HandleCountNodes(CountNodesTxn *txn) { - auto connector = Open(GetTxnChannelName(txn->id())); - auto stream = connector.first; - auto masterChannel = txn->GetChannelToSender(system_); - - // TODO: Fix this hack -- use the storage. - int num = 123; - - masterChannel->Send<CommitRequest>("master", "main", worker_id_); - auto m = stream->AwaitEvent(); - if (dynamic_cast<CommitDirective *>(m.get())) { - masterChannel->Send<CountNodesTxnResult>(num); - } else if (dynamic_cast<AbortDirective *>(m.get())) { - // send nothing - } else { - std::cerr << "unknown message\n"; - exit(1); - } - CloseConnector(GetTxnChannelName(txn->id())); - } - - // TODO: Don't repeat code from Master. - std::string GetTxnChannelName(int64_t transaction_id) { - return "txn" + std::to_string(transaction_id); - } - - void FindMaster() { - // TODO: Replace with network service and channel resolution. - while (!(master_channel_ = system_->FindChannel("master", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - std::shared_ptr<Channel> master_channel_ = nullptr; - int worker_id_; -}; - -void ClientMain(System *system) { - std::shared_ptr<Channel> channel = nullptr; - // TODO: Replace this with network channel resolution. - while (!(channel = system->FindChannel("master", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); - std::cout << "I/O Client Main active" << std::endl; - - bool active = true; - while (active) { - std::string s; - std::getline(std::cin, s); - if (s == "quit") { - active = false; - channel->Send<Quit>(); - } else { - channel->Send<Query>(s); - } - } -} - - -int main(int argc, char *argv[]) { - //google::InitGoogleLogging(argv[0]); - gflags::ParseCommandLineFlags(&argc, &argv, true); - System system; - system.Spawn<Master>("master"); - std::thread client(ClientMain, &system); - for (int i = 0; i < NUM_WORKERS; ++i) - system.Spawn<Worker>("worker" + std::to_string(i), i); - system.AwaitShutdown(); - return 0; -} diff --git a/experimental/distributed/src/protocol.cpp b/experimental/distributed/src/protocol.cpp index d59ca1fa9..fd2fb3d28 100644 --- a/experimental/distributed/src/protocol.cpp +++ b/experimental/distributed/src/protocol.cpp @@ -7,14 +7,14 @@ namespace Protocol { -Session::Session(Socket &&socket, Data &data) - : socket_(std::move(socket)), system_(data.system) { +Session::Session(Socket &&socket, Data &) + : socket_(std::move(socket)) { event_.data.ptr = this; } bool Session::Alive() const { return alive_; } -std::string Session::GetString(SizeT len) { +std::string Session::GetStringAndShift(SizeT len) { std::string ret(reinterpret_cast<char *>(buffer_.data()), len); buffer_.Shift(len); return ret; @@ -22,30 +22,34 @@ std::string Session::GetString(SizeT len) { void Session::Execute() { if (!handshake_done_) { - SizeT len_reactor = GetLength(); - SizeT len_channel = GetLength(2); + // Note: this function can be multiple times before the buffer has the full packet. + // We currently have to check for this case and return without shifting the buffer. + // In other words, only shift anything from the buffer if you can read the entire (sub)message. - if (len_reactor == 0 || len_channel == 0) return; - if (buffer_.size() < len_reactor + len_channel) return; + if (buffer_.size() < 2 * sizeof(SizeT)) return; + SizeT len_reactor = GetLength(); + SizeT len_channel = GetLength(sizeof(SizeT)); + + if (buffer_.size() < 2 * sizeof(SizeT) + len_reactor + len_channel) return; // remove the length bytes from the buffer buffer_.Shift(2 * sizeof(SizeT)); - reactor_ = GetString(len_reactor); - channel_ = GetString(len_channel); + reactor_ = GetStringAndShift(len_reactor); + channel_ = GetStringAndShift(len_channel); - std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_ - << std::endl; + DLOG(INFO) << "Reactor: " << reactor_ << "; Channel: " << channel_ + << std::endl; - auto channel = system_->FindChannel(reactor_, channel_); + auto channel = System::GetInstance().FindChannel(reactor_, channel_); SendSuccess(channel != nullptr); handshake_done_ = true; } + if (buffer_.size() < sizeof(SizeT)) return; SizeT len_data = GetLength(); - if (len_data == 0) return; - if (buffer_.size() < len_data) return; + if (buffer_.size() < sizeof(SizeT) + len_data) return; // remove the length bytes from the buffer buffer_.Shift(sizeof(SizeT)); @@ -58,14 +62,13 @@ void Session::Execute() { iarchive(message); buffer_.Shift(len_data); - auto channel = system_->FindChannel(reactor_, channel_); + auto channel = System::GetInstance().FindChannel(reactor_, channel_); if (channel == nullptr) { SendSuccess(false); return; } channel->Send(std::move(message)); - SendSuccess(true); } @@ -79,7 +82,6 @@ void Session::Close() { } SizeT Session::GetLength(int offset) { - if (buffer_.size() - offset < sizeof(SizeT)) return 0; SizeT ret = *reinterpret_cast<SizeT *>(buffer_.data() + offset); return ret; } diff --git a/experimental/distributed/src/protocol.hpp b/experimental/distributed/src/protocol.hpp index d056f426d..f69003ad1 100644 --- a/experimental/distributed/src/protocol.hpp +++ b/experimental/distributed/src/protocol.hpp @@ -7,7 +7,6 @@ #include "io/network/stream_buffer.hpp" class Message; -class System; /** * @brief Protocol @@ -53,11 +52,10 @@ using SizeT = uint16_t; /** * Distributed Protocol Data * - * This class is responsible for holding a pointer to System. + * This typically holds living data shared by all sessions. Currently empty. */ struct Data { - Data(System *_system) : system(_system) {} - System *system; + // empty }; /** @@ -110,7 +108,7 @@ class Session { private: SizeT GetLength(int offset = 0); - std::string GetString(SizeT len); + std::string GetStringAndShift(SizeT len); bool SendSuccess(bool success); bool alive_{true}; @@ -119,8 +117,6 @@ class Session { std::string channel_{""}; Buffer buffer_; - - System *system_; }; /** diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp index 8740655c3..e09a02ce2 100644 --- a/experimental/distributed/src/reactors_distributed.cpp +++ b/experimental/distributed/src/reactors_distributed.cpp @@ -3,7 +3,7 @@ DEFINE_string(address, "127.0.0.1", "Network server bind address"); DEFINE_int32(port, 10000, "Network server bind port"); -Network::Network(System *system) : system_(system), protocol_data_(system_) {} +Network::Network() {} /** * SenderMessage implementation. @@ -22,9 +22,9 @@ std::string SenderMessage::ReactorName() const { return reactor_; } std::string SenderMessage::ChannelName() const { return channel_; } std::shared_ptr<Channel> SenderMessage::GetChannelToSender( - System *system, Distributed *distributed) const { + Distributed *distributed) const { if (address_ == FLAGS_address && port_ == FLAGS_port) { - return system->FindChannel(reactor_, channel_); + return System::GetInstance().FindChannel(reactor_, channel_); } if (distributed) return distributed->network().Resolve(address_, port_, reactor_, channel_); diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp index 1bf79dcb8..ccdeb6f4f 100644 --- a/experimental/distributed/src/reactors_distributed.hpp +++ b/experimental/distributed/src/reactors_distributed.hpp @@ -69,7 +69,7 @@ class Network { }; public: - Network(System *system); + Network(); // client functions @@ -213,8 +213,6 @@ class Network { } private: - System *system_; - // client variables SpinLock mutex_; std::vector<std::thread> pool_; @@ -242,8 +240,7 @@ class SenderMessage : public Message { std::string ReactorName() const; std::string ChannelName() const; - std::shared_ptr<Channel> GetChannelToSender(System *system, - Distributed *distributed = nullptr) const; + std::shared_ptr<Channel> GetChannelToSender(Distributed *distributed = nullptr) const; template <class Archive> void serialize(Archive &ar) { @@ -282,7 +279,7 @@ class ChannelResolvedMessage : public Message { */ class Distributed { public: - Distributed(System &system) : system_(system), network_(&system) {} + Distributed() {} Distributed(const Distributed &) = delete; Distributed(Distributed &&) = delete; @@ -321,19 +318,17 @@ class Distributed { return stream_channel.first; } - System &system() { return system_; } Network &network() { return network_; } protected: - System &system_; Network network_; }; class DistributedReactor : public Reactor { public: - DistributedReactor(System *system, std::string name, Distributed &distributed) - : Reactor(system, name), distributed_(distributed) {} + DistributedReactor(std::string name, Distributed &distributed) + : Reactor(name), distributed_(distributed) {} protected: Distributed &distributed_; diff --git a/experimental/distributed/src/reactors_local.cpp b/experimental/distributed/src/reactors_local.cpp index 57feeaf28..43d1df18f 100644 --- a/experimental/distributed/src/reactors_local.cpp +++ b/experimental/distributed/src/reactors_local.cpp @@ -93,9 +93,9 @@ void Reactor::RunEventLoop() { if (exit_event_loop) break; } - for (auto& cbAndSub : msg_and_cb.second) { - auto& cb = cbAndSub.first; - const Message& msg = *msg_and_cb.first; + for (auto &cbAndSub : msg_and_cb.second) { + auto &cb = cbAndSub.first; + const Message &msg = *msg_and_cb.first; cb(msg, cbAndSub.second); } } @@ -106,8 +106,8 @@ void Reactor::RunEventLoop() { */ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { // return type after because the scope Reactor:: is not searched before the name - for (auto& connectors_key_value : connectors_) { - Connector& event_queue = *connectors_key_value.second; + for (auto &connectors_key_value : connectors_) { + Connector &event_queue = *connectors_key_value.second; auto msg_ptr = event_queue.LockedPop(); if (msg_ptr == nullptr) continue; std::type_index tidx = msg_ptr->GetTypeIndex(); @@ -115,7 +115,7 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > cb_info; auto msg_type_cb_iter = event_queue.callbacks_.find(tidx); if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type. - for (auto& tidx_cb_key_value : msg_type_cb_iter->second) { + for (auto &tidx_cb_key_value : msg_type_cb_iter->second) { uint64_t uid = tidx_cb_key_value.first; EventStream::Callback cb = tidx_cb_key_value.second; cb_info.emplace_back(cb, EventStream::Subscription(event_queue, tidx, uid)); diff --git a/experimental/distributed/src/reactors_local.hpp b/experimental/distributed/src/reactors_local.hpp index 4ab5dc1a0..4f053e31d 100644 --- a/experimental/distributed/src/reactors_local.hpp +++ b/experimental/distributed/src/reactors_local.hpp @@ -79,6 +79,10 @@ class EventStream { */ virtual std::unique_ptr<Message> PopEvent() = 0; + /** + * Get the name of the connector. + */ + virtual const std::string &ConnectorName() = 0; /** * Subscription Service. * @@ -95,10 +99,10 @@ class EventStream { friend class Reactor; friend class Connector; - Subscription(Connector& event_queue, std::type_index tidx, uint64_t cb_uid) + Subscription(Connector &event_queue, std::type_index tidx, uint64_t cb_uid) : event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { } - Connector& event_queue_; + Connector &event_queue_; std::type_index tidx_; uint64_t cb_uid_; }; @@ -107,10 +111,10 @@ class EventStream { * Register a callback that will be called whenever an event arrives. */ template<typename MsgType> - void OnEvent(std::function<void(const MsgType&, const Subscription&)>&& cb) { - OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message& general_msg, - const Subscription& subscription) { - const MsgType& correct_msg = dynamic_cast<const MsgType&>(general_msg); + void OnEvent(std::function<void(const MsgType&, const Subscription&)> &&cb) { + OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message &general_msg, + const Subscription &subscription) { + const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg); cb(correct_msg, subscription); }); } @@ -149,16 +153,16 @@ class EventStream { */ class OnEventOnceChainer { public: - OnEventOnceChainer(EventStream& event_stream) : event_stream_(event_stream) {} + OnEventOnceChainer(EventStream &event_stream) : event_stream_(event_stream) {} ~OnEventOnceChainer() { InstallCallbacks(); } template<typename MsgType> - OnEventOnceChainer& ChainOnce(std::function<void(const MsgType&)>&& cb) { + OnEventOnceChainer &ChainOnce(std::function<void(const MsgType&)> &&cb) { std::function<void(const Message&, const Subscription&)> wrap = - [cb = std::move(cb)](const Message& general_msg, const Subscription& subscription) { - const MsgType& correct_msg = dynamic_cast<const MsgType&>(general_msg); + [cb = std::move(cb)](const Message &general_msg, const Subscription &subscription) { + const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg); subscription.unsubscribe(); cb(correct_msg); // Warning: this can close the Channel, be careful what you put after it! }; @@ -178,7 +182,7 @@ class EventStream { tmp_cb = [cb = std::move(cbs_[i].second), next_type, next_cb = std::move(next_cb), - es_ptr = &this->event_stream_](const Message& msg, const Subscription& subscription) { + es_ptr = &this->event_stream_](const Message &msg, const Subscription &subscription) { cb(msg, subscription); if (next_cb != nullptr) { es_ptr->OnEventHelper(next_type, std::move(next_cb)); @@ -191,7 +195,7 @@ class EventStream { event_stream_.OnEventHelper(next_type, std::move(next_cb)); } - EventStream& event_stream_; + EventStream &event_stream_; std::vector<std::pair<std::type_index, std::function<void(const Message&, const Subscription&)>>> cbs_; }; typedef std::function<void(const Message&, const Subscription&)> Callback; @@ -286,6 +290,9 @@ class Connector { std::unique_lock<std::mutex> lock(*mutex_); queue_->LockedOnEventHelper(tidx, callback); } + const std::string &ConnectorName() { + return queue_->connector_name_; + } void Close(); private: @@ -350,7 +357,7 @@ private: return t; } - void RemoveCb(const EventStream::Subscription& subscription) { + void RemoveCb(const EventStream::Subscription &subscription) { std::unique_lock<std::mutex> lock(*mutex_); size_t num_erased = callbacks_[subscription.tidx_].erase(subscription.cb_uid_); assert(num_erased == 1); @@ -382,8 +389,8 @@ class Reactor { public: friend class System; - Reactor(System *system, std::string name) - : system_(system), name_(name), main_(Open("main")) {} + Reactor(std::string name) + : name_(name), main_(Open("main")) {} virtual ~Reactor() {} @@ -407,13 +414,17 @@ class Reactor { */ void CloseAllConnectors(); + /** + * Get Reactor name + */ + const std::string &name() { return name_; } + Reactor(const Reactor &other) = delete; Reactor(Reactor &&other) = default; Reactor &operator=(const Reactor &other) = delete; Reactor &operator=(Reactor &&other) = default; protected: - System *system_; std::string name_; /* * Locks all Reactor data, including all Connector's in connectors_. @@ -450,27 +461,32 @@ class Reactor { /** - * Global placeholder for all reactors in the system. Alive through the entire process lifetime. + * Global placeholder for all reactors in the system. * * E.g. holds set of reactors, channels for all reactors. + * Alive through the entire process lifetime. + * Singleton class. Created automatically. */ class System { public: friend class Reactor; - System() {} - - System(const System &) = delete; - System(System &&) = delete; - System &operator=(const System &) = delete; - System &operator=(System &&) = delete; + /** + * Get the (singleton) instance of System. + * + * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern + */ + static System &GetInstance() { + static System system; // guaranteed to be destroyed, initialized on first use + return system; + } template <class ReactorType, class... Args> const std::shared_ptr<Channel> Spawn(const std::string &name, Args &&... args) { std::unique_lock<std::recursive_mutex> lock(mutex_); auto *raw_reactor = - new ReactorType(this, name, std::forward<Args>(args)...); + new ReactorType(name, std::forward<Args>(args)...); std::unique_ptr<Reactor> reactor(raw_reactor); // Capturing a pointer isn't ideal, I would prefer to capture a Reactor&, but not sure how to do it. std::thread reactor_thread( @@ -494,10 +510,17 @@ class System { auto &thread = key_value.second.second; thread.join(); } + reactors_.clear(); // for testing, since System is a singleton now } private: - void StartReactor(Reactor& reactor) { + System() {} + System(const System &) = delete; + System(System &&) = delete; + System &operator=(const System &) = delete; + System &operator=(System &&) = delete; + + void StartReactor(Reactor &reactor) { current_reactor_ = &reactor; reactor.Run(); reactor.RunEventLoop(); // Activate callbacks. diff --git a/experimental/distributed/tests/distributed_test.cpp b/experimental/distributed/tests/distributed_test.cpp index 1e129a31b..52d37351d 100644 --- a/experimental/distributed/tests/distributed_test.cpp +++ b/experimental/distributed/tests/distributed_test.cpp @@ -1,7 +1,9 @@ +#include "reactors_distributed.hpp" + #include <iostream> #include <fstream> -#include "reactors_distributed.hpp" +#include <glog/logging.h> DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future DEFINE_string(config_filename, "", "File containing list of all processes"); @@ -11,10 +13,10 @@ class MemgraphDistributed : public Distributed { using Location = std::pair<std::string, uint16_t>; public: - MemgraphDistributed(System &system) : Distributed(system) {} + MemgraphDistributed(System &system) : Distributed() {} /** Register memgraph node id to the given location. */ - void RegisterMemgraphNode(int64_t mnid, const std::string& address, uint16_t port) { + void RegisterMemgraphNode(int64_t mnid, const std::string &address, uint16_t port) { std::unique_lock<std::recursive_mutex> lock(mutex_); mnodes_[mnid] = Location(address, port); } @@ -23,7 +25,7 @@ class MemgraphDistributed : public Distributed { const std::string &reactor, const std::string &channel) { std::unique_lock<std::recursive_mutex> lock(mutex_); - const auto& location = mnodes_.at(mnid); + const auto &location = mnodes_.at(mnid); return Distributed::FindChannel(location.first, location.second, reactor, channel); } @@ -47,45 +49,70 @@ class MemgraphDistributed : public Distributed { * @return Pair (master mnid, list of worker's id). */ std::pair<int64_t, std::vector<int64_t>> -ParseConfigAndRegister(const std::string& filename, - MemgraphDistributed& distributed) { + ParseConfigAndRegister(const std::string &filename, + MemgraphDistributed &distributed) { std::ifstream file(filename, std::ifstream::in); assert(file.good()); int64_t master_mnid; std::vector<int64_t> worker_mnids; int64_t mnid; - std::string address; - uint16_t port; - file >> master_mnid >> address >> port; - distributed.RegisterMemgraphNode(master_mnid, address, port); + std::string address; + uint16_t port; + file >> master_mnid >> address >> port; + distributed.RegisterMemgraphNode(master_mnid, address, port); while (file.good()) { - file >> mnid >> address >> port; + file >> mnid >> address >> port; if (file.eof()) break ; distributed.RegisterMemgraphNode(mnid, address, port); worker_mnids.push_back(mnid); - } - file.close(); - return std::make_pair(master_mnid, worker_mnids); + } + file.close(); + return std::make_pair(master_mnid, worker_mnids); } - +/** + * Interface to the Memgraph distributed system. + * + * E.g. get channel to other Memgraph nodes. + */ class MemgraphReactor : public Reactor { public: - MemgraphReactor(System* system, std::string name, - MemgraphDistributed &distributed) - : Reactor(system, name), distributed_(distributed) {} + MemgraphReactor(std::string name, + MemgraphDistributed &distributed) + : Reactor(name), distributed_(distributed) {} protected: MemgraphDistributed &distributed_; }; +/** + * Sends a text message and has a return address. + */ +class TextMessage : public SenderMessage { +public: + TextMessage(std::string reactor, std::string channel, std::string s) + : SenderMessage(reactor, channel), text(s) {} + + template <class Archive> + void serialize(Archive &archive) { + archive(cereal::virtual_base_class<SenderMessage>(this), text); + } + + std::string text; + +protected: + friend class cereal::access; + TextMessage() {} // Cereal needs access to a default constructor. +}; +CEREAL_REGISTER_TYPE(TextMessage); + class Master : public MemgraphReactor { public: - Master(System* system, std::string name, MemgraphDistributed &distributed, - int64_t mnid, std::vector<int64_t>&& worker_mnids) - : MemgraphReactor(system, name, distributed), mnid_(mnid), + Master(std::string name, MemgraphDistributed &distributed, + int64_t mnid, std::vector<int64_t> &&worker_mnids) + : MemgraphReactor(name, distributed), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {} virtual void Run() { @@ -93,9 +120,11 @@ class Master : public MemgraphReactor { << ":" << distributed_.network().Port() << std::endl; auto stream = main_.first; - stream->OnEvent<SenderMessage>([this](const SenderMessage &msg, - const EventStream::Subscription& subscription) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; + + // wait until every worker sends a SenderMessage back, then close + stream->OnEvent<TextMessage>([this](const TextMessage &msg, + const EventStream::Subscription &subscription) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; ++workers_seen; if (workers_seen == worker_mnids_.size()) { subscription.unsubscribe(); @@ -107,11 +136,12 @@ class Master : public MemgraphReactor { } }); + // send a TextMessage to each worker for (auto wmnid : worker_mnids_) { auto stream = distributed_.FindChannel(wmnid, "worker", "main"); stream->OnEventOnce() .ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){ - msg.channel()->Send<SenderMessage>("master", "main"); + msg.channel()->Send<TextMessage>("master", "main", "hi from master"); stream->Close(); }); } @@ -125,9 +155,9 @@ class Master : public MemgraphReactor { class Worker : public MemgraphReactor { public: - Worker(System* system, std::string name, MemgraphDistributed &distributed, + Worker(std::string name, MemgraphDistributed &distributed, int64_t mnid, int64_t master_mnid) - : MemgraphReactor(system, name, distributed), mnid_(mnid), + : MemgraphReactor(name, distributed), mnid_(mnid), master_mnid_(master_mnid) {} virtual void Run() { @@ -135,20 +165,18 @@ class Worker : public MemgraphReactor { << ":" << distributed_.network().Port() << std::endl; auto stream = main_.first; + // wait until master sends us a TextMessage, then reply back and close stream->OnEventOnce() - .ChainOnce<SenderMessage>([this](const SenderMessage &msg) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n"; + .ChainOnce<TextMessage>([this](const TextMessage &msg) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; + + msg.GetChannelToSender(&distributed_) + ->Send<TextMessage>("worker", "main", "hi from worker"); + // Sleep for a while so we can read output in the terminal. std::this_thread::sleep_for(std::chrono::seconds(4)); CloseConnector("main"); }); - - auto remote_stream = distributed_.FindChannel(master_mnid_, "master", "main"); - remote_stream->OnEventOnce() - .ChainOnce<ChannelResolvedMessage>([this, remote_stream](const ChannelResolvedMessage &msg){ - msg.channel()->Send<SenderMessage>("worker", "main"); - remote_stream->Close(); - }); } protected: @@ -158,9 +186,10 @@ class Worker : public MemgraphReactor { int main(int argc, char *argv[]) { + google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); - System system; + System &system = System::GetInstance(); MemgraphDistributed distributed(system); auto mnids = ParseConfigAndRegister(FLAGS_config_filename, distributed); distributed.StartServices(); diff --git a/experimental/distributed/tests/local_memgraph.cpp b/experimental/distributed/tests/local_memgraph.cpp new file mode 100644 index 000000000..1e01b8188 --- /dev/null +++ b/experimental/distributed/tests/local_memgraph.cpp @@ -0,0 +1,389 @@ +// This is a deprecated implementation! It is using the deprecated AwaitEvent, I'm changing it to use OnEvent. WIP + +// #include <atomic> +// #include <chrono> +// #include <cstdlib> +// #include <iostream> +// #include <string> +// #include <thread> +// #include <vector> + +// #include "reactors_distributed.hpp" + +// const int NUM_WORKERS = 1; + +// class Txn : public SenderMessage { +// public: +// Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {} +// int64_t id() const { return id_; } + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<SenderMessage>(this), id_); +// } + +// private: +// int64_t id_; +// }; + +// class CreateNodeTxn : public Txn { +// public: +// CreateNodeTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {} + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<Txn>(this)); +// } +// }; + +// class CountNodesTxn : public Txn { +// public: +// CountNodesTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {} + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<Txn>(this)); +// } +// }; + +// class CountNodesTxnResult : public Message { +// public: +// CountNodesTxnResult(int64_t count) : count_(count) {} +// int64_t count() const { return count_; } + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(count_); +// } + +// private: +// int64_t count_; +// }; + +// class CommitRequest : public SenderMessage { +// public: +// CommitRequest(std::string reactor, std::string channel, int64_t worker_id) +// : SenderMessage(reactor, channel), worker_id_(worker_id) {} +// int64_t worker_id() { return worker_id_; } + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<SenderMessage>(this), worker_id_); +// } + +// private: +// int64_t worker_id_; +// }; + +// class AbortRequest : public SenderMessage { +// public: +// AbortRequest(std::string reactor, std::string channel, int64_t worker_id) +// : SenderMessage(reactor, channel), worker_id_(worker_id) {} +// int64_t worker_id() { return worker_id_; } + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<SenderMessage>(this), worker_id_); +// } + +// private: +// int64_t worker_id_; +// }; + +// class CommitDirective : public Message { +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<Message>(this)); +// } +// }; + +// class AbortDirective : public Message { +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<Message>(this)); +// } +// }; + +// class Query : public Message { +// public: +// Query(std::string query) : Message(), query_(query) {} +// std::string query() const { return query_; } + +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<Message>(this), query_); +// } + +// private: +// std::string query_; +// }; + +// class Quit : public Message { +// template <class Archive> +// void serialize(Archive &archive) { +// archive(cereal::base_class<Message>(this)); +// } +// }; + +// class Master : public Reactor { +// public: +// Master(System *system, std::string name) : Reactor(system, name), next_xid_(1) {} + +// virtual void Run() { +// auto stream = main_.first; +// FindWorkers(); + +// std::cout << "Master is active" << std::endl; +// while (true) { +// auto m = stream->AwaitEvent(); +// if (Query *query = dynamic_cast<Query *>(m.get())) { +// ProcessQuery(query); +// break; // process only the first query +// } else if (SenderMessage *msg = dynamic_cast<SenderMessage *>(m.get())) { +// std::cout << "SenderMessage received!" << std::endl; +// std::cout << " Address: " << msg->Address() << std::endl; +// std::cout << " Port: " << msg->Port() << std::endl; +// std::cout << " Reactor: " << msg->ReactorName() << std::endl; +// std::cout << " Channel: " << msg->ChannelName() << std::endl; +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// } + +// stream->OnEvent<Message>([this](const Message &msg, const EventStream::Subscription& subscription) { +// std::cout << "Processing Query via Callback" << std::endl; +// const Query &query = +// dynamic_cast<const Query &>(msg); // exception bad_cast +// ProcessQuery(&query); +// subscription.unsubscribe(); +// }); +// } + +// private: +// void ProcessQuery(const Query *query) { +// if (query->query() == "create node") { +// PerformCreateNode(); +// } else if (query->query() == "count nodes") { +// PerformCountNodes(); +// } else { +// std::cout << "got query: " << query->query() << std::endl; +// } +// } + +// void PerformCreateNode() { +// int worker_id = rand() % NUM_WORKERS; +// int64_t xid = GetTransactionId(); +// std::string txn_channel_name = GetTxnName(xid); +// auto connector = Open(txn_channel_name); +// auto stream = connector.first; + +// channels_[worker_id]->Send<CreateNodeTxn>("master", "main", xid); +// auto m = stream->AwaitEvent(); +// if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) { +// req->GetChannelToSender(system_)->Send<CommitDirective>(); +// } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) { +// req->GetChannelToSender(system_)->Send<AbortDirective>(); +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// CloseConnector(txn_channel_name); +// } + +// void PerformCountNodes() { +// int64_t xid = GetTransactionId(); +// std::string txn_channel_name = GetTxnName(xid); +// auto connector = Open(txn_channel_name); +// auto stream = connector.first; +// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) +// channels_[w_id]->Send<CountNodesTxn>("master", "main", xid); + +// std::vector<std::shared_ptr<Channel>> txn_channels; +// txn_channels.resize(NUM_WORKERS, nullptr); +// bool commit = true; +// for (int responds = 0; responds < NUM_WORKERS; ++responds) { +// auto m = stream->AwaitEvent(); +// if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) { +// txn_channels[req->worker_id()] = req->GetChannelToSender(system_); +// commit &= true; +// } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) { +// txn_channels[req->worker_id()] = req->GetChannelToSender(system_); +// commit = false; +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// } + +// if (commit) { +// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) +// txn_channels[w_id]->Send<CommitDirective>(); +// } else { +// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) +// txn_channels[w_id]->Send<AbortDirective>(); +// } + +// int64_t count = 0; +// for (int responds = 0; responds < NUM_WORKERS; ++responds) { +// auto m = stream->AwaitEvent(); +// if (CountNodesTxnResult *cnt = +// dynamic_cast<CountNodesTxnResult *>(m.get())) { +// count += cnt->count(); +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// } + +// CloseConnector(txn_channel_name); +// std::cout << "graph has " << count << " vertices" << std::endl; +// } + +// int64_t GetTransactionId() { return next_xid_++; } + +// std::string GetWorkerName(int worker_id) { +// return "worker" + std::to_string(worker_id); +// } + +// std::string GetTxnName(int txn_id) { return "txn" + std::to_string(txn_id); } + +// void FindWorkers() { +// channels_.resize(NUM_WORKERS, nullptr); +// int workers_found = 0; +// while (workers_found < NUM_WORKERS) { +// for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) { +// if (channels_[w_id] == nullptr) { +// // TODO: Resolve worker channel using the network service. +// channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main"); +// if (channels_[w_id] != nullptr) ++workers_found; +// } +// } +// if (workers_found < NUM_WORKERS) +// std::this_thread::sleep_for(std::chrono::seconds(1)); +// } +// } + +// // TODO: Why is master atomic, it should be unique? +// std::atomic<int64_t> next_xid_; +// std::vector<std::shared_ptr<Channel>> channels_; +// }; + +// class Worker : public Reactor { +// public: +// Worker(System *system, std::string name, int64_t id) : Reactor(system, name), +// worker_id_(id) {} + +// virtual void Run() { +// std::cout << "worker " << worker_id_ << " is active" << std::endl; +// auto stream = main_.first; +// FindMaster(); +// while (true) { +// auto m = stream->AwaitEvent(); +// if (Txn *txn = dynamic_cast<Txn *>(m.get())) { +// HandleTransaction(txn); +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// } +// } + +// private: +// void HandleTransaction(Txn *txn) { +// if (CreateNodeTxn *create_txn = dynamic_cast<CreateNodeTxn *>(txn)) { +// HandleCreateNode(create_txn); +// } else if (CountNodesTxn *cnt_txn = dynamic_cast<CountNodesTxn *>(txn)) { +// HandleCountNodes(cnt_txn); +// } else { +// std::cerr << "unknown transaction\n"; +// exit(1); +// } +// } + +// void HandleCreateNode(CreateNodeTxn *txn) { +// auto connector = Open(GetTxnChannelName(txn->id())); +// auto stream = connector.first; +// auto masterChannel = txn->GetChannelToSender(system_); +// // TODO: Do the actual commit. +// masterChannel->Send<CommitRequest>("master", "main", worker_id_); +// auto m = stream->AwaitEvent(); +// if (dynamic_cast<CommitDirective *>(m.get())) { +// // TODO: storage_.CreateNode(); +// } else if (dynamic_cast<AbortDirective *>(m.get())) { +// // TODO: Rollback. +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// CloseConnector(GetTxnChannelName(txn->id())); +// } + +// void HandleCountNodes(CountNodesTxn *txn) { +// auto connector = Open(GetTxnChannelName(txn->id())); +// auto stream = connector.first; +// auto masterChannel = txn->GetChannelToSender(system_); + +// // TODO: Fix this hack -- use the storage. +// int num = 123; + +// masterChannel->Send<CommitRequest>("master", "main", worker_id_); +// auto m = stream->AwaitEvent(); +// if (dynamic_cast<CommitDirective *>(m.get())) { +// masterChannel->Send<CountNodesTxnResult>(num); +// } else if (dynamic_cast<AbortDirective *>(m.get())) { +// // send nothing +// } else { +// std::cerr << "unknown message\n"; +// exit(1); +// } +// CloseConnector(GetTxnChannelName(txn->id())); +// } + +// // TODO: Don't repeat code from Master. +// std::string GetTxnChannelName(int64_t transaction_id) { +// return "txn" + std::to_string(transaction_id); +// } + +// void FindMaster() { +// // TODO: Replace with network service and channel resolution. +// while (!(master_channel_ = system_->FindChannel("master", "main"))) +// std::this_thread::sleep_for(std::chrono::seconds(1)); +// } + +// std::shared_ptr<Channel> master_channel_ = nullptr; +// int worker_id_; +// }; + +// void ClientMain(System *system) { +// std::shared_ptr<Channel> channel = nullptr; +// // TODO: Replace this with network channel resolution. +// while (!(channel = system->FindChannel("master", "main"))) +// std::this_thread::sleep_for(std::chrono::seconds(1)); +// std::cout << "I/O Client Main active" << std::endl; + +// bool active = true; +// while (active) { +// std::string s; +// std::getline(std::cin, s); +// if (s == "quit") { +// active = false; +// channel->Send<Quit>(); +// } else { +// channel->Send<Query>(s); +// } +// } +// } + + +// int main(int argc, char *argv[]) { +// //google::InitGoogleLogging(argv[0]); +// gflags::ParseCommandLineFlags(&argc, &argv, true); +// System system; +// system.Spawn<Master>("master"); +// std::thread client(ClientMain, &system); +// for (int i = 0; i < NUM_WORKERS; ++i) +// system.Spawn<Worker>("worker" + std::to_string(i), i); +// system.AwaitShutdown(); +// return 0; +// } diff --git a/experimental/distributed/tests/network_chat.cpp b/experimental/distributed/tests/network_chat.cpp index 179eccb47..2e624e5db 100644 --- a/experimental/distributed/tests/network_chat.cpp +++ b/experimental/distributed/tests/network_chat.cpp @@ -38,8 +38,8 @@ CEREAL_REGISTER_TYPE(ChatACK); class ChatServer : public DistributedReactor { public: - ChatServer(System *system, std::string name, Distributed &distributed) - : DistributedReactor(system, name, distributed) {} + ChatServer(std::string name, Distributed &distributed) + : DistributedReactor(name, distributed) {} virtual void Run() { std::cout << "ChatServer is active" << std::endl; @@ -56,7 +56,7 @@ class ChatServer : public DistributedReactor { std::cout << "Received message from " << msg.Address() << ":" << msg.Port() << " -> '" << msg.Message() << "'" << std::endl; - auto channel = msg.GetChannelToSender(system_); + auto channel = msg.GetChannelToSender(); if (channel != nullptr) { channel->Send<ChatACK>("server", "chat", msg.Message()); } @@ -66,8 +66,8 @@ class ChatServer : public DistributedReactor { class ChatClient : public DistributedReactor { public: - ChatClient(System *system, std::string name, Distributed &distributed) - : DistributedReactor(system, name, distributed) {} + ChatClient(std::string name, Distributed &distributed) + : DistributedReactor(name, distributed) {} virtual void Run() { std::cout << "ChatClient is active" << std::endl; @@ -91,8 +91,8 @@ class ChatClient : public DistributedReactor { int main(int argc, char *argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); - System system; - Distributed distributed(system); + System& system = System::GetInstance(); + Distributed distributed; distributed.StartServices(); system.Spawn<ChatServer>("server", distributed); system.Spawn<ChatClient>("client", distributed); diff --git a/experimental/distributed/tests/network_client.cpp b/experimental/distributed/tests/network_client.cpp index 710347b02..b8ab44236 100644 --- a/experimental/distributed/tests/network_client.cpp +++ b/experimental/distributed/tests/network_client.cpp @@ -2,8 +2,7 @@ int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); - System system; - Distributed distributed(system); + Distributed distributed; distributed.network().StartClient(1); auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main"); std::cout << channel << std::endl; diff --git a/experimental/distributed/tests/reactors_local_unit.cpp b/experimental/distributed/tests/reactors_local_unit.cpp index f8aff1467..9079ad464 100644 --- a/experimental/distributed/tests/reactors_local_unit.cpp +++ b/experimental/distributed/tests/reactors_local_unit.cpp @@ -13,13 +13,13 @@ TEST(SystemTest, ReturnWithoutThrowing) { struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { CloseConnector("main"); } }; - System system; + System &system = System::GetInstance(); ASSERT_NO_THROW(system.Spawn<Master>("master")); ASSERT_NO_THROW(system.AwaitShutdown()); } @@ -27,7 +27,7 @@ TEST(SystemTest, ReturnWithoutThrowing) { TEST(ChannelCreationTest, ThrowOnReusingChannelName) { struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { Open("channel"); ASSERT_THROW(Open("channel"), std::runtime_error); @@ -36,7 +36,7 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) { } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.AwaitShutdown(); } @@ -44,10 +44,10 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) { TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); @@ -55,17 +55,17 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("master", "main"))) + while (!(channel = System::GetInstance().FindChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -79,10 +79,10 @@ TEST(SimpleSendTest, OneSimpleSend) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageInt>(123); CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. @@ -90,7 +90,7 @@ TEST(SimpleSendTest, OneSimpleSend) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} virtual void Run() { EventStream* stream = main_.first; std::unique_ptr<Message> m_uptr = stream->AwaitEvent(); @@ -101,7 +101,7 @@ TEST(SimpleSendTest, OneSimpleSend) { } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -114,10 +114,10 @@ TEST(SimpleSendTest, OneCallback) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageInt>(888); CloseConnector("main"); @@ -125,18 +125,18 @@ TEST(SimpleSendTest, OneCallback) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} virtual void Run() { EventStream* stream = main_.first; - stream->OnEvent<MessageInt>([this](const MessageInt& msg, const EventStream::Subscription&) { + stream->OnEvent<MessageInt>([this](const MessageInt &msg, const EventStream::Subscription&) { ASSERT_EQ(msg.x, 888); CloseConnector("main"); }); } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -150,10 +150,10 @@ TEST(SimpleSendTest, IgnoreAfterClose) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageInt>(101); channel->Send<MessageInt>(102); // should be ignored @@ -165,7 +165,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} virtual void Run() { EventStream* stream = main_.first; std::unique_ptr<Message> m_uptr = stream->AwaitEvent(); @@ -176,7 +176,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) { } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -190,12 +190,12 @@ TEST(SimpleSendTest, DuringFirstEvent) { }; struct Master : public Reactor { - Master(System *system, std::string name, std::promise<int> p) : Reactor(system, name), p_(std::move(p)) {} + Master(std::string name, std::promise<int> p) : Reactor(name), p_(std::move(p)) {} virtual void Run() { EventStream* stream = main_.first; - stream->OnEvent<MessageInt>([this](const Message& msg, const EventStream::Subscription& subscription) { - const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg); + stream->OnEvent<MessageInt>([this](const Message &msg, const EventStream::Subscription &subscription) { + const MessageInt &msgint = dynamic_cast<const MessageInt&>(msg); if (msgint.x == 101) FindChannel("main")->Send<MessageInt>(102); if (msgint.x == 102) { @@ -211,7 +211,7 @@ TEST(SimpleSendTest, DuringFirstEvent) { std::promise<int> p_; }; - System system; + System &system = System::GetInstance(); std::promise<int> p; auto f = p.get_future(); system.Spawn<Master>("master", std::move(p)); @@ -232,10 +232,10 @@ TEST(MultipleSendTest, UnsubscribeService) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageInt>(55); channel->Send<MessageInt>(66); @@ -251,21 +251,21 @@ TEST(MultipleSendTest, UnsubscribeService) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} int num_msgs_received = 0; virtual void Run() { EventStream* stream = main_.first; - stream->OnEvent<MessageInt>([this](const MessageInt& msgint, const EventStream::Subscription& subscription) { + stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const EventStream::Subscription &subscription) { ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); ++num_msgs_received; if (msgint.x == 66) { subscription.unsubscribe(); // receive only two of them } }); - stream->OnEvent<MessageChar>([this](const MessageChar& msgchar, const EventStream::Subscription& subscription) { + stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const EventStream::Subscription &subscription) { char c = msgchar.x; ++num_msgs_received; ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); @@ -277,7 +277,7 @@ TEST(MultipleSendTest, UnsubscribeService) { } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -295,10 +295,10 @@ TEST(MultipleSendTest, OnEvent) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageInt>(101); @@ -310,7 +310,7 @@ TEST(MultipleSendTest, OnEvent) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} struct EndMessage : Message {}; int correct_vals = 0; @@ -319,13 +319,13 @@ TEST(MultipleSendTest, OnEvent) { EventStream* stream = main_.first; correct_vals = 0; - stream->OnEvent<MessageInt>([this](const MessageInt& msgint, const EventStream::Subscription&) { + stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const EventStream::Subscription&) { ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); ++correct_vals; main_.second->Send<EndMessage>(); }); - stream->OnEvent<MessageChar>([this](const MessageChar& msgchar, const EventStream::Subscription&) { + stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const EventStream::Subscription&) { ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); ++correct_vals; main_.second->Send<EndMessage>(); @@ -340,7 +340,7 @@ TEST(MultipleSendTest, OnEvent) { } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -353,10 +353,10 @@ TEST(MultipleSendTest, Chaining) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageInt>(55); channel->Send<MessageInt>(66); @@ -366,26 +366,26 @@ TEST(MultipleSendTest, Chaining) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} virtual void Run() { EventStream* stream = main_.first; stream->OnEventOnce() - .ChainOnce<MessageInt>([this](const MessageInt& msg) { + .ChainOnce<MessageInt>([this](const MessageInt &msg) { ASSERT_EQ(msg.x, 55); }) - .ChainOnce<MessageInt>([](const MessageInt& msg) { + .ChainOnce<MessageInt>([](const MessageInt &msg) { ASSERT_EQ(msg.x, 66); }) - .ChainOnce<MessageInt>([this](const MessageInt& msg) { + .ChainOnce<MessageInt>([this](const MessageInt &msg) { ASSERT_EQ(msg.x, 77); CloseConnector("main"); }); } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -404,10 +404,10 @@ TEST(MultipleSendTest, ChainingInRightOrder) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel->Send<MessageChar>('a'); channel->Send<MessageInt>(55); @@ -418,26 +418,26 @@ TEST(MultipleSendTest, ChainingInRightOrder) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} virtual void Run() { EventStream* stream = main_.first; stream->OnEventOnce() - .ChainOnce<MessageInt>([this](const MessageInt& msg) { + .ChainOnce<MessageInt>([this](const MessageInt &msg) { ASSERT_EQ(msg.x, 55); }) - .ChainOnce<MessageChar>([](const MessageChar& msg) { + .ChainOnce<MessageChar>([](const MessageChar &msg) { ASSERT_EQ(msg.x, 'b'); }) - .ChainOnce<MessageInt>([this](const MessageInt& msg) { + .ChainOnce<MessageInt>([this](const MessageInt &msg) { ASSERT_EQ(msg.x, 77); CloseConnector("main"); }); } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); @@ -453,10 +453,10 @@ TEST(MultipleSendTest, ProcessManyMessages) { }; struct Master : public Reactor { - Master(System *system, std::string name) : Reactor(system, name) {} + Master(std::string name) : Reactor(name) {} virtual void Run() { std::shared_ptr<Channel> channel; - while (!(channel = system_->FindChannel("worker", "main"))) + while (!(channel = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); @@ -469,7 +469,7 @@ TEST(MultipleSendTest, ProcessManyMessages) { }; struct Worker : public Reactor { - Worker(System *system, std::string name) : Reactor(system, name) {} + Worker(std::string name) : Reactor(name) {} struct EndMessage : Message {}; int vals = 0; @@ -492,7 +492,7 @@ TEST(MultipleSendTest, ProcessManyMessages) { } }; - System system; + System &system = System::GetInstance(); system.Spawn<Master>("master"); system.Spawn<Worker>("worker"); system.AwaitShutdown(); diff --git a/experimental/distributed/tests/start_distributed.py b/experimental/distributed/tests/start_distributed.py index 5f47325ec..e07656272 100755 --- a/experimental/distributed/tests/start_distributed.py +++ b/experimental/distributed/tests/start_distributed.py @@ -6,7 +6,7 @@ import os command = 'gnome-terminal' program = './distributed_test' config_filename = 'config' -flags = ' --minloglevel 2' +flags = '--minloglevel=2' f = open(config_filename, 'r') for line in f: @@ -14,8 +14,9 @@ for line in f: my_mnid = data[0] address = data[1] port = data[2] - call = program + flags + ' --my_mnid ' + my_mnid + ' --address ' + address +\ - ' --port ' + port + ' --config_filename ' + config_filename - command += " --tab -e '" + call + "'" + call = "{} {} --my_mnid {} --address {} --port {} --config_filename={}".format( + program, flags, my_mnid, address, port, config_filename) + command += " --tab -e '{}'".format(call) -os.system(command) \ No newline at end of file +print(command) +os.system(command)