From e4e61cd00a5b524fd06d4d1ea0804f4106964d36 Mon Sep 17 00:00:00 2001 From: Goran Zuzic Date: Thu, 24 Aug 2017 13:58:13 +0200 Subject: [PATCH] Expanded distributed tests Summary: 1. add subscription functionality (it can close the channel and get the name) 2. new test for ReturnAddrMsg functionality 3. new test for serialization 4. proper serialization access permissions Reviewers: sasa.stanko Reviewed By: sasa.stanko Subscribers: lion, buda Differential Revision: https://phabricator.memgraph.io/D705 --- .../distributed/src/reactors_distributed.cpp | 3 + .../distributed/src/reactors_distributed.hpp | 11 +- .../distributed/src/reactors_local.cpp | 6 +- .../distributed/src/reactors_local.hpp | 7 +- .../tests/reactors_distributed_unit.cpp | 155 +++++++++++++++++- 5 files changed, 175 insertions(+), 7 deletions(-) diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp index 6fb4bf04a..0d59f5e81 100644 --- a/experimental/distributed/src/reactors_distributed.cpp +++ b/experimental/distributed/src/reactors_distributed.cpp @@ -10,6 +10,9 @@ Network::Network() {} */ ReturnAddressMsg::ReturnAddressMsg() {} +ReturnAddressMsg::ReturnAddressMsg(std::string channel) + : ReturnAddressMsg(current_reactor_->name(), channel) {} + ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel) : address_(FLAGS_address), port_(FLAGS_port), diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp index 39fc038ff..7d122d657 100644 --- a/experimental/distributed/src/reactors_distributed.hpp +++ b/experimental/distributed/src/reactors_distributed.hpp @@ -234,7 +234,10 @@ class Network { */ class ReturnAddressMsg : public Message { public: - ReturnAddressMsg(); + /* The return address is on the current reactor, specified channel */ + ReturnAddressMsg(std::string channel); + + /* The return address is on a specified reactor/channel */ ReturnAddressMsg(std::string reactor, std::string channel); std::string Address() const; @@ -244,12 +247,16 @@ class ReturnAddressMsg : public Message { std::shared_ptr GetReturnChannelWriter() const; - template + template void serialize(Archive &ar) { ar(cereal::virtual_base_class(this), address_, port_, reactor_, channel_); } + protected: + friend class cereal::access; + ReturnAddressMsg(); // Cereal needs access to a default constructor. + private: std::string address_; uint16_t port_; diff --git a/experimental/distributed/src/reactors_local.cpp b/experimental/distributed/src/reactors_local.cpp index cc3a60919..f300d7afb 100644 --- a/experimental/distributed/src/reactors_local.cpp +++ b/experimental/distributed/src/reactors_local.cpp @@ -4,10 +4,14 @@ void EventStream::Subscription::Unsubscribe() const { event_queue_.RemoveCb(*this); } -void EventStream::Subscription::Close() const { +void EventStream::Subscription::CloseChannel() const { event_queue_.Close(); } +const std::string& EventStream::Subscription::ChannelName() const { + return event_queue_.channel_name_; +} + thread_local Reactor* current_reactor_ = nullptr; std::string Channel::LocalChannelWriter::ReactorName() { diff --git a/experimental/distributed/src/reactors_local.hpp b/experimental/distributed/src/reactors_local.hpp index 1828e7595..396a8e7b0 100644 --- a/experimental/distributed/src/reactors_local.hpp +++ b/experimental/distributed/src/reactors_local.hpp @@ -115,7 +115,12 @@ class EventStream { /** * Close the stream. Convenience method. */ - void Close() const; + void CloseChannel() const; + + /** + * Get the name of the channel the message is delivered to. + */ + const std::string& ChannelName() const; private: friend class Reactor; diff --git a/experimental/distributed/tests/reactors_distributed_unit.cpp b/experimental/distributed/tests/reactors_distributed_unit.cpp index d616eded0..1c4d5ce4b 100644 --- a/experimental/distributed/tests/reactors_distributed_unit.cpp +++ b/experimental/distributed/tests/reactors_distributed_unit.cpp @@ -15,6 +15,9 @@ #include #include +/** + * Test do the services start up without crashes. + */ TEST(SimpleTests, StartAndStopServices) { System &system = System::GetInstance(); Distributed &distributed = Distributed::GetInstance(); @@ -27,6 +30,12 @@ TEST(SimpleTests, StartAndStopServices) { distributed.StopServices(); } +/** + * Test simple message reception. + * + * Data flow: + * (1) Send an empty message from Master to Worker/main + */ TEST(SimpleTests, SendEmptyMessage) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} @@ -37,7 +46,7 @@ TEST(SimpleTests, SendEmptyMessage) { .ChainOnce([this](const ChannelResolvedMessage& msg, const Subscription& subscription) { msg.channelWriter()->Send(); - subscription.Close(); + subscription.CloseChannel(); }); CloseChannel("main"); @@ -50,8 +59,148 @@ TEST(SimpleTests, SendEmptyMessage) { virtual void Run() { main_.first->OnEventOnce() .ChainOnce([this](const Message&, const Subscription& subscription) { - // if this message isn't delivered, the main channel will never be closed - subscription.Close(); // close "main" + // if this message isn't delivered, the main channel will never be closed and we infinite loop + subscription.CloseChannel(); // close "main" + }); + } + }; + + // emulate flags like it's a multiprocess system, these may be alredy set by default + FLAGS_address = "127.0.0.1"; + FLAGS_port = 10000; + + System &system = System::GetInstance(); + Distributed &distributed = Distributed::GetInstance(); + distributed.StartServices(); + + system.Spawn("master"); + system.Spawn("worker"); + + system.AwaitShutdown(); // this must be called before StopServices + distributed.StopServices(); +} + +/** + * Test ReturnAddressMsg functionality. + * + * Data flow: + * (1) Send an empty message from Master to Worker/main + * (2) Send an empty message from Worker to Master/main + */ +TEST(SimpleTests, SendReturnAddressMessage) { + struct Master : public Reactor { + Master(std::string name) : Reactor(name) {} + + virtual void Run() { + Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") + ->OnEventOnce() + .ChainOnce([this](const ChannelResolvedMessage& msg, + const Subscription& sub) { + // send a message that will be returned to "main" + msg.channelWriter()->Send(this->name(), "main"); + // close this anonymous channel + sub.CloseChannel(); + }); + + main_.first->OnEventOnce() + .ChainOnce([this](const Message&, const Subscription& sub) { + // if this message isn't delivered, the main channel will never be closed and we infinite loop + // close the "main" channel + sub.CloseChannel(); + }); + } + }; + + struct Worker : public Reactor { + Worker(std::string name) : Reactor(name) {} + + virtual void Run() { + main_.first->OnEventOnce() + .ChainOnce([this](const ReturnAddressMsg &msg, const Subscription& sub) { + msg.GetReturnChannelWriter()->Send(); + sub.CloseChannel(); // close "main" + }); + } + }; + + // emulate flags like it's a multiprocess system, these may be alredy set by default + FLAGS_address = "127.0.0.1"; + FLAGS_port = 10000; + + System &system = System::GetInstance(); + Distributed &distributed = Distributed::GetInstance(); + distributed.StartServices(); + + system.Spawn("master"); + system.Spawn("worker"); + + system.AwaitShutdown(); // this must be called before StopServices + distributed.StopServices(); +} + +// Apparently templates cannot be declared inside local classes, figure out how to move it in? +// For that reason I obscured the name. +struct SerializableMessage_TextMessage : public ReturnAddressMsg { + SerializableMessage_TextMessage(std::string channel, std::string arg_text, int arg_val) + : ReturnAddressMsg(channel), text(arg_text), val(arg_val) {} + std::string text; + int val; + + template + void serialize(Archive &ar) { + ar(cereal::virtual_base_class(this), text, val); + } + + protected: + friend class cereal::access; + SerializableMessage_TextMessage() {} // Cereal needs access to a default constructor. +}; +CEREAL_REGISTER_TYPE(SerializableMessage_TextMessage); + +/** + * Test serializability of a complex message over the network layer. + * + * Data flow: + * (1) Send ("hi", 123) from Master to Worker/main + * (2) Send ("hi back", 779) from Worker to Master/main + */ +TEST(SimpleTests, SendSerializableMessage) { + struct Master : public Reactor { + Master(std::string name) : Reactor(name) {} + + virtual void Run() { + Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") + ->OnEventOnce() + .ChainOnce([this](const ChannelResolvedMessage& msg, + const Subscription& sub) { + // send a message that will be returned to "main" + msg.channelWriter()->Send("main", "hi", 123); + // close this anonymous channel + sub.CloseChannel(); + }); + + main_.first->OnEventOnce() + .ChainOnce([this](const SerializableMessage_TextMessage& msg, const Subscription& sub) { + ASSERT_EQ(msg.text, "hi back"); + ASSERT_EQ(msg.val, 779); + // if this message isn't delivered, the main channel will never be closed and we infinite loop + // close the "main" channel + sub.CloseChannel(); + }); + } + }; + + struct Worker : public Reactor { + Worker(std::string name) : Reactor(name) {} + + virtual void Run() { + main_.first->OnEventOnce() + .ChainOnce([this](const SerializableMessage_TextMessage &msg, const Subscription& sub) { + ASSERT_EQ(msg.text, "hi"); + ASSERT_EQ(msg.val, 123); + msg.GetReturnChannelWriter()->Send + ("no channel, dont use this", "hi back", 779); + sub.CloseChannel(); // close "main" }); } };