Revise reactors code

Reviewers: buda, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D923
This commit is contained in:
Mislav Bradac 2017-10-25 14:47:46 +02:00
parent 9f7ef8e0e9
commit fe3d752904
19 changed files with 1228 additions and 1902 deletions

View File

@ -186,10 +186,11 @@ target_link_libraries(antlr_opencypher_parser_lib antlr4)
set(memgraph_src_files set(memgraph_src_files
${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp ${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
${src_dir}/communication/bolt/v1/session.cpp ${src_dir}/communication/bolt/v1/session.cpp
${src_dir}/communication/reactor/reactor_local.cpp
${src_dir}/data_structures/concurrent/skiplist_gc.cpp ${src_dir}/data_structures/concurrent/skiplist_gc.cpp
${src_dir}/database/dbms.cpp
${src_dir}/database/graph_db.cpp ${src_dir}/database/graph_db.cpp
${src_dir}/database/graph_db_accessor.cpp ${src_dir}/database/graph_db_accessor.cpp
${src_dir}/database/dbms.cpp
${src_dir}/durability/recovery.cpp ${src_dir}/durability/recovery.cpp
${src_dir}/durability/snapshooter.cpp ${src_dir}/durability/snapshooter.cpp
${src_dir}/io/network/addrinfo.cpp ${src_dir}/io/network/addrinfo.cpp
@ -219,7 +220,7 @@ set(memgraph_src_files
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# memgraph_lib depend on these libraries # memgraph_lib depend on these libraries
set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools cereal
antlr_opencypher_parser_lib dl glog gflags) antlr_opencypher_parser_lib dl glog gflags)
if (USE_LTALLOC) if (USE_LTALLOC)

View File

@ -2,72 +2,60 @@
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include "reactors_distributed.hpp"
#include "memgraph_config.hpp" #include "memgraph_config.hpp"
#include "memgraph_distributed.hpp" #include "memgraph_distributed.hpp"
#include "memgraph_transactions.hpp" #include "memgraph_transactions.hpp"
#include "reactors_distributed.hpp"
/** /**
* List of queries that should be executed. * List of queries that should be executed.
*/ */
std::vector<std::string> queries = {{ std::vector<std::string> queries = {
"create vertex", {"create vertex", "create vertex", "create vertex", "create vertex",
"create vertex", "create vertex", "create vertex", "create vertex", "create vertex",
"create vertex", "create vertex", "create vertex", "vertex count", "create vertex",
"create vertex", "create vertex", "vertex count"}};
"create vertex",
"create vertex",
"create vertex",
"create vertex",
"create vertex",
"create vertex",
"vertex count",
"create vertex",
"create vertex",
"vertex count"
}};
/** /**
* This is the client that issues some hard-coded queries. * This is the client that issues some hard-coded queries.
*/ */
class Client : public Reactor { class Client : public Reactor {
public: public:
Client(std::string name) : Reactor(name) { Client(std::string name) : Reactor(name) {}
}
void IssueQueries(std::shared_ptr<ChannelWriter> channel_to_leader) { void IssueQueries(std::shared_ptr<ChannelWriter> channel_to_leader) {
// (concurrently) create a couple of vertices // (concurrently) create a couple of vertices
for (int query_idx = 0; query_idx < queries.size(); ++query_idx) { for (int query_idx = 0; query_idx < static_cast<int64_t>(queries.size());
++query_idx) {
// register callback // register callback
std::string channel_name = "query-" + std::to_string(query_idx); std::string channel_name = "query-" + std::to_string(query_idx);
auto stream = Open(channel_name).first; auto stream = Open(channel_name).first;
stream stream->OnEventOnce().ChainOnce<ResultMsg>(
->OnEventOnce() [this, query_idx](const ResultMsg &msg, const Subscription &sub) {
.ChainOnce<ResultMsg>([this, query_idx](const ResultMsg &msg, std::cout << "Result of query " << query_idx << " ("
const Subscription &sub){ << queries[query_idx] << "):" << std::endl
std::cout << "Result of query " << query_idx << " (" << " " << msg.result() << std::endl;
<< queries[query_idx] << "):" << std::endl sub.CloseChannel();
<< " " << msg.result() << std::endl; });
sub.CloseChannel();
});
// then issue the query (to avoid race conditions) // then issue the query (to avoid race conditions)
std::cout << "Issuing command " << query_idx << " (" std::cout << "Issuing command " << query_idx << " (" << queries[query_idx]
<< queries[query_idx] << ")" << std::endl; << ")" << std::endl;
channel_to_leader->Send<QueryMsg>(channel_name, queries[query_idx]); channel_to_leader->Send<QueryMsg>(channel_name, queries[query_idx]);
} }
} }
virtual void Run() { virtual void Run() {
MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
auto mnid = memgraph.LeaderMnid(); auto mnid = memgraph.LeaderMnid();
memgraph.FindChannel(mnid, "master", "client-queries") memgraph.FindChannel(mnid, "master", "client-queries")
->OnEventOnce() ->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage &msg, const Subscription& sub) { .ChainOnce<ChannelResolvedMessage>(
sub.CloseChannel(); [this](const ChannelResolvedMessage &msg, const Subscription &sub) {
IssueQueries(msg.channelWriter()); sub.CloseChannel();
}); IssueQueries(msg.channelWriter());
});
} }
}; };
@ -77,7 +65,7 @@ int main(int argc, char *argv[]) {
System &system = System::GetInstance(); System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance(); Distributed &distributed = Distributed::GetInstance();
MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance(); MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
memgraph.RegisterConfig(ParseConfig()); memgraph.RegisterConfig(ParseConfig());
distributed.StartServices(); distributed.StartServices();
@ -85,6 +73,5 @@ int main(int argc, char *argv[]) {
system.AwaitShutdown(); system.AwaitShutdown();
distributed.StopServices(); distributed.StopServices();
return 0; return 0;
} }

View File

@ -4,9 +4,9 @@
#include "reactors_distributed.hpp" #include "reactors_distributed.hpp"
#include <unordered_map>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -17,20 +17,18 @@ class MemgraphDistributed {
public: public:
/** /**
* Get the (singleton) instance of MemgraphDistributed. * Get the (singleton) instance of MemgraphDistributed.
*
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
*/ */
static MemgraphDistributed &GetInstance() { static MemgraphDistributed &GetInstance() {
static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use static MemgraphDistributed memgraph;
return memgraph; return memgraph;
} }
EventStream* FindChannel(MnidT mnid, EventStream *FindChannel(MnidT mnid, const std::string &reactor,
const std::string &reactor,
const std::string &channel) { const std::string &channel) {
std::unique_lock<std::recursive_mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
const auto &location = mnodes_.at(mnid); const auto &location = mnodes_.at(mnid);
return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel); return Distributed::GetInstance().FindChannel(
location.first, location.second, reactor, channel);
} }
void RegisterConfig(const Config &config) { void RegisterConfig(const Config &config) {
@ -51,23 +49,22 @@ class MemgraphDistributed {
/** /**
* The leader is currently the first node in the config. * The leader is currently the first node in the config.
*/ */
MnidT LeaderMnid() { MnidT LeaderMnid() const { return config_.nodes.front().mnid; }
return config_.nodes.front().mnid;
}
protected: protected:
MemgraphDistributed() {} MemgraphDistributed() {}
/** Register memgraph node id to the given location. */ /** Register memgraph node id to the given location. */
void RegisterMemgraphNode(MnidT mnid, const std::string &address, uint16_t port) { void RegisterMemgraphNode(MnidT mnid, const std::string &address,
std::unique_lock<std::recursive_mutex> lock(mutex_); uint16_t port) {
std::unique_lock<std::mutex> lock(mutex_);
mnodes_[mnid] = Location(address, port); mnodes_[mnid] = Location(address, port);
} }
private: private:
Config config_; Config config_;
std::recursive_mutex mutex_; std::mutex mutex_;
std::unordered_map<MnidT, Location> mnodes_; std::unordered_map<MnidT, Location> mnodes_;
MemgraphDistributed(const MemgraphDistributed &) = delete; MemgraphDistributed(const MemgraphDistributed &) = delete;

View File

@ -1,35 +0,0 @@
#include "reactors_distributed.hpp"
DEFINE_string(address, "127.0.0.1", "Network server bind address");
DEFINE_int32(port, 10000, "Network server bind port");
Network::Network() {}
/**
* ReturnAddressMsg implementation.
*/
ReturnAddressMsg::ReturnAddressMsg() {}
ReturnAddressMsg::ReturnAddressMsg(std::string channel)
: ReturnAddressMsg(current_reactor_->name(), channel) {}
ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel)
: address_(FLAGS_address),
port_(FLAGS_port),
reactor_(reactor),
channel_(channel) {}
std::string ReturnAddressMsg::Address() const { return address_; }
uint16_t ReturnAddressMsg::Port() const { return port_; }
std::string ReturnAddressMsg::ReactorName() const { return reactor_; }
std::string ReturnAddressMsg::ChannelName() const { return channel_; }
std::shared_ptr<ChannelWriter> ReturnAddressMsg::GetReturnChannelWriter() const {
if (address_ == FLAGS_address && port_ == FLAGS_port) {
return System::GetInstance().FindChannel(reactor_, channel_);
} else {
// TODO(zuza): we should probably assert here if services have been already started.
return Distributed::GetInstance().network().Resolve(address_, port_, reactor_, channel_);
}
assert(false);
}

View File

@ -1,350 +0,0 @@
#pragma once
#include <cassert>
#include <exception>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <tuple>
#include <typeindex>
#include <utility>
#include <gflags/gflags.h>
#include "protocol.hpp"
#include "reactors_local.hpp"
#include "cereal/archives/binary.hpp"
#include "cereal/types/base_class.hpp"
#include "cereal/types/memory.hpp"
#include "cereal/types/polymorphic.hpp"
#include "cereal/types/string.hpp"
#include "cereal/types/utility.hpp" // utility has to be included because of std::pair
#include "cereal/types/vector.hpp"
#include "communication/server.hpp"
#include "threading/sync/spinlock.hpp"
DECLARE_string(address);
DECLARE_int32(port);
/**
* Networking service.
*/
class Network {
private:
using Endpoint = Protocol::Endpoint;
using Socket = Protocol::Socket;
using NetworkServer = communication::Server<Protocol::Session,
Protocol::Socket, Protocol::Data>;
struct NetworkMessage {
NetworkMessage()
: address(""), port(0), reactor(""), channel(""), message(nullptr) {}
NetworkMessage(const std::string& _address, uint16_t _port,
const std::string& _reactor, const std::string& _channel,
std::unique_ptr<Message> _message)
: address(_address),
port(_port),
reactor(_reactor),
channel(_channel),
message(std::move(_message)) {}
NetworkMessage(NetworkMessage &&nm)
: address(std::move(nm.address)),
port(std::move(nm.port)),
reactor(std::move(nm.reactor)),
channel(std::move(nm.channel)),
message(std::move(nm.message)) {}
std::string address;
uint16_t port;
std::string reactor;
std::string channel;
std::unique_ptr<Message> message;
};
public:
Network();
// client functions
std::shared_ptr<ChannelWriter> Resolve(std::string address, uint16_t port,
std::string reactor_name,
std::string channel_name) {
if (Protocol::SendMessage(address, port, reactor_name, channel_name,
nullptr)) {
return std::make_shared<RemoteChannelWriter>(this, address, port, reactor_name,
channel_name);
}
LOG(WARNING) << "Could not resolve " << address << ":" << port << " " << reactor_name << "/" << channel_name;
return nullptr;
}
std::shared_ptr<EventStream> AsyncResolve(const std::string& address, uint16_t port,
int32_t retries,
std::chrono::seconds cooldown) {
// TODO: Asynchronously resolve channel, and return an event stream
// that emits the channel after it gets resolved.
return nullptr;
}
/** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */
void StartClient(int worker_count) {
LOG(INFO) << "Starting " << worker_count << " client workers";
client_run_ = true;
for (int i = 0; i < worker_count; ++i) {
pool_.push_back(std::thread([worker_count, this]() {
while (this->client_run_) {
this->mutex_.lock();
if (!this->queue_.empty()) {
NetworkMessage nm(std::move(this->queue_.front()));
this->queue_.pop();
this->mutex_.unlock();
// TODO: store success
bool success =
Protocol::SendMessage(nm.address, nm.port, nm.reactor,
nm.channel, std::move(nm.message));
DLOG(INFO) << "Network client message send status: " << success << std::endl;
} else {
this->mutex_.unlock();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}));
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void StopClient() {
while (true) {
std::lock_guard<SpinLock> lock(mutex_);
if (queue_.empty()) {
break;
}
}
client_run_ = false;
for (size_t i = 0; i < pool_.size(); ++i) {
pool_[i].join();
}
pool_.clear();
}
class RemoteChannelWriter : public ChannelWriter {
public:
RemoteChannelWriter(Network *network, std::string address, uint16_t port,
std::string reactor, std::string channel)
: network_(network),
address_(address),
port_(port),
reactor_(reactor),
channel_(channel) {}
virtual std::string Address() { return address_; }
virtual uint16_t Port() { return port_; }
virtual std::string ReactorName() { return reactor_; }
virtual std::string Name() { return channel_; }
virtual void Send(std::unique_ptr<Message> message) {
std::lock_guard<SpinLock> lock(network_->mutex_);
network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
std::move(message)));
}
private:
Network *network_;
std::string address_;
uint16_t port_;
std::string reactor_;
std::string channel_;
};
// server functions
std::string Address() { return FLAGS_address; }
uint16_t Port() { return FLAGS_port; }
/** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */
void StartServer(int workers_count) {
if (server_ != nullptr) {
LOG(FATAL) << "Tried to start a running server!";
}
// Initialize endpoint.
Endpoint endpoint;
try {
endpoint = Endpoint(FLAGS_address.c_str(), FLAGS_port);
} catch (io::network::NetworkEndpointException &e) {
LOG(FATAL) << e.what();
}
// Initialize socket.
Socket socket;
if (!socket.Bind(endpoint)) {
LOG(FATAL) << "Cannot bind to socket on " << FLAGS_address << " at "
<< FLAGS_port;
}
if (!socket.SetNonBlocking()) {
LOG(FATAL) << "Cannot set socket to non blocking!";
}
if (!socket.Listen(1024)) {
LOG(FATAL) << "Cannot listen on socket!";
}
// Initialize server
server_ =
std::make_unique<NetworkServer>(std::move(socket), protocol_data_);
// Start server
thread_ = std::thread(
[workers_count, this]() { this->server_->Start(workers_count); });
}
void StopServer() {
if (server_ != nullptr) {
server_->Shutdown();
thread_.join();
server_ = nullptr;
}
}
private:
// client variables
SpinLock mutex_;
std::vector<std::thread> pool_;
std::queue<NetworkMessage> queue_;
std::atomic<bool> client_run_;
// server variables
std::thread thread_;
Protocol::Data protocol_data_;
std::unique_ptr<NetworkServer> server_{nullptr};
};
/**
* Message that includes the sender channel used to respond.
*/
class ReturnAddressMsg : public Message {
public:
/* The return address is on the current reactor, specified channel */
ReturnAddressMsg(std::string channel);
/* The return address is on a specified reactor/channel */
ReturnAddressMsg(std::string reactor, std::string channel);
std::string Address() const;
uint16_t Port() const;
std::string ReactorName() const;
std::string ChannelName() const;
std::shared_ptr<ChannelWriter> GetReturnChannelWriter() const;
template<class Archive>
void serialize(Archive &ar) {
ar(cereal::virtual_base_class<Message>(this), address_, port_,
reactor_, channel_);
}
protected:
friend class cereal::access;
ReturnAddressMsg(); // Cereal needs access to a default constructor.
private:
std::string address_;
uint16_t port_;
std::string reactor_;
std::string channel_;
};
CEREAL_REGISTER_TYPE(ReturnAddressMsg);
/**
* Message that will arrive on a stream returned by Distributed::FindChannel
* once and if the channel is successfully resolved.
*/
class ChannelResolvedMessage : public Message {
public:
ChannelResolvedMessage() {}
ChannelResolvedMessage(std::shared_ptr<ChannelWriter> channel_writer)
: Message(), channel_writer_(channel_writer) {}
std::shared_ptr<ChannelWriter> channelWriter() const { return channel_writer_; }
private:
std::shared_ptr<ChannelWriter> channel_writer_;
};
/**
* Placeholder for all functionality related to non-local communication.
*
* E.g. resolve remote channels by memgraph node id, etc.
* Alive through the entire process lifetime.
* Singleton class. Created automatically on first use.
* Final (can't extend) because it's a singleton. Please be careful if you're changing this.
*/
class Distributed final {
public:
/**
* Get the (singleton) instance of Distributed.
*
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
*/
static Distributed &GetInstance() {
static Distributed distributed; // guaranteed to be destroyed, initialized on first use
return distributed;
}
void StartServices() {
network_.StartClient(4);
network_.StartServer(4);
}
void StopServices() {
network_.StopClient();
network_.StopServer();
}
// TODO: Implement remote Spawn.
/**
* Resolves remote channel.
*
* TODO: Provide asynchronous implementation of this function.
*
* @return EventStream on which message will arrive once channel is resolved.
* @warning It can only be called from local Reactor.
*/
EventStream* FindChannel(const std::string &address,
uint16_t port,
const std::string &reactor_name,
const std::string &channel_name) {
std::shared_ptr<ChannelWriter> channel_writer = nullptr;
while (!(channel_writer = network_.Resolve(address, port, reactor_name, channel_name)))
std::this_thread::sleep_for(std::chrono::milliseconds(200));
auto stream_channel = current_reactor_->Open();
stream_channel.second->Send<ChannelResolvedMessage>(channel_writer);
return stream_channel.first;
}
Network &network() { return network_; }
protected:
Distributed() {}
Network network_;
private:
Distributed(const Distributed &) = delete;
Distributed(Distributed &&) = delete;
Distributed &operator=(const Distributed &) = delete;
Distributed &operator=(Distributed &&) = delete;
};

View File

@ -1,134 +0,0 @@
#include "reactors_local.hpp"
void EventStream::Subscription::Unsubscribe() const {
event_queue_.RemoveCb(*this);
}
void EventStream::Subscription::CloseChannel() const {
event_queue_.Close();
}
const std::string& EventStream::Subscription::ChannelName() const {
return event_queue_.channel_name_;
}
thread_local Reactor* current_reactor_ = nullptr;
std::string Channel::LocalChannelWriter::ReactorName() {
return reactor_name_;
}
std::string Channel::LocalChannelWriter::Name() {
return channel_name_;
}
void Channel::Close() {
// TODO(zuza): there will be major problems if a reactor tries to close a stream that isn't theirs
// luckily this should never happen if the framework is used as expected.
current_reactor_->CloseChannel(channel_name_);
}
std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Reactor::Open(const std::string &channel_name) {
std::unique_lock<std::mutex> lock(*mutex_);
// TODO: Improve the check that the channel name does not exist in the
// system.
if (channels_.count(channel_name) != 0) {
throw std::runtime_error("Channel with name " + channel_name
+ "already exists");
}
auto it = channels_.emplace(channel_name,
std::make_shared<Channel>(Channel::Params{name_, channel_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Reactor::Open() {
std::unique_lock<std::mutex> lock(*mutex_);
do {
std::string channel_name = "stream-" + std::to_string(channel_name_counter_++);
if (channels_.count(channel_name) == 0) {
// Channel &queue = channels_[channel_name];
auto it = channels_.emplace(channel_name,
std::make_shared<Channel>(Channel::Params{name_, channel_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
} while (true);
}
const std::shared_ptr<ChannelWriter> Reactor::FindChannel(
const std::string &channel_name) {
std::unique_lock<std::mutex> lock(*mutex_);
auto it_channel = channels_.find(channel_name);
if (it_channel == channels_.end()) return nullptr;
return it_channel->second->LockedOpenChannel();
}
void Reactor::CloseChannel(const std::string &s) {
std::unique_lock<std::mutex> lock(*mutex_);
auto it = channels_.find(s);
assert(it != channels_.end());
channels_.erase(it);
cvar_->notify_all();
}
void Reactor::RunEventLoop() {
bool exit_event_loop = false;
while (true) {
// Find (or wait) for the next Message.
MsgAndCbInfo msg_and_cb;
{
std::unique_lock<std::mutex> lock(*mutex_);
while (true) {
// Not fair because was taken earlier, talk to lion.
msg_and_cb = LockedGetPendingMessages();
if (msg_and_cb.first != nullptr) break;
// Exit the loop if there are no more Channels.
if (channels_.empty()) {
exit_event_loop = true;
break;
}
cvar_->wait(lock);
}
if (exit_event_loop) break;
}
for (auto &cbAndSub : msg_and_cb.second) {
auto &cb = cbAndSub.first;
const Message &msg = *msg_and_cb.first;
cb(msg, cbAndSub.second);
}
}
}
/**
* Checks if there is any nonempty EventStream.
*/
auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
// return type after because the scope Reactor:: is not searched before the name
for (auto &channels_key_value : channels_) {
Channel &event_queue = *channels_key_value.second;
auto msg_ptr = event_queue.LockedPop();
if (msg_ptr == nullptr) continue;
std::type_index tidx = msg_ptr->GetTypeIndex();
std::vector<std::pair<EventStream::Callback, Subscription> > cb_info;
auto msg_type_cb_iter = event_queue.callbacks_.find(tidx);
if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type.
for (auto &tidx_cb_key_value : msg_type_cb_iter->second) {
uint64_t uid = tidx_cb_key_value.first;
EventStream::Callback cb = tidx_cb_key_value.second;
cb_info.emplace_back(cb, Subscription(event_queue, tidx, uid));
}
}
return MsgAndCbInfo(std::move(msg_ptr), std::move(cb_info));
}
return MsgAndCbInfo(nullptr, {});
}

View File

@ -1,540 +0,0 @@
#pragma once
#include <cassert>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include <utility>
#include "cereal/types/memory.hpp"
class EventStream;
class Reactor;
class System;
class Channel;
extern thread_local Reactor* current_reactor_;
/**
* Base class for messages.
*/
class Message {
public:
virtual ~Message() {}
template <class Archive>
void serialize(Archive &) {}
/** Run-time type identification that is used for callbacks.
*
* Warning: this works because of the virtual destructor, don't remove it from this class
*/
std::type_index GetTypeIndex() {
return typeid(*this);
}
};
/**
* Write-end of a Channel (between two reactors).
*/
class ChannelWriter {
public:
/**
* Construct and send the message to the channel.
*/
template<typename MsgType, typename... Args>
void Send(Args&&... args) {
Send(std::unique_ptr<Message>(new MsgType(std::forward<Args>(args)...)));
}
virtual void Send(std::unique_ptr<Message> ptr) = 0;
virtual std::string ReactorName() = 0;
virtual std::string Name() = 0;
void operator=(const ChannelWriter &) = delete;
template <class Archive>
void serialize(Archive &archive) {
archive(ReactorName(), Name());
}
};
/**
* Read-end of a Channel (between two reactors).
*/
class EventStream {
public:
class OnEventOnceChainer;
class Subscription;
/**
* Register a callback that will be called whenever an event arrives.
*/
template<typename MsgType>
void OnEvent(std::function<void(const MsgType&, const Subscription&)> &&cb) {
OnEventHelper(typeid(MsgType),
[cb = std::move(cb)](const Message &general_msg,
const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
cb(correct_msg, subscription);
});
}
/**
* Register a callback that will be called only once.
* Once event is received, channel of this EventStream is closed.
*/
template<typename MsgType>
void OnEventOnceThenClose(std::function<void(const MsgType&)> &&cb) {
OnEventHelper(typeid(MsgType),
[cb = std::move(cb)](const Message &general_msg,
const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
subscription.CloseChannel();
cb(correct_msg);
});
}
/**
* Starts a chain to register a callback that fires off only once.
*
* This method supports chaining (see the the class OnEventOnceChainer or the tests for examples).
* Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last
* chained callback fired.
*/
OnEventOnceChainer OnEventOnce() {
return OnEventOnceChainer(*this);
}
/**
* Get the name of the channel.
*/
virtual const std::string &ChannelName() = 0;
/**
* Subscription Service.
*
* Unsubscribe from a callback. Lightweight object (can copy by value).
*/
class Subscription {
public:
/**
* Unsubscribe. Call only once.
*/
void Unsubscribe() const;
/**
* Close the stream. Convenience method.
*/
void CloseChannel() const;
/**
* Get the name of the channel the message is delivered to.
*/
const std::string& ChannelName() const;
private:
friend class Reactor;
friend class Channel;
Subscription(Channel &event_queue, std::type_index tidx, uint64_t cb_uid)
: event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { }
Channel &event_queue_;
std::type_index tidx_;
uint64_t cb_uid_;
};
/**
* Close this event stream, disallowing further events from getting received.
*
* Any subsequent call after Close() to any function will be result in undefined
* behavior (invalid pointer dereference). Can only be called from the thread
* associated with the Reactor.
*/
virtual void Close() = 0;
/**
* Convenience class to chain one-off callbacks.
*
* Usage: Create this class with OnEventOnce() and then chain callbacks using ChainOnce.
* A callback will fire only once, unsubscribe and immediately subscribe the next callback to the stream.
*
* Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb);
*
* Implementation: This class is a temporary object that remembers the callbacks that are to be installed
* and finally installs them in the destructor. Not sure is this kosher, is there another way?
*/
class OnEventOnceChainer {
public:
OnEventOnceChainer(EventStream &event_stream) : event_stream_(event_stream) {}
~OnEventOnceChainer() {
InstallCallbacks();
}
template<typename MsgType>
OnEventOnceChainer &ChainOnce(std::function<void(const MsgType&, const Subscription&)> &&cb) {
std::function<void(const Message&, const Subscription&)> wrap =
[cb = std::move(cb)](const Message &general_msg, const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
subscription.Unsubscribe();
cb(correct_msg, subscription); // Warning: this can close the Channel, be careful what you put after it!
};
cbs_.emplace_back(typeid(MsgType), std::move(wrap));
return *this;
}
private:
void InstallCallbacks() {
int num_callbacks = cbs_.size();
assert(num_callbacks > 0); // We should install at least one callback, otherwise the usage is wrong?
std::function<void(const Message&, const Subscription&)> next_cb = nullptr;
std::type_index next_type = typeid(nullptr);
for (int i = num_callbacks - 1; i >= 0; --i) {
std::function<void(const Message&, const Subscription&)> tmp_cb = nullptr;
tmp_cb = [cb = std::move(cbs_[i].second),
next_type,
next_cb = std::move(next_cb),
es_ptr = &this->event_stream_](const Message &msg, const Subscription &subscription) {
cb(msg, subscription);
if (next_cb != nullptr) {
es_ptr->OnEventHelper(next_type, std::move(next_cb));
}
};
next_cb = std::move(tmp_cb);
next_type = cbs_[i].first;
}
event_stream_.OnEventHelper(next_type, std::move(next_cb));
}
EventStream &event_stream_;
std::vector<std::pair<std::type_index, std::function<void(const Message&, const Subscription&)>>> cbs_;
};
typedef std::function<void(const Message&, const Subscription&)> Callback;
private:
virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0;
};
using Subscription = EventStream::Subscription; // To write less.
/**
* Implementation of a channel.
*
* This class is an internal data structure that represents the state of the channel.
* This class is not meant to be used by the clients of the messaging framework.
* The Channel class wraps the event queue data structure, the mutex that protects
* concurrent access to the event queue, the local channel and the event stream.
* The class is owned by the Reactor. It gets closed when the owner reactor
* (the one that owns the read-end of a channel) removes/closes it.
*/
class Channel {
struct Params;
public:
friend class Reactor; // to create a Params initialization object
friend class EventStream::Subscription;
Channel(Params params)
: channel_name_(params.channel_name),
reactor_name_(params.reactor_name),
mutex_(params.mutex),
cvar_(params.cvar),
stream_(mutex_, this) {}
/**
* LocalChannelWriter represents the channels to reactors living in the same reactor system (write-end of the channels).
*
* Sending messages to the local channel requires acquiring the mutex.
* LocalChannelWriter holds a (weak) pointer to the enclosing Channel object.
* Messages sent to a closed channel are ignored.
* There can be multiple LocalChannelWriters refering to the same stream if needed.
*/
class LocalChannelWriter : public ChannelWriter {
public:
friend class Channel;
LocalChannelWriter(std::shared_ptr<std::mutex> mutex, std::string reactor_name,
std::string channel_name, std::weak_ptr<Channel> queue)
: mutex_(mutex),
reactor_name_(reactor_name),
channel_name_(channel_name),
weak_queue_(queue) {}
virtual void Send(std::unique_ptr<Message> m) {
std::shared_ptr<Channel> queue_ = weak_queue_.lock(); // Atomic, per the standard.
if (queue_) {
// We guarantee here that the Channel is not destroyed.
std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedPush(std::move(m));
}
}
virtual std::string ReactorName();
virtual std::string Name();
private:
std::shared_ptr<std::mutex> mutex_;
std::string reactor_name_;
std::string channel_name_;
std::weak_ptr<Channel> weak_queue_;
};
/**
* Implementation of the event stream.
*
* After the enclosing Channel object is destroyed (by a call to CloseChannel or Close).
*/
class LocalEventStream : public EventStream {
public:
friend class Channel;
LocalEventStream(std::shared_ptr<std::mutex> mutex, Channel *queue) : mutex_(mutex), queue_(queue) {}
void OnEventHelper(std::type_index tidx, Callback callback) {
std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedOnEventHelper(tidx, callback);
}
const std::string &ChannelName() {
return queue_->channel_name_;
}
void Close() {
queue_->Close();
}
private:
std::shared_ptr<std::mutex> mutex_;
std::string channel_name_;
Channel *queue_;
};
/**
* Close the channel. Must be called from the reactor that owns the channel.
*/
void Close();
Channel(const Channel &other) = delete;
Channel(Channel &&other) = default;
Channel &operator=(const Channel &other) = delete;
Channel &operator=(Channel &&other) = default;
private:
/**
* Initialization parameters to Channel.
* Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor.
*/
struct Params {
std::string reactor_name;
std::string channel_name;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cvar;
};
void LockedPush(std::unique_ptr<Message> m) {
queue_.emplace(std::move(m));
// This is OK because there is only one Reactor (thread) that can wait on this Channel.
cvar_->notify_one();
}
std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
return std::make_shared<LocalChannelWriter>(mutex_, reactor_name_, channel_name_, self_ptr_);
}
std::unique_ptr<Message> LockedPop() {
return LockedRawPop();
}
void LockedOnEventHelper(std::type_index tidx, EventStream::Callback callback) {
uint64_t cb_uid = next_cb_uid++;
callbacks_[tidx][cb_uid] = callback;
}
std::unique_ptr<Message> LockedRawPop() {
if (queue_.empty()) return nullptr;
std::unique_ptr<Message> t = std::move(queue_.front());
queue_.pop();
return t;
}
void RemoveCb(const EventStream::Subscription &subscription) {
std::unique_lock<std::mutex> lock(*mutex_);
size_t num_erased = callbacks_[subscription.tidx_].erase(subscription.cb_uid_);
assert(num_erased == 1);
}
std::string channel_name_;
std::string reactor_name_;
std::queue<std::unique_ptr<Message>> queue_;
// Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive.
std::shared_ptr<std::mutex> mutex_;
std::shared_ptr<std::condition_variable> cvar_;
/**
* A weak_ptr to itself.
*
* There are initialization problems with this, check Params.
*/
std::weak_ptr<Channel> self_ptr_;
LocalEventStream stream_;
std::unordered_map<std::type_index, std::unordered_map<uint64_t, EventStream::Callback> > callbacks_;
uint64_t next_cb_uid = 0;
};
/**
* A single unit of concurrent execution in the system.
*
* E.g. one worker, one client. Owned by System. Has a thread associated with it.
*/
class Reactor {
public:
friend class System;
Reactor(std::string name)
: name_(name), main_(Open("main")) {}
virtual ~Reactor() {}
virtual void Run() = 0;
std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Open(const std::string &s);
std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Open();
const std::shared_ptr<ChannelWriter> FindChannel(const std::string &channel_name);
/**
* Close a channel by name.
*
* Should only be called from the Reactor thread.
*/
void CloseChannel(const std::string &s);
/**
* Get Reactor name
*/
const std::string &name() { return name_; }
Reactor(const Reactor &other) = delete;
Reactor(Reactor &&other) = default;
Reactor &operator=(const Reactor &other) = delete;
Reactor &operator=(Reactor &&other) = default;
protected:
std::string name_;
/*
* Locks all Reactor data, including all Channel's in channels_.
*
* This should be a shared_ptr because LocalChannelWriter can outlive Reactor.
*/
std::shared_ptr<std::mutex> mutex_ =
std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> cvar_ =
std::make_shared<std::condition_variable>();
/**
* List of channels of a reactor indexed by name.
*
* While the channels are owned by the reactor, a shared_ptr to solve the circular reference problem
* between ChannelWriters and EventStreams.
*/
std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
int64_t channel_name_counter_{0};
std::pair<EventStream*, std::shared_ptr<ChannelWriter>> main_;
private:
typedef std::pair<std::unique_ptr<Message>,
std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > > MsgAndCbInfo;
/**
* Dispatches all waiting messages to callbacks. Shuts down when there are no callbacks left.
*/
void RunEventLoop();
// TODO: remove proof of locking evidence ?!
MsgAndCbInfo LockedGetPendingMessages();
};
/**
* Global placeholder for all reactors in the system.
*
* E.g. holds set of reactors, channels for all reactors.
* Alive through the entire process lifetime.
* Singleton class. Created automatically on first use.
* Final (can't extend) because it's a singleton. Please be careful if you're changing this.
*/
class System final {
public:
friend class Reactor;
/**
* Get the (singleton) instance of System.
*
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
*/
static System &GetInstance() {
static System system; // guaranteed to be destroyed, initialized on first use
return system;
}
template <class ReactorType, class... Args>
const std::shared_ptr<ChannelWriter> Spawn(const std::string &name,
Args &&... args) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
auto *raw_reactor =
new ReactorType(name, std::forward<Args>(args)...);
std::unique_ptr<Reactor> reactor(raw_reactor);
// Capturing a pointer isn't ideal, I would prefer to capture a Reactor&, but not sure how to do it.
std::thread reactor_thread(
[this, raw_reactor]() { this->StartReactor(*raw_reactor); });
assert(reactors_.count(name) == 0);
reactors_.emplace(name, std::pair<std::unique_ptr<Reactor>, std::thread>
(std::move(reactor), std::move(reactor_thread)));
return nullptr;
}
const std::shared_ptr<ChannelWriter> FindChannel(const std::string &reactor_name,
const std::string &channel_name) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
auto it_reactor = reactors_.find(reactor_name);
if (it_reactor == reactors_.end()) return nullptr;
return it_reactor->second.first->FindChannel(channel_name);
}
void AwaitShutdown() {
for (auto &key_value : reactors_) {
auto &thread = key_value.second.second;
thread.join();
}
reactors_.clear(); // for testing, since System is a singleton now
}
private:
System() {}
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
System &operator=(System &&) = delete;
void StartReactor(Reactor &reactor) {
current_reactor_ = &reactor;
reactor.Run();
reactor.RunEventLoop(); // Activate callbacks.
}
std::recursive_mutex mutex_;
// TODO: Replace with a map to a reactor Channel map to have more granular
// locking.
std::unordered_map<std::string,
std::pair<std::unique_ptr<Reactor>, std::thread>>
reactors_;
};

View File

@ -1,12 +1,14 @@
#include <iostream>
#include <fstream> #include <fstream>
#include <iostream>
#include <glog/logging.h> #include <glog/logging.h>
#include "memgraph_config.hpp" #include "memgraph_config.hpp"
#include "reactors_distributed.hpp" #include "reactors_distributed.hpp"
DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be
// assigned by the leader once in
// the future
class MemgraphDistributed { class MemgraphDistributed {
private: private:
@ -16,32 +18,35 @@ class MemgraphDistributed {
/** /**
* Get the (singleton) instance of MemgraphDistributed. * Get the (singleton) instance of MemgraphDistributed.
* *
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern * More info:
* https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
*/ */
static MemgraphDistributed &GetInstance() { static MemgraphDistributed &GetInstance() {
static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use static MemgraphDistributed
memgraph; // guaranteed to be destroyed, initialized on first use
return memgraph; return memgraph;
} }
/** Register memgraph node id to the given location. */ /** Register memgraph node id to the given location. */
void RegisterMemgraphNode(int64_t mnid, const std::string &address, uint16_t port) { void RegisterMemgraphNode(int64_t mnid, const std::string &address,
std::unique_lock<std::recursive_mutex> lock(mutex_); uint16_t port) {
std::unique_lock<std::mutex> lock(mutex_);
mnodes_[mnid] = Location(address, port); mnodes_[mnid] = Location(address, port);
} }
EventStream* FindChannel(int64_t mnid, EventStream *FindChannel(int64_t mnid, const std::string &reactor,
const std::string &reactor,
const std::string &channel) { const std::string &channel) {
std::unique_lock<std::recursive_mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
const auto &location = mnodes_.at(mnid); const auto &location = mnodes_.at(mnid);
return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel); return Distributed::GetInstance().FindChannel(
location.first, location.second, reactor, channel);
} }
protected: protected:
MemgraphDistributed() {} MemgraphDistributed() {}
private: private:
std::recursive_mutex mutex_; std::mutex mutex_;
std::unordered_map<int64_t, Location> mnodes_; std::unordered_map<int64_t, Location> mnodes_;
MemgraphDistributed(const MemgraphDistributed &) = delete; MemgraphDistributed(const MemgraphDistributed &) = delete;
@ -64,8 +69,8 @@ class MemgraphDistributed {
* *
* @return Pair (master mnid, list of worker's id). * @return Pair (master mnid, list of worker's id).
*/ */
std::pair<int64_t, std::vector<int64_t>> std::pair<int64_t, std::vector<int64_t>> ParseConfigAndRegister(
ParseConfigAndRegister(const std::string &filename) { const std::string &filename) {
std::ifstream file(filename, std::ifstream::in); std::ifstream file(filename, std::ifstream::in);
assert(file.good()); assert(file.good());
int64_t master_mnid; int64_t master_mnid;
@ -78,8 +83,7 @@ std::pair<int64_t, std::vector<int64_t>>
memgraph.RegisterMemgraphNode(master_mnid, address, port); memgraph.RegisterMemgraphNode(master_mnid, address, port);
while (file.good()) { while (file.good()) {
file >> mnid >> address >> port; file >> mnid >> address >> port;
if (file.eof()) if (file.eof()) break;
break ;
memgraph.RegisterMemgraphNode(mnid, address, port); memgraph.RegisterMemgraphNode(mnid, address, port);
worker_mnids.push_back(mnid); worker_mnids.push_back(mnid);
} }
@ -91,9 +95,9 @@ std::pair<int64_t, std::vector<int64_t>>
* Sends a text message and has a return address. * Sends a text message and has a return address.
*/ */
class TextMessage : public ReturnAddressMsg { class TextMessage : public ReturnAddressMsg {
public: public:
TextMessage(std::string reactor, std::string channel, std::string s) TextMessage(std::string reactor, std::string channel, std::string s)
: ReturnAddressMsg(reactor, channel), text(s) {} : ReturnAddressMsg(reactor, channel), text(s) {}
template <class Archive> template <class Archive>
void serialize(Archive &archive) { void serialize(Archive &archive) {
@ -102,51 +106,52 @@ public:
std::string text; std::string text;
protected: protected:
friend class cereal::access; friend class cereal::access;
TextMessage() {} // Cereal needs access to a default constructor. TextMessage() {} // Cereal needs access to a default constructor.
}; };
CEREAL_REGISTER_TYPE(TextMessage); CEREAL_REGISTER_TYPE(TextMessage);
class Master : public Reactor { class Master : public Reactor {
public: public:
Master(std::string name, int64_t mnid, std::vector<int64_t> &&worker_mnids) Master(std::string name, int64_t mnid, std::vector<int64_t> &&worker_mnids)
: Reactor(name), mnid_(mnid), : Reactor(name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {}
worker_mnids_(std::move(worker_mnids)) {}
virtual void Run() { virtual void Run() {
MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance(); MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
Distributed &distributed = Distributed::GetInstance(); Distributed &distributed = Distributed::GetInstance();
std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address() std::cout << "Master (" << mnid_ << ") @ "
<< ":" << distributed.network().Port() << std::endl; << distributed.network().Address() << ":"
<< distributed.network().Port() << std::endl;
auto stream = main_.first; auto stream = main_.first;
// wait until every worker sends a ReturnAddressMsg back, then close // wait until every worker sends a ReturnAddressMsg back, then close
stream->OnEvent<TextMessage>([this](const TextMessage &msg, stream->OnEvent<TextMessage>(
const Subscription &subscription) { [this](const TextMessage &msg, const Subscription &subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; std::cout << "Message from " << msg.Address() << ":" << msg.Port()
++workers_seen; << " .. " << msg.text << "\n";
if (workers_seen == worker_mnids_.size()) { ++workers_seen;
subscription.Unsubscribe(); if (workers_seen == static_cast<int64_t>(worker_mnids_.size())) {
// Sleep for a while so we can read output in the terminal. subscription.Unsubscribe();
// (start_distributed.py runs each process in a new tab which is // Sleep for a while so we can read output in the terminal.
// closed immediately after process has finished) // (start_distributed.py runs each process in a new tab which is
std::this_thread::sleep_for(std::chrono::seconds(4)); // closed immediately after process has finished)
CloseChannel("main"); std::this_thread::sleep_for(std::chrono::seconds(4));
} CloseChannel("main");
}); }
});
// send a TextMessage to each worker // send a TextMessage to each worker
for (auto wmnid : worker_mnids_) { for (auto wmnid : worker_mnids_) {
auto stream = memgraph.FindChannel(wmnid, "worker", "main"); auto stream = memgraph.FindChannel(wmnid, "worker", "main");
stream->OnEventOnce() stream->OnEventOnce().ChainOnce<ChannelResolvedMessage>([this, stream](
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg, const Subscription&){ const ChannelResolvedMessage &msg, const Subscription &) {
msg.channelWriter()->Send<TextMessage>("master", "main", "hi from master"); msg.channelWriter()->Send<TextMessage>("master", "main",
stream->Close(); "hi from master");
}); stream->Close();
});
} }
} }
@ -159,28 +164,29 @@ class Master : public Reactor {
class Worker : public Reactor { class Worker : public Reactor {
public: public:
Worker(std::string name, int64_t mnid, int64_t master_mnid) Worker(std::string name, int64_t mnid, int64_t master_mnid)
: Reactor(name), mnid_(mnid), : Reactor(name), mnid_(mnid), master_mnid_(master_mnid) {}
master_mnid_(master_mnid) {}
virtual void Run() { virtual void Run() {
Distributed &distributed = Distributed::GetInstance(); Distributed &distributed = Distributed::GetInstance();
std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address() std::cout << "Worker (" << mnid_ << ") @ "
<< ":" << distributed.network().Port() << std::endl; << distributed.network().Address() << ":"
<< distributed.network().Port() << std::endl;
auto stream = main_.first; auto stream = main_.first;
// wait until master sends us a TextMessage, then reply back and close // wait until master sends us a TextMessage, then reply back and close
stream->OnEventOnce() stream->OnEventOnce().ChainOnce<TextMessage>(
.ChainOnce<TextMessage>([this](const TextMessage &msg, const Subscription&) { [this](const TextMessage &msg, const Subscription &) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; std::cout << "Message from " << msg.Address() << ":" << msg.Port()
<< " .. " << msg.text << "\n";
msg.GetReturnChannelWriter() msg.GetReturnChannelWriter()->Send<TextMessage>("worker", "main",
->Send<TextMessage>("worker", "main", "hi from worker"); "hi from worker");
// Sleep for a while so we can read output in the terminal. // Sleep for a while so we can read output in the terminal.
std::this_thread::sleep_for(std::chrono::seconds(4)); std::this_thread::sleep_for(std::chrono::seconds(4));
CloseChannel("main"); CloseChannel("main");
}); });
} }
protected: protected:
@ -188,7 +194,6 @@ class Worker : public Reactor {
const int64_t master_mnid_; const int64_t master_mnid_;
}; };
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]); google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true); gflags::ParseCommandLineFlags(&argc, &argv, true);

View File

@ -1,227 +0,0 @@
/**
* This test file test the Distributed Reactors API on ONLY one process (no real networking).
* In other words, we send a message from one process to itself.
*/
#include "gtest/gtest.h"
#include "reactors_distributed.hpp"
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <future>
/**
* Test do the services start up without crashes.
*/
TEST(SimpleTests, StartAndStopServices) {
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
// do nothing
std::this_thread::sleep_for(std::chrono::milliseconds(500));
system.AwaitShutdown();
distributed.StopServices();
}
/**
* Test simple message reception.
*
* Data flow:
* (1) Send an empty message from Master to Worker/main
*/
TEST(SimpleTests, SendEmptyMessage) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& subscription) {
msg.channelWriter()->Send<Message>();
subscription.CloseChannel();
});
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
main_.first->OnEventOnce()
.ChainOnce<Message>([this](const Message&, const Subscription& subscription) {
// if this message isn't delivered, the main channel will never be closed and we infinite loop
subscription.CloseChannel(); // close "main"
});
}
};
// emulate flags like it's a multiprocess system, these may be alredy set by default
FLAGS_address = "127.0.0.1";
FLAGS_port = 10000;
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown(); // this must be called before StopServices
distributed.StopServices();
}
/**
* Test ReturnAddressMsg functionality.
*
* Data flow:
* (1) Send an empty message from Master to Worker/main
* (2) Send an empty message from Worker to Master/main
*/
TEST(SimpleTests, SendReturnAddressMessage) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& sub) {
// send a message that will be returned to "main"
msg.channelWriter()->Send<ReturnAddressMsg>(this->name(), "main");
// close this anonymous channel
sub.CloseChannel();
});
main_.first->OnEventOnce()
.ChainOnce<Message>([this](const Message&, const Subscription& sub) {
// if this message isn't delivered, the main channel will never be closed and we infinite loop
// close the "main" channel
sub.CloseChannel();
});
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
main_.first->OnEventOnce()
.ChainOnce<ReturnAddressMsg>([this](const ReturnAddressMsg &msg, const Subscription& sub) {
msg.GetReturnChannelWriter()->Send<Message>();
sub.CloseChannel(); // close "main"
});
}
};
// emulate flags like it's a multiprocess system, these may be alredy set by default
FLAGS_address = "127.0.0.1";
FLAGS_port = 10000;
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown(); // this must be called before StopServices
distributed.StopServices();
}
// Apparently templates cannot be declared inside local classes, figure out how to move it in?
// For that reason I obscured the name.
struct SerializableMessage_TextMessage : public ReturnAddressMsg {
SerializableMessage_TextMessage(std::string channel, std::string arg_text, int arg_val)
: ReturnAddressMsg(channel), text(arg_text), val(arg_val) {}
std::string text;
int val;
template<class Archive>
void serialize(Archive &ar) {
ar(cereal::virtual_base_class<ReturnAddressMsg>(this), text, val);
}
protected:
friend class cereal::access;
SerializableMessage_TextMessage() {} // Cereal needs access to a default constructor.
};
CEREAL_REGISTER_TYPE(SerializableMessage_TextMessage);
/**
* Test serializability of a complex message over the network layer.
*
* Data flow:
* (1) Send ("hi", 123) from Master to Worker/main
* (2) Send ("hi back", 779) from Worker to Master/main
*/
TEST(SimpleTests, SendSerializableMessage) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
const Subscription& sub) {
// send a message that will be returned to "main"
msg.channelWriter()->Send<SerializableMessage_TextMessage>("main", "hi", 123);
// close this anonymous channel
sub.CloseChannel();
});
main_.first->OnEventOnce()
.ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage& msg, const Subscription& sub) {
ASSERT_EQ(msg.text, "hi back");
ASSERT_EQ(msg.val, 779);
// if this message isn't delivered, the main channel will never be closed and we infinite loop
// close the "main" channel
sub.CloseChannel();
});
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
main_.first->OnEventOnce()
.ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage &msg, const Subscription& sub) {
ASSERT_EQ(msg.text, "hi");
ASSERT_EQ(msg.val, 123);
msg.GetReturnChannelWriter()->Send<SerializableMessage_TextMessage>
("no channel, dont use this", "hi back", 779);
sub.CloseChannel(); // close "main"
});
}
};
// emulate flags like it's a multiprocess system, these may be alredy set by default
FLAGS_address = "127.0.0.1";
FLAGS_port = 10000;
System &system = System::GetInstance();
Distributed &distributed = Distributed::GetInstance();
distributed.StartServices();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown(); // this must be called before StopServices
distributed.StopServices();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,483 +0,0 @@
#include "reactors_local.hpp"
#include "gtest/gtest.h"
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
TEST(SystemTest, ReturnWithoutThrowing) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() { CloseChannel("main"); }
};
System &system = System::GetInstance();
ASSERT_NO_THROW(system.Spawn<Master>("master"));
ASSERT_NO_THROW(system.AwaitShutdown());
}
TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Open("channel");
ASSERT_THROW(Open("channel"), std::runtime_error);
CloseChannel("main");
CloseChannel("channel");
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.AwaitShutdown();
}
TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseChannel("main");
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, OneCallback) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(888);
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream *stream = main_.first;
stream->OnEvent<MessageInt>(
[this](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 888);
CloseChannel("main");
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, IgnoreAfterClose) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(101);
channel_writer->Send<MessageInt>(102); // should be ignored
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(103); // should be ignored
channel_writer->Send<MessageInt>(104); // should be ignored
CloseChannel(
"main"); // Write-end doesn't need to be closed because it's in RAII.
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream *stream = main_.first;
stream->OnEvent<MessageInt>(
[this](const MessageInt &msg, const Subscription &) {
CloseChannel("main");
ASSERT_EQ(msg.x, 101);
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, DuringFirstEvent) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(std::string name, std::promise<int> p)
: Reactor(name), p_(std::move(p)) {}
virtual void Run() {
EventStream *stream = main_.first;
stream->OnEvent<MessageInt>(
[this](const Message &msg, const Subscription &subscription) {
const MessageInt &msgint = dynamic_cast<const MessageInt &>(msg);
if (msgint.x == 101) FindChannel("main")->Send<MessageInt>(102);
if (msgint.x == 102) {
subscription.Unsubscribe();
CloseChannel("main");
p_.set_value(777);
}
});
std::shared_ptr<ChannelWriter> channel_writer = FindChannel("main");
channel_writer->Send<MessageInt>(101);
}
std::promise<int> p_;
};
System &system = System::GetInstance();
std::promise<int> p;
auto f = p.get_future();
system.Spawn<Master>("master", std::move(p));
f.wait();
ASSERT_EQ(f.get(), 777);
system.AwaitShutdown();
}
TEST(MultipleSendTest, UnsubscribeService) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(55);
channel_writer->Send<MessageInt>(66);
channel_writer->Send<MessageInt>(77);
channel_writer->Send<MessageInt>(88);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageChar>('a');
channel_writer->Send<MessageChar>('b');
channel_writer->Send<MessageChar>('c');
channel_writer->Send<MessageChar>('d');
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
int num_msgs_received = 0;
virtual void Run() {
EventStream *stream = main_.first;
stream->OnEvent<MessageInt>(
[this](const MessageInt &msgint, const Subscription &subscription) {
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
++num_msgs_received;
if (msgint.x == 66) {
subscription.Unsubscribe(); // receive only two of them
}
});
stream->OnEvent<MessageChar>(
[this](const MessageChar &msgchar, const Subscription &subscription) {
char c = msgchar.x;
++num_msgs_received;
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
if (num_msgs_received == 5) {
subscription.Unsubscribe();
CloseChannel("main");
}
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(MultipleSendTest, OnEvent) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(101);
channel_writer->Send<MessageChar>('a');
channel_writer->Send<MessageInt>(103);
channel_writer->Send<MessageChar>('b');
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
struct EndMessage : Message {};
int correct_vals = 0;
virtual void Run() {
EventStream *stream = main_.first;
correct_vals = 0;
stream->OnEvent<MessageInt>(
[this](const MessageInt &msgint, const Subscription &) {
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
++correct_vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<MessageChar>(
[this](const MessageChar &msgchar, const Subscription &) {
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
++correct_vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>(
[this](const EndMessage &, const Subscription &) {
ASSERT_LE(correct_vals, 4);
if (correct_vals == 4) {
CloseChannel("main");
}
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(MultipleSendTest, Chaining) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(55);
channel_writer->Send<MessageInt>(66);
channel_writer->Send<MessageInt>(77);
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream *stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>(
[this](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageInt>(
[](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 66);
})
.ChainOnce<MessageInt>(
[this](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 77);
CloseChannel("main");
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(MultipleSendTest, ChainingInRightOrder) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageChar>('a');
channel_writer->Send<MessageInt>(55);
channel_writer->Send<MessageChar>('b');
channel_writer->Send<MessageInt>(77);
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream *stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>(
[this](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageChar>(
[](const MessageChar &msg, const Subscription &) {
ASSERT_EQ(msg.x, 'b');
})
.ChainOnce<MessageInt>(
[this](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 77);
CloseChannel("main");
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(MultipleSendTest, ProcessManyMessages) {
const static int num_tests = 100;
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer =
System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
for (int i = 0; i < num_tests; ++i) {
channel_writer->Send<MessageInt>(rand());
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
}
CloseChannel("main");
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
struct EndMessage : Message {};
int vals = 0;
virtual void Run() {
EventStream *stream = main_.first;
vals = 0;
stream->OnEvent<MessageInt>(
[this](const Message &, const Subscription &) {
++vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>(
[this](const Message &, const Subscription &) {
ASSERT_LE(vals, num_tests);
if (vals == num_tests) {
CloseChannel("main");
}
});
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -163,3 +163,6 @@ import_header_library(cppitertools ${CMAKE_CURRENT_SOURCE_DIR})
# Setup json # Setup json
import_header_library(json ${CMAKE_CURRENT_SOURCE_DIR}) import_header_library(json ${CMAKE_CURRENT_SOURCE_DIR})
# Setup cereal
import_header_library(cereal "${CMAKE_CURRENT_SOURCE_DIR}/cereal/include")

View File

@ -10,9 +10,9 @@ cd ${working_dir}
# antlr # antlr
antlr_generator_filename="antlr-4.6-complete.jar" antlr_generator_filename="antlr-4.6-complete.jar"
#wget -O ${antlr_generator_filename} http://www.antlr.org/download/${antlr_generator_filename} # wget -O ${antlr_generator_filename} http://www.antlr.org/download/${antlr_generator_filename}
wget -nv -O ${antlr_generator_filename} http://deps.memgraph.io/${antlr_generator_filename} wget -nv -O ${antlr_generator_filename} http://deps.memgraph.io/${antlr_generator_filename}
#git clone https://github.com/antlr/antlr4.git # git clone https://github.com/antlr/antlr4.git
git clone git://deps.memgraph.io/antlr4.git git clone git://deps.memgraph.io/antlr4.git
antlr4_tag="aacd2a2c95816d8dc1c05814051d631bfec4cf3e" # v4.6 antlr4_tag="aacd2a2c95816d8dc1c05814051d631bfec4cf3e" # v4.6
cd antlr4 cd antlr4
@ -23,7 +23,7 @@ cd ..
# Use our fork that uses experimental/optional instead of unique_ptr in # Use our fork that uses experimental/optional instead of unique_ptr in
# DerefHolder. Once we move memgraph to c++17 we can use cpp17 branch from # DerefHolder. Once we move memgraph to c++17 we can use cpp17 branch from
# original repo. # original repo.
#git clone https://github.com/memgraph/cppitertools.git # git clone https://github.com/memgraph/cppitertools.git
git clone git://deps.memgraph.io/cppitertools.git git clone git://deps.memgraph.io/cppitertools.git
cd cppitertools cd cppitertools
cppitertools_tag="4231e0bc6fba2737b2a7a8a1576cf06186b0de6a" # experimental_optional 17 Aug 2017 cppitertools_tag="4231e0bc6fba2737b2a7a8a1576cf06186b0de6a" # experimental_optional 17 Aug 2017
@ -31,7 +31,7 @@ git checkout ${cppitertools_tag}
cd .. cd ..
# fmt # fmt
#git clone https://github.com/fmtlib/fmt.git # git clone https://github.com/fmtlib/fmt.git
git clone git://deps.memgraph.io/fmt.git git clone git://deps.memgraph.io/fmt.git
fmt_tag="7fa8f8fa48b0903deab5bb42e6760477173ac485" # v3.0.1 fmt_tag="7fa8f8fa48b0903deab5bb42e6760477173ac485" # v3.0.1
# Commit which fixes an issue when compiling with C++14 and higher. # Commit which fixes an issue when compiling with C++14 and higher.
@ -42,7 +42,7 @@ git cherry-pick -n ${fmt_cxx14_fix}
cd .. cd ..
# rapidcheck # rapidcheck
#git clone https://github.com/emil-e/rapidcheck.git # git clone https://github.com/emil-e/rapidcheck.git
git clone git://deps.memgraph.io/rapidcheck.git git clone git://deps.memgraph.io/rapidcheck.git
rapidcheck_tag="853e14f0f4313a9eb3c71e24848373e7b843dfd1" # Jun 23, 2017 rapidcheck_tag="853e14f0f4313a9eb3c71e24848373e7b843dfd1" # Jun 23, 2017
cd rapidcheck cd rapidcheck
@ -50,7 +50,7 @@ git checkout ${rapidcheck_tag}
cd .. cd ..
# google benchmark # google benchmark
#git clone https://github.com/google/benchmark.git # git clone https://github.com/google/benchmark.git
git clone git://deps.memgraph.io/benchmark.git git clone git://deps.memgraph.io/benchmark.git
benchmark_tag="4f8bfeae470950ef005327973f15b0044eceaceb" # v1.1.0 benchmark_tag="4f8bfeae470950ef005327973f15b0044eceaceb" # v1.1.0
cd benchmark cd benchmark
@ -58,7 +58,7 @@ git checkout ${benchmark_tag}
cd .. cd ..
# google test # google test
#git clone https://github.com/google/googletest.git # git clone https://github.com/google/googletest.git
git clone git://deps.memgraph.io/googletest.git git clone git://deps.memgraph.io/googletest.git
googletest_tag="ec44c6c1675c25b9827aacd08c02433cccde7780" # v1.8.0 googletest_tag="ec44c6c1675c25b9827aacd08c02433cccde7780" # v1.8.0
cd googletest cd googletest
@ -66,7 +66,7 @@ git checkout ${googletest_tag}
cd .. cd ..
# google logging # google logging
#git clone https://github.com/memgraph/glog.git # git clone https://github.com/memgraph/glog.git
git clone git://deps.memgraph.io/glog.git git clone git://deps.memgraph.io/glog.git
glog_tag="a6ee5ef590190cdb9f69cccc2db99dc5994b2f92" # custom version (v0.3.5+) glog_tag="a6ee5ef590190cdb9f69cccc2db99dc5994b2f92" # custom version (v0.3.5+)
cd glog cd glog
@ -74,7 +74,7 @@ git checkout ${glog_tag}
cd .. cd ..
# lcov-to-coberatura-xml # lcov-to-coberatura-xml
#git clone https://github.com/eriwen/lcov-to-cobertura-xml.git # git clone https://github.com/eriwen/lcov-to-cobertura-xml.git
git clone git://deps.memgraph.io/lcov-to-cobertura-xml.git git clone git://deps.memgraph.io/lcov-to-cobertura-xml.git
lcov_to_xml_tag="59584761cb5da4687693faec05bf3e2b74e9dde9" # Dec 6, 2016 lcov_to_xml_tag="59584761cb5da4687693faec05bf3e2b74e9dde9" # Dec 6, 2016
cd lcov-to-cobertura-xml cd lcov-to-cobertura-xml
@ -82,7 +82,7 @@ git checkout ${lcov_to_xml_tag}
cd .. cd ..
# google flags # google flags
#git clone https://github.com/memgraph/gflags.git # git clone https://github.com/memgraph/gflags.git
git clone git://deps.memgraph.io/gflags.git git clone git://deps.memgraph.io/gflags.git
gflags_tag="b37ceb03a0e56c9f15ce80409438a555f8a67b7c" # custom version (May 6, 2017) gflags_tag="b37ceb03a0e56c9f15ce80409438a555f8a67b7c" # custom version (May 6, 2017)
cd gflags cd gflags
@ -106,14 +106,19 @@ rm postgres.tar.gz
# We use head on Sep 1, 2017 instead of last release since it was long time ago. # We use head on Sep 1, 2017 instead of last release since it was long time ago.
mkdir json mkdir json
cd json cd json
#wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp" # wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp"
wget -nv http://deps.memgraph.io/json.hpp wget -nv http://deps.memgraph.io/json.hpp
cd .. cd ..
#ltalloc # ltalloc
#git clone https://github.com/r-lyeh/ltalloc.git # git clone https://github.com/r-lyeh/ltalloc.git
git clone git://deps.memgraph.io/ltalloc.git git clone git://deps.memgraph.io/ltalloc.git
ltalloc_tag="aefde2afa5cd49c9d1a797aa08ec08b2bec13a36" # Sep 15, 2017 ltalloc_tag="aefde2afa5cd49c9d1a797aa08ec08b2bec13a36" # Sep 15, 2017
cd ltalloc cd ltalloc
git checkout ${ltalloc_tag} git checkout ${ltalloc_tag}
# cereal
git clone https://github.com/USCiLab/cereal.git
cd cereal
git checkout v1.2.2
cd .. cd ..

View File

@ -5,10 +5,9 @@
#include "glog/logging.h" #include "glog/logging.h"
namespace Protocol { namespace protocol {
Session::Session(Socket &&socket, Data &) Session::Session(Socket &&socket, Data &) : socket_(std::move(socket)) {
: socket_(std::move(socket)) {
event_.data.ptr = this; event_.data.ptr = this;
} }
@ -22,9 +21,12 @@ std::string Session::GetStringAndShift(SizeT len) {
void Session::Execute() { void Session::Execute() {
if (!handshake_done_) { if (!handshake_done_) {
// Note: this function can be multiple times before the buffer has the full packet. // Note: this function can be multiple times before the buffer has the full
// We currently have to check for this case and return without shifting the buffer. // packet.
// In other words, only shift anything from the buffer if you can read the entire (sub)message. // We currently have to check for this case and return without shifting
// the buffer.
// In other words, only shift anything from the buffer if you can read the
// entire (sub)message.
if (buffer_.size() < 2 * sizeof(SizeT)) return; if (buffer_.size() < 2 * sizeof(SizeT)) return;
SizeT len_reactor = GetLength(); SizeT len_reactor = GetLength();
@ -56,7 +58,7 @@ void Session::Execute() {
// TODO: check for exceptions // TODO: check for exceptions
std::istringstream stream; std::istringstream stream;
stream.str(std::string(reinterpret_cast<char*>(buffer_.data()), len_data)); stream.str(std::string(reinterpret_cast<char *>(buffer_.data()), len_data));
cereal::BinaryInputArchive iarchive{stream}; cereal::BinaryInputArchive iarchive{stream};
std::unique_ptr<Message> message{nullptr}; std::unique_ptr<Message> message{nullptr};
iarchive(message); iarchive(message);
@ -157,7 +159,7 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor,
LOG(INFO) << "Couldn't send message size!"; LOG(INFO) << "Couldn't send message size!";
return false; return false;
} }
if (!socket.Write(buffer.data(), buffer.size())) { if (!socket.Write(buffer)) {
LOG(INFO) << "Couldn't send message data!"; LOG(INFO) << "Couldn't send message data!";
return false; return false;
} }

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <chrono>
#include "communication/bolt/v1/decoder/buffer.hpp" #include "communication/bolt/v1/decoder/buffer.hpp"
#include "io/network/epoll.hpp" #include "io/network/epoll.hpp"
#include "io/network/network_endpoint.hpp" #include "io/network/network_endpoint.hpp"
@ -40,7 +42,8 @@ class Message;
* Currently the server is implemented to handle more than one message after * Currently the server is implemented to handle more than one message after
* the initial handshake, but the client can only send one message. * the initial handshake, but the client can only send one message.
*/ */
namespace Protocol { namespace protocol {
using Endpoint = io::network::NetworkEndpoint; using Endpoint = io::network::NetworkEndpoint;
using Socket = io::network::Socket; using Socket = io::network::Socket;
using StreamBuffer = io::network::StreamBuffer; using StreamBuffer = io::network::StreamBuffer;
@ -100,6 +103,8 @@ class Session {
*/ */
void Written(size_t len); void Written(size_t len);
bool TimedOut() { return false; }
/** /**
* Closes the session (client socket). * Closes the session (client socket).
*/ */
@ -108,6 +113,8 @@ class Session {
io::network::Epoll::Event event_; io::network::Epoll::Event event_;
Socket socket_; Socket socket_;
std::chrono::time_point<std::chrono::steady_clock> last_event_time_;
private: private:
SizeT GetLength(int offset = 0); SizeT GetLength(int offset = 0);
std::string GetStringAndShift(SizeT len); std::string GetStringAndShift(SizeT len);
@ -115,6 +122,7 @@ class Session {
bool alive_{true}; bool alive_{true};
bool handshake_done_{false}; bool handshake_done_{false};
std::string reactor_{""}; std::string reactor_{""};
std::string channel_{""}; std::string channel_{""};

View File

@ -0,0 +1,141 @@
#include "communication/reactor/reactor_local.hpp"
#include "utils/exceptions.hpp"
namespace communication::reactor {
thread_local Reactor *current_reactor_ = nullptr;
void EventStream::Subscription::Unsubscribe() const {
event_queue_.RemoveCallback(*this);
}
void EventStream::Subscription::CloseChannel() const { event_queue_.Close(); }
const std::string &EventStream::Subscription::channel_name() const {
return event_queue_.channel_name_;
}
std::string Channel::LocalChannelWriter::ReactorName() const {
return reactor_name_;
}
std::string Channel::LocalChannelWriter::Name() const { return channel_name_; }
void Channel::Close() {
// TODO(zuza): there will be major problems if a reactor tries to close a
// stream that isn't theirs luckily this should never happen if the framework
// is used as expected.
current_reactor_->CloseChannel(channel_name_);
}
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
const std::string &channel_name) {
std::unique_lock<std::mutex> lock(*mutex_);
if (channels_.count(channel_name) != 0) {
throw utils::BasicException("Channel with name " + channel_name +
"already exists");
}
auto it =
channels_
.emplace(channel_name, std::make_shared<Channel>(Channel::Params{
name_, channel_name, mutex_, cvar_}))
.first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open() {
std::unique_lock<std::mutex> lock(*mutex_);
do {
std::string channel_name =
"stream-" + std::to_string(channel_name_counter_++);
if (channels_.count(channel_name) == 0) {
auto it =
channels_
.emplace(channel_name, std::make_shared<Channel>(Channel::Params{
name_, channel_name, mutex_, cvar_}))
.first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
} while (true);
}
std::shared_ptr<ChannelWriter> Reactor::FindChannel(
const std::string &channel_name) {
std::unique_lock<std::mutex> lock(*mutex_);
auto it_channel = channels_.find(channel_name);
if (it_channel == channels_.end()) return nullptr;
return it_channel->second->LockedOpenChannel();
}
void Reactor::CloseChannel(const std::string &s) {
std::unique_lock<std::mutex> lock(*mutex_);
auto it = channels_.find(s);
CHECK(it != channels_.end()) << "Trying to close nonexisting channel";
channels_.erase(it);
cvar_->notify_all();
}
void Reactor::RunEventLoop() {
bool exit_event_loop = false;
while (true) {
// Find (or wait) for the next Message.
PendingMessageInfo info;
{
std::unique_lock<std::mutex> guard(*mutex_);
while (true) {
// Not fair because was taken earlier, talk to lion.
info = GetPendingMessages();
if (info.message != nullptr) break;
// Exit the loop if there are no more Channels.
if (channels_.empty()) {
exit_event_loop = true;
break;
}
cvar_->wait(guard);
}
if (exit_event_loop) break;
}
for (auto &callback_info : info.callbacks) {
callback_info.first(*info.message, callback_info.second);
}
}
}
/**
* Checks if there is any nonempty EventStream.
*/
Reactor::PendingMessageInfo Reactor::GetPendingMessages() {
for (auto &channels_key_value : channels_) {
Channel &event_queue = *channels_key_value.second;
auto message = event_queue.LockedPop();
if (message == nullptr) continue;
std::type_index type_index = message->GetTypeIndex();
using Subscription = EventStream::Subscription;
std::vector<std::pair<EventStream::Callback, Subscription>> callback_info;
auto msg_type_cb_iter = event_queue.callbacks_.find(type_index);
if (msg_type_cb_iter != event_queue.callbacks_.end()) {
// There is a callback for this type.
for (auto &type_index_cb_key_value : msg_type_cb_iter->second) {
auto uid = type_index_cb_key_value.first;
auto callback = type_index_cb_key_value.second;
callback_info.emplace_back(callback,
Subscription(event_queue, type_index, uid));
}
}
return PendingMessageInfo{std::move(message), std::move(callback_info)};
}
return PendingMessageInfo{};
}
}

View File

@ -0,0 +1,553 @@
#pragma once
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include <utility>
#include "cereal/types/memory.hpp"
#include "glog/logging.h"
namespace communication::reactor {
class EventStream;
class Reactor;
class System;
class Channel;
extern thread_local Reactor *current_reactor_;
/**
* Base class for messages.
*/
class Message {
public:
virtual ~Message() {}
template <class Archive>
void serialize(Archive &) {}
/**
* Run-time type identification that is used for callbacks.
*
* Warning: this works because of the virtual destructor, don't remove it from
* this class
*/
std::type_index GetTypeIndex() { return typeid(*this); }
};
/**
* Write-end of a Channel (between two reactors).
*/
class ChannelWriter {
public:
ChannelWriter() = default;
ChannelWriter(const ChannelWriter &) = delete;
void operator=(const ChannelWriter &) = delete;
ChannelWriter(ChannelWriter &&) = delete;
void operator=(ChannelWriter &&) = delete;
/**
* Construct and send the message to the channel.
*/
template <typename TMessage, typename... Args>
void Send(Args &&... args) {
Send(std::unique_ptr<Message>(
std::make_unique<TMessage>(std::forward<Args>(args)...)));
}
virtual void Send(std::unique_ptr<Message> message) = 0;
virtual std::string ReactorName() const = 0;
virtual std::string Name() const = 0;
template <class Archive>
void serialize(Archive &archive) {
archive(ReactorName(), Name());
}
};
/**
* Read-end of a Channel (between two reactors).
*/
class EventStream {
public:
class OnEventOnceChainer;
class Subscription;
/**
* Register a callback that will be called whenever an event arrives.
*/
template <typename TMessage>
void OnEvent(
std::function<void(const TMessage &, const Subscription &)> &&callback) {
OnEventHelper(typeid(TMessage), [callback = std::move(callback)](
const Message &base_message,
const Subscription &subscription) {
const auto &message = dynamic_cast<const TMessage &>(base_message);
callback(message, subscription);
});
}
/**
* Register a callback that will be called only once.
* Once event is received, channel of this EventStream is closed.
*/
template <typename TMessage>
void OnEventOnceThenClose(std::function<void(const TMessage &)> &&callback) {
OnEventHelper(typeid(TMessage), [callback = std::move(callback)](
const Message &base_message,
const Subscription &subscription) {
const TMessage &message = dynamic_cast<const TMessage &>(base_message);
subscription.CloseChannel();
callback(message);
});
}
/**
* Starts a chain to register a callback that fires off only once.
*
* This method supports chaining (see the the class OnEventOnceChainer or the
* tests for examples).
* Warning: when chaining callbacks, make sure that EventStream does not
* deallocate before the last
* chained callback fired.
*/
OnEventOnceChainer OnEventOnce() { return OnEventOnceChainer(*this); }
/**
* Get the name of the channel.
*/
virtual const std::string &ChannelName() = 0;
/**
* Subscription Service.
*
* Unsubscribe from a callback. Lightweight object (can copy by value).
*/
class Subscription {
public:
/**
* Unsubscribe. Call only once.
*/
void Unsubscribe() const;
/**
* Close the stream. Convenience method.
*/
void CloseChannel() const;
/**
* Get the name of the channel the message is delivered to.
*/
const std::string &channel_name() const;
private:
friend class Reactor;
friend class Channel;
Subscription(Channel &event_queue, std::type_index type_index,
uint64_t callback_id)
: event_queue_(event_queue),
type_index_(type_index),
callback_id_(callback_id) {}
Channel &event_queue_;
std::type_index type_index_;
uint64_t callback_id_;
};
/**
* Close this event stream, disallowing further events from getting received.
*
* Any subsequent call after Close() to any function will be result in
* undefined
* behavior (invalid pointer dereference). Can only be called from the thread
* associated with the Reactor.
*/
virtual void Close() = 0;
/**
* Convenience class to chain one-off callbacks.
*
* Usage: Create this class with OnEventOnce() and then chain callbacks using
* ChainOnce.
* A callback will fire only once, unsubscribe and immediately subscribe the
* next callback to the stream.
*
* Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb);
*
* Implementation: This class is a temporary object that remembers the
* callbacks that are to be installed
* and finally installs them in the destructor. Not sure is this kosher, is
* there another way?
*/
class OnEventOnceChainer {
public:
OnEventOnceChainer(EventStream &event_stream)
: event_stream_(event_stream) {}
~OnEventOnceChainer() { InstallCallbacks(); }
template <typename TMessage>
OnEventOnceChainer &ChainOnce(
std::function<void(const TMessage &, const Subscription &)>
&&callback) {
std::function<void(const Message &, const Subscription &)>
wrap = [callback = std::move(callback)](
const Message &base_message, const Subscription &subscription) {
const TMessage &message = dynamic_cast<const TMessage &>(base_message);
subscription.Unsubscribe();
// Warning: this can close the Channel, be careful what you put after
// it!
callback(message, subscription);
};
callbacks_.emplace_back(typeid(TMessage), std::move(wrap));
return *this;
}
private:
void InstallCallbacks() {
int num_callbacks = callbacks_.size();
CHECK(num_callbacks > 0) << "No callback will be installed";
std::function<void(const Message &, const Subscription &)> next_callback;
std::type_index next_type = typeid(nullptr);
for (int i = num_callbacks - 1; i >= 0; --i) {
std::function<void(const Message &, const Subscription &)>
tmp_callback = [
callback = std::move(callbacks_[i].second), next_type,
next_callback = std::move(next_callback),
event_stream = &this->event_stream_
](const Message &message, const Subscription &subscription) {
callback(message, subscription);
if (next_callback) {
event_stream->OnEventHelper(next_type, std::move(next_callback));
}
};
next_callback = std::move(tmp_callback);
next_type = callbacks_[i].first;
}
event_stream_.OnEventHelper(next_type, std::move(next_callback));
}
EventStream &event_stream_;
std::vector<
std::pair<std::type_index,
std::function<void(const Message &, const Subscription &)>>>
callbacks_;
};
typedef std::function<void(const Message &, const Subscription &)> Callback;
private:
virtual void OnEventHelper(std::type_index type_index, Callback callback) = 0;
};
/**
* Implementation of a channel.
*
* This class is an internal data structure that represents the state of the
* channel. This class is not meant to be used by the clients of the messaging
* framework. The Channel class wraps the event queue data structure, the mutex
* that protects concurrent access to the event queue, the local channel and the
* event stream. The class is owned by the Reactor. It gets closed when the
* owner reactor (the one that owns the read-end of a channel) removes/closes
* it.
*/
class Channel {
struct Params;
public:
friend class Reactor; // to create a Params initialization object
friend class EventStream::Subscription;
Channel(Params params)
: channel_name_(params.channel_name),
reactor_name_(params.reactor_name),
mutex_(params.mutex),
cvar_(params.cvar),
stream_(mutex_, this) {}
/**
* LocalChannelWriter represents the channels to reactors living in the same
* reactor system (write-end of the channels).
*
* Sending messages to the local channel requires acquiring the mutex.
* LocalChannelWriter holds a (weak) pointer to the enclosing Channel object.
* Messages sent to a closed channel are ignored.
* There can be multiple LocalChannelWriters refering to the same stream if
* needed.
*/
class LocalChannelWriter : public ChannelWriter {
public:
friend class Channel;
LocalChannelWriter(std::string reactor_name, std::string channel_name,
std::weak_ptr<Channel> queue)
: reactor_name_(reactor_name),
channel_name_(channel_name),
queue_(queue) {}
void Send(std::unique_ptr<Message> m) override {
// Atomic, per the standard. We guarantee here that if channel exists it
// will not be destroyed by the end of this function.
std::shared_ptr<Channel> queue = queue_.lock();
if (queue) {
queue->Push(std::move(m));
}
// TODO: what should we do here? Channel doesn't exist so message will be
// lost.
}
std::string ReactorName() const override;
std::string Name() const override;
private:
std::string reactor_name_;
std::string channel_name_;
std::weak_ptr<Channel> queue_;
};
/**
* Implementation of the event stream.
*
* After the enclosing Channel object is destroyed (by a call to CloseChannel
* or Close).
*/
class LocalEventStream : public EventStream {
public:
friend class Channel;
LocalEventStream(std::shared_ptr<std::mutex> mutex, Channel *queue)
: mutex_(mutex), queue_(queue) {}
void OnEventHelper(std::type_index type_index, Callback callback) {
std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedOnEventHelper(type_index, callback);
}
const std::string &ChannelName() { return queue_->channel_name_; }
void Close() { queue_->Close(); }
private:
std::shared_ptr<std::mutex> mutex_;
std::string channel_name_;
Channel *queue_;
};
/**
* Close the channel. Must be called from the reactor that owns the channel.
*/
void Close();
Channel(const Channel &other) = delete;
Channel(Channel &&other) = default;
Channel &operator=(const Channel &other) = delete;
Channel &operator=(Channel &&other) = default;
private:
/**
* Initialization parameters to Channel.
* Warning: do not forget to initialize self_ptr_ individually. Private
* because it shouldn't be created outside of a Reactor.
*/
struct Params {
std::string reactor_name;
std::string channel_name;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cvar;
};
void Push(std::unique_ptr<Message> m) {
std::unique_lock<std::mutex> guard(*mutex_);
queue_.emplace(std::move(m));
// This is OK because there is only one Reactor (thread) that can wait on
// this Channel.
cvar_->notify_one();
}
std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
// TODO(zuza): fix this CHECK using this answer
// https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
// TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky
// or that it doesn't fail sometimes, when it should.
CHECK(!self_ptr_.expired());
return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
self_ptr_);
}
std::unique_ptr<Message> LockedPop() { return LockedRawPop(); }
void LockedOnEventHelper(std::type_index type_index,
EventStream::Callback callback) {
uint64_t callback_id = next_callback_id++;
callbacks_[type_index][callback_id] = callback;
}
std::unique_ptr<Message> LockedRawPop() {
if (queue_.empty()) return nullptr;
std::unique_ptr<Message> t = std::move(queue_.front());
queue_.pop();
return t;
}
void RemoveCallback(const EventStream::Subscription &subscription) {
std::unique_lock<std::mutex> lock(*mutex_);
auto num_erased =
callbacks_[subscription.type_index_].erase(subscription.callback_id_);
CHECK(num_erased == 1) << "Expected to remove 1 element";
}
std::string channel_name_;
std::string reactor_name_;
std::queue<std::unique_ptr<Message>> queue_;
// Should only be locked once since it's used by a cond. var. Also caught in
// dctor, so must be recursive.
std::shared_ptr<std::mutex> mutex_;
std::shared_ptr<std::condition_variable> cvar_;
/**
* A weak_ptr to itself.
*
* There are initialization problems with this, check Params.
*/
std::weak_ptr<Channel> self_ptr_;
LocalEventStream stream_;
std::unordered_map<std::type_index,
std::unordered_map<uint64_t, EventStream::Callback>>
callbacks_;
uint64_t next_callback_id = 0;
};
/**
* A single unit of concurrent execution in the system.
*
* E.g. one worker, one client. Owned by System. Has a thread associated with
* it.
*/
class Reactor {
friend class System;
Reactor(System &system, std::string name,
std::function<void(Reactor &)> setup)
: system_(system), name_(name), setup_(setup), main_(Open("main")) {}
public:
~Reactor() {}
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open(
const std::string &s);
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open();
std::shared_ptr<ChannelWriter> FindChannel(const std::string &channel_name);
/**
* Close a channel by name.
*
* Should only be called from the Reactor thread.
*/
void CloseChannel(const std::string &s);
/**
* Get Reactor name
*/
const std::string &name() const { return name_; }
Reactor(const Reactor &other) = delete;
Reactor(Reactor &&other) = default;
Reactor &operator=(const Reactor &other) = delete;
Reactor &operator=(Reactor &&other) = default;
System &system_;
std::string name_;
std::function<void(Reactor &)> setup_;
/*
* Locks all Reactor data, including all Channel's in channels_.
*
* This should be a shared_ptr because LocalChannelWriter can outlive Reactor.
*/
std::shared_ptr<std::mutex> mutex_ = std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable> cvar_ =
std::make_shared<std::condition_variable>();
/**
* List of channels of a reactor indexed by name.
*/
std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
int64_t channel_name_counter_ = 0;
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> main_;
private:
struct PendingMessageInfo {
std::unique_ptr<Message> message;
std::vector<std::pair<EventStream::Callback, EventStream::Subscription>>
callbacks;
};
/**
* Dispatches all waiting messages to callbacks. Shuts down when there are no
* callbacks left.
*/
void RunEventLoop();
PendingMessageInfo GetPendingMessages();
};
/**
* Placeholder for all reactors.
* Make sure object of this class outlives all Reactors created by it.
*/
class System {
public:
friend class Reactor;
System() = default;
void Spawn(const std::string &name, std::function<void(Reactor &)> setup) {
std::unique_lock<std::mutex> lock(mutex_);
std::unique_ptr<Reactor> reactor(new Reactor(*this, name, setup));
std::thread reactor_thread([ this, raw_reactor = reactor.get() ] {
current_reactor_ = raw_reactor;
raw_reactor->setup_(*raw_reactor);
raw_reactor->RunEventLoop();
});
auto got = reactors_.emplace(
name, std::pair<decltype(reactor), std::thread>{
std::move(reactor), std::move(reactor_thread)});
CHECK(got.second) << "Reactor with name: '" << name << "' already exists";
}
const std::shared_ptr<ChannelWriter> FindChannel(
const std::string &reactor_name, const std::string &channel_name) {
std::unique_lock<std::mutex> lock(mutex_);
auto it_reactor = reactors_.find(reactor_name);
if (it_reactor == reactors_.end()) return nullptr;
return it_reactor->second.first->FindChannel(channel_name);
}
// TODO: Think about interaction with destructor. Should we call this in
// destructor, complain in destructor if there are alive threads or stop them
// in some way.
void AwaitShutdown() {
for (auto &key_value : reactors_) {
auto &thread = key_value.second.second;
thread.join();
}
reactors_.clear();
}
private:
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
System &operator=(System &&) = delete;
std::mutex mutex_;
std::unordered_map<std::string,
std::pair<std::unique_ptr<Reactor>, std::thread>>
reactors_;
};
}

View File

@ -220,6 +220,12 @@ bool Socket::Write(const uint8_t *data, size_t len,
return true; return true;
} }
bool Socket::Write(const std::string &s,
const std::function<bool()> &keep_retrying) {
return Write(reinterpret_cast<const uint8_t *>(s.data()), s.size(),
keep_retrying);
}
int Socket::Read(void *buffer, size_t len) { int Socket::Read(void *buffer, size_t len) {
return read(socket_, buffer, len); return read(socket_, buffer, len);
} }

View File

@ -151,6 +151,8 @@ class Socket {
*/ */
bool Write(const uint8_t *data, size_t len, bool Write(const uint8_t *data, size_t len,
const std::function<bool()> &keep_retrying = [] { return false; }); const std::function<bool()> &keep_retrying = [] { return false; });
bool Write(const std::string &s,
const std::function<bool()> &keep_retrying = [] { return false; });
/** /**
* Read data from the socket. * Read data from the socket.

View File

@ -0,0 +1,385 @@
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <future>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "communication/reactor/reactor_local.hpp"
#include "gtest/gtest.h"
#include "utils/exceptions.hpp"
using namespace communication::reactor;
using Subscription = EventStream::Subscription;
TEST(SystemTest, ReturnWithoutThrowing) {
System system;
ASSERT_NO_THROW(
system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); }));
ASSERT_NO_THROW(system.AwaitShutdown());
}
TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
System system;
system.Spawn("master", [](Reactor &r) {
r.Open("channel");
ASSERT_THROW(r.Open("channel"), utils::BasicException);
r.CloseChannel("main");
r.CloseChannel("channel");
});
system.AwaitShutdown();
}
TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
r.CloseChannel("main");
});
system.Spawn("worker", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
r.CloseChannel("main");
});
system.AwaitShutdown();
}
TEST(SimpleSendTest, OneCallback) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(888);
r.CloseChannel("main");
});
system.Spawn("worker", [](Reactor &r) {
EventStream *stream = r.main_.first;
stream->OnEvent<MessageInt>(
[&r](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 888);
r.CloseChannel("main");
});
});
system.AwaitShutdown();
}
TEST(SimpleSendTest, IgnoreAfterClose) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(101);
channel_writer->Send<MessageInt>(102); // should be ignored
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(103); // should be ignored
channel_writer->Send<MessageInt>(104); // should be ignored
// Write-end doesn't need to be closed because it's in RAII.
r.CloseChannel("main");
});
system.Spawn("worker", [](Reactor &r) {
EventStream *stream = r.main_.first;
stream->OnEvent<MessageInt>(
[&r](const MessageInt &msg, const Subscription &) {
r.CloseChannel("main");
ASSERT_EQ(msg.x, 101);
});
});
system.AwaitShutdown();
}
TEST(SimpleSendTest, DuringFirstEvent) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
System system;
std::promise<int> p;
auto f = p.get_future();
system.Spawn("master", [&p](Reactor &r) mutable {
EventStream *stream = r.main_.first;
stream->OnEvent<MessageInt>(
[&](const Message &msg, const Subscription &subscription) {
const MessageInt &msgint = dynamic_cast<const MessageInt &>(msg);
if (msgint.x == 101) r.FindChannel("main")->Send<MessageInt>(102);
if (msgint.x == 102) {
subscription.Unsubscribe();
r.CloseChannel("main");
p.set_value(777);
}
});
std::shared_ptr<ChannelWriter> channel_writer = r.FindChannel("main");
channel_writer->Send<MessageInt>(101);
});
f.wait();
ASSERT_EQ(f.get(), 777);
system.AwaitShutdown();
}
TEST(MultipleSendTest, UnsubscribeService) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(55);
channel_writer->Send<MessageInt>(66);
channel_writer->Send<MessageInt>(77);
channel_writer->Send<MessageInt>(88);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageChar>('a');
channel_writer->Send<MessageChar>('b');
channel_writer->Send<MessageChar>('c');
channel_writer->Send<MessageChar>('d');
r.CloseChannel("main");
});
system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
EventStream *stream = r.main_.first;
stream->OnEvent<MessageInt>(
[&](const MessageInt &msgint, const Subscription &subscription) {
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
++num_received_messages;
if (msgint.x == 66) {
subscription.Unsubscribe(); // receive only two of them
}
});
stream->OnEvent<MessageChar>(
[&](const MessageChar &msgchar, const Subscription &subscription) {
char c = msgchar.x;
++num_received_messages;
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
if (num_received_messages == 5) {
subscription.Unsubscribe();
r.CloseChannel("main");
}
});
});
system.AwaitShutdown();
}
TEST(MultipleSendTest, OnEvent) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(101);
channel_writer->Send<MessageChar>('a');
channel_writer->Send<MessageInt>(103);
channel_writer->Send<MessageChar>('b');
r.CloseChannel("main");
});
system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
struct EndMessage : Message {};
EventStream *stream = r.main_.first;
stream->OnEvent<MessageInt>(
[&](const MessageInt &msgint, const Subscription &) {
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
++correct_vals;
r.main_.second->Send<EndMessage>();
});
stream->OnEvent<MessageChar>(
[&](const MessageChar &msgchar, const Subscription &) {
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
++correct_vals;
r.main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>([&](const EndMessage &, const Subscription &) {
ASSERT_LE(correct_vals, 4);
if (correct_vals == 4) {
r.CloseChannel("main");
}
});
});
system.AwaitShutdown();
}
TEST(MultipleSendTest, Chaining) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageInt>(55);
channel_writer->Send<MessageInt>(66);
channel_writer->Send<MessageInt>(77);
r.CloseChannel("main");
});
system.Spawn("worker", [](Reactor &r) {
EventStream *stream = r.main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 66);
})
.ChainOnce<MessageInt>(
[&](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 77);
r.CloseChannel("main");
});
});
system.AwaitShutdown();
}
TEST(MultipleSendTest, ChainingInRightOrder) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel_writer->Send<MessageChar>('a');
channel_writer->Send<MessageInt>(55);
channel_writer->Send<MessageChar>('b');
channel_writer->Send<MessageInt>(77);
r.CloseChannel("main");
});
system.Spawn("worker", [](Reactor &r) {
EventStream *stream = r.main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageChar>(
[](const MessageChar &msg, const Subscription &) {
ASSERT_EQ(msg.x, 'b');
})
.ChainOnce<MessageInt>(
[&](const MessageInt &msg, const Subscription &) {
ASSERT_EQ(msg.x, 77);
r.CloseChannel("main");
});
});
system.AwaitShutdown();
}
TEST(MultipleSendTest, ProcessManyMessages) {
const static int kNumTests = 100;
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
System system;
system.Spawn("master", [](Reactor &r) {
std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
for (int i = 0; i < kNumTests; ++i) {
channel_writer->Send<MessageInt>(rand());
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
}
r.CloseChannel("main");
});
system.Spawn("worker", [vals = 0](Reactor & r) mutable {
struct EndMessage : Message {};
EventStream *stream = r.main_.first;
vals = 0;
stream->OnEvent<MessageInt>([&](const Message &, const Subscription &) {
++vals;
r.main_.second->Send<EndMessage>();
});
stream->OnEvent<EndMessage>([&](const Message &, const Subscription &) {
ASSERT_LE(vals, kNumTests);
if (vals == kNumTests) {
r.CloseChannel("main");
}
});
});
system.AwaitShutdown();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}