diff --git a/experimental/distributed/README.md b/experimental/distributed/README.md index 92e096794..3cb0d5caa 100644 --- a/experimental/distributed/README.md +++ b/experimental/distributed/README.md @@ -9,9 +9,9 @@ This subdirectory structure implements distributed infrastructure of Memgraph. * Node: a computer that performs (distributed) work. * Vertex: an abstract graph concept. * Reactor: a unit of concurrent execution, lives on its own thread. -* Connector: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes. -* EventStream: read-end of a connector, is owned by exactly one Reactor/thread. -* Channel: write-end of a connector, can be owned (wrote into) by multiple threads. +* Channel: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes. +* EventStream: read-end of a channel, is owned by exactly one Reactor/thread. +* ChannelWriter: write-end of a channel, can be owned (wrote into) by multiple threads. ## conventions diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp index 09379fc10..6fb4bf04a 100644 --- a/experimental/distributed/src/reactors_distributed.cpp +++ b/experimental/distributed/src/reactors_distributed.cpp @@ -6,22 +6,22 @@ DEFINE_int32(port, 10000, "Network server bind port"); Network::Network() {} /** - * SenderMessage implementation. + * ReturnAddressMsg implementation. */ -SenderMessage::SenderMessage() {} +ReturnAddressMsg::ReturnAddressMsg() {} -SenderMessage::SenderMessage(std::string reactor, std::string channel) +ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel) : address_(FLAGS_address), port_(FLAGS_port), reactor_(reactor), channel_(channel) {} -std::string SenderMessage::Address() const { return address_; } -uint16_t SenderMessage::Port() const { return port_; } -std::string SenderMessage::ReactorName() const { return reactor_; } -std::string SenderMessage::ChannelName() const { return channel_; } +std::string ReturnAddressMsg::Address() const { return address_; } +uint16_t ReturnAddressMsg::Port() const { return port_; } +std::string ReturnAddressMsg::ReactorName() const { return reactor_; } +std::string ReturnAddressMsg::ChannelName() const { return channel_; } -std::shared_ptr SenderMessage::GetChannelToSender() const { +std::shared_ptr ReturnAddressMsg::GetReturnChannelWriter() const { if (address_ == FLAGS_address && port_ == FLAGS_port) { return System::GetInstance().FindChannel(reactor_, channel_); } else { diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp index 0d5bd230f..323848a91 100644 --- a/experimental/distributed/src/reactors_distributed.hpp +++ b/experimental/distributed/src/reactors_distributed.hpp @@ -73,12 +73,12 @@ class Network { // client functions - std::shared_ptr Resolve(std::string address, uint16_t port, + std::shared_ptr Resolve(std::string address, uint16_t port, std::string reactor_name, std::string channel_name) { if (Protocol::SendMessage(address, port, reactor_name, channel_name, nullptr)) { - return std::make_shared(this, address, port, reactor_name, + return std::make_shared(this, address, port, reactor_name, channel_name); } return nullptr; @@ -131,9 +131,9 @@ class Network { } } - class RemoteChannel : public Channel { + class RemoteChannelWriter : public ChannelWriter { public: - RemoteChannel(Network *network, std::string address, uint16_t port, + RemoteChannelWriter(Network *network, std::string address, uint16_t port, std::string reactor, std::string channel) : network_(network), address_(address), @@ -228,17 +228,17 @@ class Network { /** * Message that includes the sender channel used to respond. */ -class SenderMessage : public Message { +class ReturnAddressMsg : public Message { public: - SenderMessage(); - SenderMessage(std::string reactor, std::string channel); + ReturnAddressMsg(); + ReturnAddressMsg(std::string reactor, std::string channel); std::string Address() const; uint16_t Port() const; std::string ReactorName() const; std::string ChannelName() const; - std::shared_ptr GetChannelToSender() const; + std::shared_ptr GetReturnChannelWriter() const; template void serialize(Archive &ar) { @@ -252,7 +252,7 @@ class SenderMessage : public Message { std::string reactor_; std::string channel_; }; -CEREAL_REGISTER_TYPE(SenderMessage); +CEREAL_REGISTER_TYPE(ReturnAddressMsg); /** @@ -262,13 +262,13 @@ CEREAL_REGISTER_TYPE(SenderMessage); class ChannelResolvedMessage : public Message { public: ChannelResolvedMessage() {} - ChannelResolvedMessage(std::shared_ptr channel) - : Message(), channel_(channel) {} + ChannelResolvedMessage(std::shared_ptr channel_writer) + : Message(), channel_writer_(channel_writer) {} - std::shared_ptr channel() const { return channel_; } + std::shared_ptr channelWriter() const { return channel_writer_; } private: - std::shared_ptr channel_; + std::shared_ptr channel_writer_; }; /** @@ -315,11 +315,11 @@ class Distributed final { uint16_t port, const std::string &reactor_name, const std::string &channel_name) { - std::shared_ptr channel = nullptr; - while (!(channel = network_.Resolve(address, port, reactor_name, channel_name))) + std::shared_ptr channel_writer = nullptr; + while (!(channel_writer = network_.Resolve(address, port, reactor_name, channel_name))) std::this_thread::sleep_for(std::chrono::milliseconds(200)); auto stream_channel = current_reactor_->Open(); - stream_channel.second->Send(channel); + stream_channel.second->Send(channel_writer); return stream_channel.first; } diff --git a/experimental/distributed/src/reactors_local.cpp b/experimental/distributed/src/reactors_local.cpp index 43d1df18f..2c3254b19 100644 --- a/experimental/distributed/src/reactors_local.cpp +++ b/experimental/distributed/src/reactors_local.cpp @@ -6,64 +6,64 @@ void EventStream::Subscription::unsubscribe() const { thread_local Reactor* current_reactor_ = nullptr; -std::string Connector::LocalChannel::ReactorName() { +std::string Channel::LocalChannelWriter::ReactorName() { return reactor_name_; } -std::string Connector::LocalChannel::Name() { - return connector_name_; +std::string Channel::LocalChannelWriter::Name() { + return channel_name_; } -void Connector::LocalEventStream::Close() { - current_reactor_->CloseConnector(connector_name_); +void Channel::LocalEventStream::Close() { + current_reactor_->CloseChannel(channel_name_); } -std::pair> Reactor::Open(const std::string &connector_name) { +std::pair> Reactor::Open(const std::string &channel_name) { std::unique_lock lock(*mutex_); // TODO: Improve the check that the channel name does not exist in the // system. - if (connectors_.count(connector_name) != 0) { - throw std::runtime_error("Connector with name " + connector_name + if (channels_.count(channel_name) != 0) { + throw std::runtime_error("Channel with name " + channel_name + "already exists"); } - auto it = connectors_.emplace(connector_name, - std::make_shared(Connector::Params{name_, connector_name, mutex_, cvar_})).first; + auto it = channels_.emplace(channel_name, + std::make_shared(Channel::Params{name_, channel_name, mutex_, cvar_})).first; it->second->self_ptr_ = it->second; return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } -std::pair> Reactor::Open() { +std::pair> Reactor::Open() { std::unique_lock lock(*mutex_); do { - std::string connector_name = "stream-" + std::to_string(connector_name_counter_++); - if (connectors_.count(connector_name) == 0) { - // Connector &queue = connectors_[connector_name]; - auto it = connectors_.emplace(connector_name, - std::make_shared(Connector::Params{name_, connector_name, mutex_, cvar_})).first; + std::string channel_name = "stream-" + std::to_string(channel_name_counter_++); + if (channels_.count(channel_name) == 0) { + // Channel &queue = channels_[channel_name]; + auto it = channels_.emplace(channel_name, + std::make_shared(Channel::Params{name_, channel_name, mutex_, cvar_})).first; it->second->self_ptr_ = it->second; return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } } while (true); } -const std::shared_ptr Reactor::FindChannel( +const std::shared_ptr Reactor::FindChannel( const std::string &channel_name) { std::unique_lock lock(*mutex_); - auto it_connector = connectors_.find(channel_name); - if (it_connector == connectors_.end()) return nullptr; - return it_connector->second->LockedOpenChannel(); + auto it_channel = channels_.find(channel_name); + if (it_channel == channels_.end()) return nullptr; + return it_channel->second->LockedOpenChannel(); } -void Reactor::CloseConnector(const std::string &s) { +void Reactor::CloseChannel(const std::string &s) { std::unique_lock lock(*mutex_); - auto it = connectors_.find(s); - assert(it != connectors_.end()); - connectors_.erase(it); + auto it = channels_.find(s); + assert(it != channels_.end()); + channels_.erase(it); } -void Reactor::CloseAllConnectors() { +void Reactor::CloseAllChannels() { std::unique_lock lock(*mutex_); - connectors_.clear(); + channels_.clear(); } void Reactor::RunEventLoop() { @@ -77,8 +77,8 @@ void Reactor::RunEventLoop() { while (true) { - // Exit the loop if there are no more Connectors. - if (connectors_.empty()) { + // Exit the loop if there are no more Channels. + if (channels_.empty()) { exit_event_loop = true; break; } @@ -106,8 +106,8 @@ void Reactor::RunEventLoop() { */ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { // return type after because the scope Reactor:: is not searched before the name - for (auto &connectors_key_value : connectors_) { - Connector &event_queue = *connectors_key_value.second; + for (auto &channels_key_value : channels_) { + Channel &event_queue = *channels_key_value.second; auto msg_ptr = event_queue.LockedPop(); if (msg_ptr == nullptr) continue; std::type_index tidx = msg_ptr->GetTypeIndex(); diff --git a/experimental/distributed/src/reactors_local.hpp b/experimental/distributed/src/reactors_local.hpp index 5e782a3a1..6c0664aa4 100644 --- a/experimental/distributed/src/reactors_local.hpp +++ b/experimental/distributed/src/reactors_local.hpp @@ -14,7 +14,7 @@ class EventStream; class Reactor; class System; -class Connector; +class Channel; extern thread_local Reactor* current_reactor_; @@ -38,9 +38,9 @@ class Message { }; /** - * Write-end of a Connector (between two reactors). + * Write-end of a Channel (between two reactors). */ -class Channel { +class ChannelWriter { public: /** * Construct and send the message to the channel. @@ -56,7 +56,7 @@ class Channel { virtual std::string Name() = 0; - void operator=(const Channel &) = delete; + void operator=(const ChannelWriter &) = delete; template void serialize(Archive &archive) { @@ -65,24 +65,40 @@ class Channel { }; /** - * Read-end of a Connector (between two reactors). + * Read-end of a Channel (between two reactors). */ class EventStream { public: - /** - * Blocks until a message arrives. - */ - virtual std::unique_ptr AwaitEvent() = 0; + class Subscription; + class OnEventOnceChainer; /** - * Polls if there is a message available, returning null if there is none. + * Register a callback that will be called whenever an event arrives. */ - virtual std::unique_ptr PopEvent() = 0; + template + void OnEvent(std::function &&cb) { + OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message &general_msg, + const Subscription &subscription) { + const MsgType &correct_msg = dynamic_cast(general_msg); + cb(correct_msg, subscription); + }); + } /** - * Get the name of the connector. + * Starts a chain to register a callback that fires off only once. + * + * This method supports chaining (see the the class OnEventOnceChainer or the tests for examples). + * Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last + * chained callback fired. */ - virtual const std::string &ConnectorName() = 0; + OnEventOnceChainer OnEventOnce() { + return OnEventOnceChainer(*this); + } + + /** + * Get the name of the channel. + */ + virtual const std::string &ChannelName() = 0; /** * Subscription Service. * @@ -97,40 +113,16 @@ class EventStream { private: friend class Reactor; - friend class Connector; + friend class Channel; - Subscription(Connector &event_queue, std::type_index tidx, uint64_t cb_uid) + Subscription(Channel &event_queue, std::type_index tidx, uint64_t cb_uid) : event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { } - Connector &event_queue_; + Channel &event_queue_; std::type_index tidx_; uint64_t cb_uid_; }; - /** - * Register a callback that will be called whenever an event arrives. - */ - template - void OnEvent(std::function &&cb) { - OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message &general_msg, - const Subscription &subscription) { - const MsgType &correct_msg = dynamic_cast(general_msg); - cb(correct_msg, subscription); - }); - } - - class OnEventOnceChainer; - /** - * Starts a chain to register a callback that fires off only once. - * - * This method supports chaining (see the the class OnEventOnceChainer or the tests for examples). - * Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last - * chained callback fired. - */ - OnEventOnceChainer OnEventOnce() { - return OnEventOnceChainer(*this); - } - /** * Close this event stream, disallowing further events from getting received. * @@ -205,52 +197,52 @@ private: }; /** - * Implementation of a connector. + * Implementation of a channel. * - * This class is an internal data structure that represents the state of the connector. + * This class is an internal data structure that represents the state of the channel. * This class is not meant to be used by the clients of the messaging framework. - * The Connector class wraps the event queue data structure, the mutex that protects + * The Channel class wraps the event queue data structure, the mutex that protects * concurrent access to the event queue, the local channel and the event stream. * The class is owned by the Reactor. It gets closed when the owner reactor - * (the one that owns the read-end of a connector) removes/closes it. + * (the one that owns the read-end of a channel) removes/closes it. */ -class Connector { +class Channel { struct Params; public: friend class Reactor; // to create a Params initialization object friend class EventStream::Subscription; - Connector(Params params) - : connector_name_(params.connector_name), + Channel(Params params) + : channel_name_(params.channel_name), reactor_name_(params.reactor_name), mutex_(params.mutex), cvar_(params.cvar), - stream_(mutex_, connector_name_, this) {} + stream_(mutex_, channel_name_, this) {} /** - * LocalChannel represents the channels to reactors living in the same reactor system (write-end of the connectors). + * LocalChannelWriter represents the channels to reactors living in the same reactor system (write-end of the channels). * * Sending messages to the local channel requires acquiring the mutex. - * LocalChannel holds a (weak) pointer to the enclosing Connector object. + * LocalChannelWriter holds a (weak) pointer to the enclosing Channel object. * Messages sent to a closed channel are ignored. - * There can be multiple LocalChannels refering to the same stream if needed. + * There can be multiple LocalChannelWriters refering to the same stream if needed. */ - class LocalChannel : public Channel { + class LocalChannelWriter : public ChannelWriter { public: - friend class Connector; + friend class Channel; - LocalChannel(std::shared_ptr mutex, std::string reactor_name, - std::string connector_name, std::weak_ptr queue) + LocalChannelWriter(std::shared_ptr mutex, std::string reactor_name, + std::string channel_name, std::weak_ptr queue) : mutex_(mutex), reactor_name_(reactor_name), - connector_name_(connector_name), + channel_name_(channel_name), weak_queue_(queue) {} virtual void Send(std::unique_ptr m) { - std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. + std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. if (queue_) { - // We guarantee here that the Connector is not destroyed. + // We guarantee here that the Channel is not destroyed. std::unique_lock lock(*mutex_); queue_->LockedPush(std::move(m)); } @@ -263,57 +255,53 @@ class Connector { private: std::shared_ptr mutex_; std::string reactor_name_; - std::string connector_name_; - std::weak_ptr weak_queue_; + std::string channel_name_; + std::weak_ptr weak_queue_; }; /** * Implementation of the event stream. * - * After the enclosing Connector object is destroyed (by a call to CloseChannel or Close). + * After the enclosing Channel object is destroyed (by a call to CloseChannel or Close). */ class LocalEventStream : public EventStream { public: - friend class Connector; + friend class Channel; - LocalEventStream(std::shared_ptr mutex, std::string connector_name, - Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {} + LocalEventStream(std::shared_ptr mutex, std::string channel_name, + Channel *queue) : mutex_(mutex), channel_name_(channel_name), queue_(queue) {} std::unique_ptr AwaitEvent() { std::unique_lock lock(*mutex_); return queue_->LockedAwaitPop(lock); } - std::unique_ptr PopEvent() { - std::unique_lock lock(*mutex_); - return queue_->LockedPop(); - } void OnEventHelper(std::type_index tidx, Callback callback) { std::unique_lock lock(*mutex_); queue_->LockedOnEventHelper(tidx, callback); } - const std::string &ConnectorName() { - return queue_->connector_name_; + const std::string &ChannelName() { + return queue_->channel_name_; } void Close(); private: std::shared_ptr mutex_; - std::string connector_name_; - Connector *queue_; + std::string channel_name_; + Channel *queue_; }; - Connector(const Connector &other) = delete; - Connector(Connector &&other) = default; - Connector &operator=(const Connector &other) = delete; - Connector &operator=(Connector &&other) = default; + Channel(const Channel &other) = delete; + Channel(Channel &&other) = default; + Channel &operator=(const Channel &other) = delete; + Channel &operator=(Channel &&other) = default; private: /** - * Initialization parameters to Connector. + * Initialization parameters to Channel. * Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor. */ struct Params { std::string reactor_name; - std::string connector_name; + std::string channel_name; std::shared_ptr mutex; std::shared_ptr cvar; }; @@ -321,13 +309,13 @@ private: void LockedPush(std::unique_ptr m) { queue_.emplace(std::move(m)); - // This is OK because there is only one Reactor (thread) that can wait on this Connector. + // This is OK because there is only one Reactor (thread) that can wait on this Channel. cvar_->notify_one(); } - std::shared_ptr LockedOpenChannel() { + std::shared_ptr LockedOpenChannel() { assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned - return std::make_shared(mutex_, reactor_name_, connector_name_, self_ptr_); + return std::make_shared(mutex_, reactor_name_, channel_name_, self_ptr_); } std::unique_ptr LockedAwaitPop(std::unique_lock &lock) { @@ -363,7 +351,7 @@ private: assert(num_erased == 1); } - std::string connector_name_; + std::string channel_name_; std::string reactor_name_; std::queue> queue_; // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. @@ -374,7 +362,7 @@ private: * * There are initialization problems with this, check Params. */ - std::weak_ptr self_ptr_; + std::weak_ptr self_ptr_; LocalEventStream stream_; std::unordered_map > callbacks_; uint64_t next_cb_uid = 0; @@ -396,23 +384,23 @@ class Reactor { virtual void Run() = 0; - std::pair> Open(const std::string &s); - std::pair> Open(); - const std::shared_ptr FindChannel(const std::string &channel_name); + std::pair> Open(const std::string &s); + std::pair> Open(); + const std::shared_ptr FindChannel(const std::string &channel_name); /** - * Close a connector by name. + * Close a channel by name. * * Should only be called from the Reactor thread. */ - void CloseConnector(const std::string &s); + void CloseChannel(const std::string &s); /** - * close all connectors (typically during shutdown). + * close all channels (typically during shutdown). * * Should only be called from the Reactor thread. */ - void CloseAllConnectors(); + void CloseAllChannels(); /** * Get Reactor name @@ -427,9 +415,9 @@ class Reactor { protected: std::string name_; /* - * Locks all Reactor data, including all Connector's in connectors_. + * Locks all Reactor data, including all Channel's in channels_. * - * This should be a shared_ptr because LocalChannel can outlive Reactor. + * This should be a shared_ptr because LocalChannelWriter can outlive Reactor. */ std::shared_ptr mutex_ = std::make_shared(); @@ -437,14 +425,14 @@ class Reactor { std::make_shared(); /** - * List of connectors of a reactor indexed by name. + * List of channels of a reactor indexed by name. * - * While the connectors are owned by the reactor, a shared_ptr to solve the circular reference problem - * between Channels and EventStreams. + * While the channels are owned by the reactor, a shared_ptr to solve the circular reference problem + * between ChannelWriters and EventStreams. */ - std::unordered_map> connectors_; - int64_t connector_name_counter_{0}; - std::pair> main_; + std::unordered_map> channels_; + int64_t channel_name_counter_{0}; + std::pair> main_; private: typedef std::pair, @@ -483,7 +471,7 @@ class System final { } template - const std::shared_ptr Spawn(const std::string &name, + const std::shared_ptr Spawn(const std::string &name, Args &&... args) { std::unique_lock lock(mutex_); auto *raw_reactor = @@ -498,7 +486,7 @@ class System final { return nullptr; } - const std::shared_ptr FindChannel(const std::string &reactor_name, + const std::shared_ptr FindChannel(const std::string &reactor_name, const std::string &channel_name) { std::unique_lock lock(mutex_); auto it_reactor = reactors_.find(reactor_name); @@ -528,7 +516,7 @@ class System final { } std::recursive_mutex mutex_; - // TODO: Replace with a map to a reactor Connector map to have more granular + // TODO: Replace with a map to a reactor Channel map to have more granular // locking. std::unordered_map, std::thread>> diff --git a/experimental/distributed/tests/distributed_test.cpp b/experimental/distributed/tests/distributed_test.cpp index aff6928ea..b0dc665d1 100644 --- a/experimental/distributed/tests/distributed_test.cpp +++ b/experimental/distributed/tests/distributed_test.cpp @@ -90,14 +90,14 @@ std::pair> /** * Sends a text message and has a return address. */ -class TextMessage : public SenderMessage { +class TextMessage : public ReturnAddressMsg { public: TextMessage(std::string reactor, std::string channel, std::string s) - : SenderMessage(reactor, channel), text(s) {} + : ReturnAddressMsg(reactor, channel), text(s) {} template void serialize(Archive &archive) { - archive(cereal::virtual_base_class(this), text); + archive(cereal::virtual_base_class(this), text); } std::string text; @@ -124,7 +124,7 @@ class Master : public Reactor { auto stream = main_.first; - // wait until every worker sends a SenderMessage back, then close + // wait until every worker sends a ReturnAddressMsg back, then close stream->OnEvent([this](const TextMessage &msg, const EventStream::Subscription &subscription) { std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; @@ -135,7 +135,7 @@ class Master : public Reactor { // (start_distributed.py runs each process in a new tab which is // closed immediately after process has finished) std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseConnector("main"); + CloseChannel("main"); } }); @@ -144,7 +144,7 @@ class Master : public Reactor { auto stream = memgraph.FindChannel(wmnid, "worker", "main"); stream->OnEventOnce() .ChainOnce([this, stream](const ChannelResolvedMessage &msg){ - msg.channel()->Send("master", "main", "hi from master"); + msg.channelWriter()->Send("master", "main", "hi from master"); stream->Close(); }); } @@ -174,12 +174,12 @@ class Worker : public Reactor { .ChainOnce([this](const TextMessage &msg) { std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; - msg.GetChannelToSender() + msg.GetReturnChannelWriter() ->Send("worker", "main", "hi from worker"); // Sleep for a while so we can read output in the terminal. std::this_thread::sleep_for(std::chrono::seconds(4)); - CloseConnector("main"); + CloseChannel("main"); }); } diff --git a/experimental/distributed/tests/local_memgraph.cpp b/experimental/distributed/tests/local_memgraph.cpp index 1e01b8188..c7e18cd7f 100644 --- a/experimental/distributed/tests/local_memgraph.cpp +++ b/experimental/distributed/tests/local_memgraph.cpp @@ -12,14 +12,14 @@ // const int NUM_WORKERS = 1; -// class Txn : public SenderMessage { +// class Txn : public ReturnAddressMsg { // public: -// Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {} +// Txn(std::string reactor, std::string channel, int64_t id) : ReturnAddressMsg(reactor, channel), id_(id) {} // int64_t id() const { return id_; } // template // void serialize(Archive &archive) { -// archive(cereal::base_class(this), id_); +// archive(cereal::base_class(this), id_); // } // private: @@ -60,30 +60,30 @@ // int64_t count_; // }; -// class CommitRequest : public SenderMessage { +// class CommitRequest : public ReturnAddressMsg { // public: // CommitRequest(std::string reactor, std::string channel, int64_t worker_id) -// : SenderMessage(reactor, channel), worker_id_(worker_id) {} +// : ReturnAddressMsg(reactor, channel), worker_id_(worker_id) {} // int64_t worker_id() { return worker_id_; } // template // void serialize(Archive &archive) { -// archive(cereal::base_class(this), worker_id_); +// archive(cereal::base_class(this), worker_id_); // } // private: // int64_t worker_id_; // }; -// class AbortRequest : public SenderMessage { +// class AbortRequest : public ReturnAddressMsg { // public: // AbortRequest(std::string reactor, std::string channel, int64_t worker_id) -// : SenderMessage(reactor, channel), worker_id_(worker_id) {} +// : ReturnAddressMsg(reactor, channel), worker_id_(worker_id) {} // int64_t worker_id() { return worker_id_; } // template // void serialize(Archive &archive) { -// archive(cereal::base_class(this), worker_id_); +// archive(cereal::base_class(this), worker_id_); // } // private: @@ -139,8 +139,8 @@ // if (Query *query = dynamic_cast(m.get())) { // ProcessQuery(query); // break; // process only the first query -// } else if (SenderMessage *msg = dynamic_cast(m.get())) { -// std::cout << "SenderMessage received!" << std::endl; +// } else if (ReturnAddressMsg *msg = dynamic_cast(m.get())) { +// std::cout << "ReturnAddressMsg received!" << std::endl; // std::cout << " Address: " << msg->Address() << std::endl; // std::cout << " Port: " << msg->Port() << std::endl; // std::cout << " Reactor: " << msg->ReactorName() << std::endl; @@ -175,27 +175,27 @@ // int worker_id = rand() % NUM_WORKERS; // int64_t xid = GetTransactionId(); // std::string txn_channel_name = GetTxnName(xid); -// auto connector = Open(txn_channel_name); -// auto stream = connector.first; +// auto channel = Open(txn_channel_name); +// auto stream = channel.first; // channels_[worker_id]->Send("master", "main", xid); // auto m = stream->AwaitEvent(); // if (CommitRequest *req = dynamic_cast(m.get())) { -// req->GetChannelToSender(system_)->Send(); +// req->GetReturnChannelWriter(system_)->Send(); // } else if (AbortRequest *req = dynamic_cast(m.get())) { -// req->GetChannelToSender(system_)->Send(); +// req->GetReturnChannelWriter(system_)->Send(); // } else { // std::cerr << "unknown message\n"; // exit(1); // } -// CloseConnector(txn_channel_name); +// CloseChannel(txn_channel_name); // } // void PerformCountNodes() { // int64_t xid = GetTransactionId(); // std::string txn_channel_name = GetTxnName(xid); -// auto connector = Open(txn_channel_name); -// auto stream = connector.first; +// auto channel = Open(txn_channel_name); +// auto stream = channel.first; // for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) // channels_[w_id]->Send("master", "main", xid); @@ -205,10 +205,10 @@ // for (int responds = 0; responds < NUM_WORKERS; ++responds) { // auto m = stream->AwaitEvent(); // if (CommitRequest *req = dynamic_cast(m.get())) { -// txn_channels[req->worker_id()] = req->GetChannelToSender(system_); +// txn_channels[req->worker_id()] = req->GetReturnChannelWriter(system_); // commit &= true; // } else if (AbortRequest *req = dynamic_cast(m.get())) { -// txn_channels[req->worker_id()] = req->GetChannelToSender(system_); +// txn_channels[req->worker_id()] = req->GetReturnChannelWriter(system_); // commit = false; // } else { // std::cerr << "unknown message\n"; @@ -236,7 +236,7 @@ // } // } -// CloseConnector(txn_channel_name); +// CloseChannel(txn_channel_name); // std::cout << "graph has " << count << " vertices" << std::endl; // } @@ -302,9 +302,9 @@ // } // void HandleCreateNode(CreateNodeTxn *txn) { -// auto connector = Open(GetTxnChannelName(txn->id())); -// auto stream = connector.first; -// auto masterChannel = txn->GetChannelToSender(system_); +// auto channel = Open(GetTxnChannelName(txn->id())); +// auto stream = channel.first; +// auto masterChannel = txn->GetReturnChannelWriter(system_); // // TODO: Do the actual commit. // masterChannel->Send("master", "main", worker_id_); // auto m = stream->AwaitEvent(); @@ -316,13 +316,13 @@ // std::cerr << "unknown message\n"; // exit(1); // } -// CloseConnector(GetTxnChannelName(txn->id())); +// CloseChannel(GetTxnChannelName(txn->id())); // } // void HandleCountNodes(CountNodesTxn *txn) { -// auto connector = Open(GetTxnChannelName(txn->id())); -// auto stream = connector.first; -// auto masterChannel = txn->GetChannelToSender(system_); +// auto channel = Open(GetTxnChannelName(txn->id())); +// auto stream = channel.first; +// auto masterChannel = txn->GetReturnChannelWriter(system_); // // TODO: Fix this hack -- use the storage. // int num = 123; @@ -337,7 +337,7 @@ // std::cerr << "unknown message\n"; // exit(1); // } -// CloseConnector(GetTxnChannelName(txn->id())); +// CloseChannel(GetTxnChannelName(txn->id())); // } // // TODO: Don't repeat code from Master. diff --git a/experimental/distributed/tests/network_chat.cpp b/experimental/distributed/tests/network_chat.cpp index 10174b8bb..52447bf25 100644 --- a/experimental/distributed/tests/network_chat.cpp +++ b/experimental/distributed/tests/network_chat.cpp @@ -3,18 +3,18 @@ #include "reactors_distributed.hpp" -class ChatMessage : public SenderMessage { +class ChatMessage : public ReturnAddressMsg { public: - ChatMessage() : SenderMessage(), message_("") {} + ChatMessage() : ReturnAddressMsg(), message_("") {} ChatMessage(std::string reactor, std::string channel, std::string message) - : SenderMessage(reactor, channel), message_(message) {} + : ReturnAddressMsg(reactor, channel), message_(message) {} std::string Message() const { return message_; } template void serialize(Archive &ar) { - ar(cereal::base_class(this), message_); + ar(cereal::base_class(this), message_); } private: @@ -56,7 +56,7 @@ class ChatServer : public Reactor { std::cout << "Received message from " << msg.Address() << ":" << msg.Port() << " -> '" << msg.Message() << "'" << std::endl; - auto channel = msg.GetChannelToSender(); + auto channel = msg.GetReturnChannelWriter(); if (channel != nullptr) { channel->Send("server", "chat", msg.Message()); } diff --git a/experimental/distributed/tests/network_client.cpp b/experimental/distributed/tests/network_client.cpp index 0d3ebe096..bcaa67688 100644 --- a/experimental/distributed/tests/network_client.cpp +++ b/experimental/distributed/tests/network_client.cpp @@ -7,7 +7,7 @@ int main(int argc, char *argv[]) { auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main"); std::cout << channel << std::endl; if (channel != nullptr) { - channel->Send("master", "main"); + channel->Send("master", "main"); } distributed.network().StopClient(); return 0; diff --git a/experimental/distributed/tests/reactors_local_unit.cpp b/experimental/distributed/tests/reactors_local_unit.cpp index 9079ad464..b3fa3e367 100644 --- a/experimental/distributed/tests/reactors_local_unit.cpp +++ b/experimental/distributed/tests/reactors_local_unit.cpp @@ -1,4 +1,5 @@ #include "gtest/gtest.h" +#include "reactors_local.hpp" #include #include @@ -9,13 +10,11 @@ #include #include -#include "reactors_local.hpp" - TEST(SystemTest, ReturnWithoutThrowing) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - CloseConnector("main"); + CloseChannel("main"); } }; @@ -31,8 +30,8 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) { virtual void Run() { Open("channel"); ASSERT_THROW(Open("channel"), std::runtime_error); - CloseConnector("main"); - CloseConnector("channel"); + CloseChannel("main"); + CloseChannel("channel"); } }; @@ -42,62 +41,26 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) { } -TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { +TEST(ChannelSetUpTest, CheckMainChannelIsSet) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); - CloseConnector("main"); + CloseChannel("main"); } }; struct Worker : public Reactor { Worker(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("master", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("master", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300)); - CloseConnector("main"); - } - }; - - System &system = System::GetInstance(); - system.Spawn("master"); - system.Spawn("worker"); - system.AwaitShutdown(); -} - - -TEST(SimpleSendTest, OneSimpleSend) { - struct MessageInt : public Message { - MessageInt(int xx) : x(xx) {} - int x; - }; - - struct Master : public Reactor { - Master(std::string name) : Reactor(name) {} - virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(123); - CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. - } - }; - - struct Worker : public Reactor { - Worker(std::string name) : Reactor(name) {} - virtual void Run() { - EventStream* stream = main_.first; - std::unique_ptr m_uptr = stream->AwaitEvent(); - CloseConnector("main"); - MessageInt* msg = dynamic_cast(m_uptr.get()); - ASSERT_NE(msg, nullptr); - ASSERT_EQ(msg->x, 123); + CloseChannel("main"); } }; @@ -116,11 +79,11 @@ TEST(SimpleSendTest, OneCallback) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(888); - CloseConnector("main"); + channel_writer->Send(888); + CloseChannel("main"); } }; @@ -131,7 +94,7 @@ TEST(SimpleSendTest, OneCallback) { stream->OnEvent([this](const MessageInt &msg, const EventStream::Subscription&) { ASSERT_EQ(msg.x, 888); - CloseConnector("main"); + CloseChannel("main"); }); } }; @@ -152,15 +115,15 @@ TEST(SimpleSendTest, IgnoreAfterClose) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(101); - channel->Send(102); // should be ignored + channel_writer->Send(101); + channel_writer->Send(102); // should be ignored std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(103); // should be ignored - channel->Send(104); // should be ignored - CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. + channel_writer->Send(103); // should be ignored + channel_writer->Send(104); // should be ignored + CloseChannel("main"); // Write-end doesn't need to be closed because it's in RAII. } }; @@ -168,11 +131,11 @@ TEST(SimpleSendTest, IgnoreAfterClose) { Worker(std::string name) : Reactor(name) {} virtual void Run() { EventStream* stream = main_.first; - std::unique_ptr m_uptr = stream->AwaitEvent(); - CloseConnector("main"); - MessageInt* msg = dynamic_cast(m_uptr.get()); - ASSERT_NE(msg, nullptr); - ASSERT_EQ(msg->x, 101); + + stream->OnEvent([this](const MessageInt& msg, const EventStream::Subscription&) { + CloseChannel("main"); + ASSERT_EQ(msg.x, 101); + }); } }; @@ -182,7 +145,6 @@ TEST(SimpleSendTest, IgnoreAfterClose) { system.AwaitShutdown(); } - TEST(SimpleSendTest, DuringFirstEvent) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} @@ -200,13 +162,13 @@ TEST(SimpleSendTest, DuringFirstEvent) { FindChannel("main")->Send(102); if (msgint.x == 102) { subscription.unsubscribe(); - CloseConnector("main"); + CloseChannel("main"); p_.set_value(777); } }); - std::shared_ptr channel = FindChannel("main"); - channel->Send(101); + std::shared_ptr channel_writer = FindChannel("main"); + channel_writer->Send(101); } std::promise p_; }; @@ -234,19 +196,19 @@ TEST(MultipleSendTest, UnsubscribeService) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(55); - channel->Send(66); - channel->Send(77); - channel->Send(88); + channel_writer->Send(55); + channel_writer->Send(66); + channel_writer->Send(77); + channel_writer->Send(88); std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send('a'); - channel->Send('b'); - channel->Send('c'); - channel->Send('d'); - CloseConnector("main"); + channel_writer->Send('a'); + channel_writer->Send('b'); + channel_writer->Send('c'); + channel_writer->Send('d'); + CloseChannel("main"); } }; @@ -271,7 +233,7 @@ TEST(MultipleSendTest, UnsubscribeService) { ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); if (num_msgs_received == 5) { subscription.unsubscribe(); - CloseConnector("main"); + CloseChannel("main"); } }); } @@ -297,15 +259,15 @@ TEST(MultipleSendTest, OnEvent) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(101); - channel->Send('a'); - channel->Send(103); - channel->Send('b'); - CloseConnector("main"); + channel_writer->Send(101); + channel_writer->Send('a'); + channel_writer->Send(103); + channel_writer->Send('b'); + CloseChannel("main"); } }; @@ -334,7 +296,7 @@ TEST(MultipleSendTest, OnEvent) { stream->OnEvent([this](const EndMessage&, const EventStream::Subscription&) { ASSERT_LE(correct_vals, 4); if (correct_vals == 4) { - CloseConnector("main"); + CloseChannel("main"); } }); } @@ -355,13 +317,13 @@ TEST(MultipleSendTest, Chaining) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(55); - channel->Send(66); - channel->Send(77); - CloseConnector("main"); + channel_writer->Send(55); + channel_writer->Send(66); + channel_writer->Send(77); + CloseChannel("main"); } }; @@ -380,7 +342,7 @@ TEST(MultipleSendTest, Chaining) { }) .ChainOnce([this](const MessageInt &msg) { ASSERT_EQ(msg.x, 77); - CloseConnector("main"); + CloseChannel("main"); }); } }; @@ -406,14 +368,14 @@ TEST(MultipleSendTest, ChainingInRightOrder) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send('a'); - channel->Send(55); - channel->Send('b'); - channel->Send(77); - CloseConnector("main"); + channel_writer->Send('a'); + channel_writer->Send(55); + channel_writer->Send('b'); + channel_writer->Send(77); + CloseChannel("main"); } }; @@ -432,7 +394,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) { }) .ChainOnce([this](const MessageInt &msg) { ASSERT_EQ(msg.x, 77); - CloseConnector("main"); + CloseChannel("main"); }); } }; @@ -455,16 +417,16 @@ TEST(MultipleSendTest, ProcessManyMessages) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} virtual void Run() { - std::shared_ptr channel; - while (!(channel = System::GetInstance().FindChannel("worker", "main"))) + std::shared_ptr channel_writer; + while (!(channel_writer = System::GetInstance().FindChannel("worker", "main"))) std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); for (int i = 0; i < num_tests; ++i) { - channel->Send(rand()); + channel_writer->Send(rand()); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5)); } - CloseConnector("main"); + CloseChannel("main"); } }; @@ -486,7 +448,7 @@ TEST(MultipleSendTest, ProcessManyMessages) { stream->OnEvent([this](const Message&, const EventStream::Subscription&) { ASSERT_LE(vals, num_tests); if (vals == num_tests) { - CloseConnector("main"); + CloseChannel("main"); } }); } diff --git a/experimental/distributed/tests/serialize_test.cpp b/experimental/distributed/tests/serialize_test.cpp deleted file mode 100644 index 1da6a1e43..000000000 --- a/experimental/distributed/tests/serialize_test.cpp +++ /dev/null @@ -1,126 +0,0 @@ -#include -#include - -#include "cereal/archives/binary.hpp" -#include "cereal/types/memory.hpp" -#include "cereal/types/string.hpp" -#include "cereal/types/utility.hpp" // utility has to be included because of std::pair -#include "cereal/types/vector.hpp" - -struct BasicSerializable { - int64_t x_; - std::string y_; - - BasicSerializable() = default; - BasicSerializable(int64_t x, std::string y) : x_(x), y_(y) {} - - template - void serialize(Archive &ar) { - ar(x_, y_); - } - - template - static void load_and_construct( - Archive &ar, cereal::construct &construct) { - int64_t x; - std::string y; - ar(x, y); - construct(x, y); - } -}; - -struct ComplexSerializable { - using VectorT = std::vector; - using VectorPairT = std::vector>; - - BasicSerializable x_; - VectorT y_; - VectorPairT z_; - - ComplexSerializable(const BasicSerializable &x, const VectorT &y, - const VectorPairT &z) - : x_(x), y_(y), z_(z) {} - - template - void serialize(Archive &ar) { - ar(x_, y_, z_); - } - - template - static void load_and_construct( - Archive &ar, cereal::construct &construct) { - BasicSerializable x; - VectorT y; - VectorPairT z; - ar(x, y, z); - construct(x, y, z); - } -}; - -class DummyStreamBuf : public std::basic_streambuf { - protected: - std::streamsize xsputn(const char *data, std::streamsize count) override { - for (std::streamsize i = 0; i < count; ++i) { - data_.push_back(data[i]); - } - return count; - } - std::streamsize xsgetn(char *data, std::streamsize count) override { - if (count < 0) return 0; - if (static_cast(position_ + count) > data_.size()) { - count = data_.size() - position_; - position_ = data_.size(); - } - memcpy(data, data_.data() + position_, count); - position_ += count; - return count; - } - - private: - std::vector data_; - std::streamsize position_{0}; -}; - -int main() { - DummyStreamBuf sb; - std::iostream iostream(&sb); - - // serialization - cereal::BinaryOutputArchive oarchive{iostream}; - std::unique_ptr const basic_serializable_object{ - new BasicSerializable{100, "Test"}}; - std::unique_ptr const complex_serializable_object{ - new ComplexSerializable{ - {100, "test"}, - {3.4, 3.4}, - {{"first", {10, "Basic1"}}, {"second", {20, "Basic2"}}}}}; - oarchive(basic_serializable_object); - oarchive(complex_serializable_object); - - // deserialization - cereal::BinaryInputArchive iarchive{iostream}; - std::unique_ptr basic_deserialized_object{nullptr}; - std::unique_ptr complex_deserialized_object{nullptr}; - iarchive(basic_deserialized_object); - iarchive(complex_deserialized_object); - - // output - std::cout << "Basic Deserialized: " << basic_deserialized_object->x_ << "; " - << basic_deserialized_object->y_ << std::endl; - auto x = complex_deserialized_object->x_; - auto y = complex_deserialized_object->y_; - auto z = complex_deserialized_object->z_; - std::cout << "Complex Deserialized" << std::endl; - std::cout << " x_ -> " << x.x_ << "; " << x.y_ << std::endl; - std::cout << " y_ -> "; - for (const auto v_item : y) std::cout << v_item << " "; - std::cout << std::endl; - std::cout << " z_ -> "; - for (const auto v_item : z) - std::cout << v_item.first << " | Pair: (" << v_item.second.x_ << ", " - << v_item.second.y_ << ")" - << "::"; - std::cout << std::endl; - - return 0; -}