Fix distributed reactors
Reviewers: buda, mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D935
This commit is contained in:
parent
d62feb56fc
commit
e6d3edf9a9
@ -186,7 +186,9 @@ target_link_libraries(antlr_opencypher_parser_lib antlr4)
|
|||||||
set(memgraph_src_files
|
set(memgraph_src_files
|
||||||
${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
|
${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
|
||||||
${src_dir}/communication/bolt/v1/session.cpp
|
${src_dir}/communication/bolt/v1/session.cpp
|
||||||
|
${src_dir}/communication/reactor/protocol.cpp
|
||||||
${src_dir}/communication/reactor/reactor_local.cpp
|
${src_dir}/communication/reactor/reactor_local.cpp
|
||||||
|
${src_dir}/communication/reactor/reactor_distributed.cpp
|
||||||
${src_dir}/data_structures/concurrent/skiplist_gc.cpp
|
${src_dir}/data_structures/concurrent/skiplist_gc.cpp
|
||||||
${src_dir}/database/dbms.cpp
|
${src_dir}/database/dbms.cpp
|
||||||
${src_dir}/database/graph_db.cpp
|
${src_dir}/database/graph_db.cpp
|
||||||
|
56
src/communication/reactor/common_messages.hpp
Normal file
56
src/communication/reactor/common_messages.hpp
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "communication/reactor/reactor_local.hpp"
|
||||||
|
|
||||||
|
// TODO: Which of these I need to include.
|
||||||
|
#include "cereal/archives/binary.hpp"
|
||||||
|
#include "cereal/types/base_class.hpp"
|
||||||
|
#include "cereal/types/memory.hpp"
|
||||||
|
#include "cereal/types/polymorphic.hpp"
|
||||||
|
#include "cereal/types/string.hpp"
|
||||||
|
#include "cereal/types/utility.hpp"
|
||||||
|
#include "cereal/types/vector.hpp"
|
||||||
|
|
||||||
|
DECLARE_string(reactor_address);
|
||||||
|
DECLARE_int32(reactor_port);
|
||||||
|
|
||||||
|
namespace communication::reactor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Message that includes the channel on which response is expected;
|
||||||
|
*/
|
||||||
|
class ReturnAddressMessage : public Message {
|
||||||
|
public:
|
||||||
|
ReturnAddressMessage(std::string reactor, std::string channel)
|
||||||
|
: address_(FLAGS_reactor_address),
|
||||||
|
port_(FLAGS_reactor_port),
|
||||||
|
reactor_(reactor),
|
||||||
|
channel_(channel) {}
|
||||||
|
|
||||||
|
const std::string &address() const { return address_; }
|
||||||
|
uint16_t port() const { return port_; }
|
||||||
|
const std::string &reactor_name() const { return reactor_; }
|
||||||
|
const std::string &channel_name() const { return channel_; }
|
||||||
|
|
||||||
|
template <class Archive>
|
||||||
|
void serialize(Archive &ar) {
|
||||||
|
ar(cereal::virtual_base_class<Message>(this), address_, port_, reactor_,
|
||||||
|
channel_);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto FindChannel(ChannelFinder &finder) const {
|
||||||
|
return finder.FindChannel(address_, port_, reactor_, channel_);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
friend class cereal::access;
|
||||||
|
ReturnAddressMessage() {} // Cereal needs access to a default constructor.
|
||||||
|
|
||||||
|
// Good luck these being const using cereal...
|
||||||
|
std::string address_;
|
||||||
|
uint16_t port_;
|
||||||
|
std::string reactor_;
|
||||||
|
std::string channel_;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
CEREAL_REGISTER_TYPE(communication::reactor::ReturnAddressMessage);
|
@ -1,13 +1,14 @@
|
|||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
|
||||||
#include "protocol.hpp"
|
#include "communication/reactor/protocol.hpp"
|
||||||
#include "reactors_distributed.hpp"
|
#include "communication/reactor/reactor_distributed.hpp"
|
||||||
|
|
||||||
#include "glog/logging.h"
|
#include "glog/logging.h"
|
||||||
|
|
||||||
namespace protocol {
|
namespace communication::reactor {
|
||||||
|
|
||||||
Session::Session(Socket &&socket, Data &) : socket_(std::move(socket)) {
|
Session::Session(Socket &&socket, SessionData &data)
|
||||||
|
: socket_(std::move(socket)), system_(data.system) {
|
||||||
event_.data.ptr = this;
|
event_.data.ptr = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,7 +44,7 @@ void Session::Execute() {
|
|||||||
DLOG(INFO) << "Reactor: " << reactor_ << "; Channel: " << channel_
|
DLOG(INFO) << "Reactor: " << reactor_ << "; Channel: " << channel_
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
auto channel = System::GetInstance().FindChannel(reactor_, channel_);
|
auto channel = system_.FindChannel(reactor_, channel_);
|
||||||
SendSuccess(channel != nullptr);
|
SendSuccess(channel != nullptr);
|
||||||
|
|
||||||
handshake_done_ = true;
|
handshake_done_ = true;
|
||||||
@ -59,12 +60,12 @@ void Session::Execute() {
|
|||||||
// TODO: check for exceptions
|
// TODO: check for exceptions
|
||||||
std::istringstream stream;
|
std::istringstream stream;
|
||||||
stream.str(std::string(reinterpret_cast<char *>(buffer_.data()), len_data));
|
stream.str(std::string(reinterpret_cast<char *>(buffer_.data()), len_data));
|
||||||
cereal::BinaryInputArchive iarchive{stream};
|
::cereal::BinaryInputArchive iarchive{stream};
|
||||||
std::unique_ptr<Message> message{nullptr};
|
std::unique_ptr<Message> message{nullptr};
|
||||||
iarchive(message);
|
iarchive(message);
|
||||||
buffer_.Shift(len_data);
|
buffer_.Shift(len_data);
|
||||||
|
|
||||||
auto channel = System::GetInstance().FindChannel(reactor_, channel_);
|
auto channel = system_.FindChannel(reactor_, channel_);
|
||||||
if (channel == nullptr) {
|
if (channel == nullptr) {
|
||||||
SendSuccess(false);
|
SendSuccess(false);
|
||||||
return;
|
return;
|
||||||
@ -145,13 +146,13 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor,
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool success = GetSuccess(socket);
|
bool success = GetSuccess(socket);
|
||||||
if (message == nullptr or !success) {
|
if (message == nullptr || !success) {
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize and send message
|
// Serialize and send message
|
||||||
std::ostringstream stream;
|
std::ostringstream stream;
|
||||||
cereal::BinaryOutputArchive oarchive(stream);
|
::cereal::BinaryOutputArchive oarchive(stream);
|
||||||
oarchive(message);
|
oarchive(message);
|
||||||
|
|
||||||
const std::string &buffer = stream.str();
|
const std::string &buffer = stream.str();
|
||||||
@ -164,6 +165,7 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: send message is blocking because of this. This is potential problem.
|
||||||
return GetSuccess(socket);
|
return GetSuccess(socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,13 +3,12 @@
|
|||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
|
||||||
#include "communication/bolt/v1/decoder/buffer.hpp"
|
#include "communication/bolt/v1/decoder/buffer.hpp"
|
||||||
|
#include "communication/reactor/reactor_local.hpp"
|
||||||
#include "io/network/epoll.hpp"
|
#include "io/network/epoll.hpp"
|
||||||
#include "io/network/network_endpoint.hpp"
|
#include "io/network/network_endpoint.hpp"
|
||||||
#include "io/network/socket.hpp"
|
#include "io/network/socket.hpp"
|
||||||
#include "io/network/stream_buffer.hpp"
|
#include "io/network/stream_buffer.hpp"
|
||||||
|
|
||||||
class Message;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Protocol
|
* @brief Protocol
|
||||||
*
|
*
|
||||||
@ -42,14 +41,16 @@ class Message;
|
|||||||
* Currently the server is implemented to handle more than one message after
|
* Currently the server is implemented to handle more than one message after
|
||||||
* the initial handshake, but the client can only send one message.
|
* the initial handshake, but the client can only send one message.
|
||||||
*/
|
*/
|
||||||
namespace protocol {
|
namespace communication::reactor {
|
||||||
|
|
||||||
|
class Message;
|
||||||
|
|
||||||
using Endpoint = io::network::NetworkEndpoint;
|
using Endpoint = io::network::NetworkEndpoint;
|
||||||
using Socket = io::network::Socket;
|
using Socket = io::network::Socket;
|
||||||
using StreamBuffer = io::network::StreamBuffer;
|
using StreamBuffer = io::network::StreamBuffer;
|
||||||
|
|
||||||
// this buffer should be larger than the largest serialized message
|
// this buffer should be larger than the largest serialized message
|
||||||
using Buffer = communication::bolt::Buffer<262144>;
|
using Buffer = bolt::Buffer<262144>;
|
||||||
using SizeT = uint16_t;
|
using SizeT = uint16_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -57,8 +58,8 @@ using SizeT = uint16_t;
|
|||||||
*
|
*
|
||||||
* This typically holds living data shared by all sessions. Currently empty.
|
* This typically holds living data shared by all sessions. Currently empty.
|
||||||
*/
|
*/
|
||||||
struct Data {
|
struct SessionData {
|
||||||
// empty
|
System system;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -71,7 +72,7 @@ struct Data {
|
|||||||
class Session {
|
class Session {
|
||||||
private:
|
private:
|
||||||
public:
|
public:
|
||||||
Session(Socket &&socket, Data &data);
|
Session(Socket &&socket, SessionData &data);
|
||||||
|
|
||||||
int Id() const { return socket_.fd(); }
|
int Id() const { return socket_.fd(); }
|
||||||
|
|
||||||
@ -112,6 +113,7 @@ class Session {
|
|||||||
|
|
||||||
io::network::Epoll::Event event_;
|
io::network::Epoll::Event event_;
|
||||||
Socket socket_;
|
Socket socket_;
|
||||||
|
System &system_;
|
||||||
|
|
||||||
std::chrono::time_point<std::chrono::steady_clock> last_event_time_;
|
std::chrono::time_point<std::chrono::steady_clock> last_event_time_;
|
||||||
|
|
||||||
|
5
src/communication/reactor/reactor_distributed.cpp
Normal file
5
src/communication/reactor/reactor_distributed.cpp
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
#include "communication/reactor/reactor_distributed.hpp"
|
||||||
|
|
||||||
|
// reactor adress can't be 0.0.0.0.
|
||||||
|
DEFINE_string(reactor_address, "127.0.0.1", "Network server bind address");
|
||||||
|
DEFINE_int32(reactor_port, 10000, "Network server bind port");
|
274
src/communication/reactor/reactor_distributed.hpp
Normal file
274
src/communication/reactor/reactor_distributed.hpp
Normal file
@ -0,0 +1,274 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <exception>
|
||||||
|
#include <functional>
|
||||||
|
#include <iostream>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <tuple>
|
||||||
|
#include <typeindex>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include <gflags/gflags.h>
|
||||||
|
|
||||||
|
#include "communication/reactor/reactor_local.hpp"
|
||||||
|
#include "protocol.hpp"
|
||||||
|
|
||||||
|
#include "cereal/archives/binary.hpp"
|
||||||
|
#include "cereal/types/base_class.hpp"
|
||||||
|
#include "cereal/types/memory.hpp"
|
||||||
|
#include "cereal/types/polymorphic.hpp"
|
||||||
|
#include "cereal/types/string.hpp"
|
||||||
|
#include "cereal/types/utility.hpp"
|
||||||
|
#include "cereal/types/vector.hpp"
|
||||||
|
|
||||||
|
#include "communication/server.hpp"
|
||||||
|
#include "threading/sync/spinlock.hpp"
|
||||||
|
|
||||||
|
DECLARE_string(reactor_address);
|
||||||
|
DECLARE_int32(reactor_port);
|
||||||
|
|
||||||
|
namespace communication::reactor {
|
||||||
|
|
||||||
|
class DistributedSystem;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Networking service.
|
||||||
|
*/
|
||||||
|
class Network {
|
||||||
|
private:
|
||||||
|
using Endpoint = io::network::NetworkEndpoint;
|
||||||
|
using Socket = Socket;
|
||||||
|
using ServerT = communication::Server<Session, SessionData>;
|
||||||
|
friend class DistributedSystem;
|
||||||
|
|
||||||
|
struct NetworkMessage {
|
||||||
|
NetworkMessage() {}
|
||||||
|
|
||||||
|
NetworkMessage(const std::string &address, uint16_t port,
|
||||||
|
const std::string &reactor, const std::string &channel,
|
||||||
|
std::unique_ptr<Message> message)
|
||||||
|
: address(address),
|
||||||
|
port(port),
|
||||||
|
reactor(reactor),
|
||||||
|
channel(channel),
|
||||||
|
message(std::move(message)) {}
|
||||||
|
|
||||||
|
NetworkMessage(NetworkMessage &&nm) = default;
|
||||||
|
NetworkMessage &operator=(NetworkMessage &&nm) = default;
|
||||||
|
|
||||||
|
std::string address;
|
||||||
|
uint16_t port = 0;
|
||||||
|
std::string reactor;
|
||||||
|
std::string channel;
|
||||||
|
std::unique_ptr<Message> message;
|
||||||
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
|
Network() = default;
|
||||||
|
|
||||||
|
// client functions
|
||||||
|
|
||||||
|
std::shared_ptr<ChannelWriter> Resolve(std::string address, uint16_t port,
|
||||||
|
std::string reactor_name,
|
||||||
|
std::string channel_name) {
|
||||||
|
if (SendMessage(address, port, reactor_name, channel_name, nullptr)) {
|
||||||
|
return std::make_shared<RemoteChannelWriter>(this, address, port,
|
||||||
|
reactor_name, channel_name);
|
||||||
|
}
|
||||||
|
LOG(WARNING) << "Could not resolve " << address << ":" << port << " "
|
||||||
|
<< reactor_name << "/" << channel_name;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Start a threadpool that dispatches the messages from the (outgoing) queue
|
||||||
|
* to the sockets */
|
||||||
|
void StartClient(int worker_count) {
|
||||||
|
LOG(INFO) << "Starting " << worker_count << " client workers";
|
||||||
|
client_run_ = true;
|
||||||
|
|
||||||
|
for (int i = 0; i < worker_count; ++i) {
|
||||||
|
pool_.push_back(std::thread([worker_count, this]() {
|
||||||
|
while (this->client_run_) {
|
||||||
|
this->mutex_.lock();
|
||||||
|
if (!this->queue_.empty()) {
|
||||||
|
NetworkMessage nm(std::move(this->queue_.front()));
|
||||||
|
this->queue_.pop();
|
||||||
|
this->mutex_.unlock();
|
||||||
|
// TODO: store success
|
||||||
|
bool success = SendMessage(nm.address, nm.port, nm.reactor,
|
||||||
|
nm.channel, std::move(nm.message));
|
||||||
|
DLOG(INFO) << "Network client message send status: " << success
|
||||||
|
<< std::endl;
|
||||||
|
} else {
|
||||||
|
this->mutex_.unlock();
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void StopClient() {
|
||||||
|
while (true) {
|
||||||
|
std::lock_guard<SpinLock> lock(mutex_);
|
||||||
|
if (queue_.empty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client_run_ = false;
|
||||||
|
for (size_t i = 0; i < pool_.size(); ++i) {
|
||||||
|
pool_[i].join();
|
||||||
|
}
|
||||||
|
pool_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteChannelWriter : public ChannelWriter {
|
||||||
|
public:
|
||||||
|
RemoteChannelWriter(Network *network, std::string address, uint16_t port,
|
||||||
|
std::string reactor, std::string channel)
|
||||||
|
: network_(network),
|
||||||
|
address_(address),
|
||||||
|
port_(port),
|
||||||
|
reactor_(reactor),
|
||||||
|
channel_(channel) {}
|
||||||
|
|
||||||
|
virtual std::string Address() { return address_; }
|
||||||
|
|
||||||
|
virtual uint16_t Port() { return port_; }
|
||||||
|
|
||||||
|
std::string ReactorName() const override { return reactor_; }
|
||||||
|
|
||||||
|
std::string Name() const override { return channel_; }
|
||||||
|
|
||||||
|
void Send(std::unique_ptr<Message> message) override {
|
||||||
|
std::lock_guard<SpinLock> lock(network_->mutex_);
|
||||||
|
network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
|
||||||
|
std::move(message)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Network *network_;
|
||||||
|
std::string address_;
|
||||||
|
uint16_t port_;
|
||||||
|
std::string reactor_;
|
||||||
|
std::string channel_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// server functions
|
||||||
|
|
||||||
|
std::string address() const { return FLAGS_reactor_address; }
|
||||||
|
|
||||||
|
uint16_t port() const { return FLAGS_reactor_port; }
|
||||||
|
|
||||||
|
/** Start a threadpool that relays the messages from the sockets to the
|
||||||
|
* LocalEventStreams */
|
||||||
|
void StartServer(int workers_count) {
|
||||||
|
if (server_ != nullptr) {
|
||||||
|
LOG(FATAL) << "Tried to start a running server!";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize endpoint.
|
||||||
|
Endpoint endpoint;
|
||||||
|
try {
|
||||||
|
endpoint = Endpoint(FLAGS_reactor_address.c_str(), FLAGS_reactor_port);
|
||||||
|
} catch (io::network::NetworkEndpointException &e) {
|
||||||
|
LOG(FATAL) << e.what();
|
||||||
|
}
|
||||||
|
// Initialize server
|
||||||
|
server_ = std::make_unique<ServerT>(endpoint, protocol_data_);
|
||||||
|
|
||||||
|
// Start server
|
||||||
|
thread_ = std::thread(
|
||||||
|
[workers_count, this]() { this->server_->Start(workers_count); });
|
||||||
|
}
|
||||||
|
|
||||||
|
void StopServer() {
|
||||||
|
if (server_ != nullptr) {
|
||||||
|
server_->Shutdown();
|
||||||
|
thread_.join();
|
||||||
|
server_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// client variables
|
||||||
|
SpinLock mutex_;
|
||||||
|
std::vector<std::thread> pool_;
|
||||||
|
std::queue<NetworkMessage> queue_;
|
||||||
|
std::atomic<bool> client_run_;
|
||||||
|
|
||||||
|
// server variables
|
||||||
|
std::thread thread_;
|
||||||
|
SessionData protocol_data_;
|
||||||
|
std::unique_ptr<ServerT> server_{nullptr};
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Placeholder for all functionality related to non-local communication.
|
||||||
|
* E.g. resolve remote channels by memgraph node id, etc.
|
||||||
|
*/
|
||||||
|
class DistributedSystem : public ChannelFinder {
|
||||||
|
public:
|
||||||
|
DistributedSystem() {
|
||||||
|
network_.StartClient(4);
|
||||||
|
network_.StartServer(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Thread safe.
|
||||||
|
void Spawn(const std::string &name, std::function<void(Reactor &)> setup) {
|
||||||
|
system_.Spawn(name, setup, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-thread safe.
|
||||||
|
// TODO: figure out what should be intereaction of this function and
|
||||||
|
// destructor.
|
||||||
|
void StopServices() {
|
||||||
|
system_.AwaitShutdown();
|
||||||
|
network_.StopClient();
|
||||||
|
network_.StopServer();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<ChannelWriter> FindChannel(
|
||||||
|
const std::string &reactor_name,
|
||||||
|
const std::string &channel_name) override {
|
||||||
|
return system_.FindChannel(reactor_name, channel_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolves remote channel synchronously.
|
||||||
|
*
|
||||||
|
* @return EventStream on which message will arrive once channel is resolved.
|
||||||
|
* @warning It can only be called from local Reactor.
|
||||||
|
*/
|
||||||
|
std::shared_ptr<ChannelWriter> FindChannel(
|
||||||
|
const std::string &address, uint16_t port,
|
||||||
|
const std::string &reactor_name,
|
||||||
|
const std::string &channel_name) override {
|
||||||
|
// Yeah... Unneeded shared ptr... once again. We love that.
|
||||||
|
std::shared_ptr<ChannelWriter> channel_writer = nullptr;
|
||||||
|
// TODO: Check if this is actually local channel.
|
||||||
|
while (!(channel_writer =
|
||||||
|
network_.Resolve(address, port, reactor_name, channel_name))) {
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||||
|
}
|
||||||
|
return channel_writer;
|
||||||
|
}
|
||||||
|
|
||||||
|
Network &network() { return network_; }
|
||||||
|
const Network &network() const { return network_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
Network network_;
|
||||||
|
System &system_ = network_.protocol_data_.system;
|
||||||
|
|
||||||
|
DistributedSystem(const DistributedSystem &) = delete;
|
||||||
|
DistributedSystem(DistributedSystem &&) = delete;
|
||||||
|
DistributedSystem &operator=(const DistributedSystem &) = delete;
|
||||||
|
DistributedSystem &operator=(DistributedSystem &&) = delete;
|
||||||
|
};
|
||||||
|
}
|
@ -4,8 +4,6 @@
|
|||||||
|
|
||||||
namespace communication::reactor {
|
namespace communication::reactor {
|
||||||
|
|
||||||
thread_local Reactor *current_reactor_ = nullptr;
|
|
||||||
|
|
||||||
void EventStream::Subscription::Unsubscribe() const {
|
void EventStream::Subscription::Unsubscribe() const {
|
||||||
event_queue_.RemoveCallback(*this);
|
event_queue_.RemoveCallback(*this);
|
||||||
}
|
}
|
||||||
@ -26,7 +24,7 @@ void Channel::Close() {
|
|||||||
// TODO(zuza): there will be major problems if a reactor tries to close a
|
// TODO(zuza): there will be major problems if a reactor tries to close a
|
||||||
// stream that isn't theirs luckily this should never happen if the framework
|
// stream that isn't theirs luckily this should never happen if the framework
|
||||||
// is used as expected.
|
// is used as expected.
|
||||||
current_reactor_->CloseChannel(channel_name_);
|
reactor_.CloseChannel(channel_name_);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
|
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
|
||||||
@ -36,11 +34,11 @@ std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
|
|||||||
throw utils::BasicException("Channel with name " + channel_name +
|
throw utils::BasicException("Channel with name " + channel_name +
|
||||||
"already exists");
|
"already exists");
|
||||||
}
|
}
|
||||||
auto it =
|
auto it = channels_
|
||||||
channels_
|
.emplace(channel_name,
|
||||||
.emplace(channel_name, std::make_shared<Channel>(Channel::Params{
|
std::make_shared<Channel>(Channel::Params{
|
||||||
name_, channel_name, mutex_, cvar_}))
|
name_, channel_name, mutex_, cvar_, *this}))
|
||||||
.first;
|
.first;
|
||||||
it->second->self_ptr_ = it->second;
|
it->second->self_ptr_ = it->second;
|
||||||
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
|
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
|
||||||
}
|
}
|
||||||
@ -51,11 +49,11 @@ std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open() {
|
|||||||
std::string channel_name =
|
std::string channel_name =
|
||||||
"stream-" + std::to_string(channel_name_counter_++);
|
"stream-" + std::to_string(channel_name_counter_++);
|
||||||
if (channels_.count(channel_name) == 0) {
|
if (channels_.count(channel_name) == 0) {
|
||||||
auto it =
|
auto it = channels_
|
||||||
channels_
|
.emplace(channel_name,
|
||||||
.emplace(channel_name, std::make_shared<Channel>(Channel::Params{
|
std::make_shared<Channel>(Channel::Params{
|
||||||
name_, channel_name, mutex_, cvar_}))
|
name_, channel_name, mutex_, cvar_, *this}))
|
||||||
.first;
|
.first;
|
||||||
it->second->self_ptr_ = it->second;
|
it->second->self_ptr_ = it->second;
|
||||||
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
|
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
|
||||||
}
|
}
|
||||||
|
@ -18,8 +18,6 @@ class Reactor;
|
|||||||
class System;
|
class System;
|
||||||
class Channel;
|
class Channel;
|
||||||
|
|
||||||
extern thread_local Reactor *current_reactor_;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for messages.
|
* Base class for messages.
|
||||||
*/
|
*/
|
||||||
@ -63,11 +61,20 @@ class ChannelWriter {
|
|||||||
|
|
||||||
virtual std::string ReactorName() const = 0;
|
virtual std::string ReactorName() const = 0;
|
||||||
virtual std::string Name() const = 0;
|
virtual std::string Name() const = 0;
|
||||||
|
};
|
||||||
|
|
||||||
template <class Archive>
|
class ChannelFinder {
|
||||||
void serialize(Archive &archive) {
|
public:
|
||||||
archive(ReactorName(), Name());
|
virtual ~ChannelFinder() {}
|
||||||
}
|
|
||||||
|
// Find local channel.
|
||||||
|
virtual std::shared_ptr<ChannelWriter> FindChannel(
|
||||||
|
const std::string &reactor_name, const std::string &channel_name) = 0;
|
||||||
|
|
||||||
|
// Find remote channel.
|
||||||
|
virtual std::shared_ptr<ChannelWriter> FindChannel(
|
||||||
|
const std::string &address, uint16_t port,
|
||||||
|
const std::string &reactor_name, const std::string &channel_name) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -270,7 +277,8 @@ class Channel {
|
|||||||
reactor_name_(params.reactor_name),
|
reactor_name_(params.reactor_name),
|
||||||
mutex_(params.mutex),
|
mutex_(params.mutex),
|
||||||
cvar_(params.cvar),
|
cvar_(params.cvar),
|
||||||
stream_(mutex_, this) {}
|
stream_(mutex_, this),
|
||||||
|
reactor_(params.reactor) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LocalChannelWriter represents the channels to reactors living in the same
|
* LocalChannelWriter represents the channels to reactors living in the same
|
||||||
@ -361,6 +369,7 @@ class Channel {
|
|||||||
std::string channel_name;
|
std::string channel_name;
|
||||||
std::shared_ptr<std::mutex> mutex;
|
std::shared_ptr<std::mutex> mutex;
|
||||||
std::shared_ptr<std::condition_variable> cvar;
|
std::shared_ptr<std::condition_variable> cvar;
|
||||||
|
Reactor &reactor;
|
||||||
};
|
};
|
||||||
|
|
||||||
void Push(std::unique_ptr<Message> m) {
|
void Push(std::unique_ptr<Message> m) {
|
||||||
@ -410,6 +419,7 @@ class Channel {
|
|||||||
// dctor, so must be recursive.
|
// dctor, so must be recursive.
|
||||||
std::shared_ptr<std::mutex> mutex_;
|
std::shared_ptr<std::mutex> mutex_;
|
||||||
std::shared_ptr<std::condition_variable> cvar_;
|
std::shared_ptr<std::condition_variable> cvar_;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A weak_ptr to itself.
|
* A weak_ptr to itself.
|
||||||
*
|
*
|
||||||
@ -417,6 +427,7 @@ class Channel {
|
|||||||
*/
|
*/
|
||||||
std::weak_ptr<Channel> self_ptr_;
|
std::weak_ptr<Channel> self_ptr_;
|
||||||
LocalEventStream stream_;
|
LocalEventStream stream_;
|
||||||
|
Reactor &reactor_;
|
||||||
std::unordered_map<std::type_index,
|
std::unordered_map<std::type_index,
|
||||||
std::unordered_map<uint64_t, EventStream::Callback>>
|
std::unordered_map<uint64_t, EventStream::Callback>>
|
||||||
callbacks_;
|
callbacks_;
|
||||||
@ -432,7 +443,7 @@ class Channel {
|
|||||||
class Reactor {
|
class Reactor {
|
||||||
friend class System;
|
friend class System;
|
||||||
|
|
||||||
Reactor(System &system, std::string name,
|
Reactor(ChannelFinder &system, std::string name,
|
||||||
std::function<void(Reactor &)> setup)
|
std::function<void(Reactor &)> setup)
|
||||||
: system_(system), name_(name), setup_(setup), main_(Open("main")) {}
|
: system_(system), name_(name), setup_(setup), main_(Open("main")) {}
|
||||||
|
|
||||||
@ -461,7 +472,7 @@ class Reactor {
|
|||||||
Reactor &operator=(const Reactor &other) = delete;
|
Reactor &operator=(const Reactor &other) = delete;
|
||||||
Reactor &operator=(Reactor &&other) = default;
|
Reactor &operator=(Reactor &&other) = default;
|
||||||
|
|
||||||
System &system_;
|
ChannelFinder &system_;
|
||||||
std::string name_;
|
std::string name_;
|
||||||
std::function<void(Reactor &)> setup_;
|
std::function<void(Reactor &)> setup_;
|
||||||
|
|
||||||
@ -479,6 +490,8 @@ class Reactor {
|
|||||||
*/
|
*/
|
||||||
std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
|
std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
|
||||||
int64_t channel_name_counter_ = 0;
|
int64_t channel_name_counter_ = 0;
|
||||||
|
// I don't understand why ChannelWriter is shared. ChannelWriter is just
|
||||||
|
// endpoint that could be copied to every user.
|
||||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> main_;
|
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> main_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -501,18 +514,21 @@ class Reactor {
|
|||||||
* Placeholder for all reactors.
|
* Placeholder for all reactors.
|
||||||
* Make sure object of this class outlives all Reactors created by it.
|
* Make sure object of this class outlives all Reactors created by it.
|
||||||
*/
|
*/
|
||||||
class System {
|
class System : public ChannelFinder {
|
||||||
public:
|
public:
|
||||||
friend class Reactor;
|
friend class Reactor;
|
||||||
System() = default;
|
System() = default;
|
||||||
|
|
||||||
void Spawn(const std::string &name, std::function<void(Reactor &)> setup) {
|
void Spawn(const std::string &name, std::function<void(Reactor &)> setup,
|
||||||
|
ChannelFinder *finder = nullptr) {
|
||||||
|
if (!finder) {
|
||||||
|
finder = this;
|
||||||
|
}
|
||||||
std::unique_lock<std::mutex> lock(mutex_);
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
std::unique_ptr<Reactor> reactor(new Reactor(*this, name, setup));
|
std::unique_ptr<Reactor> reactor(new Reactor(*finder, name, setup));
|
||||||
std::thread reactor_thread([ this, raw_reactor = reactor.get() ] {
|
std::thread reactor_thread([reactor = reactor.get()] {
|
||||||
current_reactor_ = raw_reactor;
|
reactor->setup_(*reactor);
|
||||||
raw_reactor->setup_(*raw_reactor);
|
reactor->RunEventLoop();
|
||||||
raw_reactor->RunEventLoop();
|
|
||||||
});
|
});
|
||||||
auto got = reactors_.emplace(
|
auto got = reactors_.emplace(
|
||||||
name, std::pair<decltype(reactor), std::thread>{
|
name, std::pair<decltype(reactor), std::thread>{
|
||||||
@ -520,16 +536,28 @@ class System {
|
|||||||
CHECK(got.second) << "Reactor with name: '" << name << "' already exists";
|
CHECK(got.second) << "Reactor with name: '" << name << "' already exists";
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::shared_ptr<ChannelWriter> FindChannel(
|
std::shared_ptr<ChannelWriter> FindChannel(
|
||||||
const std::string &reactor_name, const std::string &channel_name) {
|
const std::string &reactor_name,
|
||||||
|
const std::string &channel_name) override {
|
||||||
std::unique_lock<std::mutex> lock(mutex_);
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
auto it_reactor = reactors_.find(reactor_name);
|
auto it_reactor = reactors_.find(reactor_name);
|
||||||
if (it_reactor == reactors_.end()) return nullptr;
|
if (it_reactor == reactors_.end()) return nullptr;
|
||||||
return it_reactor->second.first->FindChannel(channel_name);
|
return it_reactor->second.first->FindChannel(channel_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<ChannelWriter> FindChannel(const std::string &, uint16_t,
|
||||||
|
const std::string &,
|
||||||
|
const std::string &) override {
|
||||||
|
// TODO: This is awful design, but at this point I just want to make
|
||||||
|
// reactors work. We should templatize Reactor by system instead of dealing
|
||||||
|
// with interfaces then System would spawn Reactor<System> and
|
||||||
|
// DistributedSystem would spawn Reactor<DistributedSystem>.
|
||||||
|
LOG(FATAL) << "Tried to resolve remote channel in local System";
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Think about interaction with destructor. Should we call this in
|
// TODO: Think about interaction with destructor. Should we call this in
|
||||||
// destructor, complain in destructor if there are alive threads or stop them
|
// destructor, complain in destructor if there are alive threads or stop
|
||||||
|
// them
|
||||||
// in some way.
|
// in some way.
|
||||||
void AwaitShutdown() {
|
void AwaitShutdown() {
|
||||||
for (auto &key_value : reactors_) {
|
for (auto &key_value : reactors_) {
|
||||||
@ -550,4 +578,6 @@ class System {
|
|||||||
std::pair<std::unique_ptr<Reactor>, std::thread>>
|
std::pair<std::unique_ptr<Reactor>, std::thread>>
|
||||||
reactors_;
|
reactors_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
using Subscription = Channel::LocalEventStream::Subscription;
|
||||||
}
|
}
|
||||||
|
151
tests/unit/reactor_distributed.cpp
Normal file
151
tests/unit/reactor_distributed.cpp
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
/**
|
||||||
|
* This test file test the Distributed Reactors API on ONLY one process (no real
|
||||||
|
* networking).
|
||||||
|
* In other words, we send a message from one process to itself.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <future>
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "communication/reactor/common_messages.hpp"
|
||||||
|
#include "communication/reactor/reactor_distributed.hpp"
|
||||||
|
#include "gtest/gtest.h"
|
||||||
|
|
||||||
|
using namespace communication::reactor;
|
||||||
|
|
||||||
|
struct MessageInt : public Message {
|
||||||
|
MessageInt() {} // cereal needs this
|
||||||
|
MessageInt(int x) : x(x) {}
|
||||||
|
int x;
|
||||||
|
|
||||||
|
template <class Archive>
|
||||||
|
void serialize(Archive &ar) {
|
||||||
|
ar(cereal::virtual_base_class<Message>(this), x);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
CEREAL_REGISTER_TYPE(MessageInt);
|
||||||
|
|
||||||
|
struct RequestMessage : public ReturnAddressMessage {
|
||||||
|
RequestMessage() {}
|
||||||
|
RequestMessage(std::string reactor, std::string channel, int x)
|
||||||
|
: ReturnAddressMessage(reactor, channel), x(x){};
|
||||||
|
|
||||||
|
template <class Archive>
|
||||||
|
void serialize(Archive &ar) {
|
||||||
|
ar(cereal::virtual_base_class<ReturnAddressMessage>(this), x);
|
||||||
|
}
|
||||||
|
|
||||||
|
friend class cereal::access;
|
||||||
|
int x;
|
||||||
|
};
|
||||||
|
CEREAL_REGISTER_TYPE(RequestMessage);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test do the services start up without crashes.
|
||||||
|
*/
|
||||||
|
TEST(SimpleTests, StartAndStopServices) {
|
||||||
|
DistributedSystem system;
|
||||||
|
// do nothing
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||||
|
system.StopServices();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test simple message reception.
|
||||||
|
*
|
||||||
|
* Data flow:
|
||||||
|
* (1) Send an empty message from Master to Worker/main
|
||||||
|
*/
|
||||||
|
TEST(SimpleTests, SendEmptyMessage) {
|
||||||
|
DistributedSystem system;
|
||||||
|
|
||||||
|
system.Spawn("master", [](Reactor &r) {
|
||||||
|
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
|
||||||
|
writer->Send<Message>();
|
||||||
|
r.CloseChannel("main");
|
||||||
|
});
|
||||||
|
|
||||||
|
system.Spawn("worker", [](Reactor &r) {
|
||||||
|
r.main_.first->OnEventOnce().ChainOnce<Message>(
|
||||||
|
[&](const Message &, const Subscription &subscription) {
|
||||||
|
// if this message isn't delivered, the main channel will never be
|
||||||
|
// closed and we infinite loop
|
||||||
|
subscription.CloseChannel(); // close "main"
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
system.StopServices();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test ReturnAddressMsg functionality.
|
||||||
|
*
|
||||||
|
* Data flow:
|
||||||
|
* (1) Send an empty message from Master to Worker/main
|
||||||
|
* (2) Send an empty message from Worker to Master/main
|
||||||
|
*/
|
||||||
|
TEST(SimpleTests, SendReturnAddressMessage) {
|
||||||
|
DistributedSystem system;
|
||||||
|
|
||||||
|
system.Spawn("master", [](Reactor &r) {
|
||||||
|
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
|
||||||
|
writer->Send<ReturnAddressMessage>(r.name(), "main");
|
||||||
|
r.main_.first->OnEvent<MessageInt>(
|
||||||
|
[&](const MessageInt &message, const Subscription &) {
|
||||||
|
EXPECT_EQ(message.x, 5);
|
||||||
|
r.CloseChannel("main");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
system.Spawn("worker", [](Reactor &r) {
|
||||||
|
r.main_.first->OnEvent<ReturnAddressMessage>(
|
||||||
|
[&](const ReturnAddressMessage &message, const Subscription &) {
|
||||||
|
message.FindChannel(r.system_)->Send<MessageInt>(5);
|
||||||
|
r.CloseChannel("main");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
system.StopServices();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test serializability of a complex message over the network layer.
|
||||||
|
*
|
||||||
|
* Data flow:
|
||||||
|
* (1) Send ("hi", 123) from Master to Worker/main
|
||||||
|
* (2) Send ("hi back", 779) from Worker to Master/main
|
||||||
|
*/
|
||||||
|
TEST(SimpleTests, SendSerializableMessage) {
|
||||||
|
DistributedSystem system;
|
||||||
|
|
||||||
|
system.Spawn("master", [](Reactor &r) {
|
||||||
|
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
|
||||||
|
writer->Send<RequestMessage>(r.name(), "main", 123);
|
||||||
|
r.main_.first->OnEvent<MessageInt>(
|
||||||
|
[&](const MessageInt &message, const Subscription &) {
|
||||||
|
ASSERT_EQ(message.x, 779);
|
||||||
|
r.CloseChannel("main");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
system.Spawn("worker", [](Reactor &r) {
|
||||||
|
r.main_.first->OnEvent<RequestMessage>(
|
||||||
|
[&](const RequestMessage &message, const Subscription &) {
|
||||||
|
ASSERT_EQ(message.x, 123);
|
||||||
|
message.FindChannel(r.system_)->Send<MessageInt>(779);
|
||||||
|
r.CloseChannel("main");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
system.StopServices();
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user