diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index 51fe74861..4e9a17c45 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -149,7 +149,7 @@ class Master : public Reactor { } } - stream->OnEvent(typeid(Message), [this](const Message &msg, EventStream::Subscription& subscription) { + stream->OnEvent(typeid(Message), [this](const Message &msg, const EventStream::Subscription& subscription) { std::cout << "Processing Query via Callback" << std::endl; const Query &query = dynamic_cast(msg); // exception bad_cast @@ -178,12 +178,12 @@ class Master : public Reactor { auto create_node_txn = std::make_unique("master", "main", xid); - channels_[worker_id]->Send(typeid(nullptr), std::move(create_node_txn)); + channels_[worker_id]->SendHelper(typeid(nullptr), std::move(create_node_txn)); auto m = stream->AwaitEvent(); if (CommitRequest *req = dynamic_cast(m.get())) { - req->GetChannelToSender(system_)->Send(typeid(nullptr), std::make_unique()); + req->GetChannelToSender(system_)->Send(); } else if (AbortRequest *req = dynamic_cast(m.get())) { - req->GetChannelToSender(system_)->Send(typeid(nullptr), std::make_unique()); + req->GetChannelToSender(system_)->Send(); } else { std::cerr << "unknown message\n"; exit(1); @@ -197,8 +197,7 @@ class Master : public Reactor { auto connector = Open(txn_channel_name); auto stream = connector.first; for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) - channels_[w_id]->Send(typeid(nullptr), - std::make_unique("master", "main", xid)); + channels_[w_id]->Send("master", "main", xid); std::vector> txn_channels; txn_channels.resize(NUM_WORKERS, nullptr); @@ -219,10 +218,10 @@ class Master : public Reactor { if (commit) { for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) - txn_channels[w_id]->Send(typeid(nullptr), std::make_unique()); + txn_channels[w_id]->Send(); } else { for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) - txn_channels[w_id]->Send(typeid(nullptr), std::make_unique()); + txn_channels[w_id]->Send(); } int64_t count = 0; @@ -307,7 +306,7 @@ class Worker : public Reactor { auto stream = connector.first; auto masterChannel = txn->GetChannelToSender(system_); // TODO: Do the actual commit. - masterChannel->Send(typeid(nullptr), + masterChannel->SendHelper(typeid(nullptr), std::make_unique("master", "main", worker_id_)); auto m = stream->AwaitEvent(); if (dynamic_cast(m.get())) { @@ -329,11 +328,11 @@ class Worker : public Reactor { // TODO: Fix this hack -- use the storage. int num = 123; - masterChannel->Send(typeid(nullptr), + masterChannel->SendHelper(typeid(nullptr), std::make_unique("master", "main", worker_id_)); auto m = stream->AwaitEvent(); if (dynamic_cast(m.get())) { - masterChannel->Send(typeid(nullptr), std::make_unique(num)); + masterChannel->SendHelper(typeid(nullptr), std::make_unique(num)); } else if (dynamic_cast(m.get())) { // send nothing } else { @@ -371,9 +370,9 @@ void ClientMain(System *system) { std::getline(std::cin, s); if (s == "quit") { active = false; - channel->Send(typeid(nullptr), std::make_unique()); + channel->SendHelper(typeid(nullptr), std::make_unique()); } else { - channel->Send(typeid(nullptr), std::make_unique(s)); + channel->SendHelper(typeid(nullptr), std::make_unique(s)); } } } diff --git a/experimental/distributed/src/communication.cpp b/experimental/distributed/src/communication.cpp index 05ae9a21e..761a069c7 100644 --- a/experimental/distributed/src/communication.cpp +++ b/experimental/distributed/src/communication.cpp @@ -3,7 +3,7 @@ DEFINE_string(address, "127.0.0.1", "Network server bind address"); DEFINE_int32(port, 10000, "Network server bind port"); -void EventStream::Subscription::unsubscribe() { +void EventStream::Subscription::unsubscribe() const { event_queue_.RemoveCb(*this); } diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index 1051c98f8..56f0b7391 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -11,8 +11,9 @@ #include #include #include -#include #include +#include +#include #include @@ -45,7 +46,18 @@ extern thread_local Reactor* current_reactor_; */ class Channel { public: - virtual void Send(const std::type_index&, std::unique_ptr) = 0; + /** + * Construct and send the message to the channel. + */ + template + void Send(Args&&... args) { + SendHelper(typeid(MsgType), std::unique_ptr(new MsgType(std::forward(args)...))); + } + + template + void Send(std::unique_ptr&& msg_ptr) { + SendHelper(typeid(MsgType), std::move(msg_ptr)); + } virtual std::string Address() = 0; @@ -61,6 +73,8 @@ class Channel { void serialize(Archive &archive) { archive(Address(), Port(), ReactorName(), Name()); } + + virtual void SendHelper(const std::type_index&, std::unique_ptr) = 0; }; /** @@ -90,7 +104,7 @@ class EventStream { /** * Unsubscribe. Call only once. */ - void unsubscribe(); + void unsubscribe() const; private: friend class Reactor; @@ -104,7 +118,7 @@ class EventStream { uint64_t cb_uid_; }; - typedef std::function Callback; + typedef std::function Callback; /** * Register a callback that will be called whenever an event arrives. @@ -166,7 +180,7 @@ class Connector { weak_queue_(queue), system_(system) {} - virtual void Send(const std::type_index& tidx, std::unique_ptr m) { + virtual void SendHelper(const std::type_index& tidx, std::unique_ptr m) { std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. if (queue_) { // We guarantee here that the Connector is not destroyed. @@ -494,7 +508,7 @@ class Network { virtual std::string Name() { return channel_; } - virtual void Send(const std::type_index &tidx, std::unique_ptr message) { + virtual void SendHelper(const std::type_index &tidx, std::unique_ptr message) { network_->mutex_.lock(); network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, std::move(message))); diff --git a/experimental/distributed/src/protocol.cpp b/experimental/distributed/src/protocol.cpp index dc38caff0..72b3bd69e 100644 --- a/experimental/distributed/src/protocol.cpp +++ b/experimental/distributed/src/protocol.cpp @@ -64,7 +64,7 @@ void Session::Execute() { return; } - channel->Send(typeid(nullptr), std::move(message)); + channel->SendHelper(typeid(nullptr), std::move(message)); SendSuccess(true); } diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/connector_unit.cpp index 09a147135..3144f5fe3 100644 --- a/experimental/distributed/tests/connector_unit.cpp +++ b/experimental/distributed/tests/connector_unit.cpp @@ -48,8 +48,8 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { virtual void Run() { std::shared_ptr channel; while (!(channel = system_->FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); } }; @@ -59,8 +59,8 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { virtual void Run() { std::shared_ptr channel; while (!(channel = system_->FindChannel("master", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); CloseConnector("main"); } }; @@ -83,8 +83,8 @@ TEST(SimpleSendTest, OneSimpleSend) { virtual void Run() { std::shared_ptr channel; while (!(channel = system_->FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); - channel->Send(typeid(nullptr), std::make_unique(123)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send(123); CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. } }; @@ -107,6 +107,42 @@ TEST(SimpleSendTest, OneSimpleSend) { system.AwaitShutdown(); } +TEST(SimpleSendTest, OneCallback) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + + struct Master : public Reactor { + Master(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send(888); + CloseConnector("main"); + } + }; + + struct Worker : public Reactor { + Worker(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + EventStream* stream = main_.first; + + stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) { + const MessageInt& msgint = dynamic_cast(msg); + ASSERT_EQ(msgint.x, 888); + CloseConnector("main"); + }); + } + }; + + System system; + system.Spawn("master"); + system.Spawn("worker"); + system.AwaitShutdown(); +} + TEST(SimpleSendTest, IgnoreAfterClose) { struct MessageInt : public Message { @@ -119,12 +155,12 @@ TEST(SimpleSendTest, IgnoreAfterClose) { virtual void Run() { std::shared_ptr channel; while (!(channel = system_->FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); - channel->Send(typeid(nullptr), std::make_unique(101)); - channel->Send(typeid(nullptr), std::make_unique(102)); - std::this_thread::sleep_for(std::chrono::seconds(1)); - channel->Send(typeid(nullptr), std::make_unique(103)); // these ones should be ignored - channel->Send(typeid(nullptr), std::make_unique(104)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send(101); + channel->Send(102); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send(103); // should be ignored + channel->Send(104); // should be ignored CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. } }; @@ -148,7 +184,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) { } -TEST(SimpleSendTest, OnEvent) { +TEST(MultipleSendTest, UnsubscribeService) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; @@ -158,19 +194,83 @@ TEST(SimpleSendTest, OnEvent) { char x; }; + struct Master : public Reactor { + Master(System *system, std::string name) : Reactor(system, name) {} + virtual void Run() { + std::shared_ptr channel; + while (!(channel = system_->FindChannel("worker", "main"))) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send(55); + channel->Send(66); + channel->Send(77); + channel->Send(88); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + channel->Send('a'); + channel->Send('b'); + channel->Send('c'); + channel->Send('d'); + CloseConnector("main"); + } + }; + + struct Worker : public Reactor { + Worker(System *system, std::string name) : Reactor(system, name) {} + + int num_msgs_received = 0; + + virtual void Run() { + EventStream* stream = main_.first; + + stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription& subscription) { + const MessageInt& msgint = dynamic_cast(msg); + ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); + ++num_msgs_received; + if (msgint.x == 66) { + subscription.unsubscribe(); // receive only two of them + } + }); + stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription& subscription) { + const MessageChar& msgchar = dynamic_cast(msg); + char c = msgchar.x; + ++num_msgs_received; + ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); + if (num_msgs_received == 5) { + subscription.unsubscribe(); + CloseConnector("main"); + } + }); + } + }; + + System system; + system.Spawn("master"); + system.Spawn("worker"); + system.AwaitShutdown(); +} + + +TEST(MultipleSendTest, OnEvent) { + struct MessageInt : public Message { + MessageInt(int xx) : x(xx) {} + int x; + }; + struct MessageChar : public Message { + MessageChar(char xx) : x(xx) {} + char x; + }; struct Master : public Reactor { Master(System *system, std::string name) : Reactor(system, name) {} virtual void Run() { std::shared_ptr channel; while (!(channel = system_->FindChannel("worker", "main"))) - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); - channel->Send(typeid(MessageInt), std::make_unique(101)); - channel->Send(typeid(MessageChar), std::make_unique('a')); - channel->Send(typeid(MessageInt), std::make_unique(103)); // these ones should be ignored - channel->Send(typeid(MessageChar), std::make_unique('b')); - CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. + channel->Send(101); + channel->Send('a'); + channel->Send(103); + channel->Send('b'); + CloseConnector("main"); } }; @@ -184,24 +284,22 @@ TEST(SimpleSendTest, OnEvent) { EventStream* stream = main_.first; correct_vals = 0; - stream->OnEvent(typeid(MessageInt), [this](const Message& msg, EventStream::Subscription& subscription) { + stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) { const MessageInt& msgint = dynamic_cast(msg); - std::cout << "msg has int " << msgint.x << std::endl; ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); ++correct_vals; - main_.second->Send(typeid(EndMessage), std::make_unique()); + main_.second->Send(); }); - stream->OnEvent(typeid(MessageChar), [this](const Message& msg, EventStream::Subscription& subscription) { + stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription&) { const MessageChar& msgchar = dynamic_cast(msg); - std::cout << "msg has char " << msgchar.x << std::endl; ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); ++correct_vals; - main_.second->Send(typeid(EndMessage), std::make_unique()); + main_.second->Send(); }); - stream->OnEvent(typeid(EndMessage), [this](const Message&, EventStream::Subscription&) { + stream->OnEvent(typeid(EndMessage), [this](const Message&, const EventStream::Subscription&) { ASSERT_LE(correct_vals, 4); if (correct_vals == 4) { CloseConnector("main"); diff --git a/experimental/distributed/tests/network_chat.cpp b/experimental/distributed/tests/network_chat.cpp index b2e8b7fa6..8933b5f04 100644 --- a/experimental/distributed/tests/network_chat.cpp +++ b/experimental/distributed/tests/network_chat.cpp @@ -54,7 +54,7 @@ class ChatServer : public Reactor { << std::endl; auto channel = msg->GetChannelToSender(system_); if (channel != nullptr) { - channel->Send(typeid(nullptr), + channel->SendHelper(typeid(nullptr), std::make_unique("server", "chat", msg->Message())); } } else { @@ -81,7 +81,7 @@ class ChatClient : public Reactor { auto channel = system_->network().Resolve(address, port, "server", "chat"); if (channel != nullptr) { - channel->Send(typeid(nullptr), std::make_unique("server", "chat", message)); + channel->SendHelper(typeid(nullptr), std::make_unique("server", "chat", message)); } else { std::cerr << "Couldn't resolve that server!" << std::endl; } diff --git a/experimental/distributed/tests/network_client.cpp b/experimental/distributed/tests/network_client.cpp index 2c3218890..7616519ba 100644 --- a/experimental/distributed/tests/network_client.cpp +++ b/experimental/distributed/tests/network_client.cpp @@ -8,7 +8,7 @@ int main(int argc, char *argv[]) { std::cout << channel << std::endl; if (channel != nullptr) { auto message = std::make_unique("master", "main"); - channel->Send(typeid(SenderMessage), std::move(message)); + channel->SendHelper(typeid(SenderMessage), std::move(message)); } system.network().StopClient(); return 0;