diff --git a/.gitignore b/.gitignore index 72a2e4a86..ed6de26ee 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ tags ve/ ve3/ perf.data* +TAGS diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index da954042a..2fa2f1692 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -12,7 +12,7 @@ const int NUM_WORKERS = 1; class Txn : public SenderMessage { public: - Txn(ChannelRefT channel, int64_t id) : SenderMessage(channel), id_(id) {} + Txn(std::shared_ptr channel, int64_t id) : SenderMessage(channel), id_(id) {} int64_t id() const { return id_; } template @@ -26,7 +26,7 @@ class Txn : public SenderMessage { class CreateNodeTxn : public Txn { public: - CreateNodeTxn(ChannelRefT channel, int64_t id) : Txn(channel, id) {} + CreateNodeTxn(std::shared_ptr channel, int64_t id) : Txn(channel, id) {} template void serialize(Archive &archive) { @@ -36,7 +36,7 @@ class CreateNodeTxn : public Txn { class CountNodesTxn : public Txn { public: - CountNodesTxn(ChannelRefT channel, int64_t id) : Txn(channel, id) {} + CountNodesTxn(std::shared_ptr channel, int64_t id) : Txn(channel, id) {} template void serialize(Archive &archive) { @@ -60,7 +60,7 @@ class CountNodesTxnResult : public Message { class CommitRequest : public SenderMessage { public: - CommitRequest(ChannelRefT sender, int64_t worker_id) + CommitRequest(std::shared_ptr sender, int64_t worker_id) : SenderMessage(sender), worker_id_(worker_id) {} int64_t worker_id() { return worker_id_; } @@ -75,7 +75,7 @@ class CommitRequest : public SenderMessage { class AbortRequest : public SenderMessage { public: - AbortRequest(ChannelRefT sender, int64_t worker_id) + AbortRequest(std::shared_ptr sender, int64_t worker_id) : SenderMessage(sender), worker_id_(worker_id) {} int64_t worker_id() { return worker_id_; } @@ -182,7 +182,7 @@ class Master : public Reactor { std::cerr << "unknown message\n"; exit(1); } - Close(txn_channel_name); + CloseConnector(txn_channel_name); } void PerformCountNodes() { @@ -194,7 +194,7 @@ class Master : public Reactor { channels_[w_id]->Send( std::make_unique(connector.second, xid)); - std::vector txn_channels; + std::vector> txn_channels; txn_channels.resize(NUM_WORKERS, nullptr); bool commit = true; for (int responds = 0; responds < NUM_WORKERS; ++responds) { @@ -231,7 +231,7 @@ class Master : public Reactor { } } - Close(txn_channel_name); + CloseConnector(txn_channel_name); std::cout << "graph has " << count << " vertices" << std::endl; } @@ -312,7 +312,7 @@ class Worker : public Reactor { std::cerr << "unknown message\n"; exit(1); } - Close(GetTxnChannelName(txn->id())); + CloseConnector(GetTxnChannelName(txn->id())); } void HandleCountNodes(CountNodesTxn *txn) { @@ -334,7 +334,7 @@ class Worker : public Reactor { std::cerr << "unknown message\n"; exit(1); } - Close(GetTxnChannelName(txn->id())); + CloseConnector(GetTxnChannelName(txn->id())); } // TODO: Don't repeat code from Master. @@ -350,7 +350,6 @@ class Worker : public Reactor { std::shared_ptr master_channel_ = nullptr; int worker_id_; - // Storage storage_; }; void ClientMain(System *system) { diff --git a/experimental/distributed/src/communication.cpp b/experimental/distributed/src/communication.cpp index 7a80636b1..c5d6089d0 100644 --- a/experimental/distributed/src/communication.cpp +++ b/experimental/distributed/src/communication.cpp @@ -6,107 +6,96 @@ void EventStream::Subscription::unsubscribe() { thread_local Reactor* current_reactor_ = nullptr; -std::string EventQueue::LocalChannel::Hostname() { +std::string Connector::LocalChannel::Hostname() { return system_->network().Hostname(); } -int32_t EventQueue::LocalChannel::Port() { +int32_t Connector::LocalChannel::Port() { return system_->network().Port(); } -std::string EventQueue::LocalChannel::ReactorName() { +std::string Connector::LocalChannel::ReactorName() { return reactor_name_; } -std::string EventQueue::LocalChannel::Name() { +std::string Connector::LocalChannel::Name() { return name_; } -void EventQueue::LocalEventStream::Close() { - current_reactor_->Close(name_); -} - -ConnectorT Reactor::Open(const std::string &channel_name) { - std::unique_lock lock(*mutex_); +std::pair> Reactor::Open(const std::string &channel_name) { + std::unique_lock lock(*mutex_); // TODO: Improve the check that the channel name does not exist in the // system. assert(connectors_.count(channel_name) == 0); auto it = connectors_.emplace(channel_name, - EventQueue::Params{system_, name_, channel_name, mutex_, cvar_}).first; - return ConnectorT(it->second.stream_, it->second.channel_); + std::make_shared(Connector::Params{system_, name_, channel_name, mutex_, cvar_})).first; + it->second->self_ptr_ = it->second; + return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } -ConnectorT Reactor::Open() { - std::unique_lock lock(*mutex_); +std::pair> Reactor::Open() { + std::unique_lock lock(*mutex_); do { std::string channel_name = "stream-" + std::to_string(channel_name_counter_++); if (connectors_.count(channel_name) == 0) { - // EventQueue &queue = connectors_[channel_name]; + // Connector &queue = connectors_[channel_name]; auto it = connectors_.emplace(channel_name, - EventQueue::Params{system_, name_, channel_name, mutex_, cvar_}).first; - return ConnectorT(it->second.stream_, it->second.channel_); + std::make_shared(Connector::Params{system_, name_, channel_name, mutex_, cvar_})).first; + it->second->self_ptr_ = it->second; + return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } } while (true); } const std::shared_ptr Reactor::FindChannel( const std::string &channel_name) { - std::unique_lock lock(*mutex_); + std::unique_lock lock(*mutex_); auto it_connector = connectors_.find(channel_name); if (it_connector == connectors_.end()) return nullptr; - return it_connector->second.channel_; + return it_connector->second->LockedOpenChannel(); } -void Reactor::Close(const std::string &s) { - std::unique_lock lock(*mutex_); +void Reactor::CloseConnector(const std::string &s) { + std::unique_lock lock(*mutex_); auto it = connectors_.find(s); assert(it != connectors_.end()); - LockedCloseInternal(it->second); - connectors_.erase(it); // this calls the EventQueue destructor that catches the mutex, ugh. + connectors_.erase(it); } -void Reactor::LockedCloseInternal(EventQueue& event_queue) { - // TODO(zuza): figure this out! @@@@ - std::cout << "Close Channel! Reactor name = " << name_ << " Channel name = " << event_queue.name_ << std::endl; +void Reactor::CloseAllConnectors() { + std::unique_lock lock(*mutex_); + connectors_.clear(); } void Reactor::RunEventLoop() { - std::cout << "event loop is run!" << std::endl; + bool exit_event_loop = false; + while (true) { - // Clean up EventQueues without callbacks. + // Find (or wait) for the next Message. + MsgAndCbInfo msg_and_cb; { - std::unique_lock lock(*mutex_); - for (auto connectors_it = connectors_.begin(); connectors_it != connectors_.end(); ) { - EventQueue& event_queue = connectors_it->second; - if (event_queue.LockedCanBeClosed()) { - LockedCloseInternal(event_queue); - connectors_it = connectors_.erase(connectors_it); // This removes the element from the collection. - } else { - ++connectors_it; - } - } - } - - // Process and wait for events to dispatch. - MsgAndCbInfo msgAndCb; - { - std::unique_lock lock(*mutex_); - - // Exit the loop if there are no more EventQueues. - if (connectors_.empty()) { - return; - } + std::unique_lock lock(*mutex_); while (true) { - msgAndCb = LockedGetPendingMessages(lock); - if (msgAndCb.first != nullptr) break; + // Exit the loop if there are no more Connectors. + if (connectors_.empty()) { + exit_event_loop = true; + break; + } + + // Not fair because was taken earlier, talk to lion. + msg_and_cb = LockedGetPendingMessages(lock); + if (msg_and_cb.first != nullptr) break; + cvar_->wait(lock); } + + if (exit_event_loop) break; } - for (auto& cbAndSub : msgAndCb.second) { + for (auto& cbAndSub : msg_and_cb.second) { auto& cb = cbAndSub.first; - const Message& msg = *msgAndCb.first; + const Message& msg = *msg_and_cb.first; cb(msg, cbAndSub.second); } } @@ -115,10 +104,10 @@ void Reactor::RunEventLoop() { /** * Checks if there is any nonempty EventStream. */ -auto Reactor::LockedGetPendingMessages(std::unique_lock &lock) -> MsgAndCbInfo { +auto Reactor::LockedGetPendingMessages(std::unique_lock &lock) -> MsgAndCbInfo { // return type after because the scope Reactor:: is not searched before the name for (auto& connectors_key_value : connectors_) { - EventQueue& event_queue = connectors_key_value.second; + Connector& event_queue = *connectors_key_value.second; auto msg_ptr = event_queue.LockedPop(lock); if (msg_ptr == nullptr) continue; diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index 68441b144..6794afb55 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -19,7 +19,7 @@ class Message; class EventStream; class Reactor; class System; -class EventQueue; +class Connector; extern thread_local Reactor* current_reactor_; @@ -74,10 +74,10 @@ class EventStream { private: friend class Reactor; - Subscription(EventQueue& event_queue, uint64_t cb_uid) : event_queue_(event_queue) { + Subscription(Connector& event_queue, uint64_t cb_uid) : event_queue_(event_queue) { cb_uid_ = cb_uid; } - EventQueue& event_queue_; + Connector& event_queue_; uint64_t cb_uid_; }; @@ -87,11 +87,6 @@ class EventStream { * Register a callback that will be called whenever an event arrives. */ virtual void OnEvent(Callback callback) = 0; - - /** - * Close this event stream, disallowing further events from getting received. - */ - virtual void Close() = 0; }; /** @@ -99,88 +94,51 @@ class EventStream { * * This class is an internal data structure that represents the state of the connector. * This class is not meant to be used by the clients of the messaging framework. - * The EventQueue class wraps the event queue data structure, the mutex that protects + * The Connector class wraps the event queue data structure, the mutex that protects * concurrent access to the event queue, the local channel and the event stream. - * The class is owned by the Reactor, but its LocalChannel can outlive it. - * See the LocalChannel and LocalEventStream nested classes for further information. + * The class is owned by the Reactor. It gets closed when the owner reactor + * (the one that owns the read-end of a connector) removes/closes it. */ -class EventQueue { +class Connector { + class Params; + public: - friend class Reactor; + friend class Reactor; // to create a Params initialization object friend class EventStream::Subscription; - struct Params { - System* system; - std::string reactor_name; - std::string name; - std::shared_ptr mutex; - std::shared_ptr cvar; - }; - - EventQueue(Params params) + Connector(Params params) : system_(params.system), - reactor_name_(params.reactor_name), name_(params.name), + reactor_name_(params.reactor_name), mutex_(params.mutex), - cvar_(params.cvar) {} + cvar_(params.cvar), + stream_(mutex_, name_, this) {} /** - * The destructor locks the mutex of the EventQueue and sets queue pointer to null. - */ - ~EventQueue() { - // Ugly: this is the ONLY thing that is allowed to lock this recursive mutex twice. - // This is because we can't make a locked and a unlocked version of the destructor. - std::unique_lock lock(*mutex_); - stream_->queue_ = nullptr; - channel_->queue_ = nullptr; - } - - void LockedPush(std::unique_lock &, std::unique_ptr m) { - queue_.push(std::move(m)); - cvar_->notify_one(); - } - - std::unique_ptr LockedAwaitPop(std::unique_lock &lock) { - std::unique_ptr m; - while (!(m = LockedRawPop())) { - cvar_->wait(lock); - } - return m; - } - - std::unique_ptr LockedPop(std::unique_lock &lock) { - return LockedRawPop(); - } - - void LockedOnEvent(EventStream::Callback callback) { - uint64_t cb_uid = next_cb_uid++; - callbacks_[cb_uid] = callback; - } - - /** - * LocalChannel represents the channels to reactors living in the same reactor system. + * LocalChannel represents the channels to reactors living in the same reactor system (write-end of the connectors). * * Sending messages to the local channel requires acquiring the mutex. - * LocalChannel holds a pointer to the enclosing EventQueue object. - * The enclosing EventQueue object is destroyed when the reactor calls Close. - * When this happens, the pointer to the enclosing EventQueue object is set to null. - * After this, all the message sends on this channel are dropped. + * LocalChannel holds a (weak) pointer to the enclosing Connector object. + * Messages sent to a closed channel are ignored. + * There can be multiple LocalChannels refering to the same stream if needed. */ class LocalChannel : public Channel { public: - friend class EventQueue; + friend class Connector; - LocalChannel(std::shared_ptr mutex, std::string reactor_name, - std::string name, EventQueue *queue, System *system) + LocalChannel(std::shared_ptr mutex, std::string reactor_name, + std::string name, std::weak_ptr queue, System *system) : mutex_(mutex), reactor_name_(reactor_name), name_(name), - queue_(queue), + weak_queue_(queue), system_(system) {} virtual void Send(std::unique_ptr m) { - std::unique_lock lock(*mutex_); - if (queue_ != nullptr) { + std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. + if (queue_) { + // We guarantee here that the Connector is not destroyed. + std::unique_lock lock(*mutex_); queue_->LockedPush(lock, std::move(m)); } } @@ -194,27 +152,26 @@ class EventQueue { virtual std::string Name(); private: - std::shared_ptr mutex_; + std::shared_ptr mutex_; std::string reactor_name_; std::string name_; - EventQueue *queue_; + std::weak_ptr weak_queue_; System *system_; }; /** * Implementation of the event stream. * - * After the enclosing EventQueue object is destroyed (by a call to Close), - * it is no longer legal to call any of the event stream methods. + * After the enclosing Connector object is destroyed (by a call to CloseChannel or Close). */ class LocalEventStream : public EventStream { public: - friend class EventQueue; + friend class Connector; - LocalEventStream(std::shared_ptr mutex, std::string name, - EventQueue *queue) : mutex_(mutex), name_(name), queue_(queue) {} + LocalEventStream(std::shared_ptr mutex, std::string name, + Connector *queue) : mutex_(mutex), name_(name), queue_(queue) {} std::unique_ptr AwaitEvent() { - std::unique_lock lock(*mutex_); + std::unique_lock lock(*mutex_); if (queue_ != nullptr) { return queue_->LockedAwaitPop(lock); } @@ -222,7 +179,7 @@ class EventQueue { "Cannot call method after connector was closed."); } std::unique_ptr PopEvent() { - std::unique_lock lock(*mutex_); + std::unique_lock lock(*mutex_); if (queue_ != nullptr) { return queue_->LockedPop(lock); } @@ -230,7 +187,7 @@ class EventQueue { "Cannot call method after connector was closed."); } void OnEvent(EventStream::Callback callback) { - std::unique_lock lock(*mutex_); + std::unique_lock lock(*mutex_); if (queue_ != nullptr) { queue_->LockedOnEvent(callback); return; @@ -239,67 +196,97 @@ class EventQueue { "Cannot call method after connector was closed."); } - void Close(); - private: - std::shared_ptr mutex_; + std::shared_ptr mutex_; std::string name_; - EventQueue *queue_; + Connector *queue_; + }; + + Connector(const Connector &other) = delete; + Connector(Connector &&other) = default; + Connector &operator=(const Connector &other) = delete; + Connector &operator=(Connector &&other) = default; + +private: + /** + * Initialization parameters to Connector. + * Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor. + */ + struct Params { + System* system; + std::string reactor_name; + /** + * Connector name. + */ + std::string name; + std::shared_ptr mutex; + std::shared_ptr cvar; }; - private: + + void LockedPush(std::unique_lock &, std::unique_ptr m) { + queue_.push(std::move(m)); + // This is OK because there is only one Reactor (thread) that can wait on this Connector. + cvar_->notify_one(); + } + + std::shared_ptr LockedOpenChannel() { + assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned + return std::make_shared(mutex_, reactor_name_, name_, self_ptr_, system_); + } + + std::unique_ptr LockedAwaitPop(std::unique_lock &lock) { + std::unique_ptr m; + while (!(m = LockedRawPop())) { + cvar_->wait(lock); + } + return m; + } + + std::unique_ptr LockedPop(std::unique_lock &lock) { + return LockedRawPop(); + } + + void LockedOnEvent(EventStream::Callback callback) { + uint64_t cb_uid = next_cb_uid++; + callbacks_[cb_uid] = callback; + } + std::unique_ptr LockedRawPop() { if (queue_.empty()) return nullptr; std::unique_ptr t = std::move(queue_.front()); queue_.pop(); - return std::move(t); - } - - /** - * Should the owner close this EventQueue? - * - * Currently only checks if there are no more messages and all callbacks have unsubscribed? - * This assumes the event loop has been started. - */ - bool LockedCanBeClosed() { - return callbacks_.empty() && queue_.empty(); + return t; } void RemoveCbByUid(uint64_t uid) { - std::unique_lock lock(*mutex_); + std::unique_lock lock(*mutex_); size_t num_erased = callbacks_.erase(uid); assert(num_erased == 1); - - // TODO(zuza): if no more callbacks, shut down the class (and the eventloop is started). First, figure out ownership of EventQueue? } System *system_; std::string name_; std::string reactor_name_; - std::queue> queue_; // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. - std::shared_ptr mutex_; - std::shared_ptr cvar_; - std::shared_ptr stream_ = - std::make_shared(mutex_, name_, this); - std::shared_ptr channel_ = - std::make_shared(mutex_, reactor_name_, name_, this, system_); + std::shared_ptr mutex_; + std::shared_ptr cvar_; + /** + * A weak_ptr to itself. + * + * There are initialization problems with this, check Params. + */ + std::weak_ptr self_ptr_; + LocalEventStream stream_; std::unordered_map callbacks_; uint64_t next_cb_uid = 0; - }; -/** - * Pair composed of read-end and write-end of a connection. - */ -using ConnectorT = std::pair, std::shared_ptr>; -using ChannelRefT = std::shared_ptr; - /** * A single unit of concurrent execution in the system. * - * E.g. one worker, one client. Owned by System. + * E.g. one worker, one client. Owned by System. Has a thread associated with it. */ class Reactor { public: @@ -307,31 +294,56 @@ class Reactor { Reactor(System *system, std::string name) : system_(system), name_(name), main_(Open("main")) {} - + virtual ~Reactor() {} virtual void Run() = 0; - ConnectorT Open(const std::string &s); - ConnectorT Open(); + std::pair> Open(const std::string &s); + std::pair> Open(); const std::shared_ptr FindChannel(const std::string &channel_name); - void Close(const std::string &s); + /** + * Close a connector by name. + * + * Should only be called from the Reactor thread. + */ + void CloseConnector(const std::string &s); + + /** + * close all connectors (typically during shutdown). + * + * Should only be called from the Reactor thread. + */ + void CloseAllConnectors(); + + Reactor(const Reactor &other) = delete; + Reactor(Reactor &&other) = default; + Reactor &operator=(const Reactor &other) = delete; + Reactor &operator=(Reactor &&other) = default; + protected: System *system_; std::string name_; /* - * Locks all Reactor data, including all EventQueue's in connectors_. + * Locks all Reactor data, including all Connector's in connectors_. * * This should be a shared_ptr because LocalChannel can outlive Reactor. */ - std::shared_ptr mutex_ = - std::make_shared(); - std::shared_ptr cvar_ = - std::make_shared(); - std::unordered_map connectors_; + std::shared_ptr mutex_ = + std::make_shared(); + std::shared_ptr cvar_ = + std::make_shared(); + + /** + * List of connectors of a reactor indexed by name. + * + * While the connectors are owned by the reactor, a shared_ptr to solve the circular reference problem + * between Channels and EventStreams. + */ + std::unordered_map> connectors_; int64_t channel_name_counter_{0}; - ConnectorT main_; + std::pair> main_; private: typedef std::pair, @@ -342,10 +354,8 @@ class Reactor { */ void RunEventLoop(); - void LockedCloseInternal(EventQueue& event_queue); - // TODO: remove proof of locking evidence ?! - MsgAndCbInfo LockedGetPendingMessages(std::unique_lock &lock); + MsgAndCbInfo LockedGetPendingMessages(std::unique_lock &lock); }; /** @@ -437,9 +447,9 @@ class Message { */ class SenderMessage : public Message { public: - SenderMessage(ChannelRefT sender) : sender_(sender) {} + SenderMessage(std::shared_ptr sender) : sender_(sender) {} - ChannelRefT sender() { return sender_; } + std::shared_ptr sender() { return sender_; } template void serialize(Archive &ar) { @@ -447,7 +457,7 @@ class SenderMessage : public Message { } private: - ChannelRefT sender_; + std::shared_ptr sender_; }; /** @@ -534,7 +544,7 @@ class System { } std::recursive_mutex mutex_; - // TODO: Replace with a map to a reactor EventQueue map to have more granular + // TODO: Replace with a map to a reactor Connector map to have more granular // locking. std::unordered_map, std::thread>> diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/connector_unit.cpp new file mode 100644 index 000000000..fe4c6165d --- /dev/null +++ b/experimental/distributed/tests/connector_unit.cpp @@ -0,0 +1,125 @@ +#include "gtest/gtest.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "communication.hpp" + +TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { + struct Master : public Reactor { + Master(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::seconds(1)); + CloseConnector("main"); + } + }; + + struct Worker : public Reactor { + Worker(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("master", "main"))) + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::seconds(1)); + CloseConnector("main"); + } + }; + + System system; + system.Spawn("master"); + system.Spawn("worker"); + system.AwaitShutdown(); +} + + +TEST(SimpleSendTest, OneSimpleSend) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + struct Master : public Reactor { + Master(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::seconds(1)); + channel->Send(std::make_unique(123)); + CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. + } + }; + + struct Worker : public Reactor { + Worker(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + EventStream* stream = main_.first; + std::unique_ptr m_uptr = stream->AwaitEvent(); + CloseConnector("main"); + MessageInt* msg = dynamic_cast(m_uptr.get()); + ASSERT_NE(msg, nullptr); + ASSERT_EQ(msg->x, 123); + } + }; + + System system; + system.Spawn("master"); + system.Spawn("worker"); + system.AwaitShutdown(); +} + + +TEST(SimpleSendTest, IgnoreAfterClose) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + struct Master : public Reactor { + Master(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::seconds(1)); + channel->Send(std::make_unique(101)); + channel->Send(std::make_unique(102)); + std::this_thread::sleep_for(std::chrono::seconds(1)); + channel->Send(std::make_unique(103)); // these ones should be ignored + channel->Send(std::make_unique(104)); + CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. + } + }; + + struct Worker : public Reactor { + Worker(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + EventStream* stream = main_.first; + std::unique_ptr m_uptr = stream->AwaitEvent(); + CloseConnector("main"); + MessageInt* msg = dynamic_cast(m_uptr.get()); + ASSERT_NE(msg, nullptr); + ASSERT_EQ(msg->x, 101); + } + }; + + System system; + system.Spawn("master"); + system.Spawn("worker"); + system.AwaitShutdown(); +} + + + + + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}