From 3fecc4f357619a7252b982e2044b4286e7829d6c Mon Sep 17 00:00:00 2001 From: Goran Zuzic Date: Wed, 9 Aug 2017 16:52:02 +0200 Subject: [PATCH] Callback Chainer Convenience Class Reviewers: buda, sasa.stanko, lion Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D653 --- .../distributed/src/communication.hpp | 70 +++++++++++++++++++ .../distributed/tests/connector_unit.cpp | 45 ++++++++++++ 2 files changed, 115 insertions(+) diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index 6c449a9b0..f1244d5bc 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -130,6 +130,18 @@ class EventStream { }); } + class OnEventOnceChainer; + /** + * Starts a chain to register a callback that fires off only once. + * + * This method supports chaining (see the the class OnEventOnceChainer or the tests for examples). + * Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last + * chained callback fired. + */ + OnEventOnceChainer OnEventOnce() { + return OnEventOnceChainer(*this); + } + /** * Close this event stream, disallowing further events from getting received. * @@ -139,6 +151,64 @@ class EventStream { */ virtual void Close() = 0; + /** + * Convenience class to chain one-off callbacks. + * + * Usage: Create this class with OnEventOnce() and then chain callbacks using ChainOnce. + * A callback will fire only once, unsubscribe and immediately subscribe the next callback to the stream. + * + * Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb); + * + * Implementation: This class is a temporary object that remembers the callbacks that are to be installed + * and finally installs them in the destructor. Not sure is this kosher, is there another way? + */ + class OnEventOnceChainer { + public: + OnEventOnceChainer(EventStream& event_stream) : event_stream_(event_stream) {} + ~OnEventOnceChainer() { + InstallCallbacks(); + } + + template + OnEventOnceChainer& ChainOnce(std::function&& cb) { + std::function wrap = + [cb = std::move(cb)](const Message& general_msg, const Subscription& subscription) { + const MsgType& correct_msg = dynamic_cast(general_msg); + subscription.unsubscribe(); + cb(correct_msg); // Warning: this can close the Channel, be careful what you put after it! + }; + cbs_.emplace_back(typeid(MsgType), std::move(wrap)); + return *this; + } + + private: + void InstallCallbacks() { + int num_callbacks = cbs_.size(); + assert(num_callbacks > 0); // We should install at least one callback, otherwise the usage is wrong? + std::function next_cb = nullptr; + std::type_index next_type = typeid(nullptr); + + for (int i = num_callbacks - 1; i >= 0; --i) { + std::function tmp_cb = nullptr; + tmp_cb = [cb = std::move(cbs_[i].second), + next_type, + next_cb = std::move(next_cb), + es_ptr = &this->event_stream_](const Message& msg, const Subscription& subscription) { + cb(msg, subscription); + if (next_cb != nullptr) { + es_ptr->OnEventHelper(next_type, std::move(next_cb)); + } + }; + next_cb = std::move(tmp_cb); + next_type = cbs_[i].first; + } + + event_stream_.OnEventHelper(next_type, std::move(next_cb)); + } + + EventStream& event_stream_; + std::vector>> cbs_; + }; typedef std::function Callback; private: diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/connector_unit.cpp index 383f57c37..0890ebf89 100644 --- a/experimental/distributed/tests/connector_unit.cpp +++ b/experimental/distributed/tests/connector_unit.cpp @@ -308,6 +308,51 @@ TEST(MultipleSendTest, OnEvent) { system.AwaitShutdown(); } +TEST(MultipleSendTest, Chaining) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + struct Master : public Reactor { + Master(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send(55); + channel->Send(66); + channel->Send(77); + CloseConnector("main"); + } + }; + + struct Worker : public Reactor { + Worker(System *system, std::string name) : Reactor(system, name) {} + + virtual void Run() { + EventStream* stream = main_.first; + + stream->OnEventOnce() + .ChainOnce([this](const MessageInt& msg) { + ASSERT_EQ(msg.x, 55); + }) + .ChainOnce([](const MessageInt& msg) { + ASSERT_EQ(msg.x, 66); + }) + .ChainOnce([this](const MessageInt& msg) { + ASSERT_EQ(msg.x, 77); + CloseConnector("main"); + }); + } + }; + + System system; + system.Spawn("master"); + system.Spawn("worker"); + system.AwaitShutdown(); +} +