Add few connector tests

Reviewers: zuza, buda

Reviewed By: zuza

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D656
This commit is contained in:
Sasa Stanko 2017-08-10 10:35:24 +02:00
parent 3fecc4f357
commit b5a61681d2

View File

@ -7,6 +7,7 @@
#include <string>
#include <thread>
#include <vector>
#include <future>
#include "communication.hpp"
@ -156,7 +157,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101);
channel->Send<MessageInt>(102);
channel->Send<MessageInt>(102); // should be ignored
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(103); // should be ignored
channel->Send<MessageInt>(104); // should be ignored
@ -183,6 +184,44 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
}
TEST(SimpleSendTest, DuringFirstEvent) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(System *system, std::string name, std::promise<int> p) : Reactor(system, name), p_(std::move(p)) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const Message& msg, const EventStream::Subscription& subscription) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
if (msgint.x == 101)
FindChannel("main")->Send<MessageInt>(102);
if (msgint.x == 102) {
subscription.unsubscribe();
CloseConnector("main");
p_.set_value(777);
}
});
std::shared_ptr<Channel> channel = FindChannel("main");
channel->Send<MessageInt>(101);
}
std::promise<int> p_;
};
System system;
std::promise<int> p;
auto f = p.get_future();
system.Spawn<Master>("master", std::move(p));
f.wait();
ASSERT_EQ(f.get(), 777);
system.AwaitShutdown();
}
TEST(MultipleSendTest, UnsubscribeService) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
@ -354,6 +393,113 @@ TEST(MultipleSendTest, Chaining) {
}
TEST(MultipleSendTest, ChainingInRightOrder) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageChar>('a');
channel->Send<MessageInt>(55);
channel->Send<MessageChar>('b');
channel->Send<MessageInt>(77);
CloseConnector("main");
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageChar>([](const MessageChar& msg) {
ASSERT_EQ(msg.x, 'b');
})
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
ASSERT_EQ(msg.x, 77);
CloseConnector("main");
});
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(MultipleSendTest, ProcessManyMessages) {
const static int num_tests = 100;
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
for (int i = 0; i < num_tests; ++i) {
channel->Send<MessageInt>(rand());
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
}
CloseConnector("main");
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
struct EndMessage : Message {};
int vals = 0;
virtual void Run() {
EventStream* stream = main_.first;
vals = 0;
stream->OnEvent<MessageInt>([this](const Message& msg, const EventStream::Subscription&) {
++vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>([this](const Message&, const EventStream::Subscription&) {
ASSERT_LE(vals, num_tests);
if (vals == num_tests) {
CloseConnector("main");
}
});
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}