From 5c4c4919a2be1b68185b9aafea1aeebb8f196b13 Mon Sep 17 00:00:00 2001 From: Goran Zuzic Date: Wed, 9 Aug 2017 11:58:53 +0200 Subject: [PATCH] Better OnEvent usability Reviewers: lion, buda, sasa.stanko Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D650 --- experimental/distributed/main.cpp | 2 +- .../distributed/src/communication.hpp | 23 ++++++++++++++----- .../distributed/tests/connector_unit.cpp | 20 ++++++---------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index f858d3951..cc21bc7f3 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -149,7 +149,7 @@ class Master : public Reactor { } } - stream->OnEvent(typeid(Message), [this](const Message &msg, const EventStream::Subscription& subscription) { + stream->OnEvent([this](const Message &msg, const EventStream::Subscription& subscription) { std::cout << "Processing Query via Callback" << std::endl; const Query &query = dynamic_cast(msg); // exception bad_cast diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index a32b95fd3..6c449a9b0 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -118,12 +118,17 @@ class EventStream { uint64_t cb_uid_; }; - typedef std::function Callback; - /** * Register a callback that will be called whenever an event arrives. */ - virtual void OnEvent(std::type_index tidx, Callback callback) = 0; + template + void OnEvent(std::function&& cb) { + OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message& general_msg, + const Subscription& subscription) { + const MsgType& correct_msg = dynamic_cast(general_msg); + cb(correct_msg, subscription); + }); + } /** * Close this event stream, disallowing further events from getting received. @@ -133,6 +138,12 @@ class EventStream { * associated with the Reactor. */ virtual void Close() = 0; + + typedef std::function Callback; + +private: + + virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0; }; /** @@ -224,9 +235,9 @@ class Connector { std::unique_lock lock(*mutex_); return queue_->LockedPop(); } - void OnEvent(std::type_index tidx, Callback callback) { + void OnEventHelper(std::type_index tidx, Callback callback) { std::unique_lock lock(*mutex_); - queue_->LockedOnEvent(tidx, callback); + queue_->LockedOnEventHelper(tidx, callback); } void Close(); @@ -281,7 +292,7 @@ private: return LockedRawPop(); } - void LockedOnEvent(std::type_index tidx, EventStream::Callback callback) { + void LockedOnEventHelper(std::type_index tidx, EventStream::Callback callback) { uint64_t cb_uid = next_cb_uid++; callbacks_[tidx][cb_uid] = callback; } diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/connector_unit.cpp index 3144f5fe3..383f57c37 100644 --- a/experimental/distributed/tests/connector_unit.cpp +++ b/experimental/distributed/tests/connector_unit.cpp @@ -129,9 +129,8 @@ TEST(SimpleSendTest, OneCallback) { virtual void Run() { EventStream* stream = main_.first; - stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) { - const MessageInt& msgint = dynamic_cast(msg); - ASSERT_EQ(msgint.x, 888); + stream->OnEvent([this](const MessageInt& msg, const EventStream::Subscription&) { + ASSERT_EQ(msg.x, 888); CloseConnector("main"); }); } @@ -221,16 +220,14 @@ TEST(MultipleSendTest, UnsubscribeService) { virtual void Run() { EventStream* stream = main_.first; - stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription& subscription) { - const MessageInt& msgint = dynamic_cast(msg); + stream->OnEvent([this](const MessageInt& msgint, const EventStream::Subscription& subscription) { ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); ++num_msgs_received; if (msgint.x == 66) { subscription.unsubscribe(); // receive only two of them } }); - stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription& subscription) { - const MessageChar& msgchar = dynamic_cast(msg); + stream->OnEvent([this](const MessageChar& msgchar, const EventStream::Subscription& subscription) { char c = msgchar.x; ++num_msgs_received; ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); @@ -284,22 +281,19 @@ TEST(MultipleSendTest, OnEvent) { EventStream* stream = main_.first; correct_vals = 0; - stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) { - const MessageInt& msgint = dynamic_cast(msg); + stream->OnEvent([this](const MessageInt& msgint, const EventStream::Subscription&) { ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); ++correct_vals; main_.second->Send(); }); - stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription&) { - const MessageChar& msgchar = dynamic_cast(msg); - + stream->OnEvent([this](const MessageChar& msgchar, const EventStream::Subscription&) { ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); ++correct_vals; main_.second->Send(); }); - stream->OnEvent(typeid(EndMessage), [this](const Message&, const EventStream::Subscription&) { + stream->OnEvent([this](const EndMessage&, const EventStream::Subscription&) { ASSERT_LE(correct_vals, 4); if (correct_vals == 4) { CloseConnector("main");