Expanded distributed tests

Summary:
1. add subscription functionality (it can close the channel and get the name)
2. new test for ReturnAddrMsg functionality
3. new test for serialization
4. proper serialization access permissions

Reviewers: sasa.stanko

Reviewed By: sasa.stanko

Subscribers: lion, buda

Differential Revision: https://phabricator.memgraph.io/D705
This commit is contained in:
Goran Zuzic 2017-08-24 13:58:13 +02:00
parent c11b777234
commit e4e61cd00a
5 changed files with 175 additions and 7 deletions

View File

@ -10,6 +10,9 @@ Network::Network() {}
*/ */
ReturnAddressMsg::ReturnAddressMsg() {} ReturnAddressMsg::ReturnAddressMsg() {}
ReturnAddressMsg::ReturnAddressMsg(std::string channel)
: ReturnAddressMsg(current_reactor_->name(), channel) {}
ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel) ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel)
: address_(FLAGS_address), : address_(FLAGS_address),
port_(FLAGS_port), port_(FLAGS_port),

View File

@ -234,7 +234,10 @@ class Network {
*/ */
class ReturnAddressMsg : public Message { class ReturnAddressMsg : public Message {
public: public:
ReturnAddressMsg(); /* The return address is on the current reactor, specified channel */
ReturnAddressMsg(std::string channel);
/* The return address is on a specified reactor/channel */
ReturnAddressMsg(std::string reactor, std::string channel); ReturnAddressMsg(std::string reactor, std::string channel);
std::string Address() const; std::string Address() const;
@ -244,12 +247,16 @@ class ReturnAddressMsg : public Message {
std::shared_ptr<ChannelWriter> GetReturnChannelWriter() const; std::shared_ptr<ChannelWriter> GetReturnChannelWriter() const;
template <class Archive> template<class Archive>
void serialize(Archive &ar) { void serialize(Archive &ar) {
ar(cereal::virtual_base_class<Message>(this), address_, port_, ar(cereal::virtual_base_class<Message>(this), address_, port_,
reactor_, channel_); reactor_, channel_);
} }
protected:
friend class cereal::access;
ReturnAddressMsg(); // Cereal needs access to a default constructor.
private: private:
std::string address_; std::string address_;
uint16_t port_; uint16_t port_;

View File

@ -4,10 +4,14 @@ void EventStream::Subscription::Unsubscribe() const {
event_queue_.RemoveCb(*this); event_queue_.RemoveCb(*this);
} }
void EventStream::Subscription::Close() const { void EventStream::Subscription::CloseChannel() const {
event_queue_.Close(); event_queue_.Close();
} }
const std::string& EventStream::Subscription::ChannelName() const {
return event_queue_.channel_name_;
}
thread_local Reactor* current_reactor_ = nullptr; thread_local Reactor* current_reactor_ = nullptr;
std::string Channel::LocalChannelWriter::ReactorName() { std::string Channel::LocalChannelWriter::ReactorName() {

View File

@ -115,7 +115,12 @@ class EventStream {
/** /**
* Close the stream. Convenience method. * Close the stream. Convenience method.
*/ */
void Close() const; void CloseChannel() const;
/**
* Get the name of the channel the message is delivered to.
*/
const std::string& ChannelName() const;
private: private:
friend class Reactor; friend class Reactor;

View File

@ -15,6 +15,9 @@
#include <vector> #include <vector>
#include <future> #include <future>
/**
* Test do the services start up without crashes.
*/
TEST(SimpleTests, StartAndStopServices) { TEST(SimpleTests, StartAndStopServices) {
System &system = System::GetInstance(); System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance(); Distributed &distributed = Distributed::GetInstance();
@ -27,6 +30,12 @@ TEST(SimpleTests, StartAndStopServices) {
distributed.StopServices(); distributed.StopServices();
} }
/**
* Test simple message reception.
*
* Data flow:
* (1) Send an empty message from Master to Worker/main
*/
TEST(SimpleTests, SendEmptyMessage) { TEST(SimpleTests, SendEmptyMessage) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
@ -37,7 +46,7 @@ TEST(SimpleTests, SendEmptyMessage) {
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg, .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& subscription) { const Subscription& subscription) {
msg.channelWriter()->Send<Message>(); msg.channelWriter()->Send<Message>();
subscription.Close(); subscription.CloseChannel();
}); });
CloseChannel("main"); CloseChannel("main");
@ -50,8 +59,148 @@ TEST(SimpleTests, SendEmptyMessage) {
virtual void Run() { virtual void Run() {
main_.first->OnEventOnce() main_.first->OnEventOnce()
.ChainOnce<Message>([this](const Message&, const Subscription& subscription) { .ChainOnce<Message>([this](const Message&, const Subscription& subscription) {
// if this message isn't delivered, the main channel will never be closed // if this message isn't delivered, the main channel will never be closed and we infinite loop
subscription.Close(); // close "main" subscription.CloseChannel(); // close "main"
});
}
};
// emulate flags like it's a multiprocess system, these may be alredy set by default
FLAGS_address = "127.0.0.1";
FLAGS_port = 10000;
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown(); // this must be called before StopServices
distributed.StopServices();
}
/**
* Test ReturnAddressMsg functionality.
*
* Data flow:
* (1) Send an empty message from Master to Worker/main
* (2) Send an empty message from Worker to Master/main
*/
TEST(SimpleTests, SendReturnAddressMessage) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& sub) {
// send a message that will be returned to "main"
msg.channelWriter()->Send<ReturnAddressMsg>(this->name(), "main");
// close this anonymous channel
sub.CloseChannel();
});
main_.first->OnEventOnce()
.ChainOnce<Message>([this](const Message&, const Subscription& sub) {
// if this message isn't delivered, the main channel will never be closed and we infinite loop
// close the "main" channel
sub.CloseChannel();
});
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
main_.first->OnEventOnce()
.ChainOnce<ReturnAddressMsg>([this](const ReturnAddressMsg &msg, const Subscription& sub) {
msg.GetReturnChannelWriter()->Send<Message>();
sub.CloseChannel(); // close "main"
});
}
};
// emulate flags like it's a multiprocess system, these may be alredy set by default
FLAGS_address = "127.0.0.1";
FLAGS_port = 10000;
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown(); // this must be called before StopServices
distributed.StopServices();
}
// Apparently templates cannot be declared inside local classes, figure out how to move it in?
// For that reason I obscured the name.
struct SerializableMessage_TextMessage : public ReturnAddressMsg {
SerializableMessage_TextMessage(std::string channel, std::string arg_text, int arg_val)
: ReturnAddressMsg(channel), text(arg_text), val(arg_val) {}
std::string text;
int val;
template<class Archive>
void serialize(Archive &ar) {
ar(cereal::virtual_base_class<ReturnAddressMsg>(this), text, val);
}
protected:
friend class cereal::access;
SerializableMessage_TextMessage() {} // Cereal needs access to a default constructor.
};
CEREAL_REGISTER_TYPE(SerializableMessage_TextMessage);
/**
* Test serializability of a complex message over the network layer.
*
* Data flow:
* (1) Send ("hi", 123) from Master to Worker/main
* (2) Send ("hi back", 779) from Worker to Master/main
*/
TEST(SimpleTests, SendSerializableMessage) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& sub) {
// send a message that will be returned to "main"
msg.channelWriter()->Send<SerializableMessage_TextMessage>("main", "hi", 123);
// close this anonymous channel
sub.CloseChannel();
});
main_.first->OnEventOnce()
.ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage& msg, const Subscription& sub) {
ASSERT_EQ(msg.text, "hi back");
ASSERT_EQ(msg.val, 779);
// if this message isn't delivered, the main channel will never be closed and we infinite loop
// close the "main" channel
sub.CloseChannel();
});
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
main_.first->OnEventOnce()
.ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage &msg, const Subscription& sub) {
ASSERT_EQ(msg.text, "hi");
ASSERT_EQ(msg.val, 123);
msg.GetReturnChannelWriter()->Send<SerializableMessage_TextMessage>
("no channel, dont use this", "hi back", 779);
sub.CloseChannel(); // close "main"
}); });
} }
}; };