Callback Chainer Convenience Class

Reviewers: buda, sasa.stanko, lion

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D653
This commit is contained in:
Goran Zuzic 2017-08-09 16:52:02 +02:00
parent 2446571770
commit 3fecc4f357
2 changed files with 115 additions and 0 deletions

View File

@ -130,6 +130,18 @@ class EventStream {
});
}
class OnEventOnceChainer;
/**
* Starts a chain to register a callback that fires off only once.
*
* This method supports chaining (see the the class OnEventOnceChainer or the tests for examples).
* Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last
* chained callback fired.
*/
OnEventOnceChainer OnEventOnce() {
return OnEventOnceChainer(*this);
}
/**
* Close this event stream, disallowing further events from getting received.
*
@ -139,6 +151,64 @@ class EventStream {
*/
virtual void Close() = 0;
/**
* Convenience class to chain one-off callbacks.
*
* Usage: Create this class with OnEventOnce() and then chain callbacks using ChainOnce.
* A callback will fire only once, unsubscribe and immediately subscribe the next callback to the stream.
*
* Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb);
*
* Implementation: This class is a temporary object that remembers the callbacks that are to be installed
* and finally installs them in the destructor. Not sure is this kosher, is there another way?
*/
class OnEventOnceChainer {
public:
OnEventOnceChainer(EventStream& event_stream) : event_stream_(event_stream) {}
~OnEventOnceChainer() {
InstallCallbacks();
}
template<typename MsgType>
OnEventOnceChainer& ChainOnce(std::function<void(const MsgType&)>&& cb) {
std::function<void(const Message&, const Subscription&)> wrap =
[cb = std::move(cb)](const Message& general_msg, const Subscription& subscription) {
const MsgType& correct_msg = dynamic_cast<const MsgType&>(general_msg);
subscription.unsubscribe();
cb(correct_msg); // Warning: this can close the Channel, be careful what you put after it!
};
cbs_.emplace_back(typeid(MsgType), std::move(wrap));
return *this;
}
private:
void InstallCallbacks() {
int num_callbacks = cbs_.size();
assert(num_callbacks > 0); // We should install at least one callback, otherwise the usage is wrong?
std::function<void(const Message&, const Subscription&)> next_cb = nullptr;
std::type_index next_type = typeid(nullptr);
for (int i = num_callbacks - 1; i >= 0; --i) {
std::function<void(const Message&, const Subscription&)> tmp_cb = nullptr;
tmp_cb = [cb = std::move(cbs_[i].second),
next_type,
next_cb = std::move(next_cb),
es_ptr = &this->event_stream_](const Message& msg, const Subscription& subscription) {
cb(msg, subscription);
if (next_cb != nullptr) {
es_ptr->OnEventHelper(next_type, std::move(next_cb));
}
};
next_cb = std::move(tmp_cb);
next_type = cbs_[i].first;
}
event_stream_.OnEventHelper(next_type, std::move(next_cb));
}
EventStream& event_stream_;
std::vector<std::pair<std::type_index, std::function<void(const Message&, const Subscription&)>>> cbs_;
};
typedef std::function<void(const Message&, const Subscription&)> Callback;
private:

View File

@ -308,6 +308,51 @@ TEST(MultipleSendTest, OnEvent) {
system.AwaitShutdown();
}
TEST(MultipleSendTest, Chaining) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55);
channel->Send<MessageInt>(66);
channel->Send<MessageInt>(77);
CloseConnector("main");
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageInt>([](const MessageInt& msg) {
ASSERT_EQ(msg.x, 66);
})
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
ASSERT_EQ(msg.x, 77);
CloseConnector("main");
});
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}