diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp index 6fb4bf04a..0d59f5e81 100644 --- a/experimental/distributed/src/reactors_distributed.cpp +++ b/experimental/distributed/src/reactors_distributed.cpp @@ -10,6 +10,9 @@ Network::Network() {} */ ReturnAddressMsg::ReturnAddressMsg() {} +ReturnAddressMsg::ReturnAddressMsg(std::string channel) + : ReturnAddressMsg(current_reactor_->name(), channel) {} + ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel) : address_(FLAGS_address), port_(FLAGS_port), diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp index 39fc038ff..7d122d657 100644 --- a/experimental/distributed/src/reactors_distributed.hpp +++ b/experimental/distributed/src/reactors_distributed.hpp @@ -234,7 +234,10 @@ class Network { */ class ReturnAddressMsg : public Message { public: - ReturnAddressMsg(); + /* The return address is on the current reactor, specified channel */ + ReturnAddressMsg(std::string channel); + + /* The return address is on a specified reactor/channel */ ReturnAddressMsg(std::string reactor, std::string channel); std::string Address() const; @@ -244,12 +247,16 @@ class ReturnAddressMsg : public Message { std::shared_ptr<ChannelWriter> GetReturnChannelWriter() const; - template <class Archive> + template<class Archive> void serialize(Archive &ar) { ar(cereal::virtual_base_class<Message>(this), address_, port_, reactor_, channel_); } + protected: + friend class cereal::access; + ReturnAddressMsg(); // Cereal needs access to a default constructor. + private: std::string address_; uint16_t port_; diff --git a/experimental/distributed/src/reactors_local.cpp b/experimental/distributed/src/reactors_local.cpp index cc3a60919..f300d7afb 100644 --- a/experimental/distributed/src/reactors_local.cpp +++ b/experimental/distributed/src/reactors_local.cpp @@ -4,10 +4,14 @@ void EventStream::Subscription::Unsubscribe() const { event_queue_.RemoveCb(*this); } -void EventStream::Subscription::Close() const { +void EventStream::Subscription::CloseChannel() const { event_queue_.Close(); } +const std::string& EventStream::Subscription::ChannelName() const { + return event_queue_.channel_name_; +} + thread_local Reactor* current_reactor_ = nullptr; std::string Channel::LocalChannelWriter::ReactorName() { diff --git a/experimental/distributed/src/reactors_local.hpp b/experimental/distributed/src/reactors_local.hpp index 1828e7595..396a8e7b0 100644 --- a/experimental/distributed/src/reactors_local.hpp +++ b/experimental/distributed/src/reactors_local.hpp @@ -115,7 +115,12 @@ class EventStream { /** * Close the stream. Convenience method. */ - void Close() const; + void CloseChannel() const; + + /** + * Get the name of the channel the message is delivered to. + */ + const std::string& ChannelName() const; private: friend class Reactor; diff --git a/experimental/distributed/tests/reactors_distributed_unit.cpp b/experimental/distributed/tests/reactors_distributed_unit.cpp index d616eded0..1c4d5ce4b 100644 --- a/experimental/distributed/tests/reactors_distributed_unit.cpp +++ b/experimental/distributed/tests/reactors_distributed_unit.cpp @@ -15,6 +15,9 @@ #include <vector> #include <future> +/** + * Test do the services start up without crashes. + */ TEST(SimpleTests, StartAndStopServices) { System &system = System::GetInstance(); Distributed &distributed = Distributed::GetInstance(); @@ -27,6 +30,12 @@ TEST(SimpleTests, StartAndStopServices) { distributed.StopServices(); } +/** + * Test simple message reception. + * + * Data flow: + * (1) Send an empty message from Master to Worker/main + */ TEST(SimpleTests, SendEmptyMessage) { struct Master : public Reactor { Master(std::string name) : Reactor(name) {} @@ -37,7 +46,7 @@ TEST(SimpleTests, SendEmptyMessage) { .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg, const Subscription& subscription) { msg.channelWriter()->Send<Message>(); - subscription.Close(); + subscription.CloseChannel(); }); CloseChannel("main"); @@ -50,8 +59,148 @@ TEST(SimpleTests, SendEmptyMessage) { virtual void Run() { main_.first->OnEventOnce() .ChainOnce<Message>([this](const Message&, const Subscription& subscription) { - // if this message isn't delivered, the main channel will never be closed - subscription.Close(); // close "main" + // if this message isn't delivered, the main channel will never be closed and we infinite loop + subscription.CloseChannel(); // close "main" + }); + } + }; + + // emulate flags like it's a multiprocess system, these may be alredy set by default + FLAGS_address = "127.0.0.1"; + FLAGS_port = 10000; + + System &system = System::GetInstance(); + Distributed &distributed = Distributed::GetInstance(); + distributed.StartServices(); + + system.Spawn<Master>("master"); + system.Spawn<Worker>("worker"); + + system.AwaitShutdown(); // this must be called before StopServices + distributed.StopServices(); +} + +/** + * Test ReturnAddressMsg functionality. + * + * Data flow: + * (1) Send an empty message from Master to Worker/main + * (2) Send an empty message from Worker to Master/main + */ +TEST(SimpleTests, SendReturnAddressMessage) { + struct Master : public Reactor { + Master(std::string name) : Reactor(name) {} + + virtual void Run() { + Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") + ->OnEventOnce() + .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg, + const Subscription& sub) { + // send a message that will be returned to "main" + msg.channelWriter()->Send<ReturnAddressMsg>(this->name(), "main"); + // close this anonymous channel + sub.CloseChannel(); + }); + + main_.first->OnEventOnce() + .ChainOnce<Message>([this](const Message&, const Subscription& sub) { + // if this message isn't delivered, the main channel will never be closed and we infinite loop + // close the "main" channel + sub.CloseChannel(); + }); + } + }; + + struct Worker : public Reactor { + Worker(std::string name) : Reactor(name) {} + + virtual void Run() { + main_.first->OnEventOnce() + .ChainOnce<ReturnAddressMsg>([this](const ReturnAddressMsg &msg, const Subscription& sub) { + msg.GetReturnChannelWriter()->Send<Message>(); + sub.CloseChannel(); // close "main" + }); + } + }; + + // emulate flags like it's a multiprocess system, these may be alredy set by default + FLAGS_address = "127.0.0.1"; + FLAGS_port = 10000; + + System &system = System::GetInstance(); + Distributed &distributed = Distributed::GetInstance(); + distributed.StartServices(); + + system.Spawn<Master>("master"); + system.Spawn<Worker>("worker"); + + system.AwaitShutdown(); // this must be called before StopServices + distributed.StopServices(); +} + +// Apparently templates cannot be declared inside local classes, figure out how to move it in? +// For that reason I obscured the name. +struct SerializableMessage_TextMessage : public ReturnAddressMsg { + SerializableMessage_TextMessage(std::string channel, std::string arg_text, int arg_val) + : ReturnAddressMsg(channel), text(arg_text), val(arg_val) {} + std::string text; + int val; + + template<class Archive> + void serialize(Archive &ar) { + ar(cereal::virtual_base_class<ReturnAddressMsg>(this), text, val); + } + + protected: + friend class cereal::access; + SerializableMessage_TextMessage() {} // Cereal needs access to a default constructor. +}; +CEREAL_REGISTER_TYPE(SerializableMessage_TextMessage); + +/** + * Test serializability of a complex message over the network layer. + * + * Data flow: + * (1) Send ("hi", 123) from Master to Worker/main + * (2) Send ("hi back", 779) from Worker to Master/main + */ +TEST(SimpleTests, SendSerializableMessage) { + struct Master : public Reactor { + Master(std::string name) : Reactor(name) {} + + virtual void Run() { + Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main") + ->OnEventOnce() + .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg, + const Subscription& sub) { + // send a message that will be returned to "main" + msg.channelWriter()->Send<SerializableMessage_TextMessage>("main", "hi", 123); + // close this anonymous channel + sub.CloseChannel(); + }); + + main_.first->OnEventOnce() + .ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage& msg, const Subscription& sub) { + ASSERT_EQ(msg.text, "hi back"); + ASSERT_EQ(msg.val, 779); + // if this message isn't delivered, the main channel will never be closed and we infinite loop + // close the "main" channel + sub.CloseChannel(); + }); + } + }; + + struct Worker : public Reactor { + Worker(std::string name) : Reactor(name) {} + + virtual void Run() { + main_.first->OnEventOnce() + .ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage &msg, const Subscription& sub) { + ASSERT_EQ(msg.text, "hi"); + ASSERT_EQ(msg.val, 123); + msg.GetReturnChannelWriter()->Send<SerializableMessage_TextMessage> + ("no channel, dont use this", "hi back", 779); + sub.CloseChannel(); // close "main" }); } };