From e6d3edf9a928990fc35ae8cd8f2c6860d52e6e5c Mon Sep 17 00:00:00 2001 From: Mislav Bradac Date: Fri, 27 Oct 2017 10:33:48 +0200 Subject: [PATCH] Fix distributed reactors Reviewers: buda, mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D935 --- CMakeLists.txt | 2 + src/communication/reactor/common_messages.hpp | 56 ++++ src/communication/reactor/protocol.cpp | 20 +- src/communication/reactor/protocol.hpp | 16 +- .../reactor/reactor_distributed.cpp | 5 + .../reactor/reactor_distributed.hpp | 274 ++++++++++++++++++ src/communication/reactor/reactor_local.cpp | 24 +- src/communication/reactor/reactor_local.hpp | 68 +++-- tests/unit/reactor_distributed.cpp | 151 ++++++++++ 9 files changed, 568 insertions(+), 48 deletions(-) create mode 100644 src/communication/reactor/common_messages.hpp create mode 100644 src/communication/reactor/reactor_distributed.cpp create mode 100644 src/communication/reactor/reactor_distributed.hpp create mode 100644 tests/unit/reactor_distributed.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index cb93b97ef..1b7b1c6bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -186,7 +186,9 @@ target_link_libraries(antlr_opencypher_parser_lib antlr4) set(memgraph_src_files ${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp ${src_dir}/communication/bolt/v1/session.cpp + ${src_dir}/communication/reactor/protocol.cpp ${src_dir}/communication/reactor/reactor_local.cpp + ${src_dir}/communication/reactor/reactor_distributed.cpp ${src_dir}/data_structures/concurrent/skiplist_gc.cpp ${src_dir}/database/dbms.cpp ${src_dir}/database/graph_db.cpp diff --git a/src/communication/reactor/common_messages.hpp b/src/communication/reactor/common_messages.hpp new file mode 100644 index 000000000..ff32f1506 --- /dev/null +++ b/src/communication/reactor/common_messages.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include "communication/reactor/reactor_local.hpp" + +// TODO: Which of these I need to include. +#include "cereal/archives/binary.hpp" +#include "cereal/types/base_class.hpp" +#include "cereal/types/memory.hpp" +#include "cereal/types/polymorphic.hpp" +#include "cereal/types/string.hpp" +#include "cereal/types/utility.hpp" +#include "cereal/types/vector.hpp" + +DECLARE_string(reactor_address); +DECLARE_int32(reactor_port); + +namespace communication::reactor { + +/** + * Message that includes the channel on which response is expected; + */ +class ReturnAddressMessage : public Message { + public: + ReturnAddressMessage(std::string reactor, std::string channel) + : address_(FLAGS_reactor_address), + port_(FLAGS_reactor_port), + reactor_(reactor), + channel_(channel) {} + + const std::string &address() const { return address_; } + uint16_t port() const { return port_; } + const std::string &reactor_name() const { return reactor_; } + const std::string &channel_name() const { return channel_; } + + template + void serialize(Archive &ar) { + ar(cereal::virtual_base_class(this), address_, port_, reactor_, + channel_); + } + + auto FindChannel(ChannelFinder &finder) const { + return finder.FindChannel(address_, port_, reactor_, channel_); + } + + protected: + friend class cereal::access; + ReturnAddressMessage() {} // Cereal needs access to a default constructor. + + // Good luck these being const using cereal... + std::string address_; + uint16_t port_; + std::string reactor_; + std::string channel_; +}; +} +CEREAL_REGISTER_TYPE(communication::reactor::ReturnAddressMessage); diff --git a/src/communication/reactor/protocol.cpp b/src/communication/reactor/protocol.cpp index b68523ae3..6e9d2f332 100644 --- a/src/communication/reactor/protocol.cpp +++ b/src/communication/reactor/protocol.cpp @@ -1,13 +1,14 @@ #include -#include "protocol.hpp" -#include "reactors_distributed.hpp" +#include "communication/reactor/protocol.hpp" +#include "communication/reactor/reactor_distributed.hpp" #include "glog/logging.h" -namespace protocol { +namespace communication::reactor { -Session::Session(Socket &&socket, Data &) : socket_(std::move(socket)) { +Session::Session(Socket &&socket, SessionData &data) + : socket_(std::move(socket)), system_(data.system) { event_.data.ptr = this; } @@ -43,7 +44,7 @@ void Session::Execute() { DLOG(INFO) << "Reactor: " << reactor_ << "; Channel: " << channel_ << std::endl; - auto channel = System::GetInstance().FindChannel(reactor_, channel_); + auto channel = system_.FindChannel(reactor_, channel_); SendSuccess(channel != nullptr); handshake_done_ = true; @@ -59,12 +60,12 @@ void Session::Execute() { // TODO: check for exceptions std::istringstream stream; stream.str(std::string(reinterpret_cast(buffer_.data()), len_data)); - cereal::BinaryInputArchive iarchive{stream}; + ::cereal::BinaryInputArchive iarchive{stream}; std::unique_ptr message{nullptr}; iarchive(message); buffer_.Shift(len_data); - auto channel = System::GetInstance().FindChannel(reactor_, channel_); + auto channel = system_.FindChannel(reactor_, channel_); if (channel == nullptr) { SendSuccess(false); return; @@ -145,13 +146,13 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor, } bool success = GetSuccess(socket); - if (message == nullptr or !success) { + if (message == nullptr || !success) { return success; } // Serialize and send message std::ostringstream stream; - cereal::BinaryOutputArchive oarchive(stream); + ::cereal::BinaryOutputArchive oarchive(stream); oarchive(message); const std::string &buffer = stream.str(); @@ -164,6 +165,7 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor, return false; } + // TODO: send message is blocking because of this. This is potential problem. return GetSuccess(socket); } } diff --git a/src/communication/reactor/protocol.hpp b/src/communication/reactor/protocol.hpp index 773acb238..c333be291 100644 --- a/src/communication/reactor/protocol.hpp +++ b/src/communication/reactor/protocol.hpp @@ -3,13 +3,12 @@ #include #include "communication/bolt/v1/decoder/buffer.hpp" +#include "communication/reactor/reactor_local.hpp" #include "io/network/epoll.hpp" #include "io/network/network_endpoint.hpp" #include "io/network/socket.hpp" #include "io/network/stream_buffer.hpp" -class Message; - /** * @brief Protocol * @@ -42,14 +41,16 @@ class Message; * Currently the server is implemented to handle more than one message after * the initial handshake, but the client can only send one message. */ -namespace protocol { +namespace communication::reactor { + +class Message; using Endpoint = io::network::NetworkEndpoint; using Socket = io::network::Socket; using StreamBuffer = io::network::StreamBuffer; // this buffer should be larger than the largest serialized message -using Buffer = communication::bolt::Buffer<262144>; +using Buffer = bolt::Buffer<262144>; using SizeT = uint16_t; /** @@ -57,8 +58,8 @@ using SizeT = uint16_t; * * This typically holds living data shared by all sessions. Currently empty. */ -struct Data { - // empty +struct SessionData { + System system; }; /** @@ -71,7 +72,7 @@ struct Data { class Session { private: public: - Session(Socket &&socket, Data &data); + Session(Socket &&socket, SessionData &data); int Id() const { return socket_.fd(); } @@ -112,6 +113,7 @@ class Session { io::network::Epoll::Event event_; Socket socket_; + System &system_; std::chrono::time_point last_event_time_; diff --git a/src/communication/reactor/reactor_distributed.cpp b/src/communication/reactor/reactor_distributed.cpp new file mode 100644 index 000000000..46e1e1d8f --- /dev/null +++ b/src/communication/reactor/reactor_distributed.cpp @@ -0,0 +1,5 @@ +#include "communication/reactor/reactor_distributed.hpp" + +// reactor adress can't be 0.0.0.0. +DEFINE_string(reactor_address, "127.0.0.1", "Network server bind address"); +DEFINE_int32(reactor_port, 10000, "Network server bind port"); diff --git a/src/communication/reactor/reactor_distributed.hpp b/src/communication/reactor/reactor_distributed.hpp new file mode 100644 index 000000000..aec240a3f --- /dev/null +++ b/src/communication/reactor/reactor_distributed.hpp @@ -0,0 +1,274 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "communication/reactor/reactor_local.hpp" +#include "protocol.hpp" + +#include "cereal/archives/binary.hpp" +#include "cereal/types/base_class.hpp" +#include "cereal/types/memory.hpp" +#include "cereal/types/polymorphic.hpp" +#include "cereal/types/string.hpp" +#include "cereal/types/utility.hpp" +#include "cereal/types/vector.hpp" + +#include "communication/server.hpp" +#include "threading/sync/spinlock.hpp" + +DECLARE_string(reactor_address); +DECLARE_int32(reactor_port); + +namespace communication::reactor { + +class DistributedSystem; + +/** + * Networking service. + */ +class Network { + private: + using Endpoint = io::network::NetworkEndpoint; + using Socket = Socket; + using ServerT = communication::Server; + friend class DistributedSystem; + + struct NetworkMessage { + NetworkMessage() {} + + NetworkMessage(const std::string &address, uint16_t port, + const std::string &reactor, const std::string &channel, + std::unique_ptr message) + : address(address), + port(port), + reactor(reactor), + channel(channel), + message(std::move(message)) {} + + NetworkMessage(NetworkMessage &&nm) = default; + NetworkMessage &operator=(NetworkMessage &&nm) = default; + + std::string address; + uint16_t port = 0; + std::string reactor; + std::string channel; + std::unique_ptr message; + }; + + public: + Network() = default; + + // client functions + + std::shared_ptr Resolve(std::string address, uint16_t port, + std::string reactor_name, + std::string channel_name) { + if (SendMessage(address, port, reactor_name, channel_name, nullptr)) { + return std::make_shared(this, address, port, + reactor_name, channel_name); + } + LOG(WARNING) << "Could not resolve " << address << ":" << port << " " + << reactor_name << "/" << channel_name; + return nullptr; + } + + /** Start a threadpool that dispatches the messages from the (outgoing) queue + * to the sockets */ + void StartClient(int worker_count) { + LOG(INFO) << "Starting " << worker_count << " client workers"; + client_run_ = true; + + for (int i = 0; i < worker_count; ++i) { + pool_.push_back(std::thread([worker_count, this]() { + while (this->client_run_) { + this->mutex_.lock(); + if (!this->queue_.empty()) { + NetworkMessage nm(std::move(this->queue_.front())); + this->queue_.pop(); + this->mutex_.unlock(); + // TODO: store success + bool success = SendMessage(nm.address, nm.port, nm.reactor, + nm.channel, std::move(nm.message)); + DLOG(INFO) << "Network client message send status: " << success + << std::endl; + } else { + this->mutex_.unlock(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + })); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } + + void StopClient() { + while (true) { + std::lock_guard lock(mutex_); + if (queue_.empty()) { + break; + } + } + client_run_ = false; + for (size_t i = 0; i < pool_.size(); ++i) { + pool_[i].join(); + } + pool_.clear(); + } + + class RemoteChannelWriter : public ChannelWriter { + public: + RemoteChannelWriter(Network *network, std::string address, uint16_t port, + std::string reactor, std::string channel) + : network_(network), + address_(address), + port_(port), + reactor_(reactor), + channel_(channel) {} + + virtual std::string Address() { return address_; } + + virtual uint16_t Port() { return port_; } + + std::string ReactorName() const override { return reactor_; } + + std::string Name() const override { return channel_; } + + void Send(std::unique_ptr message) override { + std::lock_guard lock(network_->mutex_); + network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, + std::move(message))); + } + + private: + Network *network_; + std::string address_; + uint16_t port_; + std::string reactor_; + std::string channel_; + }; + + // server functions + + std::string address() const { return FLAGS_reactor_address; } + + uint16_t port() const { return FLAGS_reactor_port; } + + /** Start a threadpool that relays the messages from the sockets to the + * LocalEventStreams */ + void StartServer(int workers_count) { + if (server_ != nullptr) { + LOG(FATAL) << "Tried to start a running server!"; + } + + // Initialize endpoint. + Endpoint endpoint; + try { + endpoint = Endpoint(FLAGS_reactor_address.c_str(), FLAGS_reactor_port); + } catch (io::network::NetworkEndpointException &e) { + LOG(FATAL) << e.what(); + } + // Initialize server + server_ = std::make_unique(endpoint, protocol_data_); + + // Start server + thread_ = std::thread( + [workers_count, this]() { this->server_->Start(workers_count); }); + } + + void StopServer() { + if (server_ != nullptr) { + server_->Shutdown(); + thread_.join(); + server_ = nullptr; + } + } + + private: + // client variables + SpinLock mutex_; + std::vector pool_; + std::queue queue_; + std::atomic client_run_; + + // server variables + std::thread thread_; + SessionData protocol_data_; + std::unique_ptr server_{nullptr}; +}; + +/** + * Placeholder for all functionality related to non-local communication. + * E.g. resolve remote channels by memgraph node id, etc. + */ +class DistributedSystem : public ChannelFinder { + public: + DistributedSystem() { + network_.StartClient(4); + network_.StartServer(4); + } + + // Thread safe. + void Spawn(const std::string &name, std::function setup) { + system_.Spawn(name, setup, this); + } + + // Non-thread safe. + // TODO: figure out what should be intereaction of this function and + // destructor. + void StopServices() { + system_.AwaitShutdown(); + network_.StopClient(); + network_.StopServer(); + } + + std::shared_ptr FindChannel( + const std::string &reactor_name, + const std::string &channel_name) override { + return system_.FindChannel(reactor_name, channel_name); + } + + /** + * Resolves remote channel synchronously. + * + * @return EventStream on which message will arrive once channel is resolved. + * @warning It can only be called from local Reactor. + */ + std::shared_ptr FindChannel( + const std::string &address, uint16_t port, + const std::string &reactor_name, + const std::string &channel_name) override { + // Yeah... Unneeded shared ptr... once again. We love that. + std::shared_ptr channel_writer = nullptr; + // TODO: Check if this is actually local channel. + while (!(channel_writer = + network_.Resolve(address, port, reactor_name, channel_name))) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + return channel_writer; + } + + Network &network() { return network_; } + const Network &network() const { return network_; } + + private: + Network network_; + System &system_ = network_.protocol_data_.system; + + DistributedSystem(const DistributedSystem &) = delete; + DistributedSystem(DistributedSystem &&) = delete; + DistributedSystem &operator=(const DistributedSystem &) = delete; + DistributedSystem &operator=(DistributedSystem &&) = delete; +}; +} diff --git a/src/communication/reactor/reactor_local.cpp b/src/communication/reactor/reactor_local.cpp index 7d5649e14..f33486718 100644 --- a/src/communication/reactor/reactor_local.cpp +++ b/src/communication/reactor/reactor_local.cpp @@ -4,8 +4,6 @@ namespace communication::reactor { -thread_local Reactor *current_reactor_ = nullptr; - void EventStream::Subscription::Unsubscribe() const { event_queue_.RemoveCallback(*this); } @@ -26,7 +24,7 @@ void Channel::Close() { // TODO(zuza): there will be major problems if a reactor tries to close a // stream that isn't theirs luckily this should never happen if the framework // is used as expected. - current_reactor_->CloseChannel(channel_name_); + reactor_.CloseChannel(channel_name_); } std::pair> Reactor::Open( @@ -36,11 +34,11 @@ std::pair> Reactor::Open( throw utils::BasicException("Channel with name " + channel_name + "already exists"); } - auto it = - channels_ - .emplace(channel_name, std::make_shared(Channel::Params{ - name_, channel_name, mutex_, cvar_})) - .first; + auto it = channels_ + .emplace(channel_name, + std::make_shared(Channel::Params{ + name_, channel_name, mutex_, cvar_, *this})) + .first; it->second->self_ptr_ = it->second; return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } @@ -51,11 +49,11 @@ std::pair> Reactor::Open() { std::string channel_name = "stream-" + std::to_string(channel_name_counter_++); if (channels_.count(channel_name) == 0) { - auto it = - channels_ - .emplace(channel_name, std::make_shared(Channel::Params{ - name_, channel_name, mutex_, cvar_})) - .first; + auto it = channels_ + .emplace(channel_name, + std::make_shared(Channel::Params{ + name_, channel_name, mutex_, cvar_, *this})) + .first; it->second->self_ptr_ = it->second; return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); } diff --git a/src/communication/reactor/reactor_local.hpp b/src/communication/reactor/reactor_local.hpp index d96101cba..36138c4b9 100644 --- a/src/communication/reactor/reactor_local.hpp +++ b/src/communication/reactor/reactor_local.hpp @@ -18,8 +18,6 @@ class Reactor; class System; class Channel; -extern thread_local Reactor *current_reactor_; - /** * Base class for messages. */ @@ -63,11 +61,20 @@ class ChannelWriter { virtual std::string ReactorName() const = 0; virtual std::string Name() const = 0; +}; - template - void serialize(Archive &archive) { - archive(ReactorName(), Name()); - } +class ChannelFinder { + public: + virtual ~ChannelFinder() {} + + // Find local channel. + virtual std::shared_ptr FindChannel( + const std::string &reactor_name, const std::string &channel_name) = 0; + + // Find remote channel. + virtual std::shared_ptr FindChannel( + const std::string &address, uint16_t port, + const std::string &reactor_name, const std::string &channel_name) = 0; }; /** @@ -270,7 +277,8 @@ class Channel { reactor_name_(params.reactor_name), mutex_(params.mutex), cvar_(params.cvar), - stream_(mutex_, this) {} + stream_(mutex_, this), + reactor_(params.reactor) {} /** * LocalChannelWriter represents the channels to reactors living in the same @@ -361,6 +369,7 @@ class Channel { std::string channel_name; std::shared_ptr mutex; std::shared_ptr cvar; + Reactor &reactor; }; void Push(std::unique_ptr m) { @@ -410,6 +419,7 @@ class Channel { // dctor, so must be recursive. std::shared_ptr mutex_; std::shared_ptr cvar_; + /** * A weak_ptr to itself. * @@ -417,6 +427,7 @@ class Channel { */ std::weak_ptr self_ptr_; LocalEventStream stream_; + Reactor &reactor_; std::unordered_map> callbacks_; @@ -432,7 +443,7 @@ class Channel { class Reactor { friend class System; - Reactor(System &system, std::string name, + Reactor(ChannelFinder &system, std::string name, std::function setup) : system_(system), name_(name), setup_(setup), main_(Open("main")) {} @@ -461,7 +472,7 @@ class Reactor { Reactor &operator=(const Reactor &other) = delete; Reactor &operator=(Reactor &&other) = default; - System &system_; + ChannelFinder &system_; std::string name_; std::function setup_; @@ -479,6 +490,8 @@ class Reactor { */ std::unordered_map> channels_; int64_t channel_name_counter_ = 0; + // I don't understand why ChannelWriter is shared. ChannelWriter is just + // endpoint that could be copied to every user. std::pair> main_; private: @@ -501,18 +514,21 @@ class Reactor { * Placeholder for all reactors. * Make sure object of this class outlives all Reactors created by it. */ -class System { +class System : public ChannelFinder { public: friend class Reactor; System() = default; - void Spawn(const std::string &name, std::function setup) { + void Spawn(const std::string &name, std::function setup, + ChannelFinder *finder = nullptr) { + if (!finder) { + finder = this; + } std::unique_lock lock(mutex_); - std::unique_ptr reactor(new Reactor(*this, name, setup)); - std::thread reactor_thread([ this, raw_reactor = reactor.get() ] { - current_reactor_ = raw_reactor; - raw_reactor->setup_(*raw_reactor); - raw_reactor->RunEventLoop(); + std::unique_ptr reactor(new Reactor(*finder, name, setup)); + std::thread reactor_thread([reactor = reactor.get()] { + reactor->setup_(*reactor); + reactor->RunEventLoop(); }); auto got = reactors_.emplace( name, std::pair{ @@ -520,16 +536,28 @@ class System { CHECK(got.second) << "Reactor with name: '" << name << "' already exists"; } - const std::shared_ptr FindChannel( - const std::string &reactor_name, const std::string &channel_name) { + std::shared_ptr FindChannel( + const std::string &reactor_name, + const std::string &channel_name) override { std::unique_lock lock(mutex_); auto it_reactor = reactors_.find(reactor_name); if (it_reactor == reactors_.end()) return nullptr; return it_reactor->second.first->FindChannel(channel_name); } + std::shared_ptr FindChannel(const std::string &, uint16_t, + const std::string &, + const std::string &) override { + // TODO: This is awful design, but at this point I just want to make + // reactors work. We should templatize Reactor by system instead of dealing + // with interfaces then System would spawn Reactor and + // DistributedSystem would spawn Reactor. + LOG(FATAL) << "Tried to resolve remote channel in local System"; + } + // TODO: Think about interaction with destructor. Should we call this in - // destructor, complain in destructor if there are alive threads or stop them + // destructor, complain in destructor if there are alive threads or stop + // them // in some way. void AwaitShutdown() { for (auto &key_value : reactors_) { @@ -550,4 +578,6 @@ class System { std::pair, std::thread>> reactors_; }; + +using Subscription = Channel::LocalEventStream::Subscription; } diff --git a/tests/unit/reactor_distributed.cpp b/tests/unit/reactor_distributed.cpp new file mode 100644 index 000000000..292e09b41 --- /dev/null +++ b/tests/unit/reactor_distributed.cpp @@ -0,0 +1,151 @@ +/** + * This test file test the Distributed Reactors API on ONLY one process (no real + * networking). + * In other words, we send a message from one process to itself. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "communication/reactor/common_messages.hpp" +#include "communication/reactor/reactor_distributed.hpp" +#include "gtest/gtest.h" + +using namespace communication::reactor; + +struct MessageInt : public Message { + MessageInt() {} // cereal needs this + MessageInt(int x) : x(x) {} + int x; + + template + void serialize(Archive &ar) { + ar(cereal::virtual_base_class(this), x); + } +}; +CEREAL_REGISTER_TYPE(MessageInt); + +struct RequestMessage : public ReturnAddressMessage { + RequestMessage() {} + RequestMessage(std::string reactor, std::string channel, int x) + : ReturnAddressMessage(reactor, channel), x(x){}; + + template + void serialize(Archive &ar) { + ar(cereal::virtual_base_class(this), x); + } + + friend class cereal::access; + int x; +}; +CEREAL_REGISTER_TYPE(RequestMessage); + +/** + * Test do the services start up without crashes. + */ +TEST(SimpleTests, StartAndStopServices) { + DistributedSystem system; + // do nothing + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + system.StopServices(); +} + +/** + * Test simple message reception. + * + * Data flow: + * (1) Send an empty message from Master to Worker/main + */ +TEST(SimpleTests, SendEmptyMessage) { + DistributedSystem system; + + system.Spawn("master", [](Reactor &r) { + auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main"); + writer->Send(); + r.CloseChannel("main"); + }); + + system.Spawn("worker", [](Reactor &r) { + r.main_.first->OnEventOnce().ChainOnce( + [&](const Message &, const Subscription &subscription) { + // if this message isn't delivered, the main channel will never be + // closed and we infinite loop + subscription.CloseChannel(); // close "main" + }); + }); + + system.StopServices(); +} + +/** + * Test ReturnAddressMsg functionality. + * + * Data flow: + * (1) Send an empty message from Master to Worker/main + * (2) Send an empty message from Worker to Master/main + */ +TEST(SimpleTests, SendReturnAddressMessage) { + DistributedSystem system; + + system.Spawn("master", [](Reactor &r) { + auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main"); + writer->Send(r.name(), "main"); + r.main_.first->OnEvent( + [&](const MessageInt &message, const Subscription &) { + EXPECT_EQ(message.x, 5); + r.CloseChannel("main"); + }); + }); + system.Spawn("worker", [](Reactor &r) { + r.main_.first->OnEvent( + [&](const ReturnAddressMessage &message, const Subscription &) { + message.FindChannel(r.system_)->Send(5); + r.CloseChannel("main"); + }); + }); + + system.StopServices(); +} + +/** + * Test serializability of a complex message over the network layer. + * + * Data flow: + * (1) Send ("hi", 123) from Master to Worker/main + * (2) Send ("hi back", 779) from Worker to Master/main + */ +TEST(SimpleTests, SendSerializableMessage) { + DistributedSystem system; + + system.Spawn("master", [](Reactor &r) { + auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main"); + writer->Send(r.name(), "main", 123); + r.main_.first->OnEvent( + [&](const MessageInt &message, const Subscription &) { + ASSERT_EQ(message.x, 779); + r.CloseChannel("main"); + }); + }); + + system.Spawn("worker", [](Reactor &r) { + r.main_.first->OnEvent( + [&](const RequestMessage &message, const Subscription &) { + ASSERT_EQ(message.x, 123); + message.FindChannel(r.system_)->Send(779); + r.CloseChannel("main"); + }); + }); + + system.StopServices(); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}