#include #include #include #include #include #include #include #include #include "communication/reactor/reactor_local.hpp" #include "gtest/gtest.h" #include "utils/exceptions.hpp" using namespace std::literals::chrono_literals; using namespace communication::reactor; using Subscription = EventStream::Subscription; TEST(SystemTest, ReturnWithoutThrowing) { System system; auto master = system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); }); std::this_thread::sleep_for(100ms); } TEST(ChannelCreationTest, ThrowOnReusingChannelName) { System system; auto master = system.Spawn("master", [](Reactor &r) { r.Open("channel"); ASSERT_THROW(r.Open("channel"), utils::BasicException); r.CloseChannel("main"); r.CloseChannel("channel"); }); std::this_thread::sleep_for(100ms); } TEST(SimpleSendTest, OneCallback) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); channel_writer.Send(888); r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [](Reactor &r) { EventStream *stream = r.main_.first; stream->OnEvent( [&r](const MessageInt &msg, const Subscription &) { ASSERT_EQ(msg.x, 888); r.CloseChannel("main"); }); }); std::this_thread::sleep_for(200ms); } TEST(SimpleSendTest, IgnoreAfterClose) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); channel_writer.Send(101); channel_writer.Send(102); // should be ignored std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel_writer.Send(103); // should be ignored channel_writer.Send(104); // should be ignored // Write-end doesn't need to be closed because it's in RAII. r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [](Reactor &r) { EventStream *stream = r.main_.first; stream->OnEvent( [&r](const MessageInt &msg, const Subscription &) { r.CloseChannel("main"); ASSERT_EQ(msg.x, 101); }); }); std::this_thread::sleep_for(std::chrono::milliseconds(500)); } TEST(SimpleSendTest, RecreateChannelAfterClosing) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); // Original "worker" reactor will die after it process this message. channel_writer.Send(101); std::this_thread::sleep_for(std::chrono::milliseconds(100)); // This message will be dropped since there is no reactor with name // "worker". channel_writer.Send(102); std::this_thread::sleep_for(std::chrono::milliseconds(200)); // This message should recieved by new "worker" reactor. channel_writer.Send(103); r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [](Reactor &r) { EventStream *stream = r.main_.first; stream->OnEvent( [&r](const MessageInt &msg, const Subscription &) { r.CloseChannel("main"); ASSERT_EQ(msg.x, 101); }); }); std::this_thread::sleep_for(std::chrono::milliseconds(300)); auto worker2 = system.Spawn("worker", [](Reactor &r) { EventStream *stream = r.main_.first; stream->OnEvent( [&r](const MessageInt &msg, const Subscription &) { r.CloseChannel("main"); ASSERT_EQ(msg.x, 103); }); }); std::this_thread::sleep_for(std::chrono::milliseconds(300)); } TEST(SimpleSendTest, DuringFirstEvent) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; System system; std::promise p; auto f = p.get_future(); auto master = system.Spawn("master", [&p](Reactor &r) mutable { EventStream *stream = r.main_.first; stream->OnEvent( [&](const Message &msg, const Subscription &subscription) { const MessageInt &msgint = dynamic_cast(msg); if (msgint.x == 101) { LocalChannelWriter channel_writer("master", "main", r.system_); channel_writer.Send(102); } if (msgint.x == 102) { subscription.Unsubscribe(); r.CloseChannel("main"); p.set_value(777); } }); LocalChannelWriter channel_writer("master", "main", r.system_); channel_writer.Send(101); }); f.wait(); ASSERT_EQ(f.get(), 777); } TEST(MultipleSendTest, UnsubscribeService) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; struct MessageChar : public Message { MessageChar(char xx) : x(xx) {} char x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); channel_writer.Send(55); channel_writer.Send(66); channel_writer.Send(77); channel_writer.Send(88); std::this_thread::sleep_for(std::chrono::milliseconds(300)); channel_writer.Send('a'); channel_writer.Send('b'); channel_writer.Send('c'); channel_writer.Send('d'); r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable { EventStream *stream = r.main_.first; stream->OnEvent( [&](const MessageInt &msgint, const Subscription &subscription) { ASSERT_TRUE(msgint.x == 55 || msgint.x == 66); ++num_received_messages; if (msgint.x == 66) { subscription.Unsubscribe(); // receive only two of them } }); stream->OnEvent( [&](const MessageChar &msgchar, const Subscription &subscription) { char c = msgchar.x; ++num_received_messages; ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); if (num_received_messages == 5) { subscription.Unsubscribe(); r.CloseChannel("main"); } }); }); std::this_thread::sleep_for(std::chrono::milliseconds(500)); } TEST(MultipleSendTest, OnEvent) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; struct MessageChar : public Message { MessageChar(char xx) : x(xx) {} char x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); channel_writer.Send(101); channel_writer.Send('a'); channel_writer.Send(103); channel_writer.Send('b'); r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable { struct EndMessage : Message {}; EventStream *stream = r.main_.first; stream->OnEvent( [&](const MessageInt &msgint, const Subscription &) { ASSERT_TRUE(msgint.x == 101 || msgint.x == 103); ++correct_vals; r.main_.second->Send(); }); stream->OnEvent( [&](const MessageChar &msgchar, const Subscription &) { ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b'); ++correct_vals; r.main_.second->Send(); }); stream->OnEvent([&](const EndMessage &, const Subscription &) { ASSERT_LE(correct_vals, 4); if (correct_vals == 4) { r.CloseChannel("main"); } }); }); std::this_thread::sleep_for(std::chrono::milliseconds(300)); } TEST(MultipleSendTest, Chaining) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); channel_writer.Send(55); channel_writer.Send(66); channel_writer.Send(77); r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [](Reactor &r) { EventStream *stream = r.main_.first; stream->OnEventOnce() .ChainOnce([](const MessageInt &msg, const Subscription &) { ASSERT_EQ(msg.x, 55); }) .ChainOnce([](const MessageInt &msg, const Subscription &) { ASSERT_EQ(msg.x, 66); }) .ChainOnce( [&](const MessageInt &msg, const Subscription &) { ASSERT_EQ(msg.x, 77); r.CloseChannel("main"); }); }); std::this_thread::sleep_for(std::chrono::milliseconds(300)); } TEST(MultipleSendTest, ChainingInRightOrder) { struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; struct MessageChar : public Message { MessageChar(char xx) : x(xx) {} char x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); channel_writer.Send('a'); channel_writer.Send(55); channel_writer.Send('b'); channel_writer.Send(77); r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [](Reactor &r) { std::this_thread::sleep_for(100ms); EventStream *stream = r.main_.first; stream->OnEventOnce() .ChainOnce([](const MessageInt &msg, const Subscription &) { ASSERT_EQ(msg.x, 55); }) .ChainOnce( [](const MessageChar &msg, const Subscription &) { ASSERT_EQ(msg.x, 'b'); }) .ChainOnce( [&](const MessageInt &msg, const Subscription &) { ASSERT_EQ(msg.x, 77); r.CloseChannel("main"); }); }); std::this_thread::sleep_for(300ms); } TEST(MultipleSendTest, ProcessManyMessages) { const static int kNumTests = 100; struct MessageInt : public Message { MessageInt(int xx) : x(xx) {} int x; }; System system; auto master = system.Spawn("master", [](Reactor &r) { std::this_thread::sleep_for(100ms); LocalChannelWriter channel_writer("worker", "main", r.system_); for (int i = 0; i < kNumTests; ++i) { channel_writer.Send(rand()); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5)); } r.CloseChannel("main"); }); auto worker = system.Spawn("worker", [vals = 0](Reactor & r) mutable { struct EndMessage : Message {}; EventStream *stream = r.main_.first; vals = 0; stream->OnEvent([&](const Message &, const Subscription &) { ++vals; r.main_.second->Send(); }); stream->OnEvent([&](const Message &, const Subscription &) { ASSERT_LE(vals, kNumTests); if (vals == kNumTests) { r.CloseChannel("main"); } }); }); std::this_thread::sleep_for(1000ms); } int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }