Better OnEvent usability

Reviewers: lion, buda, sasa.stanko

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D650
This commit is contained in:
Goran Zuzic 2017-08-09 11:58:53 +02:00
parent caff5e4066
commit 5c4c4919a2
3 changed files with 25 additions and 20 deletions

View File

@ -149,7 +149,7 @@ class Master : public Reactor {
} }
} }
stream->OnEvent(typeid(Message), [this](const Message &msg, const EventStream::Subscription& subscription) { stream->OnEvent<Message>([this](const Message &msg, const EventStream::Subscription& subscription) {
std::cout << "Processing Query via Callback" << std::endl; std::cout << "Processing Query via Callback" << std::endl;
const Query &query = const Query &query =
dynamic_cast<const Query &>(msg); // exception bad_cast dynamic_cast<const Query &>(msg); // exception bad_cast

View File

@ -118,12 +118,17 @@ class EventStream {
uint64_t cb_uid_; uint64_t cb_uid_;
}; };
typedef std::function<void(const Message&, const Subscription&)> Callback;
/** /**
* Register a callback that will be called whenever an event arrives. * Register a callback that will be called whenever an event arrives.
*/ */
virtual void OnEvent(std::type_index tidx, Callback callback) = 0; template<typename MsgType>
void OnEvent(std::function<void(const MsgType&, const Subscription&)>&& cb) {
OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message& general_msg,
const Subscription& subscription) {
const MsgType& correct_msg = dynamic_cast<const MsgType&>(general_msg);
cb(correct_msg, subscription);
});
}
/** /**
* Close this event stream, disallowing further events from getting received. * Close this event stream, disallowing further events from getting received.
@ -133,6 +138,12 @@ class EventStream {
* associated with the Reactor. * associated with the Reactor.
*/ */
virtual void Close() = 0; virtual void Close() = 0;
typedef std::function<void(const Message&, const Subscription&)> Callback;
private:
virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0;
}; };
/** /**
@ -224,9 +235,9 @@ class Connector {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedPop(); return queue_->LockedPop();
} }
void OnEvent(std::type_index tidx, Callback callback) { void OnEventHelper(std::type_index tidx, Callback callback) {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedOnEvent(tidx, callback); queue_->LockedOnEventHelper(tidx, callback);
} }
void Close(); void Close();
@ -281,7 +292,7 @@ private:
return LockedRawPop(); return LockedRawPop();
} }
void LockedOnEvent(std::type_index tidx, EventStream::Callback callback) { void LockedOnEventHelper(std::type_index tidx, EventStream::Callback callback) {
uint64_t cb_uid = next_cb_uid++; uint64_t cb_uid = next_cb_uid++;
callbacks_[tidx][cb_uid] = callback; callbacks_[tidx][cb_uid] = callback;
} }

View File

@ -129,9 +129,8 @@ TEST(SimpleSendTest, OneCallback) {
virtual void Run() { virtual void Run() {
EventStream* stream = main_.first; EventStream* stream = main_.first;
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) { stream->OnEvent<MessageInt>([this](const MessageInt& msg, const EventStream::Subscription&) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg); ASSERT_EQ(msg.x, 888);
ASSERT_EQ(msgint.x, 888);
CloseConnector("main"); CloseConnector("main");
}); });
} }
@ -221,16 +220,14 @@ TEST(MultipleSendTest, UnsubscribeService) {
virtual void Run() { virtual void Run() {
EventStream* stream = main_.first; EventStream* stream = main_.first;
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription& subscription) { stream->OnEvent<MessageInt>([this](const MessageInt& msgint, const EventStream::Subscription& subscription) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
++num_msgs_received; ++num_msgs_received;
if (msgint.x == 66) { if (msgint.x == 66) {
subscription.unsubscribe(); // receive only two of them subscription.unsubscribe(); // receive only two of them
} }
}); });
stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription& subscription) { stream->OnEvent<MessageChar>([this](const MessageChar& msgchar, const EventStream::Subscription& subscription) {
const MessageChar& msgchar = dynamic_cast<const MessageChar&>(msg);
char c = msgchar.x; char c = msgchar.x;
++num_msgs_received; ++num_msgs_received;
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
@ -284,22 +281,19 @@ TEST(MultipleSendTest, OnEvent) {
EventStream* stream = main_.first; EventStream* stream = main_.first;
correct_vals = 0; correct_vals = 0;
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) { stream->OnEvent<MessageInt>([this](const MessageInt& msgint, const EventStream::Subscription&) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
++correct_vals; ++correct_vals;
main_.second->Send<EndMessage>(); main_.second->Send<EndMessage>();
}); });
stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription&) { stream->OnEvent<MessageChar>([this](const MessageChar& msgchar, const EventStream::Subscription&) {
const MessageChar& msgchar = dynamic_cast<const MessageChar&>(msg);
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
++correct_vals; ++correct_vals;
main_.second->Send<EndMessage>(); main_.second->Send<EndMessage>();
}); });
stream->OnEvent(typeid(EndMessage), [this](const Message&, const EventStream::Subscription&) { stream->OnEvent<EndMessage>([this](const EndMessage&, const EventStream::Subscription&) {
ASSERT_LE(correct_vals, 4); ASSERT_LE(correct_vals, 4);
if (correct_vals == 4) { if (correct_vals == 4) {
CloseConnector("main"); CloseConnector("main");