diff --git a/CMakeLists.txt b/CMakeLists.txt index 14b31f933..cb93b97ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -186,10 +186,11 @@ target_link_libraries(antlr_opencypher_parser_lib antlr4) set(memgraph_src_files ${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp ${src_dir}/communication/bolt/v1/session.cpp + ${src_dir}/communication/reactor/reactor_local.cpp ${src_dir}/data_structures/concurrent/skiplist_gc.cpp + ${src_dir}/database/dbms.cpp ${src_dir}/database/graph_db.cpp ${src_dir}/database/graph_db_accessor.cpp - ${src_dir}/database/dbms.cpp ${src_dir}/durability/recovery.cpp ${src_dir}/durability/snapshooter.cpp ${src_dir}/io/network/addrinfo.cpp @@ -219,7 +220,7 @@ set(memgraph_src_files # ----------------------------------------------------------------------------- # memgraph_lib depend on these libraries -set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools +set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools cereal antlr_opencypher_parser_lib dl glog gflags) if (USE_LTALLOC) diff --git a/experimental/distributed/main-client.cpp b/experimental/distributed/main-client.cpp index 77c87e016..3c0c15cd1 100644 --- a/experimental/distributed/main-client.cpp +++ b/experimental/distributed/main-client.cpp @@ -2,72 +2,60 @@ #include #include -#include "reactors_distributed.hpp" #include "memgraph_config.hpp" #include "memgraph_distributed.hpp" #include "memgraph_transactions.hpp" +#include "reactors_distributed.hpp" /** * List of queries that should be executed. */ -std::vector queries = {{ - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "create vertex", - "vertex count", - "create vertex", - "create vertex", - "vertex count" -}}; +std::vector queries = { + {"create vertex", "create vertex", "create vertex", "create vertex", + "create vertex", "create vertex", "create vertex", "create vertex", + "create vertex", "create vertex", "vertex count", "create vertex", + "create vertex", "vertex count"}}; /** * This is the client that issues some hard-coded queries. */ class Client : public Reactor { public: - Client(std::string name) : Reactor(name) { - } + Client(std::string name) : Reactor(name) {} void IssueQueries(std::shared_ptr channel_to_leader) { // (concurrently) create a couple of vertices - for (int query_idx = 0; query_idx < queries.size(); ++query_idx) { + for (int query_idx = 0; query_idx < static_cast(queries.size()); + ++query_idx) { // register callback std::string channel_name = "query-" + std::to_string(query_idx); auto stream = Open(channel_name).first; - stream - ->OnEventOnce() - .ChainOnce([this, query_idx](const ResultMsg &msg, - const Subscription &sub){ - std::cout << "Result of query " << query_idx << " (" - << queries[query_idx] << "):" << std::endl - << " " << msg.result() << std::endl; - sub.CloseChannel(); - }); + stream->OnEventOnce().ChainOnce( + [this, query_idx](const ResultMsg &msg, const Subscription &sub) { + std::cout << "Result of query " << query_idx << " (" + << queries[query_idx] << "):" << std::endl + << " " << msg.result() << std::endl; + sub.CloseChannel(); + }); // then issue the query (to avoid race conditions) - std::cout << "Issuing command " << query_idx << " (" - << queries[query_idx] << ")" << std::endl; + std::cout << "Issuing command " << query_idx << " (" << queries[query_idx] + << ")" << std::endl; channel_to_leader->Send(channel_name, queries[query_idx]); } } virtual void Run() { - MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); + MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance(); auto mnid = memgraph.LeaderMnid(); memgraph.FindChannel(mnid, "master", "client-queries") - ->OnEventOnce() - .ChainOnce([this](const ChannelResolvedMessage &msg, const Subscription& sub) { - sub.CloseChannel(); - IssueQueries(msg.channelWriter()); - }); + ->OnEventOnce() + .ChainOnce( + [this](const ChannelResolvedMessage &msg, const Subscription &sub) { + sub.CloseChannel(); + IssueQueries(msg.channelWriter()); + }); } }; @@ -77,7 +65,7 @@ int main(int argc, char *argv[]) { System &system = System::GetInstance(); Distributed &distributed = Distributed::GetInstance(); - MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); + MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance(); memgraph.RegisterConfig(ParseConfig()); distributed.StartServices(); @@ -85,6 +73,5 @@ int main(int argc, char *argv[]) { system.AwaitShutdown(); distributed.StopServices(); - return 0; } diff --git a/experimental/distributed/src/memgraph_distributed.hpp b/experimental/distributed/src/memgraph_distributed.hpp index aee089d4d..ca57ce8d2 100644 --- a/experimental/distributed/src/memgraph_distributed.hpp +++ b/experimental/distributed/src/memgraph_distributed.hpp @@ -4,9 +4,9 @@ #include "reactors_distributed.hpp" -#include #include #include +#include #include #include @@ -17,20 +17,18 @@ class MemgraphDistributed { public: /** * Get the (singleton) instance of MemgraphDistributed. - * - * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern */ static MemgraphDistributed &GetInstance() { - static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use + static MemgraphDistributed memgraph; return memgraph; } - EventStream* FindChannel(MnidT mnid, - const std::string &reactor, + EventStream *FindChannel(MnidT mnid, const std::string &reactor, const std::string &channel) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); const auto &location = mnodes_.at(mnid); - return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel); + return Distributed::GetInstance().FindChannel( + location.first, location.second, reactor, channel); } void RegisterConfig(const Config &config) { @@ -51,23 +49,22 @@ class MemgraphDistributed { /** * The leader is currently the first node in the config. */ - MnidT LeaderMnid() { - return config_.nodes.front().mnid; - } + MnidT LeaderMnid() const { return config_.nodes.front().mnid; } protected: MemgraphDistributed() {} /** Register memgraph node id to the given location. */ - void RegisterMemgraphNode(MnidT mnid, const std::string &address, uint16_t port) { - std::unique_lock lock(mutex_); + void RegisterMemgraphNode(MnidT mnid, const std::string &address, + uint16_t port) { + std::unique_lock lock(mutex_); mnodes_[mnid] = Location(address, port); } private: Config config_; - std::recursive_mutex mutex_; + std::mutex mutex_; std::unordered_map mnodes_; MemgraphDistributed(const MemgraphDistributed &) = delete; diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp deleted file mode 100644 index 0d59f5e81..000000000 --- a/experimental/distributed/src/reactors_distributed.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include "reactors_distributed.hpp" - -DEFINE_string(address, "127.0.0.1", "Network server bind address"); -DEFINE_int32(port, 10000, "Network server bind port"); - -Network::Network() {} - -/** - * ReturnAddressMsg implementation. - */ -ReturnAddressMsg::ReturnAddressMsg() {} - -ReturnAddressMsg::ReturnAddressMsg(std::string channel) - : ReturnAddressMsg(current_reactor_->name(), channel) {} - -ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel) - : address_(FLAGS_address), - port_(FLAGS_port), - reactor_(reactor), - channel_(channel) {} - -std::string ReturnAddressMsg::Address() const { return address_; } -uint16_t ReturnAddressMsg::Port() const { return port_; } -std::string ReturnAddressMsg::ReactorName() const { return reactor_; } -std::string ReturnAddressMsg::ChannelName() const { return channel_; } - -std::shared_ptr ReturnAddressMsg::GetReturnChannelWriter() const { - if (address_ == FLAGS_address && port_ == FLAGS_port) { - return System::GetInstance().FindChannel(reactor_, channel_); - } else { - // TODO(zuza): we should probably assert here if services have been already started. - return Distributed::GetInstance().network().Resolve(address_, port_, reactor_, channel_); - } - assert(false); -} diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp deleted file mode 100644 index 5e4ed839d..000000000 --- a/experimental/distributed/src/reactors_distributed.hpp +++ /dev/null @@ -1,350 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "protocol.hpp" -#include "reactors_local.hpp" - -#include "cereal/archives/binary.hpp" -#include "cereal/types/base_class.hpp" -#include "cereal/types/memory.hpp" -#include "cereal/types/polymorphic.hpp" -#include "cereal/types/string.hpp" -#include "cereal/types/utility.hpp" // utility has to be included because of std::pair -#include "cereal/types/vector.hpp" - -#include "communication/server.hpp" -#include "threading/sync/spinlock.hpp" - -DECLARE_string(address); -DECLARE_int32(port); - -/** - * Networking service. - */ -class Network { - private: - using Endpoint = Protocol::Endpoint; - using Socket = Protocol::Socket; - using NetworkServer = communication::Server; - - struct NetworkMessage { - NetworkMessage() - : address(""), port(0), reactor(""), channel(""), message(nullptr) {} - - NetworkMessage(const std::string& _address, uint16_t _port, - const std::string& _reactor, const std::string& _channel, - std::unique_ptr _message) - : address(_address), - port(_port), - reactor(_reactor), - channel(_channel), - message(std::move(_message)) {} - - NetworkMessage(NetworkMessage &&nm) - : address(std::move(nm.address)), - port(std::move(nm.port)), - reactor(std::move(nm.reactor)), - channel(std::move(nm.channel)), - message(std::move(nm.message)) {} - - std::string address; - uint16_t port; - std::string reactor; - std::string channel; - std::unique_ptr message; - }; - - public: - Network(); - - // client functions - - std::shared_ptr Resolve(std::string address, uint16_t port, - std::string reactor_name, - std::string channel_name) { - if (Protocol::SendMessage(address, port, reactor_name, channel_name, - nullptr)) { - return std::make_shared(this, address, port, reactor_name, - channel_name); - } - LOG(WARNING) << "Could not resolve " << address << ":" << port << " " << reactor_name << "/" << channel_name; - return nullptr; - } - - std::shared_ptr AsyncResolve(const std::string& address, uint16_t port, - int32_t retries, - std::chrono::seconds cooldown) { - // TODO: Asynchronously resolve channel, and return an event stream - // that emits the channel after it gets resolved. - return nullptr; - } - - /** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */ - void StartClient(int worker_count) { - LOG(INFO) << "Starting " << worker_count << " client workers"; - client_run_ = true; - - for (int i = 0; i < worker_count; ++i) { - pool_.push_back(std::thread([worker_count, this]() { - while (this->client_run_) { - this->mutex_.lock(); - if (!this->queue_.empty()) { - NetworkMessage nm(std::move(this->queue_.front())); - this->queue_.pop(); - this->mutex_.unlock(); - // TODO: store success - bool success = - Protocol::SendMessage(nm.address, nm.port, nm.reactor, - nm.channel, std::move(nm.message)); - DLOG(INFO) << "Network client message send status: " << success << std::endl; - } else { - this->mutex_.unlock(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } - })); - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } - } - - void StopClient() { - while (true) { - std::lock_guard lock(mutex_); - if (queue_.empty()) { - break; - } - } - client_run_ = false; - for (size_t i = 0; i < pool_.size(); ++i) { - pool_[i].join(); - } - pool_.clear(); - } - - class RemoteChannelWriter : public ChannelWriter { - public: - RemoteChannelWriter(Network *network, std::string address, uint16_t port, - std::string reactor, std::string channel) - : network_(network), - address_(address), - port_(port), - reactor_(reactor), - channel_(channel) {} - - virtual std::string Address() { return address_; } - - virtual uint16_t Port() { return port_; } - - virtual std::string ReactorName() { return reactor_; } - - virtual std::string Name() { return channel_; } - - virtual void Send(std::unique_ptr message) { - std::lock_guard lock(network_->mutex_); - network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, - std::move(message))); - } - - private: - Network *network_; - std::string address_; - uint16_t port_; - std::string reactor_; - std::string channel_; - }; - - // server functions - - std::string Address() { return FLAGS_address; } - - uint16_t Port() { return FLAGS_port; } - - /** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */ - void StartServer(int workers_count) { - if (server_ != nullptr) { - LOG(FATAL) << "Tried to start a running server!"; - } - - // Initialize endpoint. - Endpoint endpoint; - try { - endpoint = Endpoint(FLAGS_address.c_str(), FLAGS_port); - } catch (io::network::NetworkEndpointException &e) { - LOG(FATAL) << e.what(); - } - - // Initialize socket. - Socket socket; - if (!socket.Bind(endpoint)) { - LOG(FATAL) << "Cannot bind to socket on " << FLAGS_address << " at " - << FLAGS_port; - } - if (!socket.SetNonBlocking()) { - LOG(FATAL) << "Cannot set socket to non blocking!"; - } - if (!socket.Listen(1024)) { - LOG(FATAL) << "Cannot listen on socket!"; - } - - // Initialize server - server_ = - std::make_unique(std::move(socket), protocol_data_); - - // Start server - thread_ = std::thread( - [workers_count, this]() { this->server_->Start(workers_count); }); - } - - void StopServer() { - if (server_ != nullptr) { - server_->Shutdown(); - thread_.join(); - server_ = nullptr; - } - } - - private: - // client variables - SpinLock mutex_; - std::vector pool_; - std::queue queue_; - std::atomic client_run_; - - // server variables - std::thread thread_; - Protocol::Data protocol_data_; - std::unique_ptr server_{nullptr}; -}; - -/** - * Message that includes the sender channel used to respond. - */ -class ReturnAddressMsg : public Message { - public: - /* The return address is on the current reactor, specified channel */ - ReturnAddressMsg(std::string channel); - - /* The return address is on a specified reactor/channel */ - ReturnAddressMsg(std::string reactor, std::string channel); - - std::string Address() const; - uint16_t Port() const; - std::string ReactorName() const; - std::string ChannelName() const; - - std::shared_ptr GetReturnChannelWriter() const; - - template - void serialize(Archive &ar) { - ar(cereal::virtual_base_class(this), address_, port_, - reactor_, channel_); - } - - protected: - friend class cereal::access; - ReturnAddressMsg(); // Cereal needs access to a default constructor. - - private: - std::string address_; - uint16_t port_; - std::string reactor_; - std::string channel_; -}; -CEREAL_REGISTER_TYPE(ReturnAddressMsg); - - -/** - * Message that will arrive on a stream returned by Distributed::FindChannel - * once and if the channel is successfully resolved. - */ -class ChannelResolvedMessage : public Message { - public: - ChannelResolvedMessage() {} - ChannelResolvedMessage(std::shared_ptr channel_writer) - : Message(), channel_writer_(channel_writer) {} - - std::shared_ptr channelWriter() const { return channel_writer_; } - - private: - std::shared_ptr channel_writer_; -}; - -/** - * Placeholder for all functionality related to non-local communication. - * - * E.g. resolve remote channels by memgraph node id, etc. - * Alive through the entire process lifetime. - * Singleton class. Created automatically on first use. - * Final (can't extend) because it's a singleton. Please be careful if you're changing this. - */ -class Distributed final { - public: - /** - * Get the (singleton) instance of Distributed. - * - * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern - */ - static Distributed &GetInstance() { - static Distributed distributed; // guaranteed to be destroyed, initialized on first use - return distributed; - } - - void StartServices() { - network_.StartClient(4); - network_.StartServer(4); - } - - void StopServices() { - network_.StopClient(); - network_.StopServer(); - } - - // TODO: Implement remote Spawn. - - /** - * Resolves remote channel. - * - * TODO: Provide asynchronous implementation of this function. - * - * @return EventStream on which message will arrive once channel is resolved. - * @warning It can only be called from local Reactor. - */ - EventStream* FindChannel(const std::string &address, - uint16_t port, - const std::string &reactor_name, - const std::string &channel_name) { - std::shared_ptr channel_writer = nullptr; - while (!(channel_writer = network_.Resolve(address, port, reactor_name, channel_name))) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - auto stream_channel = current_reactor_->Open(); - stream_channel.second->Send(channel_writer); - return stream_channel.first; - } - - Network &network() { return network_; } - - protected: - Distributed() {} - - Network network_; - - private: - Distributed(const Distributed &) = delete; - Distributed(Distributed &&) = delete; - Distributed &operator=(const Distributed &) = delete; - Distributed &operator=(Distributed &&) = delete; -}; diff --git a/experimental/distributed/src/reactors_local.cpp b/experimental/distributed/src/reactors_local.cpp deleted file mode 100644 index f300d7afb..000000000 --- a/experimental/distributed/src/reactors_local.cpp +++ /dev/null @@ -1,134 +0,0 @@ -#include "reactors_local.hpp" - -void EventStream::Subscription::Unsubscribe() const { - event_queue_.RemoveCb(*this); -} - -void EventStream::Subscription::CloseChannel() const { - event_queue_.Close(); -} - -const std::string& EventStream::Subscription::ChannelName() const { - return event_queue_.channel_name_; -} - -thread_local Reactor* current_reactor_ = nullptr; - -std::string Channel::LocalChannelWriter::ReactorName() { - return reactor_name_; -} - -std::string Channel::LocalChannelWriter::Name() { - return channel_name_; -} - -void Channel::Close() { - // TODO(zuza): there will be major problems if a reactor tries to close a stream that isn't theirs - // luckily this should never happen if the framework is used as expected. - current_reactor_->CloseChannel(channel_name_); -} - -std::pair> Reactor::Open(const std::string &channel_name) { - std::unique_lock lock(*mutex_); - // TODO: Improve the check that the channel name does not exist in the - // system. - if (channels_.count(channel_name) != 0) { - throw std::runtime_error("Channel with name " + channel_name - + "already exists"); - } - auto it = channels_.emplace(channel_name, - std::make_shared(Channel::Params{name_, channel_name, mutex_, cvar_})).first; - it->second->self_ptr_ = it->second; - return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); -} - -std::pair> Reactor::Open() { - std::unique_lock lock(*mutex_); - do { - std::string channel_name = "stream-" + std::to_string(channel_name_counter_++); - if (channels_.count(channel_name) == 0) { - // Channel &queue = channels_[channel_name]; - auto it = channels_.emplace(channel_name, - std::make_shared(Channel::Params{name_, channel_name, mutex_, cvar_})).first; - it->second->self_ptr_ = it->second; - return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); - } - } while (true); -} - -const std::shared_ptr Reactor::FindChannel( - const std::string &channel_name) { - std::unique_lock lock(*mutex_); - auto it_channel = channels_.find(channel_name); - if (it_channel == channels_.end()) return nullptr; - return it_channel->second->LockedOpenChannel(); -} - -void Reactor::CloseChannel(const std::string &s) { - std::unique_lock lock(*mutex_); - auto it = channels_.find(s); - assert(it != channels_.end()); - channels_.erase(it); - cvar_->notify_all(); -} - -void Reactor::RunEventLoop() { - bool exit_event_loop = false; - - while (true) { - // Find (or wait) for the next Message. - MsgAndCbInfo msg_and_cb; - { - std::unique_lock lock(*mutex_); - - while (true) { - // Not fair because was taken earlier, talk to lion. - msg_and_cb = LockedGetPendingMessages(); - if (msg_and_cb.first != nullptr) break; - - // Exit the loop if there are no more Channels. - if (channels_.empty()) { - exit_event_loop = true; - break; - } - - cvar_->wait(lock); - } - - if (exit_event_loop) break; - } - - for (auto &cbAndSub : msg_and_cb.second) { - auto &cb = cbAndSub.first; - const Message &msg = *msg_and_cb.first; - cb(msg, cbAndSub.second); - } - } -} - -/** - * Checks if there is any nonempty EventStream. - */ -auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { - // return type after because the scope Reactor:: is not searched before the name - for (auto &channels_key_value : channels_) { - Channel &event_queue = *channels_key_value.second; - auto msg_ptr = event_queue.LockedPop(); - if (msg_ptr == nullptr) continue; - std::type_index tidx = msg_ptr->GetTypeIndex(); - - std::vector > cb_info; - auto msg_type_cb_iter = event_queue.callbacks_.find(tidx); - if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type. - for (auto &tidx_cb_key_value : msg_type_cb_iter->second) { - uint64_t uid = tidx_cb_key_value.first; - EventStream::Callback cb = tidx_cb_key_value.second; - cb_info.emplace_back(cb, Subscription(event_queue, tidx, uid)); - } - } - - return MsgAndCbInfo(std::move(msg_ptr), std::move(cb_info)); - } - - return MsgAndCbInfo(nullptr, {}); -} diff --git a/experimental/distributed/src/reactors_local.hpp b/experimental/distributed/src/reactors_local.hpp deleted file mode 100644 index 828e02230..000000000 --- a/experimental/distributed/src/reactors_local.hpp +++ /dev/null @@ -1,540 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "cereal/types/memory.hpp" - -class EventStream; -class Reactor; -class System; -class Channel; - -extern thread_local Reactor* current_reactor_; - -/** - * Base class for messages. - */ -class Message { - public: - virtual ~Message() {} - - template - void serialize(Archive &) {} - - /** Run-time type identification that is used for callbacks. - * - * Warning: this works because of the virtual destructor, don't remove it from this class - */ - std::type_index GetTypeIndex() { - return typeid(*this); - } -}; - -/** - * Write-end of a Channel (between two reactors). - */ -class ChannelWriter { - public: - /** - * Construct and send the message to the channel. - */ - template - void Send(Args&&... args) { - Send(std::unique_ptr(new MsgType(std::forward(args)...))); - } - - virtual void Send(std::unique_ptr ptr) = 0; - - virtual std::string ReactorName() = 0; - - virtual std::string Name() = 0; - - void operator=(const ChannelWriter &) = delete; - - template - void serialize(Archive &archive) { - archive(ReactorName(), Name()); - } -}; - -/** - * Read-end of a Channel (between two reactors). - */ -class EventStream { - public: - class OnEventOnceChainer; - class Subscription; - - /** - * Register a callback that will be called whenever an event arrives. - */ - template - void OnEvent(std::function &&cb) { - OnEventHelper(typeid(MsgType), - [cb = std::move(cb)](const Message &general_msg, - const Subscription &subscription) { - const MsgType &correct_msg = dynamic_cast(general_msg); - cb(correct_msg, subscription); - }); - } - - /** - * Register a callback that will be called only once. - * Once event is received, channel of this EventStream is closed. - */ - template - void OnEventOnceThenClose(std::function &&cb) { - OnEventHelper(typeid(MsgType), - [cb = std::move(cb)](const Message &general_msg, - const Subscription &subscription) { - const MsgType &correct_msg = dynamic_cast(general_msg); - subscription.CloseChannel(); - cb(correct_msg); - }); - } - - /** - * Starts a chain to register a callback that fires off only once. - * - * This method supports chaining (see the the class OnEventOnceChainer or the tests for examples). - * Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last - * chained callback fired. - */ - OnEventOnceChainer OnEventOnce() { - return OnEventOnceChainer(*this); - } - - /** - * Get the name of the channel. - */ - virtual const std::string &ChannelName() = 0; - - /** - * Subscription Service. - * - * Unsubscribe from a callback. Lightweight object (can copy by value). - */ - class Subscription { - public: - /** - * Unsubscribe. Call only once. - */ - void Unsubscribe() const; - - /** - * Close the stream. Convenience method. - */ - void CloseChannel() const; - - /** - * Get the name of the channel the message is delivered to. - */ - const std::string& ChannelName() const; - - private: - friend class Reactor; - friend class Channel; - - Subscription(Channel &event_queue, std::type_index tidx, uint64_t cb_uid) - : event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { } - - Channel &event_queue_; - std::type_index tidx_; - uint64_t cb_uid_; - }; - - /** - * Close this event stream, disallowing further events from getting received. - * - * Any subsequent call after Close() to any function will be result in undefined - * behavior (invalid pointer dereference). Can only be called from the thread - * associated with the Reactor. - */ - virtual void Close() = 0; - - /** - * Convenience class to chain one-off callbacks. - * - * Usage: Create this class with OnEventOnce() and then chain callbacks using ChainOnce. - * A callback will fire only once, unsubscribe and immediately subscribe the next callback to the stream. - * - * Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb); - * - * Implementation: This class is a temporary object that remembers the callbacks that are to be installed - * and finally installs them in the destructor. Not sure is this kosher, is there another way? - */ - class OnEventOnceChainer { - public: - OnEventOnceChainer(EventStream &event_stream) : event_stream_(event_stream) {} - ~OnEventOnceChainer() { - InstallCallbacks(); - } - - template - OnEventOnceChainer &ChainOnce(std::function &&cb) { - std::function wrap = - [cb = std::move(cb)](const Message &general_msg, const Subscription &subscription) { - const MsgType &correct_msg = dynamic_cast(general_msg); - subscription.Unsubscribe(); - cb(correct_msg, subscription); // Warning: this can close the Channel, be careful what you put after it! - }; - cbs_.emplace_back(typeid(MsgType), std::move(wrap)); - return *this; - } - - private: - void InstallCallbacks() { - int num_callbacks = cbs_.size(); - assert(num_callbacks > 0); // We should install at least one callback, otherwise the usage is wrong? - std::function next_cb = nullptr; - std::type_index next_type = typeid(nullptr); - - for (int i = num_callbacks - 1; i >= 0; --i) { - std::function tmp_cb = nullptr; - tmp_cb = [cb = std::move(cbs_[i].second), - next_type, - next_cb = std::move(next_cb), - es_ptr = &this->event_stream_](const Message &msg, const Subscription &subscription) { - cb(msg, subscription); - if (next_cb != nullptr) { - es_ptr->OnEventHelper(next_type, std::move(next_cb)); - } - }; - next_cb = std::move(tmp_cb); - next_type = cbs_[i].first; - } - - event_stream_.OnEventHelper(next_type, std::move(next_cb)); - } - - EventStream &event_stream_; - std::vector>> cbs_; - }; - typedef std::function Callback; - -private: - virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0; -}; - -using Subscription = EventStream::Subscription; // To write less. - -/** - * Implementation of a channel. - * - * This class is an internal data structure that represents the state of the channel. - * This class is not meant to be used by the clients of the messaging framework. - * The Channel class wraps the event queue data structure, the mutex that protects - * concurrent access to the event queue, the local channel and the event stream. - * The class is owned by the Reactor. It gets closed when the owner reactor - * (the one that owns the read-end of a channel) removes/closes it. - */ -class Channel { - struct Params; - - public: - friend class Reactor; // to create a Params initialization object - friend class EventStream::Subscription; - - Channel(Params params) - : channel_name_(params.channel_name), - reactor_name_(params.reactor_name), - mutex_(params.mutex), - cvar_(params.cvar), - stream_(mutex_, this) {} - - /** - * LocalChannelWriter represents the channels to reactors living in the same reactor system (write-end of the channels). - * - * Sending messages to the local channel requires acquiring the mutex. - * LocalChannelWriter holds a (weak) pointer to the enclosing Channel object. - * Messages sent to a closed channel are ignored. - * There can be multiple LocalChannelWriters refering to the same stream if needed. - */ - class LocalChannelWriter : public ChannelWriter { - public: - friend class Channel; - - LocalChannelWriter(std::shared_ptr mutex, std::string reactor_name, - std::string channel_name, std::weak_ptr queue) - : mutex_(mutex), - reactor_name_(reactor_name), - channel_name_(channel_name), - weak_queue_(queue) {} - - virtual void Send(std::unique_ptr m) { - std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. - if (queue_) { - // We guarantee here that the Channel is not destroyed. - std::unique_lock lock(*mutex_); - queue_->LockedPush(std::move(m)); - } - } - - virtual std::string ReactorName(); - - virtual std::string Name(); - - private: - std::shared_ptr mutex_; - std::string reactor_name_; - std::string channel_name_; - std::weak_ptr weak_queue_; - }; - - /** - * Implementation of the event stream. - * - * After the enclosing Channel object is destroyed (by a call to CloseChannel or Close). - */ - class LocalEventStream : public EventStream { - public: - friend class Channel; - - LocalEventStream(std::shared_ptr mutex, Channel *queue) : mutex_(mutex), queue_(queue) {} - - void OnEventHelper(std::type_index tidx, Callback callback) { - std::unique_lock lock(*mutex_); - queue_->LockedOnEventHelper(tidx, callback); - } - - const std::string &ChannelName() { - return queue_->channel_name_; - } - - void Close() { - queue_->Close(); - } - - private: - std::shared_ptr mutex_; - std::string channel_name_; - Channel *queue_; - }; - - /** - * Close the channel. Must be called from the reactor that owns the channel. - */ - void Close(); - - Channel(const Channel &other) = delete; - Channel(Channel &&other) = default; - Channel &operator=(const Channel &other) = delete; - Channel &operator=(Channel &&other) = default; - -private: - /** - * Initialization parameters to Channel. - * Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor. - */ - struct Params { - std::string reactor_name; - std::string channel_name; - std::shared_ptr mutex; - std::shared_ptr cvar; - }; - - - void LockedPush(std::unique_ptr m) { - queue_.emplace(std::move(m)); - // This is OK because there is only one Reactor (thread) that can wait on this Channel. - cvar_->notify_one(); - } - - std::shared_ptr LockedOpenChannel() { - assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned - return std::make_shared(mutex_, reactor_name_, channel_name_, self_ptr_); - } - - std::unique_ptr LockedPop() { - return LockedRawPop(); - } - - void LockedOnEventHelper(std::type_index tidx, EventStream::Callback callback) { - uint64_t cb_uid = next_cb_uid++; - callbacks_[tidx][cb_uid] = callback; - } - - std::unique_ptr LockedRawPop() { - if (queue_.empty()) return nullptr; - std::unique_ptr t = std::move(queue_.front()); - queue_.pop(); - return t; - } - - void RemoveCb(const EventStream::Subscription &subscription) { - std::unique_lock lock(*mutex_); - size_t num_erased = callbacks_[subscription.tidx_].erase(subscription.cb_uid_); - assert(num_erased == 1); - } - - std::string channel_name_; - std::string reactor_name_; - std::queue> queue_; - // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. - std::shared_ptr mutex_; - std::shared_ptr cvar_; - /** - * A weak_ptr to itself. - * - * There are initialization problems with this, check Params. - */ - std::weak_ptr self_ptr_; - LocalEventStream stream_; - std::unordered_map > callbacks_; - uint64_t next_cb_uid = 0; -}; - -/** - * A single unit of concurrent execution in the system. - * - * E.g. one worker, one client. Owned by System. Has a thread associated with it. - */ -class Reactor { - public: - friend class System; - - Reactor(std::string name) - : name_(name), main_(Open("main")) {} - - virtual ~Reactor() {} - - virtual void Run() = 0; - - std::pair> Open(const std::string &s); - std::pair> Open(); - const std::shared_ptr FindChannel(const std::string &channel_name); - - /** - * Close a channel by name. - * - * Should only be called from the Reactor thread. - */ - void CloseChannel(const std::string &s); - - /** - * Get Reactor name - */ - const std::string &name() { return name_; } - - Reactor(const Reactor &other) = delete; - Reactor(Reactor &&other) = default; - Reactor &operator=(const Reactor &other) = delete; - Reactor &operator=(Reactor &&other) = default; - - protected: - std::string name_; - /* - * Locks all Reactor data, including all Channel's in channels_. - * - * This should be a shared_ptr because LocalChannelWriter can outlive Reactor. - */ - std::shared_ptr mutex_ = - std::make_shared(); - std::shared_ptr cvar_ = - std::make_shared(); - - /** - * List of channels of a reactor indexed by name. - * - * While the channels are owned by the reactor, a shared_ptr to solve the circular reference problem - * between ChannelWriters and EventStreams. - */ - std::unordered_map> channels_; - int64_t channel_name_counter_{0}; - std::pair> main_; - - private: - typedef std::pair, - std::vector > > MsgAndCbInfo; - - /** - * Dispatches all waiting messages to callbacks. Shuts down when there are no callbacks left. - */ - void RunEventLoop(); - - // TODO: remove proof of locking evidence ?! - MsgAndCbInfo LockedGetPendingMessages(); -}; - - -/** - * Global placeholder for all reactors in the system. - * - * E.g. holds set of reactors, channels for all reactors. - * Alive through the entire process lifetime. - * Singleton class. Created automatically on first use. - * Final (can't extend) because it's a singleton. Please be careful if you're changing this. - */ -class System final { - public: - friend class Reactor; - - /** - * Get the (singleton) instance of System. - * - * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern - */ - static System &GetInstance() { - static System system; // guaranteed to be destroyed, initialized on first use - return system; - } - - template - const std::shared_ptr Spawn(const std::string &name, - Args &&... args) { - std::unique_lock lock(mutex_); - auto *raw_reactor = - new ReactorType(name, std::forward(args)...); - std::unique_ptr reactor(raw_reactor); - // Capturing a pointer isn't ideal, I would prefer to capture a Reactor&, but not sure how to do it. - std::thread reactor_thread( - [this, raw_reactor]() { this->StartReactor(*raw_reactor); }); - assert(reactors_.count(name) == 0); - reactors_.emplace(name, std::pair, std::thread> - (std::move(reactor), std::move(reactor_thread))); - return nullptr; - } - - const std::shared_ptr FindChannel(const std::string &reactor_name, - const std::string &channel_name) { - std::unique_lock lock(mutex_); - auto it_reactor = reactors_.find(reactor_name); - if (it_reactor == reactors_.end()) return nullptr; - return it_reactor->second.first->FindChannel(channel_name); - } - - void AwaitShutdown() { - for (auto &key_value : reactors_) { - auto &thread = key_value.second.second; - thread.join(); - } - reactors_.clear(); // for testing, since System is a singleton now - } - - private: - System() {} - System(const System &) = delete; - System(System &&) = delete; - System &operator=(const System &) = delete; - System &operator=(System &&) = delete; - - void StartReactor(Reactor &reactor) { - current_reactor_ = &reactor; - reactor.Run(); - reactor.RunEventLoop(); // Activate callbacks. - } - - std::recursive_mutex mutex_; - // TODO: Replace with a map to a reactor Channel map to have more granular - // locking. - std::unordered_map, std::thread>> - reactors_; -}; diff --git a/experimental/distributed/tests/distributed_test.cpp b/experimental/distributed/tests/distributed_test.cpp index a2b684d74..bb20439c7 100644 --- a/experimental/distributed/tests/distributed_test.cpp +++ b/experimental/distributed/tests/distributed_test.cpp @@ -1,12 +1,14 @@ -#include #include +#include #include #include "memgraph_config.hpp" #include "reactors_distributed.hpp" -DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future +DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be + // assigned by the leader once in + // the future class MemgraphDistributed { private: @@ -16,32 +18,35 @@ class MemgraphDistributed { /** * Get the (singleton) instance of MemgraphDistributed. * - * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern + * More info: + * https://stackoverflow.com/questions/1008019/c-singleton-design-pattern */ static MemgraphDistributed &GetInstance() { - static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use + static MemgraphDistributed + memgraph; // guaranteed to be destroyed, initialized on first use return memgraph; } /** Register memgraph node id to the given location. */ - void RegisterMemgraphNode(int64_t mnid, const std::string &address, uint16_t port) { - std::unique_lock lock(mutex_); + void RegisterMemgraphNode(int64_t mnid, const std::string &address, + uint16_t port) { + std::unique_lock lock(mutex_); mnodes_[mnid] = Location(address, port); } - EventStream* FindChannel(int64_t mnid, - const std::string &reactor, + EventStream *FindChannel(int64_t mnid, const std::string &reactor, const std::string &channel) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); const auto &location = mnodes_.at(mnid); - return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel); + return Distributed::GetInstance().FindChannel( + location.first, location.second, reactor, channel); } protected: MemgraphDistributed() {} private: - std::recursive_mutex mutex_; + std::mutex mutex_; std::unordered_map mnodes_; MemgraphDistributed(const MemgraphDistributed &) = delete; @@ -64,8 +69,8 @@ class MemgraphDistributed { * * @return Pair (master mnid, list of worker's id). */ -std::pair> - ParseConfigAndRegister(const std::string &filename) { +std::pair> ParseConfigAndRegister( + const std::string &filename) { std::ifstream file(filename, std::ifstream::in); assert(file.good()); int64_t master_mnid; @@ -78,8 +83,7 @@ std::pair> memgraph.RegisterMemgraphNode(master_mnid, address, port); while (file.good()) { file >> mnid >> address >> port; - if (file.eof()) - break ; + if (file.eof()) break; memgraph.RegisterMemgraphNode(mnid, address, port); worker_mnids.push_back(mnid); } @@ -91,9 +95,9 @@ std::pair> * Sends a text message and has a return address. */ class TextMessage : public ReturnAddressMsg { -public: + public: TextMessage(std::string reactor, std::string channel, std::string s) - : ReturnAddressMsg(reactor, channel), text(s) {} + : ReturnAddressMsg(reactor, channel), text(s) {} template void serialize(Archive &archive) { @@ -102,51 +106,52 @@ public: std::string text; -protected: + protected: friend class cereal::access; - TextMessage() {} // Cereal needs access to a default constructor. + TextMessage() {} // Cereal needs access to a default constructor. }; CEREAL_REGISTER_TYPE(TextMessage); - class Master : public Reactor { public: Master(std::string name, int64_t mnid, std::vector &&worker_mnids) - : Reactor(name), mnid_(mnid), - worker_mnids_(std::move(worker_mnids)) {} + : Reactor(name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {} virtual void Run() { MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance(); Distributed &distributed = Distributed::GetInstance(); - std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address() - << ":" << distributed.network().Port() << std::endl; + std::cout << "Master (" << mnid_ << ") @ " + << distributed.network().Address() << ":" + << distributed.network().Port() << std::endl; auto stream = main_.first; // wait until every worker sends a ReturnAddressMsg back, then close - stream->OnEvent([this](const TextMessage &msg, - const Subscription &subscription) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; - ++workers_seen; - if (workers_seen == worker_mnids_.size()) { - subscription.Unsubscribe(); - // Sleep for a while so we can read output in the terminal. - // (start_distributed.py runs each process in a new tab which is - // closed immediately after process has finished) - std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseChannel("main"); - } - }); + stream->OnEvent( + [this](const TextMessage &msg, const Subscription &subscription) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() + << " .. " << msg.text << "\n"; + ++workers_seen; + if (workers_seen == static_cast(worker_mnids_.size())) { + subscription.Unsubscribe(); + // Sleep for a while so we can read output in the terminal. + // (start_distributed.py runs each process in a new tab which is + // closed immediately after process has finished) + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseChannel("main"); + } + }); // send a TextMessage to each worker for (auto wmnid : worker_mnids_) { auto stream = memgraph.FindChannel(wmnid, "worker", "main"); - stream->OnEventOnce() - .ChainOnce([this, stream](const ChannelResolvedMessage &msg, const Subscription&){ - msg.channelWriter()->Send("master", "main", "hi from master"); - stream->Close(); - }); + stream->OnEventOnce().ChainOnce([this, stream]( + const ChannelResolvedMessage &msg, const Subscription &) { + msg.channelWriter()->Send("master", "main", + "hi from master"); + stream->Close(); + }); } } @@ -159,28 +164,29 @@ class Master : public Reactor { class Worker : public Reactor { public: Worker(std::string name, int64_t mnid, int64_t master_mnid) - : Reactor(name), mnid_(mnid), - master_mnid_(master_mnid) {} + : Reactor(name), mnid_(mnid), master_mnid_(master_mnid) {} virtual void Run() { Distributed &distributed = Distributed::GetInstance(); - std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address() - << ":" << distributed.network().Port() << std::endl; + std::cout << "Worker (" << mnid_ << ") @ " + << distributed.network().Address() << ":" + << distributed.network().Port() << std::endl; auto stream = main_.first; // wait until master sends us a TextMessage, then reply back and close - stream->OnEventOnce() - .ChainOnce([this](const TextMessage &msg, const Subscription&) { - std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; + stream->OnEventOnce().ChainOnce( + [this](const TextMessage &msg, const Subscription &) { + std::cout << "Message from " << msg.Address() << ":" << msg.Port() + << " .. " << msg.text << "\n"; - msg.GetReturnChannelWriter() - ->Send("worker", "main", "hi from worker"); + msg.GetReturnChannelWriter()->Send("worker", "main", + "hi from worker"); - // Sleep for a while so we can read output in the terminal. - std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseChannel("main"); - }); + // Sleep for a while so we can read output in the terminal. + std::this_thread::sleep_for(std::chrono::seconds(4)); + CloseChannel("main"); + }); } protected: @@ -188,7 +194,6 @@ class Worker : public Reactor { const int64_t master_mnid_; }; - int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); diff --git a/experimental/distributed/tests/reactors_distributed_unit.cpp b/experimental/distributed/tests/reactors_distributed_unit.cpp deleted file mode 100644 index 1c4d5ce4b..000000000 --- a/experimental/distributed/tests/reactors_distributed_unit.cpp +++ /dev/null @@ -1,227 +0,0 @@ -/** - * This test file test the Distributed Reactors API on ONLY one process (no real networking). - * In other words, we send a message from one process to itself. - */ - -#include "gtest/gtest.h" -#include "reactors_distributed.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include - -/** - * Test do the services start up without crashes. - */ -TEST(SimpleTests, StartAndStopServices) { - System &system = System::GetInstance(); - Distributed &distributed = Distributed::GetInstance(); - distributed.StartServices(); - - // do nothing - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - system.AwaitShutdown(); - distributed.StopServices(); -} - -/** - * Test simple message reception. - * - * Data flow: - * (1) Send an empty message from Master to Worker/main - */ -TEST(SimpleTests, SendEmptyMessage) { - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - - virtual void Run() { - Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") - ->OnEventOnce() - .ChainOnce([this](const ChannelResolvedMessage& msg, - const Subscription& subscription) { - msg.channelWriter()->Send(); - subscription.CloseChannel(); - }); - - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - virtual void Run() { - main_.first->OnEventOnce() - .ChainOnce([this](const Message&, const Subscription& subscription) { - // if this message isn't delivered, the main channel will never be closed and we infinite loop - subscription.CloseChannel(); // close "main" - }); - } - }; - - // emulate flags like it's a multiprocess system, these may be alredy set by default - FLAGS_address = "127.0.0.1"; - FLAGS_port = 10000; - - System &system = System::GetInstance(); - Distributed &distributed = Distributed::GetInstance(); - distributed.StartServices(); - - system.Spawn("master"); - system.Spawn("worker"); - - system.AwaitShutdown(); // this must be called before StopServices - distributed.StopServices(); -} - -/** - * Test ReturnAddressMsg functionality. - * - * Data flow: - * (1) Send an empty message from Master to Worker/main - * (2) Send an empty message from Worker to Master/main - */ -TEST(SimpleTests, SendReturnAddressMessage) { - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - - virtual void Run() { - Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") - ->OnEventOnce() - .ChainOnce([this](const ChannelResolvedMessage& msg, - const Subscription& sub) { - // send a message that will be returned to "main" - msg.channelWriter()->Send(this->name(), "main"); - // close this anonymous channel - sub.CloseChannel(); - }); - - main_.first->OnEventOnce() - .ChainOnce([this](const Message&, const Subscription& sub) { - // if this message isn't delivered, the main channel will never be closed and we infinite loop - // close the "main" channel - sub.CloseChannel(); - }); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - virtual void Run() { - main_.first->OnEventOnce() - .ChainOnce([this](const ReturnAddressMsg &msg, const Subscription& sub) { - msg.GetReturnChannelWriter()->Send(); - sub.CloseChannel(); // close "main" - }); - } - }; - - // emulate flags like it's a multiprocess system, these may be alredy set by default - FLAGS_address = "127.0.0.1"; - FLAGS_port = 10000; - - System &system = System::GetInstance(); - Distributed &distributed = Distributed::GetInstance(); - distributed.StartServices(); - - system.Spawn("master"); - system.Spawn("worker"); - - system.AwaitShutdown(); // this must be called before StopServices - distributed.StopServices(); -} - -// Apparently templates cannot be declared inside local classes, figure out how to move it in? -// For that reason I obscured the name. -struct SerializableMessage_TextMessage : public ReturnAddressMsg { - SerializableMessage_TextMessage(std::string channel, std::string arg_text, int arg_val) - : ReturnAddressMsg(channel), text(arg_text), val(arg_val) {} - std::string text; - int val; - - template - void serialize(Archive &ar) { - ar(cereal::virtual_base_class(this), text, val); - } - - protected: - friend class cereal::access; - SerializableMessage_TextMessage() {} // Cereal needs access to a default constructor. -}; -CEREAL_REGISTER_TYPE(SerializableMessage_TextMessage); - -/** - * Test serializability of a complex message over the network layer. - * - * Data flow: - * (1) Send ("hi", 123) from Master to Worker/main - * (2) Send ("hi back", 779) from Worker to Master/main - */ -TEST(SimpleTests, SendSerializableMessage) { - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - - virtual void Run() { - Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") - ->OnEventOnce() - .ChainOnce([this](const ChannelResolvedMessage& msg, - const Subscription& sub) { - // send a message that will be returned to "main" - msg.channelWriter()->Send("main", "hi", 123); - // close this anonymous channel - sub.CloseChannel(); - }); - - main_.first->OnEventOnce() - .ChainOnce([this](const SerializableMessage_TextMessage& msg, const Subscription& sub) { - ASSERT_EQ(msg.text, "hi back"); - ASSERT_EQ(msg.val, 779); - // if this message isn't delivered, the main channel will never be closed and we infinite loop - // close the "main" channel - sub.CloseChannel(); - }); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - virtual void Run() { - main_.first->OnEventOnce() - .ChainOnce([this](const SerializableMessage_TextMessage &msg, const Subscription& sub) { - ASSERT_EQ(msg.text, "hi"); - ASSERT_EQ(msg.val, 123); - msg.GetReturnChannelWriter()->Send - ("no channel, dont use this", "hi back", 779); - sub.CloseChannel(); // close "main" - }); - } - }; - - // emulate flags like it's a multiprocess system, these may be alredy set by default - FLAGS_address = "127.0.0.1"; - FLAGS_port = 10000; - - System &system = System::GetInstance(); - Distributed &distributed = Distributed::GetInstance(); - distributed.StartServices(); - - system.Spawn("master"); - system.Spawn("worker"); - - system.AwaitShutdown(); // this must be called before StopServices - distributed.StopServices(); -} - - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/experimental/distributed/tests/reactors_local_unit.cpp b/experimental/distributed/tests/reactors_local_unit.cpp deleted file mode 100644 index 639698afd..000000000 --- a/experimental/distributed/tests/reactors_local_unit.cpp +++ /dev/null @@ -1,483 +0,0 @@ -#include "reactors_local.hpp" -#include "gtest/gtest.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -TEST(SystemTest, ReturnWithoutThrowing) { - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { CloseChannel("main"); } - }; - - System &system = System::GetInstance(); - ASSERT_NO_THROW(system.Spawn("master")); - ASSERT_NO_THROW(system.AwaitShutdown()); -} - -TEST(ChannelCreationTest, ThrowOnReusingChannelName) { - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - Open("channel"); - ASSERT_THROW(Open("channel"), std::runtime_error); - CloseChannel("main"); - CloseChannel("channel"); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.AwaitShutdown(); -} - -TEST(ChannelSetUpTest, CheckMainChannelIsSet) { - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("master", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - CloseChannel("main"); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(SimpleSendTest, OneCallback) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send(888); - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - virtual void Run() { - EventStream *stream = main_.first; - - stream->OnEvent( - [this](const MessageInt &msg, const Subscription &) { - ASSERT_EQ(msg.x, 888); - CloseChannel("main"); - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(SimpleSendTest, IgnoreAfterClose) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send(101); - channel_writer->Send(102); // should be ignored - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send(103); // should be ignored - channel_writer->Send(104); // should be ignored - CloseChannel( - "main"); // Write-end doesn't need to be closed because it's in RAII. - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - virtual void Run() { - EventStream *stream = main_.first; - - stream->OnEvent( - [this](const MessageInt &msg, const Subscription &) { - CloseChannel("main"); - ASSERT_EQ(msg.x, 101); - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(SimpleSendTest, DuringFirstEvent) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct Master : public Reactor { - Master(std::string name, std::promise p) - : Reactor(name), p_(std::move(p)) {} - virtual void Run() { - EventStream *stream = main_.first; - - stream->OnEvent( - [this](const Message &msg, const Subscription &subscription) { - const MessageInt &msgint = dynamic_cast(msg); - if (msgint.x == 101) FindChannel("main")->Send(102); - if (msgint.x == 102) { - subscription.Unsubscribe(); - CloseChannel("main"); - p_.set_value(777); - } - }); - - std::shared_ptr channel_writer = FindChannel("main"); - channel_writer->Send(101); - } - std::promise p_; - }; - - System &system = System::GetInstance(); - std::promise p; - auto f = p.get_future(); - system.Spawn("master", std::move(p)); - f.wait(); - ASSERT_EQ(f.get(), 777); - system.AwaitShutdown(); -} - -TEST(MultipleSendTest, UnsubscribeService) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - struct MessageChar : public Message { - MessageChar(char xx) : x(xx) {} - char x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send(55); - channel_writer->Send(66); - channel_writer->Send(77); - channel_writer->Send(88); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send('a'); - channel_writer->Send('b'); - channel_writer->Send('c'); - channel_writer->Send('d'); - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - int num_msgs_received = 0; - - virtual void Run() { - EventStream *stream = main_.first; - - stream->OnEvent( - [this](const MessageInt &msgint, const Subscription &subscription) { - ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); - ++num_msgs_received; - if (msgint.x == 66) { - subscription.Unsubscribe(); // receive only two of them - } - }); - stream->OnEvent( - [this](const MessageChar &msgchar, const Subscription &subscription) { - char c = msgchar.x; - ++num_msgs_received; - ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); - if (num_msgs_received == 5) { - subscription.Unsubscribe(); - CloseChannel("main"); - } - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(MultipleSendTest, OnEvent) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - struct MessageChar : public Message { - MessageChar(char xx) : x(xx) {} - char x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - - channel_writer->Send(101); - channel_writer->Send('a'); - channel_writer->Send(103); - channel_writer->Send('b'); - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - struct EndMessage : Message {}; - int correct_vals = 0; - - virtual void Run() { - EventStream *stream = main_.first; - correct_vals = 0; - - stream->OnEvent( - [this](const MessageInt &msgint, const Subscription &) { - ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); - ++correct_vals; - main_.second->Send(); - }); - - stream->OnEvent( - [this](const MessageChar &msgchar, const Subscription &) { - ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); - ++correct_vals; - main_.second->Send(); - }); - - stream->OnEvent( - [this](const EndMessage &, const Subscription &) { - ASSERT_LE(correct_vals, 4); - if (correct_vals == 4) { - CloseChannel("main"); - } - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(MultipleSendTest, Chaining) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send(55); - channel_writer->Send(66); - channel_writer->Send(77); - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - virtual void Run() { - EventStream *stream = main_.first; - - stream->OnEventOnce() - .ChainOnce( - [this](const MessageInt &msg, const Subscription &) { - ASSERT_EQ(msg.x, 55); - }) - .ChainOnce( - [](const MessageInt &msg, const Subscription &) { - ASSERT_EQ(msg.x, 66); - }) - .ChainOnce( - [this](const MessageInt &msg, const Subscription &) { - ASSERT_EQ(msg.x, 77); - CloseChannel("main"); - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(MultipleSendTest, ChainingInRightOrder) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct MessageChar : public Message { - MessageChar(char xx) : x(xx) {} - char x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel_writer->Send('a'); - channel_writer->Send(55); - channel_writer->Send('b'); - channel_writer->Send(77); - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - virtual void Run() { - EventStream *stream = main_.first; - - stream->OnEventOnce() - .ChainOnce( - [this](const MessageInt &msg, const Subscription &) { - ASSERT_EQ(msg.x, 55); - }) - .ChainOnce( - [](const MessageChar &msg, const Subscription &) { - ASSERT_EQ(msg.x, 'b'); - }) - .ChainOnce( - [this](const MessageInt &msg, const Subscription &) { - ASSERT_EQ(msg.x, 77); - CloseChannel("main"); - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -TEST(MultipleSendTest, ProcessManyMessages) { - const static int num_tests = 100; - - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel_writer; - while (!(channel_writer = - System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - - std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); - for (int i = 0; i < num_tests; ++i) { - channel_writer->Send(rand()); - std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5)); - } - CloseChannel("main"); - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - - struct EndMessage : Message {}; - int vals = 0; - - virtual void Run() { - EventStream *stream = main_.first; - vals = 0; - - stream->OnEvent( - [this](const Message &, const Subscription &) { - ++vals; - main_.second->Send(); - }); - - stream->OnEvent( - [this](const Message &, const Subscription &) { - ASSERT_LE(vals, num_tests); - if (vals == num_tests) { - CloseChannel("main"); - } - }); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 2d29c6a62..04d1a69ad 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -163,3 +163,6 @@ import_header_library(cppitertools ${CMAKE_CURRENT_SOURCE_DIR}) # Setup json import_header_library(json ${CMAKE_CURRENT_SOURCE_DIR}) + +# Setup cereal +import_header_library(cereal "${CMAKE_CURRENT_SOURCE_DIR}/cereal/include") diff --git a/libs/setup.sh b/libs/setup.sh index aaf8b9811..5c62cd20b 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -10,9 +10,9 @@ cd ${working_dir} # antlr antlr_generator_filename="antlr-4.6-complete.jar" -#wget -O ${antlr_generator_filename} http://www.antlr.org/download/${antlr_generator_filename} +# wget -O ${antlr_generator_filename} http://www.antlr.org/download/${antlr_generator_filename} wget -nv -O ${antlr_generator_filename} http://deps.memgraph.io/${antlr_generator_filename} -#git clone https://github.com/antlr/antlr4.git +# git clone https://github.com/antlr/antlr4.git git clone git://deps.memgraph.io/antlr4.git antlr4_tag="aacd2a2c95816d8dc1c05814051d631bfec4cf3e" # v4.6 cd antlr4 @@ -23,7 +23,7 @@ cd .. # Use our fork that uses experimental/optional instead of unique_ptr in # DerefHolder. Once we move memgraph to c++17 we can use cpp17 branch from # original repo. -#git clone https://github.com/memgraph/cppitertools.git +# git clone https://github.com/memgraph/cppitertools.git git clone git://deps.memgraph.io/cppitertools.git cd cppitertools cppitertools_tag="4231e0bc6fba2737b2a7a8a1576cf06186b0de6a" # experimental_optional 17 Aug 2017 @@ -31,7 +31,7 @@ git checkout ${cppitertools_tag} cd .. # fmt -#git clone https://github.com/fmtlib/fmt.git +# git clone https://github.com/fmtlib/fmt.git git clone git://deps.memgraph.io/fmt.git fmt_tag="7fa8f8fa48b0903deab5bb42e6760477173ac485" # v3.0.1 # Commit which fixes an issue when compiling with C++14 and higher. @@ -42,7 +42,7 @@ git cherry-pick -n ${fmt_cxx14_fix} cd .. # rapidcheck -#git clone https://github.com/emil-e/rapidcheck.git +# git clone https://github.com/emil-e/rapidcheck.git git clone git://deps.memgraph.io/rapidcheck.git rapidcheck_tag="853e14f0f4313a9eb3c71e24848373e7b843dfd1" # Jun 23, 2017 cd rapidcheck @@ -50,7 +50,7 @@ git checkout ${rapidcheck_tag} cd .. # google benchmark -#git clone https://github.com/google/benchmark.git +# git clone https://github.com/google/benchmark.git git clone git://deps.memgraph.io/benchmark.git benchmark_tag="4f8bfeae470950ef005327973f15b0044eceaceb" # v1.1.0 cd benchmark @@ -58,7 +58,7 @@ git checkout ${benchmark_tag} cd .. # google test -#git clone https://github.com/google/googletest.git +# git clone https://github.com/google/googletest.git git clone git://deps.memgraph.io/googletest.git googletest_tag="ec44c6c1675c25b9827aacd08c02433cccde7780" # v1.8.0 cd googletest @@ -66,7 +66,7 @@ git checkout ${googletest_tag} cd .. # google logging -#git clone https://github.com/memgraph/glog.git +# git clone https://github.com/memgraph/glog.git git clone git://deps.memgraph.io/glog.git glog_tag="a6ee5ef590190cdb9f69cccc2db99dc5994b2f92" # custom version (v0.3.5+) cd glog @@ -74,7 +74,7 @@ git checkout ${glog_tag} cd .. # lcov-to-coberatura-xml -#git clone https://github.com/eriwen/lcov-to-cobertura-xml.git +# git clone https://github.com/eriwen/lcov-to-cobertura-xml.git git clone git://deps.memgraph.io/lcov-to-cobertura-xml.git lcov_to_xml_tag="59584761cb5da4687693faec05bf3e2b74e9dde9" # Dec 6, 2016 cd lcov-to-cobertura-xml @@ -82,7 +82,7 @@ git checkout ${lcov_to_xml_tag} cd .. # google flags -#git clone https://github.com/memgraph/gflags.git +# git clone https://github.com/memgraph/gflags.git git clone git://deps.memgraph.io/gflags.git gflags_tag="b37ceb03a0e56c9f15ce80409438a555f8a67b7c" # custom version (May 6, 2017) cd gflags @@ -106,14 +106,19 @@ rm postgres.tar.gz # We use head on Sep 1, 2017 instead of last release since it was long time ago. mkdir json cd json -#wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp" +# wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp" wget -nv http://deps.memgraph.io/json.hpp cd .. -#ltalloc -#git clone https://github.com/r-lyeh/ltalloc.git +# ltalloc +# git clone https://github.com/r-lyeh/ltalloc.git git clone git://deps.memgraph.io/ltalloc.git ltalloc_tag="aefde2afa5cd49c9d1a797aa08ec08b2bec13a36" # Sep 15, 2017 cd ltalloc git checkout ${ltalloc_tag} + +# cereal +git clone https://github.com/USCiLab/cereal.git +cd cereal +git checkout v1.2.2 cd .. diff --git a/experimental/distributed/src/protocol.cpp b/src/communication/reactor/protocol.cpp similarity index 92% rename from experimental/distributed/src/protocol.cpp rename to src/communication/reactor/protocol.cpp index fd2fb3d28..b68523ae3 100644 --- a/experimental/distributed/src/protocol.cpp +++ b/src/communication/reactor/protocol.cpp @@ -5,10 +5,9 @@ #include "glog/logging.h" -namespace Protocol { +namespace protocol { -Session::Session(Socket &&socket, Data &) - : socket_(std::move(socket)) { +Session::Session(Socket &&socket, Data &) : socket_(std::move(socket)) { event_.data.ptr = this; } @@ -22,9 +21,12 @@ std::string Session::GetStringAndShift(SizeT len) { void Session::Execute() { if (!handshake_done_) { - // Note: this function can be multiple times before the buffer has the full packet. - // We currently have to check for this case and return without shifting the buffer. - // In other words, only shift anything from the buffer if you can read the entire (sub)message. + // Note: this function can be multiple times before the buffer has the full + // packet. + // We currently have to check for this case and return without shifting + // the buffer. + // In other words, only shift anything from the buffer if you can read the + // entire (sub)message. if (buffer_.size() < 2 * sizeof(SizeT)) return; SizeT len_reactor = GetLength(); @@ -56,7 +58,7 @@ void Session::Execute() { // TODO: check for exceptions std::istringstream stream; - stream.str(std::string(reinterpret_cast(buffer_.data()), len_data)); + stream.str(std::string(reinterpret_cast(buffer_.data()), len_data)); cereal::BinaryInputArchive iarchive{stream}; std::unique_ptr message{nullptr}; iarchive(message); @@ -157,7 +159,7 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor, LOG(INFO) << "Couldn't send message size!"; return false; } - if (!socket.Write(buffer.data(), buffer.size())) { + if (!socket.Write(buffer)) { LOG(INFO) << "Couldn't send message data!"; return false; } diff --git a/experimental/distributed/src/protocol.hpp b/src/communication/reactor/protocol.hpp similarity index 95% rename from experimental/distributed/src/protocol.hpp rename to src/communication/reactor/protocol.hpp index c56964dc7..773acb238 100644 --- a/experimental/distributed/src/protocol.hpp +++ b/src/communication/reactor/protocol.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include "communication/bolt/v1/decoder/buffer.hpp" #include "io/network/epoll.hpp" #include "io/network/network_endpoint.hpp" @@ -40,7 +42,8 @@ class Message; * Currently the server is implemented to handle more than one message after * the initial handshake, but the client can only send one message. */ -namespace Protocol { +namespace protocol { + using Endpoint = io::network::NetworkEndpoint; using Socket = io::network::Socket; using StreamBuffer = io::network::StreamBuffer; @@ -100,6 +103,8 @@ class Session { */ void Written(size_t len); + bool TimedOut() { return false; } + /** * Closes the session (client socket). */ @@ -108,6 +113,8 @@ class Session { io::network::Epoll::Event event_; Socket socket_; + std::chrono::time_point last_event_time_; + private: SizeT GetLength(int offset = 0); std::string GetStringAndShift(SizeT len); @@ -115,6 +122,7 @@ class Session { bool alive_{true}; bool handshake_done_{false}; + std::string reactor_{""}; std::string channel_{""}; diff --git a/src/communication/reactor/reactor_local.cpp b/src/communication/reactor/reactor_local.cpp new file mode 100644 index 000000000..7d5649e14 --- /dev/null +++ b/src/communication/reactor/reactor_local.cpp @@ -0,0 +1,141 @@ +#include "communication/reactor/reactor_local.hpp" + +#include "utils/exceptions.hpp" + +namespace communication::reactor { + +thread_local Reactor *current_reactor_ = nullptr; + +void EventStream::Subscription::Unsubscribe() const { + event_queue_.RemoveCallback(*this); +} + +void EventStream::Subscription::CloseChannel() const { event_queue_.Close(); } + +const std::string &EventStream::Subscription::channel_name() const { + return event_queue_.channel_name_; +} + +std::string Channel::LocalChannelWriter::ReactorName() const { + return reactor_name_; +} + +std::string Channel::LocalChannelWriter::Name() const { return channel_name_; } + +void Channel::Close() { + // TODO(zuza): there will be major problems if a reactor tries to close a + // stream that isn't theirs luckily this should never happen if the framework + // is used as expected. + current_reactor_->CloseChannel(channel_name_); +} + +std::pair> Reactor::Open( + const std::string &channel_name) { + std::unique_lock lock(*mutex_); + if (channels_.count(channel_name) != 0) { + throw utils::BasicException("Channel with name " + channel_name + + "already exists"); + } + auto it = + channels_ + .emplace(channel_name, std::make_shared(Channel::Params{ + name_, channel_name, mutex_, cvar_})) + .first; + it->second->self_ptr_ = it->second; + return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); +} + +std::pair> Reactor::Open() { + std::unique_lock lock(*mutex_); + do { + std::string channel_name = + "stream-" + std::to_string(channel_name_counter_++); + if (channels_.count(channel_name) == 0) { + auto it = + channels_ + .emplace(channel_name, std::make_shared(Channel::Params{ + name_, channel_name, mutex_, cvar_})) + .first; + it->second->self_ptr_ = it->second; + return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); + } + } while (true); +} + +std::shared_ptr Reactor::FindChannel( + const std::string &channel_name) { + std::unique_lock lock(*mutex_); + auto it_channel = channels_.find(channel_name); + if (it_channel == channels_.end()) return nullptr; + return it_channel->second->LockedOpenChannel(); +} + +void Reactor::CloseChannel(const std::string &s) { + std::unique_lock lock(*mutex_); + auto it = channels_.find(s); + CHECK(it != channels_.end()) << "Trying to close nonexisting channel"; + channels_.erase(it); + cvar_->notify_all(); +} + +void Reactor::RunEventLoop() { + bool exit_event_loop = false; + + while (true) { + // Find (or wait) for the next Message. + PendingMessageInfo info; + { + std::unique_lock guard(*mutex_); + + while (true) { + // Not fair because was taken earlier, talk to lion. + info = GetPendingMessages(); + if (info.message != nullptr) break; + + // Exit the loop if there are no more Channels. + if (channels_.empty()) { + exit_event_loop = true; + break; + } + + cvar_->wait(guard); + } + + if (exit_event_loop) break; + } + + for (auto &callback_info : info.callbacks) { + callback_info.first(*info.message, callback_info.second); + } + } +} + +/** + * Checks if there is any nonempty EventStream. + */ +Reactor::PendingMessageInfo Reactor::GetPendingMessages() { + for (auto &channels_key_value : channels_) { + Channel &event_queue = *channels_key_value.second; + auto message = event_queue.LockedPop(); + if (message == nullptr) continue; + std::type_index type_index = message->GetTypeIndex(); + + using Subscription = EventStream::Subscription; + std::vector> callback_info; + auto msg_type_cb_iter = event_queue.callbacks_.find(type_index); + if (msg_type_cb_iter != event_queue.callbacks_.end()) { + // There is a callback for this type. + for (auto &type_index_cb_key_value : msg_type_cb_iter->second) { + auto uid = type_index_cb_key_value.first; + auto callback = type_index_cb_key_value.second; + callback_info.emplace_back(callback, + Subscription(event_queue, type_index, uid)); + } + } + + return PendingMessageInfo{std::move(message), std::move(callback_info)}; + } + + return PendingMessageInfo{}; +} +} diff --git a/src/communication/reactor/reactor_local.hpp b/src/communication/reactor/reactor_local.hpp new file mode 100644 index 000000000..d96101cba --- /dev/null +++ b/src/communication/reactor/reactor_local.hpp @@ -0,0 +1,553 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "cereal/types/memory.hpp" +#include "glog/logging.h" + +namespace communication::reactor { + +class EventStream; +class Reactor; +class System; +class Channel; + +extern thread_local Reactor *current_reactor_; + +/** + * Base class for messages. + */ +class Message { + public: + virtual ~Message() {} + + template + void serialize(Archive &) {} + + /** + * Run-time type identification that is used for callbacks. + * + * Warning: this works because of the virtual destructor, don't remove it from + * this class + */ + std::type_index GetTypeIndex() { return typeid(*this); } +}; + +/** + * Write-end of a Channel (between two reactors). + */ +class ChannelWriter { + public: + ChannelWriter() = default; + ChannelWriter(const ChannelWriter &) = delete; + void operator=(const ChannelWriter &) = delete; + ChannelWriter(ChannelWriter &&) = delete; + void operator=(ChannelWriter &&) = delete; + + /** + * Construct and send the message to the channel. + */ + template + void Send(Args &&... args) { + Send(std::unique_ptr( + std::make_unique(std::forward(args)...))); + } + + virtual void Send(std::unique_ptr message) = 0; + + virtual std::string ReactorName() const = 0; + virtual std::string Name() const = 0; + + template + void serialize(Archive &archive) { + archive(ReactorName(), Name()); + } +}; + +/** + * Read-end of a Channel (between two reactors). + */ +class EventStream { + public: + class OnEventOnceChainer; + class Subscription; + + /** + * Register a callback that will be called whenever an event arrives. + */ + template + void OnEvent( + std::function &&callback) { + OnEventHelper(typeid(TMessage), [callback = std::move(callback)]( + const Message &base_message, + const Subscription &subscription) { + const auto &message = dynamic_cast(base_message); + callback(message, subscription); + }); + } + + /** + * Register a callback that will be called only once. + * Once event is received, channel of this EventStream is closed. + */ + template + void OnEventOnceThenClose(std::function &&callback) { + OnEventHelper(typeid(TMessage), [callback = std::move(callback)]( + const Message &base_message, + const Subscription &subscription) { + const TMessage &message = dynamic_cast(base_message); + subscription.CloseChannel(); + callback(message); + }); + } + + /** + * Starts a chain to register a callback that fires off only once. + * + * This method supports chaining (see the the class OnEventOnceChainer or the + * tests for examples). + * Warning: when chaining callbacks, make sure that EventStream does not + * deallocate before the last + * chained callback fired. + */ + OnEventOnceChainer OnEventOnce() { return OnEventOnceChainer(*this); } + + /** + * Get the name of the channel. + */ + virtual const std::string &ChannelName() = 0; + + /** + * Subscription Service. + * + * Unsubscribe from a callback. Lightweight object (can copy by value). + */ + class Subscription { + public: + /** + * Unsubscribe. Call only once. + */ + void Unsubscribe() const; + + /** + * Close the stream. Convenience method. + */ + void CloseChannel() const; + + /** + * Get the name of the channel the message is delivered to. + */ + const std::string &channel_name() const; + + private: + friend class Reactor; + friend class Channel; + + Subscription(Channel &event_queue, std::type_index type_index, + uint64_t callback_id) + : event_queue_(event_queue), + type_index_(type_index), + callback_id_(callback_id) {} + + Channel &event_queue_; + std::type_index type_index_; + uint64_t callback_id_; + }; + + /** + * Close this event stream, disallowing further events from getting received. + * + * Any subsequent call after Close() to any function will be result in + * undefined + * behavior (invalid pointer dereference). Can only be called from the thread + * associated with the Reactor. + */ + virtual void Close() = 0; + + /** + * Convenience class to chain one-off callbacks. + * + * Usage: Create this class with OnEventOnce() and then chain callbacks using + * ChainOnce. + * A callback will fire only once, unsubscribe and immediately subscribe the + * next callback to the stream. + * + * Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb); + * + * Implementation: This class is a temporary object that remembers the + * callbacks that are to be installed + * and finally installs them in the destructor. Not sure is this kosher, is + * there another way? + */ + class OnEventOnceChainer { + public: + OnEventOnceChainer(EventStream &event_stream) + : event_stream_(event_stream) {} + ~OnEventOnceChainer() { InstallCallbacks(); } + + template + OnEventOnceChainer &ChainOnce( + std::function + &&callback) { + std::function + wrap = [callback = std::move(callback)]( + const Message &base_message, const Subscription &subscription) { + const TMessage &message = dynamic_cast(base_message); + subscription.Unsubscribe(); + // Warning: this can close the Channel, be careful what you put after + // it! + callback(message, subscription); + }; + callbacks_.emplace_back(typeid(TMessage), std::move(wrap)); + return *this; + } + + private: + void InstallCallbacks() { + int num_callbacks = callbacks_.size(); + CHECK(num_callbacks > 0) << "No callback will be installed"; + std::function next_callback; + std::type_index next_type = typeid(nullptr); + + for (int i = num_callbacks - 1; i >= 0; --i) { + std::function + tmp_callback = [ + callback = std::move(callbacks_[i].second), next_type, + next_callback = std::move(next_callback), + event_stream = &this->event_stream_ + ](const Message &message, const Subscription &subscription) { + callback(message, subscription); + if (next_callback) { + event_stream->OnEventHelper(next_type, std::move(next_callback)); + } + }; + next_callback = std::move(tmp_callback); + next_type = callbacks_[i].first; + } + + event_stream_.OnEventHelper(next_type, std::move(next_callback)); + } + + EventStream &event_stream_; + std::vector< + std::pair>> + callbacks_; + }; + + typedef std::function Callback; + + private: + virtual void OnEventHelper(std::type_index type_index, Callback callback) = 0; +}; + +/** + * Implementation of a channel. + * + * This class is an internal data structure that represents the state of the + * channel. This class is not meant to be used by the clients of the messaging + * framework. The Channel class wraps the event queue data structure, the mutex + * that protects concurrent access to the event queue, the local channel and the + * event stream. The class is owned by the Reactor. It gets closed when the + * owner reactor (the one that owns the read-end of a channel) removes/closes + * it. + */ +class Channel { + struct Params; + + public: + friend class Reactor; // to create a Params initialization object + friend class EventStream::Subscription; + + Channel(Params params) + : channel_name_(params.channel_name), + reactor_name_(params.reactor_name), + mutex_(params.mutex), + cvar_(params.cvar), + stream_(mutex_, this) {} + + /** + * LocalChannelWriter represents the channels to reactors living in the same + * reactor system (write-end of the channels). + * + * Sending messages to the local channel requires acquiring the mutex. + * LocalChannelWriter holds a (weak) pointer to the enclosing Channel object. + * Messages sent to a closed channel are ignored. + * There can be multiple LocalChannelWriters refering to the same stream if + * needed. + */ + class LocalChannelWriter : public ChannelWriter { + public: + friend class Channel; + + LocalChannelWriter(std::string reactor_name, std::string channel_name, + std::weak_ptr queue) + : reactor_name_(reactor_name), + channel_name_(channel_name), + queue_(queue) {} + + void Send(std::unique_ptr m) override { + // Atomic, per the standard. We guarantee here that if channel exists it + // will not be destroyed by the end of this function. + std::shared_ptr queue = queue_.lock(); + if (queue) { + queue->Push(std::move(m)); + } + // TODO: what should we do here? Channel doesn't exist so message will be + // lost. + } + + std::string ReactorName() const override; + std::string Name() const override; + + private: + std::string reactor_name_; + std::string channel_name_; + std::weak_ptr queue_; + }; + + /** + * Implementation of the event stream. + * + * After the enclosing Channel object is destroyed (by a call to CloseChannel + * or Close). + */ + class LocalEventStream : public EventStream { + public: + friend class Channel; + + LocalEventStream(std::shared_ptr mutex, Channel *queue) + : mutex_(mutex), queue_(queue) {} + + void OnEventHelper(std::type_index type_index, Callback callback) { + std::unique_lock lock(*mutex_); + queue_->LockedOnEventHelper(type_index, callback); + } + + const std::string &ChannelName() { return queue_->channel_name_; } + + void Close() { queue_->Close(); } + + private: + std::shared_ptr mutex_; + std::string channel_name_; + Channel *queue_; + }; + + /** + * Close the channel. Must be called from the reactor that owns the channel. + */ + void Close(); + + Channel(const Channel &other) = delete; + Channel(Channel &&other) = default; + Channel &operator=(const Channel &other) = delete; + Channel &operator=(Channel &&other) = default; + + private: + /** + * Initialization parameters to Channel. + * Warning: do not forget to initialize self_ptr_ individually. Private + * because it shouldn't be created outside of a Reactor. + */ + struct Params { + std::string reactor_name; + std::string channel_name; + std::shared_ptr mutex; + std::shared_ptr cvar; + }; + + void Push(std::unique_ptr m) { + std::unique_lock guard(*mutex_); + queue_.emplace(std::move(m)); + // This is OK because there is only one Reactor (thread) that can wait on + // this Channel. + cvar_->notify_one(); + } + + std::shared_ptr LockedOpenChannel() { + // TODO(zuza): fix this CHECK using this answer + // https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned + // TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky + // or that it doesn't fail sometimes, when it should. + CHECK(!self_ptr_.expired()); + return std::make_shared(reactor_name_, channel_name_, + self_ptr_); + } + + std::unique_ptr LockedPop() { return LockedRawPop(); } + + void LockedOnEventHelper(std::type_index type_index, + EventStream::Callback callback) { + uint64_t callback_id = next_callback_id++; + callbacks_[type_index][callback_id] = callback; + } + + std::unique_ptr LockedRawPop() { + if (queue_.empty()) return nullptr; + std::unique_ptr t = std::move(queue_.front()); + queue_.pop(); + return t; + } + + void RemoveCallback(const EventStream::Subscription &subscription) { + std::unique_lock lock(*mutex_); + auto num_erased = + callbacks_[subscription.type_index_].erase(subscription.callback_id_); + CHECK(num_erased == 1) << "Expected to remove 1 element"; + } + + std::string channel_name_; + std::string reactor_name_; + std::queue> queue_; + // Should only be locked once since it's used by a cond. var. Also caught in + // dctor, so must be recursive. + std::shared_ptr mutex_; + std::shared_ptr cvar_; + /** + * A weak_ptr to itself. + * + * There are initialization problems with this, check Params. + */ + std::weak_ptr self_ptr_; + LocalEventStream stream_; + std::unordered_map> + callbacks_; + uint64_t next_callback_id = 0; +}; + +/** + * A single unit of concurrent execution in the system. + * + * E.g. one worker, one client. Owned by System. Has a thread associated with + * it. + */ +class Reactor { + friend class System; + + Reactor(System &system, std::string name, + std::function setup) + : system_(system), name_(name), setup_(setup), main_(Open("main")) {} + + public: + ~Reactor() {} + + std::pair> Open( + const std::string &s); + std::pair> Open(); + std::shared_ptr FindChannel(const std::string &channel_name); + + /** + * Close a channel by name. + * + * Should only be called from the Reactor thread. + */ + void CloseChannel(const std::string &s); + + /** + * Get Reactor name + */ + const std::string &name() const { return name_; } + + Reactor(const Reactor &other) = delete; + Reactor(Reactor &&other) = default; + Reactor &operator=(const Reactor &other) = delete; + Reactor &operator=(Reactor &&other) = default; + + System &system_; + std::string name_; + std::function setup_; + + /* + * Locks all Reactor data, including all Channel's in channels_. + * + * This should be a shared_ptr because LocalChannelWriter can outlive Reactor. + */ + std::shared_ptr mutex_ = std::make_shared(); + std::shared_ptr cvar_ = + std::make_shared(); + + /** + * List of channels of a reactor indexed by name. + */ + std::unordered_map> channels_; + int64_t channel_name_counter_ = 0; + std::pair> main_; + + private: + struct PendingMessageInfo { + std::unique_ptr message; + std::vector> + callbacks; + }; + + /** + * Dispatches all waiting messages to callbacks. Shuts down when there are no + * callbacks left. + */ + void RunEventLoop(); + + PendingMessageInfo GetPendingMessages(); +}; + +/** + * Placeholder for all reactors. + * Make sure object of this class outlives all Reactors created by it. + */ +class System { + public: + friend class Reactor; + System() = default; + + void Spawn(const std::string &name, std::function setup) { + std::unique_lock lock(mutex_); + std::unique_ptr reactor(new Reactor(*this, name, setup)); + std::thread reactor_thread([ this, raw_reactor = reactor.get() ] { + current_reactor_ = raw_reactor; + raw_reactor->setup_(*raw_reactor); + raw_reactor->RunEventLoop(); + }); + auto got = reactors_.emplace( + name, std::pair{ + std::move(reactor), std::move(reactor_thread)}); + CHECK(got.second) << "Reactor with name: '" << name << "' already exists"; + } + + const std::shared_ptr FindChannel( + const std::string &reactor_name, const std::string &channel_name) { + std::unique_lock lock(mutex_); + auto it_reactor = reactors_.find(reactor_name); + if (it_reactor == reactors_.end()) return nullptr; + return it_reactor->second.first->FindChannel(channel_name); + } + + // TODO: Think about interaction with destructor. Should we call this in + // destructor, complain in destructor if there are alive threads or stop them + // in some way. + void AwaitShutdown() { + for (auto &key_value : reactors_) { + auto &thread = key_value.second.second; + thread.join(); + } + reactors_.clear(); + } + + private: + System(const System &) = delete; + System(System &&) = delete; + System &operator=(const System &) = delete; + System &operator=(System &&) = delete; + + std::mutex mutex_; + std::unordered_map, std::thread>> + reactors_; +}; +} diff --git a/src/io/network/socket.cpp b/src/io/network/socket.cpp index d15e80014..a90eaeac8 100644 --- a/src/io/network/socket.cpp +++ b/src/io/network/socket.cpp @@ -220,6 +220,12 @@ bool Socket::Write(const uint8_t *data, size_t len, return true; } +bool Socket::Write(const std::string &s, + const std::function &keep_retrying) { + return Write(reinterpret_cast(s.data()), s.size(), + keep_retrying); +} + int Socket::Read(void *buffer, size_t len) { return read(socket_, buffer, len); } diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp index 8a19828d2..6158da3fd 100644 --- a/src/io/network/socket.hpp +++ b/src/io/network/socket.hpp @@ -151,6 +151,8 @@ class Socket { */ bool Write(const uint8_t *data, size_t len, const std::function &keep_retrying = [] { return false; }); + bool Write(const std::string &s, + const std::function &keep_retrying = [] { return false; }); /** * Read data from the socket. diff --git a/tests/unit/reactor_local.cpp b/tests/unit/reactor_local.cpp new file mode 100644 index 000000000..828c0ed46 --- /dev/null +++ b/tests/unit/reactor_local.cpp @@ -0,0 +1,385 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "communication/reactor/reactor_local.hpp" +#include "gtest/gtest.h" +#include "utils/exceptions.hpp" + +using namespace communication::reactor; +using Subscription = EventStream::Subscription; + +TEST(SystemTest, ReturnWithoutThrowing) { + System system; + ASSERT_NO_THROW( + system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); })); + ASSERT_NO_THROW(system.AwaitShutdown()); +} + +TEST(ChannelCreationTest, ThrowOnReusingChannelName) { + System system; + system.Spawn("master", [](Reactor &r) { + r.Open("channel"); + ASSERT_THROW(r.Open("channel"), utils::BasicException); + r.CloseChannel("main"); + r.CloseChannel("channel"); + }); + system.AwaitShutdown(); +} + +TEST(ChannelSetUpTest, CheckMainChannelIsSet) { + System system; + + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("master", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + r.CloseChannel("main"); + }); + + system.AwaitShutdown(); +} + +TEST(SimpleSendTest, OneCallback) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + System system; + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send(888); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [](Reactor &r) { + EventStream *stream = r.main_.first; + + stream->OnEvent( + [&r](const MessageInt &msg, const Subscription &) { + ASSERT_EQ(msg.x, 888); + r.CloseChannel("main"); + }); + }); + + system.AwaitShutdown(); +} + +TEST(SimpleSendTest, IgnoreAfterClose) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + System system; + + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send(101); + channel_writer->Send(102); // should be ignored + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send(103); // should be ignored + channel_writer->Send(104); // should be ignored + // Write-end doesn't need to be closed because it's in RAII. + r.CloseChannel("main"); + }); + + system.Spawn("worker", [](Reactor &r) { + EventStream *stream = r.main_.first; + stream->OnEvent( + [&r](const MessageInt &msg, const Subscription &) { + r.CloseChannel("main"); + ASSERT_EQ(msg.x, 101); + }); + }); + + system.AwaitShutdown(); +} + +TEST(SimpleSendTest, DuringFirstEvent) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + System system; + + std::promise p; + auto f = p.get_future(); + system.Spawn("master", [&p](Reactor &r) mutable { + EventStream *stream = r.main_.first; + + stream->OnEvent( + [&](const Message &msg, const Subscription &subscription) { + const MessageInt &msgint = dynamic_cast(msg); + if (msgint.x == 101) r.FindChannel("main")->Send(102); + if (msgint.x == 102) { + subscription.Unsubscribe(); + r.CloseChannel("main"); + p.set_value(777); + } + }); + + std::shared_ptr channel_writer = r.FindChannel("main"); + channel_writer->Send(101); + }); + + f.wait(); + ASSERT_EQ(f.get(), 777); + system.AwaitShutdown(); +} + +TEST(MultipleSendTest, UnsubscribeService) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + struct MessageChar : public Message { + MessageChar(char xx) : x(xx) {} + char x; + }; + + System system; + + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send(55); + channel_writer->Send(66); + channel_writer->Send(77); + channel_writer->Send(88); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send('a'); + channel_writer->Send('b'); + channel_writer->Send('c'); + channel_writer->Send('d'); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable { + EventStream *stream = r.main_.first; + + stream->OnEvent( + [&](const MessageInt &msgint, const Subscription &subscription) { + ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); + ++num_received_messages; + if (msgint.x == 66) { + subscription.Unsubscribe(); // receive only two of them + } + }); + stream->OnEvent( + [&](const MessageChar &msgchar, const Subscription &subscription) { + char c = msgchar.x; + ++num_received_messages; + ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); + if (num_received_messages == 5) { + subscription.Unsubscribe(); + r.CloseChannel("main"); + } + }); + }); + + system.AwaitShutdown(); +} + +TEST(MultipleSendTest, OnEvent) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + struct MessageChar : public Message { + MessageChar(char xx) : x(xx) {} + char x; + }; + + System system; + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + channel_writer->Send(101); + channel_writer->Send('a'); + channel_writer->Send(103); + channel_writer->Send('b'); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable { + struct EndMessage : Message {}; + EventStream *stream = r.main_.first; + + stream->OnEvent( + [&](const MessageInt &msgint, const Subscription &) { + ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); + ++correct_vals; + r.main_.second->Send(); + }); + + stream->OnEvent( + [&](const MessageChar &msgchar, const Subscription &) { + ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); + ++correct_vals; + r.main_.second->Send(); + }); + + stream->OnEvent([&](const EndMessage &, const Subscription &) { + ASSERT_LE(correct_vals, 4); + if (correct_vals == 4) { + r.CloseChannel("main"); + } + }); + }); + + system.AwaitShutdown(); +} + +TEST(MultipleSendTest, Chaining) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + System system; + + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send(55); + channel_writer->Send(66); + channel_writer->Send(77); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [](Reactor &r) { + EventStream *stream = r.main_.first; + + stream->OnEventOnce() + .ChainOnce([](const MessageInt &msg, const Subscription &) { + ASSERT_EQ(msg.x, 55); + }) + .ChainOnce([](const MessageInt &msg, const Subscription &) { + ASSERT_EQ(msg.x, 66); + }) + .ChainOnce( + [&](const MessageInt &msg, const Subscription &) { + ASSERT_EQ(msg.x, 77); + r.CloseChannel("main"); + }); + }); + + system.AwaitShutdown(); +} + +TEST(MultipleSendTest, ChainingInRightOrder) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + struct MessageChar : public Message { + MessageChar(char xx) : x(xx) {} + char x; + }; + + System system; + + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel_writer->Send('a'); + channel_writer->Send(55); + channel_writer->Send('b'); + channel_writer->Send(77); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [](Reactor &r) { + EventStream *stream = r.main_.first; + stream->OnEventOnce() + .ChainOnce([](const MessageInt &msg, const Subscription &) { + ASSERT_EQ(msg.x, 55); + }) + .ChainOnce( + [](const MessageChar &msg, const Subscription &) { + ASSERT_EQ(msg.x, 'b'); + }) + .ChainOnce( + [&](const MessageInt &msg, const Subscription &) { + ASSERT_EQ(msg.x, 77); + r.CloseChannel("main"); + }); + }); + + system.AwaitShutdown(); +} + +TEST(MultipleSendTest, ProcessManyMessages) { + const static int kNumTests = 100; + + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + System system; + + system.Spawn("master", [](Reactor &r) { + std::shared_ptr channel_writer; + while (!(channel_writer = r.system_.FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); + for (int i = 0; i < kNumTests; ++i) { + channel_writer->Send(rand()); + std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5)); + } + r.CloseChannel("main"); + }); + + system.Spawn("worker", [vals = 0](Reactor & r) mutable { + struct EndMessage : Message {}; + EventStream *stream = r.main_.first; + vals = 0; + + stream->OnEvent([&](const Message &, const Subscription &) { + ++vals; + r.main_.second->Send(); + }); + + stream->OnEvent([&](const Message &, const Subscription &) { + ASSERT_LE(vals, kNumTests); + if (vals == kNumTests) { + r.CloseChannel("main"); + } + }); + }); + system.AwaitShutdown(); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}