Distributed tests

Summary:
1. added distributed tests, currently only one can be run (TODO)
2. usability changes: add Close() to Subscription
3. EventStream::Subscription -> Subscription
4. Add Subscription to ChainOnce
5. Added README.md conventions5

Reviewers: sasa.stanko

Reviewed By: sasa.stanko

Subscribers: pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D701
This commit is contained in:
Goran Zuzic 2017-08-24 10:29:21 +02:00
parent 2954276ca8
commit 7f1c7a46cc
9 changed files with 165 additions and 77 deletions

View File

@ -3,22 +3,32 @@
This subdirectory structure implements distributed infrastructure of Memgraph.
## terminology
## Terminology
* Memgraph Node Id (mnid): a machine (processs) that runs a (distributed) Memgraph program.
* Node: a computer that performs (distributed) work.
* Vertex: an abstract graph concept.
* Reactor: a unit of concurrent execution, lives on its own thread.
* Channel: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes.
* Message: gets sent through channels. Must be serializable if sent via network layer (library: cereal).
* Event: arrival of a (subclass of) Message. You can register callbacks. Register exact callbacks (not for derivated/subclasses).
* EventStream: read-end of a channel, is owned by exactly one Reactor/thread.
* ChannelWriter: write-end of a channel, can be owned (wrote into) by multiple threads.
## conventions
## Ownership:
1. Locked: A method having a Locked... prefix indicates that you
have to lock the appropriate mutex before calling this function.
* System, Distributed are singletons. They should be always alive.
* ChannelWriter (write-end) should be lightweight and can be copied arbitrarily.
* EventStream (read-end) should never be written by anyone except the owner (the reactor that created it).
## dependencies
## Code Conventions
* Locked: A method having a Locked... prefix indicates that you
have to lock the appropriate mutex before calling this function.
* ALWAYS close channels. You will memory leak if you don't.
Reactor::CloseChannel or Subscription::Close will do the trick.
## Dependencies
* cereal
* <other memgraph dependencies>

View File

@ -95,6 +95,8 @@ class Network {
/** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */
void StartClient(int worker_count) {
LOG(INFO) << "Starting " << worker_count << " client workers";
client_run_ = true;
for (int i = 0; i < worker_count; ++i) {
pool_.push_back(std::thread([worker_count, this]() {
while (this->client_run_) {
@ -129,6 +131,7 @@ class Network {
for (size_t i = 0; i < pool_.size(); ++i) {
pool_[i].join();
}
pool_.clear();
}
class RemoteChannelWriter : public ChannelWriter {
@ -209,6 +212,7 @@ class Network {
if (server_ != nullptr) {
server_->Shutdown();
thread_.join();
server_ = nullptr;
}
}
@ -217,7 +221,7 @@ class Network {
SpinLock mutex_;
std::vector<std::thread> pool_;
std::queue<NetworkMessage> queue_;
std::atomic<bool> client_run_{true};
std::atomic<bool> client_run_;
// server variables
std::thread thread_;

View File

@ -1,9 +1,13 @@
#include "reactors_local.hpp"
void EventStream::Subscription::unsubscribe() const {
void EventStream::Subscription::Unsubscribe() const {
event_queue_.RemoveCb(*this);
}
void EventStream::Subscription::Close() const {
event_queue_.Close();
}
thread_local Reactor* current_reactor_ = nullptr;
std::string Channel::LocalChannelWriter::ReactorName() {
@ -14,7 +18,9 @@ std::string Channel::LocalChannelWriter::Name() {
return channel_name_;
}
void Channel::LocalEventStream::Close() {
void Channel::Close() {
// TODO(zuza): there will be major problems if a reactor tries to close a stream that isn't theirs
// luckily this should never happen if the framework is used as expected.
current_reactor_->CloseChannel(channel_name_);
}
@ -59,11 +65,7 @@ void Reactor::CloseChannel(const std::string &s) {
auto it = channels_.find(s);
assert(it != channels_.end());
channels_.erase(it);
}
void Reactor::CloseAllChannels() {
std::unique_lock<std::mutex> lock(*mutex_);
channels_.clear();
cvar_->notify_all();
}
void Reactor::RunEventLoop() {
@ -76,6 +78,9 @@ void Reactor::RunEventLoop() {
std::unique_lock<std::mutex> lock(*mutex_);
while (true) {
// Not fair because was taken earlier, talk to lion.
msg_and_cb = LockedGetPendingMessages();
if (msg_and_cb.first != nullptr) break;
// Exit the loop if there are no more Channels.
if (channels_.empty()) {
@ -83,10 +88,6 @@ void Reactor::RunEventLoop() {
break;
}
// Not fair because was taken earlier, talk to lion.
msg_and_cb = LockedGetPendingMessages();
if (msg_and_cb.first != nullptr) break;
cvar_->wait(lock);
}
@ -112,13 +113,13 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
if (msg_ptr == nullptr) continue;
std::type_index tidx = msg_ptr->GetTypeIndex();
std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > cb_info;
std::vector<std::pair<EventStream::Callback, Subscription> > cb_info;
auto msg_type_cb_iter = event_queue.callbacks_.find(tidx);
if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type.
for (auto &tidx_cb_key_value : msg_type_cb_iter->second) {
uint64_t uid = tidx_cb_key_value.first;
EventStream::Callback cb = tidx_cb_key_value.second;
cb_info.emplace_back(cb, EventStream::Subscription(event_queue, tidx, uid));
cb_info.emplace_back(cb, Subscription(event_queue, tidx, uid));
}
}

View File

@ -69,8 +69,8 @@ class ChannelWriter {
*/
class EventStream {
public:
class Subscription;
class OnEventOnceChainer;
class Subscription;
/**
* Register a callback that will be called whenever an event arrives.
@ -99,6 +99,7 @@ class EventStream {
* Get the name of the channel.
*/
virtual const std::string &ChannelName() = 0;
/**
* Subscription Service.
*
@ -109,7 +110,12 @@ class EventStream {
/**
* Unsubscribe. Call only once.
*/
void unsubscribe() const;
void Unsubscribe() const;
/**
* Close the stream. Convenience method.
*/
void Close() const;
private:
friend class Reactor;
@ -151,12 +157,12 @@ class EventStream {
}
template<typename MsgType>
OnEventOnceChainer &ChainOnce(std::function<void(const MsgType&)> &&cb) {
OnEventOnceChainer &ChainOnce(std::function<void(const MsgType&, const Subscription&)> &&cb) {
std::function<void(const Message&, const Subscription&)> wrap =
[cb = std::move(cb)](const Message &general_msg, const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
subscription.unsubscribe();
cb(correct_msg); // Warning: this can close the Channel, be careful what you put after it!
subscription.Unsubscribe();
cb(correct_msg, subscription); // Warning: this can close the Channel, be careful what you put after it!
};
cbs_.emplace_back(typeid(MsgType), std::move(wrap));
return *this;
@ -196,6 +202,8 @@ private:
virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0;
};
using Subscription = EventStream::Subscription; // To write less.
/**
* Implementation of a channel.
*
@ -218,7 +226,7 @@ class Channel {
reactor_name_(params.reactor_name),
mutex_(params.mutex),
cvar_(params.cvar),
stream_(mutex_, channel_name_, this) {}
stream_(mutex_, this) {}
/**
* LocalChannelWriter represents the channels to reactors living in the same reactor system (write-end of the channels).
@ -268,20 +276,20 @@ class Channel {
public:
friend class Channel;
LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string channel_name,
Channel *queue) : mutex_(mutex), channel_name_(channel_name), queue_(queue) {}
std::unique_ptr<Message> AwaitEvent() {
std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedAwaitPop(lock);
}
LocalEventStream(std::shared_ptr<std::mutex> mutex, Channel *queue) : mutex_(mutex), queue_(queue) {}
void OnEventHelper(std::type_index tidx, Callback callback) {
std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedOnEventHelper(tidx, callback);
}
const std::string &ChannelName() {
return queue_->channel_name_;
}
void Close();
void Close() {
queue_->Close();
}
private:
std::shared_ptr<std::mutex> mutex_;
@ -289,6 +297,11 @@ class Channel {
Channel *queue_;
};
/**
* Close the channel. Must be called from the reactor that owns the channel.
*/
void Close();
Channel(const Channel &other) = delete;
Channel(Channel &&other) = default;
Channel &operator=(const Channel &other) = delete;
@ -318,17 +331,6 @@ private:
return std::make_shared<LocalChannelWriter>(mutex_, reactor_name_, channel_name_, self_ptr_);
}
std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) {
while (true) {
std::unique_ptr<Message> m = LockedRawPop();
if (!m) {
cvar_->wait(lock);
} else {
return m;
}
}
}
std::unique_ptr<Message> LockedPop() {
return LockedRawPop();
}
@ -395,13 +397,6 @@ class Reactor {
*/
void CloseChannel(const std::string &s);
/**
* close all channels (typically during shutdown).
*
* Should only be called from the Reactor thread.
*/
void CloseAllChannels();
/**
* Get Reactor name
*/

View File

@ -126,11 +126,11 @@ class Master : public Reactor {
// wait until every worker sends a ReturnAddressMsg back, then close
stream->OnEvent<TextMessage>([this](const TextMessage &msg,
const EventStream::Subscription &subscription) {
const Subscription &subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
++workers_seen;
if (workers_seen == worker_mnids_.size()) {
subscription.unsubscribe();
subscription.Unsubscribe();
// Sleep for a while so we can read output in the terminal.
// (start_distributed.py runs each process in a new tab which is
// closed immediately after process has finished)
@ -143,7 +143,7 @@ class Master : public Reactor {
for (auto wmnid : worker_mnids_) {
auto stream = memgraph.FindChannel(wmnid, "worker", "main");
stream->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg, const Subscription&){
msg.channelWriter()->Send<TextMessage>("master", "main", "hi from master");
stream->Close();
});
@ -171,7 +171,7 @@ class Worker : public Reactor {
auto stream = main_.first;
// wait until master sends us a TextMessage, then reply back and close
stream->OnEventOnce()
.ChainOnce<TextMessage>([this](const TextMessage &msg) {
.ChainOnce<TextMessage>([this](const TextMessage &msg, const Subscription&) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
msg.GetReturnChannelWriter()

View File

@ -151,12 +151,12 @@
// }
// }
// stream->OnEvent<Message>([this](const Message &msg, const EventStream::Subscription& subscription) {
// stream->OnEvent<Message>([this](const Message &msg, const Subscription& subscription) {
// std::cout << "Processing Query via Callback" << std::endl;
// const Query &query =
// dynamic_cast<const Query &>(msg); // exception bad_cast
// ProcessQuery(&query);
// subscription.unsubscribe();
// subscription.Unsubscribe();
// });
// }

View File

@ -46,13 +46,13 @@ class ChatServer : public Reactor {
auto chat = Open("chat").first;
chat->OnEvent<ChatACK>([](const ChatACK& ack, const EventStream::Subscription&) {
chat->OnEvent<ChatACK>([](const ChatACK& ack, const Subscription&) {
std::cout << "Received ACK from " << ack.Address() << ":"
<< ack.Port() << " -> '" << ack.Message() << "'"
<< std::endl;
});
chat->OnEvent<ChatMessage>([this](const ChatMessage& msg, const EventStream::Subscription&) {
chat->OnEvent<ChatMessage>([this](const ChatMessage& msg, const Subscription&) {
std::cout << "Received message from " << msg.Address() << ":"
<< msg.Port() << " -> '" << msg.Message() << "'"
<< std::endl;

View File

@ -0,0 +1,78 @@
/**
* This test file test the Distributed Reactors API on ONLY one process (no real networking).
* In other words, we send a message from one process to itself.
*/
#include "gtest/gtest.h"
#include "reactors_distributed.hpp"
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <future>
TEST(SimpleTests, StartAndStopServices) {
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
// do nothing
std::this_thread::sleep_for(std::chrono::milliseconds(500));
system.AwaitShutdown();
distributed.StopServices();
}
TEST(SimpleTests, SendEmptyMessage) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& subscription) {
msg.channelWriter()->Send<Message>();
subscription.Close();
});
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
main_.first->OnEventOnce()
.ChainOnce<Message>([this](const Message&, const Subscription& subscription) {
// if this message isn't delivered, the main channel will never be closed
subscription.Close(); // close "main"
});
}
};
// emulate flags like it's a multiprocess system, these may be alredy set by default
FLAGS_address = "127.0.0.1";
FLAGS_port = 10000;
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown(); // this must be called before StopServices
distributed.StopServices();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -92,7 +92,7 @@ TEST(SimpleSendTest, OneCallback) {
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const MessageInt &msg, const EventStream::Subscription&) {
stream->OnEvent<MessageInt>([this](const MessageInt &msg, const Subscription&) {
ASSERT_EQ(msg.x, 888);
CloseChannel("main");
});
@ -132,7 +132,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const MessageInt& msg, const EventStream::Subscription&) {
stream->OnEvent<MessageInt>([this](const MessageInt& msg, const Subscription&) {
CloseChannel("main");
ASSERT_EQ(msg.x, 101);
});
@ -156,12 +156,12 @@ TEST(SimpleSendTest, DuringFirstEvent) {
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const Message &msg, const EventStream::Subscription &subscription) {
stream->OnEvent<MessageInt>([this](const Message &msg, const Subscription &subscription) {
const MessageInt &msgint = dynamic_cast<const MessageInt&>(msg);
if (msgint.x == 101)
FindChannel("main")->Send<MessageInt>(102);
if (msgint.x == 102) {
subscription.unsubscribe();
subscription.Unsubscribe();
CloseChannel("main");
p_.set_value(777);
}
@ -220,19 +220,19 @@ TEST(MultipleSendTest, UnsubscribeService) {
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const EventStream::Subscription &subscription) {
stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const Subscription &subscription) {
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
++num_msgs_received;
if (msgint.x == 66) {
subscription.unsubscribe(); // receive only two of them
subscription.Unsubscribe(); // receive only two of them
}
});
stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const EventStream::Subscription &subscription) {
stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const Subscription &subscription) {
char c = msgchar.x;
++num_msgs_received;
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
if (num_msgs_received == 5) {
subscription.unsubscribe();
subscription.Unsubscribe();
CloseChannel("main");
}
});
@ -281,19 +281,19 @@ TEST(MultipleSendTest, OnEvent) {
EventStream* stream = main_.first;
correct_vals = 0;
stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const EventStream::Subscription&) {
stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const Subscription&) {
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
++correct_vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const EventStream::Subscription&) {
stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const Subscription&) {
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
++correct_vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>([this](const EndMessage&, const EventStream::Subscription&) {
stream->OnEvent<EndMessage>([this](const EndMessage&, const Subscription&) {
ASSERT_LE(correct_vals, 4);
if (correct_vals == 4) {
CloseChannel("main");
@ -334,13 +334,13 @@ TEST(MultipleSendTest, Chaining) {
EventStream* stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg, const Subscription&) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageInt>([](const MessageInt &msg) {
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription&) {
ASSERT_EQ(msg.x, 66);
})
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg, const Subscription&) {
ASSERT_EQ(msg.x, 77);
CloseChannel("main");
});
@ -386,13 +386,13 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
EventStream* stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg, const Subscription&) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageChar>([](const MessageChar &msg) {
.ChainOnce<MessageChar>([](const MessageChar &msg, const Subscription&) {
ASSERT_EQ(msg.x, 'b');
})
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg, const Subscription&) {
ASSERT_EQ(msg.x, 77);
CloseChannel("main");
});
@ -440,12 +440,12 @@ TEST(MultipleSendTest, ProcessManyMessages) {
EventStream* stream = main_.first;
vals = 0;
stream->OnEvent<MessageInt>([this](const Message&, const EventStream::Subscription&) {
stream->OnEvent<MessageInt>([this](const Message&, const Subscription&) {
++vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>([this](const Message&, const EventStream::Subscription&) {
stream->OnEvent<EndMessage>([this](const Message&, const Subscription&) {
ASSERT_LE(vals, num_tests);
if (vals == num_tests) {
CloseChannel("main");