memgraph/tests/unit/reactor_distributed.cpp
Mislav Bradac e6d3edf9a9 Fix distributed reactors
Reviewers: buda, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D935
2017-10-27 11:26:04 +02:00

152 lines
4.1 KiB
C++

/**
* This test file test the Distributed Reactors API on ONLY one process (no real
* networking).
* In other words, we send a message from one process to itself.
*/
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "communication/reactor/common_messages.hpp"
#include "communication/reactor/reactor_distributed.hpp"
#include "gtest/gtest.h"
using namespace communication::reactor;
struct MessageInt : public Message {
MessageInt() {} // cereal needs this
MessageInt(int x) : x(x) {}
int x;
template <class Archive>
void serialize(Archive &ar) {
ar(cereal::virtual_base_class<Message>(this), x);
}
};
CEREAL_REGISTER_TYPE(MessageInt);
struct RequestMessage : public ReturnAddressMessage {
RequestMessage() {}
RequestMessage(std::string reactor, std::string channel, int x)
: ReturnAddressMessage(reactor, channel), x(x){};
template <class Archive>
void serialize(Archive &ar) {
ar(cereal::virtual_base_class<ReturnAddressMessage>(this), x);
}
friend class cereal::access;
int x;
};
CEREAL_REGISTER_TYPE(RequestMessage);
/**
* Test do the services start up without crashes.
*/
TEST(SimpleTests, StartAndStopServices) {
DistributedSystem system;
// do nothing
std::this_thread::sleep_for(std::chrono::milliseconds(500));
system.StopServices();
}
/**
* Test simple message reception.
*
* Data flow:
* (1) Send an empty message from Master to Worker/main
*/
TEST(SimpleTests, SendEmptyMessage) {
DistributedSystem system;
system.Spawn("master", [](Reactor &r) {
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
writer->Send<Message>();
r.CloseChannel("main");
});
system.Spawn("worker", [](Reactor &r) {
r.main_.first->OnEventOnce().ChainOnce<Message>(
[&](const Message &, const Subscription &subscription) {
// if this message isn't delivered, the main channel will never be
// closed and we infinite loop
subscription.CloseChannel(); // close "main"
});
});
system.StopServices();
}
/**
* Test ReturnAddressMsg functionality.
*
* Data flow:
* (1) Send an empty message from Master to Worker/main
* (2) Send an empty message from Worker to Master/main
*/
TEST(SimpleTests, SendReturnAddressMessage) {
DistributedSystem system;
system.Spawn("master", [](Reactor &r) {
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
writer->Send<ReturnAddressMessage>(r.name(), "main");
r.main_.first->OnEvent<MessageInt>(
[&](const MessageInt &message, const Subscription &) {
EXPECT_EQ(message.x, 5);
r.CloseChannel("main");
});
});
system.Spawn("worker", [](Reactor &r) {
r.main_.first->OnEvent<ReturnAddressMessage>(
[&](const ReturnAddressMessage &message, const Subscription &) {
message.FindChannel(r.system_)->Send<MessageInt>(5);
r.CloseChannel("main");
});
});
system.StopServices();
}
/**
* Test serializability of a complex message over the network layer.
*
* Data flow:
* (1) Send ("hi", 123) from Master to Worker/main
* (2) Send ("hi back", 779) from Worker to Master/main
*/
TEST(SimpleTests, SendSerializableMessage) {
DistributedSystem system;
system.Spawn("master", [](Reactor &r) {
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
writer->Send<RequestMessage>(r.name(), "main", 123);
r.main_.first->OnEvent<MessageInt>(
[&](const MessageInt &message, const Subscription &) {
ASSERT_EQ(message.x, 779);
r.CloseChannel("main");
});
});
system.Spawn("worker", [](Reactor &r) {
r.main_.first->OnEvent<RequestMessage>(
[&](const RequestMessage &message, const Subscription &) {
ASSERT_EQ(message.x, 123);
message.FindChannel(r.system_)->Send<MessageInt>(779);
r.CloseChannel("main");
});
});
system.StopServices();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}