Remove reactors
Summary: kill reactors Reviewers: mislav.bradac Reviewed By: mislav.bradac Differential Revision: https://phabricator.memgraph.io/D1075
This commit is contained in:
parent
6d4113d9db
commit
e999207b2f
@ -7,9 +7,6 @@ set(memgraph_src_files
|
||||
communication/messaging/distributed.cpp
|
||||
communication/messaging/local.cpp
|
||||
communication/messaging/protocol.cpp
|
||||
communication/reactor/protocol.cpp
|
||||
communication/reactor/reactor_distributed.cpp
|
||||
communication/reactor/reactor_local.cpp
|
||||
communication/rpc/rpc.cpp
|
||||
data_structures/concurrent/skiplist_gc.cpp
|
||||
database/graph_db.cpp
|
||||
|
@ -251,13 +251,6 @@ class RaftMember final {
|
||||
const RaftConfig &config);
|
||||
~RaftMember();
|
||||
|
||||
/* Just to make the tests work for now until we clean up the reactor stuff. */
|
||||
std::experimental::optional<MemberId> Leader() {
|
||||
std::lock_guard<std::mutex> lock(impl_.mutex_);
|
||||
return impl_.leader_;
|
||||
}
|
||||
MemberId Id() const { return impl_.id_; }
|
||||
|
||||
ClientResult AddCommand(const typename State::Change &command, bool blocking);
|
||||
|
||||
RequestVoteReply OnRequestVote(const RequestVoteRequest &request);
|
||||
|
@ -1,52 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
|
||||
// TODO: Which of these I need to include.
|
||||
#include "cereal/archives/binary.hpp"
|
||||
#include "cereal/types/base_class.hpp"
|
||||
#include "cereal/types/memory.hpp"
|
||||
#include "cereal/types/polymorphic.hpp"
|
||||
#include "cereal/types/string.hpp"
|
||||
#include "cereal/types/utility.hpp"
|
||||
#include "cereal/types/vector.hpp"
|
||||
|
||||
DECLARE_string(reactor_address);
|
||||
DECLARE_int32(reactor_port);
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
/**
|
||||
* Message that includes the channel on which response is expected;
|
||||
*/
|
||||
class ReturnAddressMessage : public Message {
|
||||
public:
|
||||
ReturnAddressMessage(std::string reactor, std::string channel)
|
||||
: address_(FLAGS_reactor_address),
|
||||
port_(FLAGS_reactor_port),
|
||||
reactor_(reactor),
|
||||
channel_(channel) {}
|
||||
|
||||
const std::string &address() const { return address_; }
|
||||
uint16_t port() const { return port_; }
|
||||
const std::string &reactor_name() const { return reactor_; }
|
||||
const std::string &channel_name() const { return channel_; }
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive &ar) {
|
||||
ar(cereal::virtual_base_class<Message>(this), address_, port_, reactor_,
|
||||
channel_);
|
||||
}
|
||||
|
||||
protected:
|
||||
friend class cereal::access;
|
||||
ReturnAddressMessage() {} // Cereal needs access to a default constructor.
|
||||
|
||||
// Good luck these being const using cereal...
|
||||
std::string address_;
|
||||
uint16_t port_;
|
||||
std::string reactor_;
|
||||
std::string channel_;
|
||||
};
|
||||
}
|
||||
CEREAL_REGISTER_TYPE(communication::reactor::ReturnAddressMessage);
|
@ -1,146 +0,0 @@
|
||||
#include <sstream>
|
||||
|
||||
#include "communication/reactor/protocol.hpp"
|
||||
#include "communication/reactor/reactor_distributed.hpp"
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
Session::Session(Socket &&socket, SessionData &data)
|
||||
: socket_(std::move(socket)), system_(data.system) {
|
||||
event_.data.ptr = this;
|
||||
}
|
||||
|
||||
bool Session::Alive() const { return alive_; }
|
||||
|
||||
std::string Session::GetStringAndShift(SizeT len) {
|
||||
std::string ret(reinterpret_cast<char *>(buffer_.data()), len);
|
||||
buffer_.Shift(len);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Session::Execute() {
|
||||
if (!handshake_done_) {
|
||||
// Note: this function can be multiple times before the buffer has the full
|
||||
// packet.
|
||||
// We currently have to check for this case and return without shifting
|
||||
// the buffer.
|
||||
// In other words, only shift anything from the buffer if you can read the
|
||||
// entire (sub)message.
|
||||
|
||||
if (buffer_.size() < 2 * sizeof(SizeT)) return;
|
||||
SizeT len_reactor = GetLength();
|
||||
SizeT len_channel = GetLength(sizeof(SizeT));
|
||||
|
||||
if (buffer_.size() < 2 * sizeof(SizeT) + len_reactor + len_channel) return;
|
||||
|
||||
// remove the length bytes from the buffer
|
||||
buffer_.Shift(2 * sizeof(SizeT));
|
||||
|
||||
reactor_ = GetStringAndShift(len_reactor);
|
||||
channel_ = GetStringAndShift(len_channel);
|
||||
|
||||
DLOG(INFO) << "Reactor: " << reactor_ << "; Channel: " << channel_
|
||||
<< std::endl;
|
||||
|
||||
LocalChannelWriter channel(reactor_, channel_, system_);
|
||||
SendSuccess(true);
|
||||
|
||||
handshake_done_ = true;
|
||||
}
|
||||
|
||||
if (buffer_.size() < sizeof(SizeT)) return;
|
||||
SizeT len_data = GetLength();
|
||||
if (buffer_.size() < sizeof(SizeT) + len_data) return;
|
||||
|
||||
// remove the length bytes from the buffer
|
||||
buffer_.Shift(sizeof(SizeT));
|
||||
|
||||
// TODO: check for exceptions
|
||||
std::istringstream stream;
|
||||
stream.str(std::string(reinterpret_cast<char *>(buffer_.data()), len_data));
|
||||
::cereal::BinaryInputArchive iarchive{stream};
|
||||
std::unique_ptr<Message> message{nullptr};
|
||||
iarchive(message);
|
||||
buffer_.Shift(len_data);
|
||||
|
||||
LocalChannelWriter channel(reactor_, channel_, system_);
|
||||
channel.Send(std::move(message));
|
||||
}
|
||||
|
||||
StreamBuffer Session::Allocate() { return buffer_.Allocate(); }
|
||||
|
||||
void Session::Written(size_t len) { buffer_.Written(len); }
|
||||
|
||||
void Session::Close() {
|
||||
DLOG(INFO) << "Closing session";
|
||||
this->socket_.Close();
|
||||
}
|
||||
|
||||
SizeT Session::GetLength(int offset) {
|
||||
SizeT ret = *reinterpret_cast<SizeT *>(buffer_.data() + offset);
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool Session::SendSuccess(bool success) {
|
||||
if (success) {
|
||||
return socket_.Write("\x80");
|
||||
}
|
||||
return socket_.Write("\x40");
|
||||
}
|
||||
|
||||
bool SendLength(Socket &socket, SizeT length) {
|
||||
return socket.Write(reinterpret_cast<uint8_t *>(&length), sizeof(SizeT));
|
||||
}
|
||||
|
||||
void SendMessage(std::string address, uint16_t port, std::string reactor,
|
||||
std::string channel, std::unique_ptr<Message> message) {
|
||||
// Initialize endpoint.
|
||||
Endpoint endpoint(address.c_str(), port);
|
||||
|
||||
// Initialize socket.
|
||||
Socket socket;
|
||||
if (!socket.Connect(endpoint)) {
|
||||
LOG(INFO) << "Couldn't connect to remote address: " << address << ":"
|
||||
<< port;
|
||||
return;
|
||||
}
|
||||
|
||||
// Send data
|
||||
if (!SendLength(socket, reactor.size())) {
|
||||
LOG(INFO) << "Couldn't send reactor size!";
|
||||
return;
|
||||
}
|
||||
if (!SendLength(socket, channel.size())) {
|
||||
LOG(INFO) << "Couldn't send channel size!";
|
||||
return;
|
||||
}
|
||||
if (!socket.Write(reactor)) {
|
||||
LOG(INFO) << "Couldn't send reactor data!";
|
||||
return;
|
||||
}
|
||||
if (!socket.Write(channel)) {
|
||||
LOG(INFO) << "Couldn't send channel data!";
|
||||
return;
|
||||
}
|
||||
|
||||
if (message == nullptr) return;
|
||||
|
||||
// Serialize and send message
|
||||
std::ostringstream stream;
|
||||
::cereal::BinaryOutputArchive oarchive(stream);
|
||||
oarchive(message);
|
||||
|
||||
const std::string &buffer = stream.str();
|
||||
if (!SendLength(socket, buffer.size())) {
|
||||
LOG(INFO) << "Couldn't send message size!";
|
||||
return;
|
||||
}
|
||||
if (!socket.Write(buffer)) {
|
||||
LOG(INFO) << "Couldn't send message data!";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,137 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include "communication/bolt/v1/decoder/buffer.hpp"
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
#include "io/network/epoll.hpp"
|
||||
#include "io/network/network_endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "io/network/stream_buffer.hpp"
|
||||
|
||||
/**
|
||||
* @brief Protocol
|
||||
*
|
||||
* Has classes and functions that implement server and client sides of our
|
||||
* distributed protocol.
|
||||
*
|
||||
* The protocol consists of two stages.
|
||||
* The first stage is a handshake stage when the client sends to the server
|
||||
* reactor and channel names which it wants to communicate with.
|
||||
* The second stage is sending messages.
|
||||
*
|
||||
* HANDSHAKE
|
||||
*
|
||||
* Client sends:
|
||||
* len_reactor_name(SizeT) len_channel_name(SizeT) reactor_name channel_name
|
||||
* Server responds:
|
||||
* 0x80
|
||||
*
|
||||
* MESSAGES
|
||||
*
|
||||
* Client sends:
|
||||
* len_message(SizeT) cereal_encoded_binary_message
|
||||
*
|
||||
* Currently the server is implemented to handle more than one message after
|
||||
* the initial handshake, but the client can only send one message.
|
||||
* TODO: no reason to do any sort of handshake at all.
|
||||
*/
|
||||
namespace communication::reactor {
|
||||
|
||||
class Message;
|
||||
|
||||
using Endpoint = io::network::NetworkEndpoint;
|
||||
using Socket = io::network::Socket;
|
||||
using StreamBuffer = io::network::StreamBuffer;
|
||||
|
||||
// this buffer should be larger than the largest serialized message
|
||||
using Buffer = bolt::Buffer<262144>;
|
||||
using SizeT = uint16_t;
|
||||
|
||||
/**
|
||||
* Distributed Protocol Data
|
||||
*/
|
||||
struct SessionData {
|
||||
System system;
|
||||
};
|
||||
|
||||
/**
|
||||
* Distributed Protocol Session
|
||||
*
|
||||
* This class is responsible for handling a single client connection.
|
||||
*
|
||||
* @tparam Socket type of socket (could be a network socket or test socket)
|
||||
*/
|
||||
class Session {
|
||||
private:
|
||||
public:
|
||||
Session(Socket &&socket, SessionData &data);
|
||||
|
||||
int Id() const { return socket_.fd(); }
|
||||
|
||||
/**
|
||||
* Returns the protocol alive state
|
||||
*/
|
||||
bool Alive() const;
|
||||
|
||||
/**
|
||||
* Executes the protocol after data has been read into the buffer.
|
||||
* Goes through the protocol states in order to execute commands from the
|
||||
* client.
|
||||
*/
|
||||
void Execute();
|
||||
|
||||
/**
|
||||
* Allocates data from the internal buffer.
|
||||
* Used in the underlying network stack to asynchronously read data
|
||||
* from the client.
|
||||
* @returns a StreamBuffer to the allocated internal data buffer
|
||||
*/
|
||||
StreamBuffer Allocate();
|
||||
|
||||
/**
|
||||
* Notifies the internal buffer of written data.
|
||||
* Used in the underlying network stack to notify the internal buffer
|
||||
* how many bytes of data have been written.
|
||||
* @param len how many data was written to the buffer
|
||||
*/
|
||||
void Written(size_t len);
|
||||
|
||||
bool TimedOut() { return false; }
|
||||
|
||||
/**
|
||||
* Closes the session (client socket).
|
||||
*/
|
||||
void Close();
|
||||
|
||||
io::network::Epoll::Event event_;
|
||||
Socket socket_;
|
||||
System &system_;
|
||||
|
||||
std::chrono::time_point<std::chrono::steady_clock> last_event_time_;
|
||||
|
||||
private:
|
||||
SizeT GetLength(int offset = 0);
|
||||
std::string GetStringAndShift(SizeT len);
|
||||
// Should be renamed to SendHandshake.
|
||||
bool SendSuccess(bool success);
|
||||
|
||||
bool alive_{true};
|
||||
bool handshake_done_{false};
|
||||
|
||||
std::string reactor_{""};
|
||||
std::string channel_{""};
|
||||
|
||||
Buffer buffer_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Distributed Protocol Send Message
|
||||
*
|
||||
* This function sends a message to the specified server.
|
||||
* If message is a nullptr then it only checks whether the remote reactor
|
||||
* and channel exist, else it returns the complete message send success.
|
||||
*/
|
||||
void SendMessage(std::string address, uint16_t port, std::string reactor,
|
||||
std::string channel, std::unique_ptr<Message> message);
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
#include "communication/reactor/reactor_distributed.hpp"
|
||||
|
||||
// reactor adress can't be 0.0.0.0.
|
||||
DEFINE_string(reactor_address, "127.0.0.1", "Network server bind address");
|
||||
DEFINE_int32(reactor_port, 10000, "Network server bind port");
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
RemoteChannelWriter::RemoteChannelWriter(const std::string &address,
|
||||
uint16_t port,
|
||||
const std::string &reactor,
|
||||
const std::string &channel,
|
||||
DistributedSystem &system)
|
||||
: network_(&system.network_),
|
||||
address_(address),
|
||||
port_(port),
|
||||
reactor_(reactor),
|
||||
channel_(channel) {}
|
||||
}
|
@ -1,225 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <stdexcept>
|
||||
#include <tuple>
|
||||
#include <typeindex>
|
||||
#include <utility>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
#include "data_structures/queue.hpp"
|
||||
#include "protocol.hpp"
|
||||
|
||||
#include "cereal/archives/binary.hpp"
|
||||
#include "cereal/types/base_class.hpp"
|
||||
#include "cereal/types/memory.hpp"
|
||||
#include "cereal/types/polymorphic.hpp"
|
||||
#include "cereal/types/string.hpp"
|
||||
#include "cereal/types/utility.hpp"
|
||||
#include "cereal/types/vector.hpp"
|
||||
|
||||
#include "communication/server.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
|
||||
DECLARE_string(reactor_address);
|
||||
DECLARE_int32(reactor_port);
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
class DistributedSystem;
|
||||
|
||||
/**
|
||||
* Networking service.
|
||||
*/
|
||||
class Network {
|
||||
private:
|
||||
using Endpoint = io::network::NetworkEndpoint;
|
||||
using Socket = Socket;
|
||||
using ServerT = communication::Server<Session, SessionData>;
|
||||
friend class DistributedSystem;
|
||||
|
||||
struct NetworkMessage {
|
||||
NetworkMessage() {}
|
||||
|
||||
NetworkMessage(const std::string &address, uint16_t port,
|
||||
const std::string &reactor, const std::string &channel,
|
||||
std::unique_ptr<Message> &&message)
|
||||
: address(address),
|
||||
port(port),
|
||||
reactor(reactor),
|
||||
channel(channel),
|
||||
message(std::move(message)) {}
|
||||
|
||||
NetworkMessage(NetworkMessage &&nm) = default;
|
||||
NetworkMessage &operator=(NetworkMessage &&nm) = default;
|
||||
|
||||
std::string address;
|
||||
uint16_t port = 0;
|
||||
std::string reactor;
|
||||
std::string channel;
|
||||
std::unique_ptr<Message> message;
|
||||
};
|
||||
|
||||
public:
|
||||
Network() = default;
|
||||
|
||||
/** Start a threadpool that dispatches the messages from the (outgoing) queue
|
||||
* to the sockets */
|
||||
void StartClient(int worker_count) {
|
||||
LOG(INFO) << "Starting " << worker_count << " client workers";
|
||||
|
||||
// condition variables here...
|
||||
for (int i = 0; i < worker_count; ++i) {
|
||||
pool_.push_back(std::thread([this]() {
|
||||
while (true) {
|
||||
auto message = queue_.AwaitPop();
|
||||
if (message == std::experimental::nullopt) break;
|
||||
SendMessage(message->address, message->port, message->reactor,
|
||||
message->channel, std::move(message->message));
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
void StopClient() {
|
||||
while (true) {
|
||||
std::lock_guard<SpinLock> lock(mutex_);
|
||||
if (queue_.empty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
queue_.Shutdown();
|
||||
for (size_t i = 0; i < pool_.size(); ++i) {
|
||||
pool_[i].join();
|
||||
}
|
||||
pool_.clear();
|
||||
}
|
||||
|
||||
class RemoteChannelWriter : public ChannelWriter {
|
||||
public:
|
||||
RemoteChannelWriter(const std::string &address, uint16_t port,
|
||||
const std::string &reactor, const std::string &channel,
|
||||
DistributedSystem &system);
|
||||
|
||||
// TODO: This is wrong. We should probbly have base class Address that would
|
||||
// contain everything needed to reference a channel. (address, port,
|
||||
// reactor_name, channel_name) in remote reactors and (reactor_name,
|
||||
// channel_name) in local reactors.
|
||||
virtual std::string Address() { return address_; }
|
||||
virtual uint16_t Port() { return port_; }
|
||||
std::string ReactorName() const override { return reactor_; }
|
||||
std::string Name() const override { return channel_; }
|
||||
|
||||
template <typename TMessage, typename... Args>
|
||||
void Send(Args &&... args) {
|
||||
Send(std::unique_ptr<Message>(
|
||||
std::make_unique<TMessage>(std::forward<Args>(args)...)));
|
||||
}
|
||||
|
||||
void Send(std::unique_ptr<Message> message) override {
|
||||
std::lock_guard<SpinLock> lock(network_->mutex_);
|
||||
network_->queue_.Emplace(address_, port_, reactor_, channel_,
|
||||
std::move(message));
|
||||
}
|
||||
|
||||
private:
|
||||
Network *network_;
|
||||
std::string address_;
|
||||
uint16_t port_;
|
||||
std::string reactor_;
|
||||
std::string channel_;
|
||||
};
|
||||
|
||||
// server functions
|
||||
|
||||
std::string address() const { return FLAGS_reactor_address; }
|
||||
|
||||
uint16_t port() const { return FLAGS_reactor_port; }
|
||||
|
||||
/** Start a threadpool that relays the messages from the sockets to the
|
||||
* LocalEventStreams */
|
||||
void StartServer(int workers_count) {
|
||||
if (server_ != nullptr) {
|
||||
LOG(FATAL) << "Tried to start a running server!";
|
||||
}
|
||||
|
||||
// Initialize endpoint.
|
||||
Endpoint endpoint(FLAGS_reactor_address.c_str(), FLAGS_reactor_port);
|
||||
// Initialize server
|
||||
server_ = std::make_unique<ServerT>(endpoint, protocol_data_);
|
||||
|
||||
// Start server
|
||||
thread_ = std::thread(
|
||||
[workers_count, this]() { this->server_->Start(workers_count); });
|
||||
}
|
||||
|
||||
void StopServer() {
|
||||
if (server_ != nullptr) {
|
||||
server_->Shutdown();
|
||||
thread_.join();
|
||||
server_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
// client variables
|
||||
SpinLock mutex_;
|
||||
std::vector<std::thread> pool_;
|
||||
Queue<NetworkMessage> queue_;
|
||||
|
||||
// server variables
|
||||
std::thread thread_;
|
||||
SessionData protocol_data_;
|
||||
std::unique_ptr<ServerT> server_{nullptr};
|
||||
};
|
||||
|
||||
using RemoteChannelWriter = Network::RemoteChannelWriter;
|
||||
|
||||
/**
|
||||
* Placeholder for all functionality related to non-local communication.
|
||||
* E.g. resolve remote channels by memgraph node id, etc.
|
||||
*/
|
||||
class DistributedSystem {
|
||||
public:
|
||||
DistributedSystem() {
|
||||
network_.StartClient(4);
|
||||
network_.StartServer(4);
|
||||
}
|
||||
|
||||
// Thread safe.
|
||||
std::unique_ptr<Reactor> Spawn(const std::string &name,
|
||||
std::function<void(Reactor &)> setup) {
|
||||
return system_.Spawn(name, setup);
|
||||
}
|
||||
|
||||
// Non-thread safe.
|
||||
// TODO: figure out what should be interection of this function and
|
||||
// destructor.
|
||||
void StopServices() {
|
||||
network_.StopClient();
|
||||
network_.StopServer();
|
||||
}
|
||||
|
||||
Network &network() { return network_; }
|
||||
const Network &network() const { return network_; }
|
||||
|
||||
// Should be private
|
||||
Network network_;
|
||||
|
||||
private:
|
||||
System &system_ = network_.protocol_data_.system;
|
||||
|
||||
DistributedSystem(const DistributedSystem &) = delete;
|
||||
DistributedSystem(DistributedSystem &&) = delete;
|
||||
DistributedSystem &operator=(const DistributedSystem &) = delete;
|
||||
DistributedSystem &operator=(DistributedSystem &&) = delete;
|
||||
};
|
||||
} // namespace communication::reactor
|
@ -1,169 +0,0 @@
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
void EventStream::Subscription::Unsubscribe() const {
|
||||
event_queue_.RemoveCallback(*this);
|
||||
}
|
||||
|
||||
void EventStream::Subscription::CloseChannel() const { event_queue_.Close(); }
|
||||
|
||||
const std::string &EventStream::Subscription::channel_name() const {
|
||||
return event_queue_.channel_name_;
|
||||
}
|
||||
|
||||
std::string Channel::LocalChannelWriter::ReactorName() const {
|
||||
return reactor_name_;
|
||||
}
|
||||
|
||||
void Channel::LocalChannelWriter::Send(std::unique_ptr<Message> m) {
|
||||
// Atomic, per the standard. We guarantee here that if channel exists it
|
||||
// will not be destroyed by the end of this function.
|
||||
std::shared_ptr<Channel> queue = queue_.lock();
|
||||
// Check if cached queue exists and send message.
|
||||
if (queue) {
|
||||
queue->Push(std::move(m));
|
||||
return;
|
||||
}
|
||||
// If it doesn't exist. Check if there is a new channel with same name.
|
||||
auto channel = system_.Resolve(reactor_name_, channel_name_);
|
||||
if (channel) {
|
||||
channel->Push(std::move(m));
|
||||
queue_ = channel;
|
||||
}
|
||||
}
|
||||
|
||||
std::string Channel::LocalChannelWriter::Name() const { return channel_name_; }
|
||||
|
||||
std::shared_ptr<Channel::LocalChannelWriter> Channel::LockedOpenChannel() {
|
||||
return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
|
||||
reactor_.system_);
|
||||
}
|
||||
|
||||
void Channel::Close() { reactor_.CloseChannel(channel_name_); }
|
||||
|
||||
Reactor::Reactor(System &system, const std::string &name,
|
||||
const std::function<void(Reactor &)> &setup)
|
||||
: system_(system),
|
||||
name_(name),
|
||||
setup_(setup),
|
||||
main_(Open("main")),
|
||||
thread_([this] {
|
||||
setup_(*this);
|
||||
RunEventLoop();
|
||||
system_.RemoveReactor(name_);
|
||||
}) {}
|
||||
|
||||
Reactor::~Reactor() {
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(*mutex_);
|
||||
channels_.clear();
|
||||
}
|
||||
cvar_->notify_all();
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
|
||||
const std::string &channel_name) {
|
||||
std::unique_lock<std::mutex> lock(*mutex_);
|
||||
if (channels_.count(channel_name) != 0) {
|
||||
throw utils::BasicException("Channel with name " + channel_name +
|
||||
"already exists");
|
||||
}
|
||||
auto it = channels_
|
||||
.emplace(channel_name,
|
||||
std::make_shared<Channel>(Channel::Params{
|
||||
name_, channel_name, mutex_, cvar_, *this}))
|
||||
.first;
|
||||
it->second->self_ptr_ = it->second;
|
||||
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
|
||||
}
|
||||
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open() {
|
||||
std::unique_lock<std::mutex> lock(*mutex_);
|
||||
do {
|
||||
std::string channel_name =
|
||||
"stream-" + std::to_string(channel_name_counter_++);
|
||||
if (channels_.count(channel_name) == 0) {
|
||||
auto it = channels_
|
||||
.emplace(channel_name,
|
||||
std::make_shared<Channel>(Channel::Params{
|
||||
name_, channel_name, mutex_, cvar_, *this}))
|
||||
.first;
|
||||
it->second->self_ptr_ = it->second;
|
||||
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
|
||||
std::shared_ptr<Channel> Reactor::FindChannel(const std::string &channel_name) {
|
||||
std::unique_lock<std::mutex> lock(*mutex_);
|
||||
auto it_channel = channels_.find(channel_name);
|
||||
if (it_channel == channels_.end()) return nullptr;
|
||||
return it_channel->second;
|
||||
}
|
||||
|
||||
void Reactor::CloseChannel(const std::string &s) {
|
||||
std::unique_lock<std::mutex> lock(*mutex_);
|
||||
auto it = channels_.find(s);
|
||||
CHECK(it != channels_.end()) << "Trying to close nonexisting channel";
|
||||
channels_.erase(it);
|
||||
cvar_->notify_all();
|
||||
}
|
||||
|
||||
void Reactor::RunEventLoop() {
|
||||
while (true) {
|
||||
// Find (or wait) for the next Message.
|
||||
PendingMessageInfo info;
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(*mutex_);
|
||||
// Exit the loop if there are no more Channels.
|
||||
cvar_->wait_for(guard, 200ms, [&] {
|
||||
if (channels_.empty()) return true;
|
||||
info = GetPendingMessages();
|
||||
return static_cast<bool>(info.message);
|
||||
});
|
||||
if (channels_.empty()) break;
|
||||
}
|
||||
|
||||
for (auto &callback_info : info.callbacks) {
|
||||
callback_info.first(*info.message, callback_info.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if there is any nonempty EventStream.
|
||||
*/
|
||||
Reactor::PendingMessageInfo Reactor::GetPendingMessages() {
|
||||
for (auto &channels_key_value : channels_) {
|
||||
Channel &event_queue = *channels_key_value.second;
|
||||
auto message = event_queue.LockedPop();
|
||||
if (message == nullptr) continue;
|
||||
std::type_index type_index = message->GetTypeIndex();
|
||||
|
||||
using Subscription = EventStream::Subscription;
|
||||
std::vector<std::pair<EventStream::Callback, Subscription>> callback_info;
|
||||
auto msg_type_cb_iter = event_queue.callbacks_.find(type_index);
|
||||
if (msg_type_cb_iter != event_queue.callbacks_.end()) {
|
||||
// There is a callback for this type.
|
||||
for (auto &type_index_cb_key_value : msg_type_cb_iter->second) {
|
||||
auto uid = type_index_cb_key_value.first;
|
||||
auto callback = type_index_cb_key_value.second;
|
||||
callback_info.emplace_back(callback,
|
||||
Subscription(event_queue, type_index, uid));
|
||||
}
|
||||
}
|
||||
|
||||
return PendingMessageInfo{std::move(message), std::move(callback_info)};
|
||||
}
|
||||
|
||||
return PendingMessageInfo{};
|
||||
}
|
||||
}
|
@ -1,538 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
#include "cereal/types/memory.hpp"
|
||||
#include "glog/logging.h"
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
class EventStream;
|
||||
class Reactor;
|
||||
class System;
|
||||
class Channel;
|
||||
|
||||
/**
|
||||
* Base class for messages.
|
||||
*/
|
||||
class Message {
|
||||
public:
|
||||
virtual ~Message() {}
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive &) {}
|
||||
|
||||
/**
|
||||
* Run-time type identification that is used for callbacks.
|
||||
*
|
||||
* Warning: this works because of the virtual destructor, don't remove it from
|
||||
* this class
|
||||
*/
|
||||
std::type_index GetTypeIndex() { return typeid(*this); }
|
||||
};
|
||||
|
||||
/**
|
||||
* Write-end of a Channel (between two reactors).
|
||||
*/
|
||||
class ChannelWriter {
|
||||
public:
|
||||
ChannelWriter() = default;
|
||||
ChannelWriter(const ChannelWriter &) = delete;
|
||||
void operator=(const ChannelWriter &) = delete;
|
||||
ChannelWriter(ChannelWriter &&) = delete;
|
||||
void operator=(ChannelWriter &&) = delete;
|
||||
|
||||
/**
|
||||
* Construct and send the message to the channel.
|
||||
*/
|
||||
template <typename TMessage, typename... Args>
|
||||
void Send(Args &&... args) {
|
||||
Send(std::unique_ptr<Message>(
|
||||
std::make_unique<TMessage>(std::forward<Args>(args)...)));
|
||||
}
|
||||
|
||||
virtual void Send(std::unique_ptr<Message> message) = 0;
|
||||
|
||||
virtual std::string ReactorName() const = 0;
|
||||
virtual std::string Name() const = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Read-end of a Channel (between two reactors).
|
||||
*/
|
||||
class EventStream {
|
||||
public:
|
||||
class OnEventOnceChainer;
|
||||
class Subscription;
|
||||
|
||||
/**
|
||||
* Register a callback that will be called whenever an event arrives.
|
||||
*/
|
||||
template <typename TMessage>
|
||||
void OnEvent(
|
||||
std::function<void(const TMessage &, const Subscription &)> &&callback) {
|
||||
OnEventHelper(typeid(TMessage), [callback = std::move(callback)](
|
||||
const Message &base_message,
|
||||
const Subscription &subscription) {
|
||||
const auto &message = dynamic_cast<const TMessage &>(base_message);
|
||||
callback(message, subscription);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a callback that will be called only once.
|
||||
* Once event is received, channel of this EventStream is closed.
|
||||
*/
|
||||
template <typename TMessage>
|
||||
void OnEventOnceThenClose(std::function<void(const TMessage &)> &&callback) {
|
||||
OnEventHelper(typeid(TMessage), [callback = std::move(callback)](
|
||||
const Message &base_message,
|
||||
const Subscription &subscription) {
|
||||
const TMessage &message = dynamic_cast<const TMessage &>(base_message);
|
||||
subscription.CloseChannel();
|
||||
callback(message);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a chain to register a callback that fires off only once.
|
||||
*
|
||||
* This method supports chaining (see the the class OnEventOnceChainer or the
|
||||
* tests for examples).
|
||||
* Warning: when chaining callbacks, make sure that EventStream does not
|
||||
* deallocate before the last
|
||||
* chained callback fired.
|
||||
*/
|
||||
OnEventOnceChainer OnEventOnce() { return OnEventOnceChainer(*this); }
|
||||
|
||||
/**
|
||||
* Get the name of the channel.
|
||||
*/
|
||||
virtual const std::string &ChannelName() = 0;
|
||||
|
||||
/**
|
||||
* Subscription Service.
|
||||
*
|
||||
* Unsubscribe from a callback. Lightweight object (can copy by value).
|
||||
*/
|
||||
class Subscription {
|
||||
public:
|
||||
/**
|
||||
* Unsubscribe. Call only once.
|
||||
*/
|
||||
void Unsubscribe() const;
|
||||
|
||||
/**
|
||||
* Close the stream. Convenience method.
|
||||
*/
|
||||
void CloseChannel() const;
|
||||
|
||||
/**
|
||||
* Get the name of the channel the message is delivered to.
|
||||
*/
|
||||
const std::string &channel_name() const;
|
||||
|
||||
private:
|
||||
friend class Reactor;
|
||||
friend class Channel;
|
||||
|
||||
Subscription(Channel &event_queue, std::type_index type_index,
|
||||
uint64_t callback_id)
|
||||
: event_queue_(event_queue),
|
||||
type_index_(type_index),
|
||||
callback_id_(callback_id) {}
|
||||
|
||||
Channel &event_queue_;
|
||||
std::type_index type_index_;
|
||||
uint64_t callback_id_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Close this event stream, disallowing further events from getting received.
|
||||
*
|
||||
* Any subsequent call after Close() to any function will be result in
|
||||
* undefined
|
||||
* behavior (invalid pointer dereference). Can only be called from the thread
|
||||
* associated with the Reactor.
|
||||
*/
|
||||
virtual void Close() = 0;
|
||||
|
||||
/**
|
||||
* Convenience class to chain one-off callbacks.
|
||||
*
|
||||
* Usage: Create this class with OnEventOnce() and then chain callbacks using
|
||||
* ChainOnce.
|
||||
* A callback will fire only once, unsubscribe and immediately subscribe the
|
||||
* next callback to the stream.
|
||||
*
|
||||
* Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb);
|
||||
*
|
||||
* Implementation: This class is a temporary object that remembers the
|
||||
* callbacks that are to be installed
|
||||
* and finally installs them in the destructor. Not sure is this kosher, is
|
||||
* there another way?
|
||||
*/
|
||||
class OnEventOnceChainer {
|
||||
public:
|
||||
explicit OnEventOnceChainer(EventStream &event_stream)
|
||||
: event_stream_(event_stream) {}
|
||||
~OnEventOnceChainer() { InstallCallbacks(); }
|
||||
|
||||
template <typename TMessage>
|
||||
OnEventOnceChainer &ChainOnce(
|
||||
std::function<void(const TMessage &, const Subscription &)>
|
||||
&&callback) {
|
||||
std::function<void(const Message &, const Subscription &)>
|
||||
wrap = [callback = std::move(callback)](
|
||||
const Message &base_message, const Subscription &subscription) {
|
||||
const TMessage &message = dynamic_cast<const TMessage &>(base_message);
|
||||
subscription.Unsubscribe();
|
||||
// Warning: this can close the Channel, be careful what you put after
|
||||
// it!
|
||||
callback(message, subscription);
|
||||
};
|
||||
callbacks_.emplace_back(typeid(TMessage), std::move(wrap));
|
||||
return *this;
|
||||
}
|
||||
|
||||
private:
|
||||
void InstallCallbacks() {
|
||||
int num_callbacks = callbacks_.size();
|
||||
CHECK(num_callbacks > 0) << "No callback will be installed";
|
||||
std::function<void(const Message &, const Subscription &)> next_callback;
|
||||
std::type_index next_type = typeid(nullptr);
|
||||
|
||||
for (int i = num_callbacks - 1; i >= 0; --i) {
|
||||
std::function<void(const Message &, const Subscription &)>
|
||||
tmp_callback = [
|
||||
callback = std::move(callbacks_[i].second), next_type,
|
||||
next_callback = std::move(next_callback),
|
||||
event_stream = &this->event_stream_
|
||||
](const Message &message, const Subscription &subscription) {
|
||||
callback(message, subscription);
|
||||
if (next_callback) {
|
||||
event_stream->OnEventHelper(next_type, std::move(next_callback));
|
||||
}
|
||||
};
|
||||
next_callback = std::move(tmp_callback);
|
||||
next_type = callbacks_[i].first;
|
||||
}
|
||||
|
||||
event_stream_.OnEventHelper(next_type, std::move(next_callback));
|
||||
}
|
||||
|
||||
EventStream &event_stream_;
|
||||
std::vector<
|
||||
std::pair<std::type_index,
|
||||
std::function<void(const Message &, const Subscription &)>>>
|
||||
callbacks_;
|
||||
};
|
||||
|
||||
typedef std::function<void(const Message &, const Subscription &)> Callback;
|
||||
|
||||
private:
|
||||
virtual void OnEventHelper(std::type_index type_index, Callback callback) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Implementation of a channel.
|
||||
*
|
||||
* This class is an internal data structure that represents the state of the
|
||||
* channel. This class is not meant to be used by the clients of the messaging
|
||||
* framework. The Channel class wraps the event queue data structure, the mutex
|
||||
* that protects concurrent access to the event queue, the local channel and the
|
||||
* event stream. The class is owned by the Reactor. It gets closed when the
|
||||
* owner reactor (the one that owns the read-end of a channel) removes/closes
|
||||
* it.
|
||||
*/
|
||||
class Channel {
|
||||
struct Params;
|
||||
|
||||
public:
|
||||
friend class Reactor; // to create a Params initialization object
|
||||
friend class EventStream::Subscription;
|
||||
|
||||
/**
|
||||
* LocalChannelWriter represents the channels to reactors living in the same
|
||||
* reactor system (write-end of the channels).
|
||||
*
|
||||
* Sending messages to the local channel requires acquiring the mutex.
|
||||
* LocalChannelWriter holds a (weak) pointer to the enclosing Channel object.
|
||||
* Messages sent to a closed channel are ignored.
|
||||
* There can be multiple LocalChannelWriters refering to the same stream if
|
||||
* needed.
|
||||
*
|
||||
* It must be outlived by System.
|
||||
*/
|
||||
class LocalChannelWriter : public ChannelWriter {
|
||||
public:
|
||||
friend class Channel;
|
||||
|
||||
LocalChannelWriter(const std::string &reactor_name,
|
||||
const std::string &channel_name, System &system)
|
||||
: reactor_name_(reactor_name),
|
||||
channel_name_(channel_name),
|
||||
system_(system) {}
|
||||
|
||||
template <typename TMessage, typename... Args>
|
||||
void Send(Args &&... args) {
|
||||
Send(std::unique_ptr<Message>(
|
||||
std::make_unique<TMessage>(std::forward<Args>(args)...)));
|
||||
}
|
||||
|
||||
void Send(std::unique_ptr<Message> m) override;
|
||||
std::string ReactorName() const override;
|
||||
std::string Name() const override;
|
||||
|
||||
private:
|
||||
std::string reactor_name_;
|
||||
std::string channel_name_;
|
||||
// TODO: we shouldn't do this kind of caching inside of LocalChannelWriter.
|
||||
std::weak_ptr<Channel> queue_;
|
||||
System &system_;
|
||||
};
|
||||
|
||||
explicit Channel(const Params ¶ms)
|
||||
: channel_name_(params.channel_name),
|
||||
reactor_name_(params.reactor_name),
|
||||
mutex_(params.mutex),
|
||||
cvar_(params.cvar),
|
||||
stream_(mutex_, this),
|
||||
reactor_(params.reactor) {}
|
||||
|
||||
/**
|
||||
* Implementation of the event stream.
|
||||
*
|
||||
* After the enclosing Channel object is destroyed (by a call to CloseChannel
|
||||
* or Close).
|
||||
*/
|
||||
class LocalEventStream : public EventStream {
|
||||
public:
|
||||
friend class Channel;
|
||||
|
||||
LocalEventStream(const std::shared_ptr<std::mutex> &mutex, Channel *queue)
|
||||
: mutex_(mutex), queue_(queue) {}
|
||||
|
||||
void OnEventHelper(std::type_index type_index, Callback callback) {
|
||||
std::unique_lock<std::mutex> lock(*mutex_);
|
||||
queue_->LockedOnEventHelper(type_index, callback);
|
||||
}
|
||||
|
||||
const std::string &ChannelName() { return queue_->channel_name_; }
|
||||
|
||||
void Close() { queue_->Close(); }
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::mutex> mutex_;
|
||||
std::string channel_name_;
|
||||
Channel *queue_;
|
||||
};
|
||||
|
||||
/**
|
||||
* Close the channel. Must be called from the reactor that owns the channel.
|
||||
*/
|
||||
void Close();
|
||||
|
||||
Channel(const Channel &other) = delete;
|
||||
Channel(Channel &&other) = default;
|
||||
Channel &operator=(const Channel &other) = delete;
|
||||
Channel &operator=(Channel &&other) = default;
|
||||
|
||||
private:
|
||||
/**
|
||||
* Initialization parameters to Channel.
|
||||
* Warning: do not forget to initialize self_ptr_ individually. Private
|
||||
* because it shouldn't be created outside of a Reactor.
|
||||
*/
|
||||
struct Params {
|
||||
std::string reactor_name;
|
||||
std::string channel_name;
|
||||
std::shared_ptr<std::mutex> mutex;
|
||||
std::shared_ptr<std::condition_variable> cvar;
|
||||
Reactor &reactor;
|
||||
};
|
||||
|
||||
void Push(std::unique_ptr<Message> m) {
|
||||
std::unique_lock<std::mutex> guard(*mutex_);
|
||||
queue_.emplace(std::move(m));
|
||||
// This is OK because there is only one Reactor (thread) that can wait on
|
||||
// this Channel.
|
||||
cvar_->notify_one();
|
||||
}
|
||||
|
||||
std::shared_ptr<LocalChannelWriter> LockedOpenChannel();
|
||||
std::unique_ptr<Message> LockedPop() { return LockedRawPop(); }
|
||||
|
||||
void LockedOnEventHelper(std::type_index type_index,
|
||||
EventStream::Callback callback) {
|
||||
uint64_t callback_id = next_callback_id++;
|
||||
callbacks_[type_index][callback_id] = callback;
|
||||
}
|
||||
|
||||
std::unique_ptr<Message> LockedRawPop() {
|
||||
if (queue_.empty()) return nullptr;
|
||||
std::unique_ptr<Message> t = std::move(queue_.front());
|
||||
queue_.pop();
|
||||
return t;
|
||||
}
|
||||
|
||||
void RemoveCallback(const EventStream::Subscription &subscription) {
|
||||
std::unique_lock<std::mutex> lock(*mutex_);
|
||||
auto num_erased =
|
||||
callbacks_[subscription.type_index_].erase(subscription.callback_id_);
|
||||
CHECK(num_erased == 1) << "Expected to remove 1 element";
|
||||
}
|
||||
|
||||
std::string channel_name_;
|
||||
std::string reactor_name_;
|
||||
std::queue<std::unique_ptr<Message>> queue_;
|
||||
// Should only be locked once since it's used by a cond. var. Also caught in
|
||||
// dctor, so must be recursive.
|
||||
std::shared_ptr<std::mutex> mutex_;
|
||||
std::shared_ptr<std::condition_variable> cvar_;
|
||||
|
||||
/**
|
||||
* A weak_ptr to itself.
|
||||
*
|
||||
* There are initialization problems with this, check Params.
|
||||
*/
|
||||
std::weak_ptr<Channel> self_ptr_;
|
||||
LocalEventStream stream_;
|
||||
Reactor &reactor_;
|
||||
std::unordered_map<std::type_index,
|
||||
std::unordered_map<uint64_t, EventStream::Callback>>
|
||||
callbacks_;
|
||||
uint64_t next_callback_id = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* A single unit of concurrent execution in the system.
|
||||
*
|
||||
* E.g. one worker, one client. Owned by System. Has a thread associated with
|
||||
* it.
|
||||
*/
|
||||
class Reactor {
|
||||
friend class System;
|
||||
|
||||
public:
|
||||
Reactor(System &system, const std::string &name,
|
||||
const std::function<void(Reactor &)> &setup);
|
||||
~Reactor();
|
||||
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open(
|
||||
const std::string &s);
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open();
|
||||
std::shared_ptr<Channel> FindChannel(const std::string &channel_name);
|
||||
|
||||
/**
|
||||
* Close a channel by name.
|
||||
*
|
||||
* Should only be called from the Reactor thread.
|
||||
*/
|
||||
void CloseChannel(const std::string &s);
|
||||
|
||||
/**
|
||||
* Get Reactor name
|
||||
*/
|
||||
const std::string &name() const { return name_; }
|
||||
|
||||
Reactor(const Reactor &other) = delete;
|
||||
Reactor(Reactor &&other) = default;
|
||||
Reactor &operator=(const Reactor &other) = delete;
|
||||
Reactor &operator=(Reactor &&other) = default;
|
||||
|
||||
System &system_;
|
||||
std::string name_;
|
||||
std::function<void(Reactor &)> setup_;
|
||||
|
||||
/*
|
||||
* Locks all Reactor data, including all Channel's in channels_.
|
||||
*
|
||||
* This should be a shared_ptr because LocalChannelWriter can outlive Reactor.
|
||||
*/
|
||||
std::shared_ptr<std::mutex> mutex_ = std::make_shared<std::mutex>();
|
||||
std::shared_ptr<std::condition_variable> cvar_ =
|
||||
std::make_shared<std::condition_variable>();
|
||||
|
||||
/**
|
||||
* List of channels of a reactor indexed by name.
|
||||
*/
|
||||
std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
|
||||
int64_t channel_name_counter_ = 0;
|
||||
// I don't understand why ChannelWriter is shared. ChannelWriter is just
|
||||
// endpoint that could be copied to every user.
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> main_;
|
||||
|
||||
private:
|
||||
struct PendingMessageInfo {
|
||||
std::unique_ptr<Message> message;
|
||||
std::vector<std::pair<EventStream::Callback, EventStream::Subscription>>
|
||||
callbacks;
|
||||
};
|
||||
|
||||
std::thread thread_;
|
||||
|
||||
/**
|
||||
* Dispatches all waiting messages to callbacks. Shuts down when there are no
|
||||
* callbacks left.
|
||||
*/
|
||||
void RunEventLoop();
|
||||
|
||||
PendingMessageInfo GetPendingMessages();
|
||||
};
|
||||
|
||||
using LocalChannelWriter = Channel::LocalChannelWriter;
|
||||
|
||||
/**
|
||||
* Placeholder for all reactors.
|
||||
* Make sure object of this class outlives all Reactors created by it.
|
||||
*/
|
||||
class System {
|
||||
public:
|
||||
friend class Reactor;
|
||||
System() = default;
|
||||
|
||||
std::unique_ptr<Reactor> Spawn(const std::string &name,
|
||||
std::function<void(Reactor &)> setup) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
CHECK(reactors_.find(name) == reactors_.end())
|
||||
<< "Reactor with name: '" << name << "' already exists.";
|
||||
auto reactor = std::make_unique<Reactor>(*this, name, setup);
|
||||
reactors_.emplace(name, reactor.get());
|
||||
return reactor;
|
||||
}
|
||||
|
||||
// Next two functions shouldn't be exposed.
|
||||
std::shared_ptr<Channel> Resolve(const std::string &reactor_name,
|
||||
const std::string &channel_name) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
auto it_reactor = reactors_.find(reactor_name);
|
||||
if (it_reactor == reactors_.end()) return nullptr;
|
||||
return it_reactor->second->FindChannel(channel_name);
|
||||
}
|
||||
|
||||
void RemoveReactor(const std::string &name_) {
|
||||
std::unique_lock<std::mutex> guard(mutex_);
|
||||
auto it = reactors_.find(name_);
|
||||
CHECK(it != reactors_.end()) << "Trying to delete notexisting reactor";
|
||||
reactors_.erase(it);
|
||||
}
|
||||
|
||||
private:
|
||||
System(const System &) = delete;
|
||||
System(System &&) = delete;
|
||||
System &operator=(const System &) = delete;
|
||||
System &operator=(System &&) = delete;
|
||||
|
||||
std::mutex mutex_;
|
||||
std::unordered_map<std::string, Reactor *> reactors_;
|
||||
};
|
||||
|
||||
using Subscription = Channel::LocalEventStream::Subscription;
|
||||
} // namespace communication::reactor
|
@ -1,164 +0,0 @@
|
||||
/**
|
||||
* This test file test the Distributed Reactors API on ONLY one process (no real
|
||||
* networking).
|
||||
* In other words, we send a message from one process to itself.
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <future>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "communication/reactor/common_messages.hpp"
|
||||
#include "communication/reactor/reactor_distributed.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace communication::reactor;
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
struct MessageInt : public Message {
|
||||
MessageInt() {} // cereal needs this
|
||||
MessageInt(int x) : x(x) {}
|
||||
int x;
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive &ar) {
|
||||
ar(cereal::virtual_base_class<Message>(this), x);
|
||||
}
|
||||
};
|
||||
CEREAL_REGISTER_TYPE(MessageInt);
|
||||
|
||||
struct RequestMessage : public ReturnAddressMessage {
|
||||
RequestMessage() {}
|
||||
RequestMessage(std::string reactor, std::string channel, int x)
|
||||
: ReturnAddressMessage(reactor, channel), x(x){};
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive &ar) {
|
||||
ar(cereal::virtual_base_class<ReturnAddressMessage>(this), x);
|
||||
}
|
||||
|
||||
friend class cereal::access;
|
||||
int x;
|
||||
};
|
||||
CEREAL_REGISTER_TYPE(RequestMessage);
|
||||
|
||||
/**
|
||||
* Test do the services start up without crashes.
|
||||
*/
|
||||
TEST(SimpleTests, StartAndStopServices) {
|
||||
DistributedSystem system;
|
||||
// do nothing
|
||||
std::this_thread::sleep_for(500ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test simple message reception.
|
||||
*
|
||||
* Data flow:
|
||||
* (1) Send an empty message from Master to Worker/main
|
||||
*/
|
||||
TEST(SimpleTests, SendEmptyMessage) {
|
||||
DistributedSystem system;
|
||||
|
||||
auto master = system.Spawn("master", [&](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
RemoteChannelWriter writer("127.0.0.1", 10000, "worker", "main", system);
|
||||
writer.Send<Message>();
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
r.main_.first->OnEventOnce().ChainOnce<Message>(
|
||||
[&](const Message &, const Subscription &subscription) {
|
||||
// if this message isn't delivered, the main channel will never be
|
||||
// closed and we infinite loop
|
||||
subscription.CloseChannel(); // close "main"
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(400ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test ReturnAddressMsg functionality.
|
||||
*
|
||||
* Data flow:
|
||||
* (1) Send an empty message from Master to Worker/main
|
||||
* (2) Send an empty message from Worker to Master/main
|
||||
*/
|
||||
TEST(SimpleTests, SendReturnAddressMessage) {
|
||||
DistributedSystem system;
|
||||
|
||||
auto master = system.Spawn("master", [&](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
RemoteChannelWriter writer("127.0.0.1", 10000, "worker", "main", system);
|
||||
writer.Send<ReturnAddressMessage>(r.name(), "main");
|
||||
r.main_.first->OnEvent<MessageInt>(
|
||||
[&](const MessageInt &message, const Subscription &) {
|
||||
EXPECT_EQ(message.x, 5);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
auto worker = system.Spawn("worker", [&](Reactor &r) {
|
||||
r.main_.first->OnEvent<ReturnAddressMessage>(
|
||||
[&](const ReturnAddressMessage &message, const Subscription &) {
|
||||
RemoteChannelWriter writer(message.address(), message.port(),
|
||||
message.reactor_name(),
|
||||
message.channel_name(), system);
|
||||
writer.Send<MessageInt>(5);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(400ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test serializability of a complex message over the network layer.
|
||||
*
|
||||
* Data flow:
|
||||
* (1) Send ("hi", 123) from Master to Worker/main
|
||||
* (2) Send ("hi back", 779) from Worker to Master/main
|
||||
*/
|
||||
TEST(SimpleTests, SendSerializableMessage) {
|
||||
DistributedSystem system;
|
||||
|
||||
auto master = system.Spawn("master", [&](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
RemoteChannelWriter writer("127.0.0.1", 10000, "worker", "main", system);
|
||||
writer.Send<RequestMessage>(r.name(), "main", 123);
|
||||
r.main_.first->OnEvent<MessageInt>(
|
||||
[&](const MessageInt &message, const Subscription &) {
|
||||
ASSERT_EQ(message.x, 779);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [&](Reactor &r) {
|
||||
r.main_.first->OnEvent<RequestMessage>(
|
||||
[&](const RequestMessage &message, const Subscription &) {
|
||||
ASSERT_EQ(message.x, 123);
|
||||
RemoteChannelWriter writer(message.address(), message.port(),
|
||||
message.reactor_name(),
|
||||
message.channel_name(), system);
|
||||
writer.Send<MessageInt>(779);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(400ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -1,404 +0,0 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <future>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
using namespace communication::reactor;
|
||||
using Subscription = EventStream::Subscription;
|
||||
|
||||
TEST(SystemTest, ReturnWithoutThrowing) {
|
||||
System system;
|
||||
auto master =
|
||||
system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); });
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
|
||||
TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
|
||||
System system;
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
r.Open("channel");
|
||||
ASSERT_THROW(r.Open("channel"), utils::BasicException);
|
||||
r.CloseChannel("main");
|
||||
r.CloseChannel("channel");
|
||||
});
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, OneCallback) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(888);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 888);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
std::this_thread::sleep_for(200ms);
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, IgnoreAfterClose) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(101);
|
||||
channel_writer.Send<MessageInt>(102); // should be ignored
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel_writer.Send<MessageInt>(103); // should be ignored
|
||||
channel_writer.Send<MessageInt>(104); // should be ignored
|
||||
// Write-end doesn't need to be closed because it's in RAII.
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
r.CloseChannel("main");
|
||||
ASSERT_EQ(msg.x, 101);
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, RecreateChannelAfterClosing) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
// Original "worker" reactor will die after it process this message.
|
||||
channel_writer.Send<MessageInt>(101);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
// This message will be dropped since there is no reactor with name
|
||||
// "worker".
|
||||
channel_writer.Send<MessageInt>(102);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
// This message should recieved by new "worker" reactor.
|
||||
channel_writer.Send<MessageInt>(103);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
r.CloseChannel("main");
|
||||
ASSERT_EQ(msg.x, 101);
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
|
||||
auto worker2 = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
r.CloseChannel("main");
|
||||
ASSERT_EQ(msg.x, 103);
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, DuringFirstEvent) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
std::promise<int> p;
|
||||
auto f = p.get_future();
|
||||
auto master = system.Spawn("master", [&p](Reactor &r) mutable {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&](const Message &msg, const Subscription &subscription) {
|
||||
const MessageInt &msgint = dynamic_cast<const MessageInt &>(msg);
|
||||
if (msgint.x == 101) {
|
||||
LocalChannelWriter channel_writer("master", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(102);
|
||||
}
|
||||
if (msgint.x == 102) {
|
||||
subscription.Unsubscribe();
|
||||
r.CloseChannel("main");
|
||||
p.set_value(777);
|
||||
}
|
||||
});
|
||||
|
||||
LocalChannelWriter channel_writer("master", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(101);
|
||||
});
|
||||
|
||||
f.wait();
|
||||
ASSERT_EQ(f.get(), 777);
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, UnsubscribeService) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
struct MessageChar : public Message {
|
||||
MessageChar(char xx) : x(xx) {}
|
||||
char x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(55);
|
||||
channel_writer.Send<MessageInt>(66);
|
||||
channel_writer.Send<MessageInt>(77);
|
||||
channel_writer.Send<MessageInt>(88);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel_writer.Send<MessageChar>('a');
|
||||
channel_writer.Send<MessageChar>('b');
|
||||
channel_writer.Send<MessageChar>('c');
|
||||
channel_writer.Send<MessageChar>('d');
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker =
|
||||
system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&](const MessageInt &msgint, const Subscription &subscription) {
|
||||
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
|
||||
++num_received_messages;
|
||||
if (msgint.x == 66) {
|
||||
subscription.Unsubscribe(); // receive only two of them
|
||||
}
|
||||
});
|
||||
stream->OnEvent<MessageChar>(
|
||||
[&](const MessageChar &msgchar, const Subscription &subscription) {
|
||||
char c = msgchar.x;
|
||||
++num_received_messages;
|
||||
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
|
||||
if (num_received_messages == 5) {
|
||||
subscription.Unsubscribe();
|
||||
r.CloseChannel("main");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, OnEvent) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
struct MessageChar : public Message {
|
||||
MessageChar(char xx) : x(xx) {}
|
||||
char x;
|
||||
};
|
||||
|
||||
System system;
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(101);
|
||||
channel_writer.Send<MessageChar>('a');
|
||||
channel_writer.Send<MessageInt>(103);
|
||||
channel_writer.Send<MessageChar>('b');
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
|
||||
struct EndMessage : Message {};
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&](const MessageInt &msgint, const Subscription &) {
|
||||
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
|
||||
++correct_vals;
|
||||
r.main_.second->Send<EndMessage>();
|
||||
});
|
||||
|
||||
stream->OnEvent<MessageChar>(
|
||||
[&](const MessageChar &msgchar, const Subscription &) {
|
||||
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
|
||||
++correct_vals;
|
||||
r.main_.second->Send<EndMessage>();
|
||||
});
|
||||
|
||||
stream->OnEvent<EndMessage>([&](const EndMessage &, const Subscription &) {
|
||||
ASSERT_LE(correct_vals, 4);
|
||||
if (correct_vals == 4) {
|
||||
r.CloseChannel("main");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, Chaining) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
channel_writer.Send<MessageInt>(55);
|
||||
channel_writer.Send<MessageInt>(66);
|
||||
channel_writer.Send<MessageInt>(77);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEventOnce()
|
||||
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 55);
|
||||
})
|
||||
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 66);
|
||||
})
|
||||
.ChainOnce<MessageInt>(
|
||||
[&](const MessageInt &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 77);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, ChainingInRightOrder) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
struct MessageChar : public Message {
|
||||
MessageChar(char xx) : x(xx) {}
|
||||
char x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
channel_writer.Send<MessageChar>('a');
|
||||
channel_writer.Send<MessageInt>(55);
|
||||
channel_writer.Send<MessageChar>('b');
|
||||
channel_writer.Send<MessageInt>(77);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEventOnce()
|
||||
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 55);
|
||||
})
|
||||
.ChainOnce<MessageChar>(
|
||||
[](const MessageChar &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 'b');
|
||||
})
|
||||
.ChainOnce<MessageInt>(
|
||||
[&](const MessageInt &msg, const Subscription &) {
|
||||
ASSERT_EQ(msg.x, 77);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(300ms);
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, ProcessManyMessages) {
|
||||
const static int kNumTests = 100;
|
||||
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
LocalChannelWriter channel_writer("worker", "main", r.system_);
|
||||
|
||||
for (int i = 0; i < kNumTests; ++i) {
|
||||
channel_writer.Send<MessageInt>(rand());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
|
||||
}
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [vals = 0](Reactor & r) mutable {
|
||||
struct EndMessage : Message {};
|
||||
EventStream *stream = r.main_.first;
|
||||
vals = 0;
|
||||
|
||||
stream->OnEvent<MessageInt>([&](const Message &, const Subscription &) {
|
||||
++vals;
|
||||
r.main_.second->Send<EndMessage>();
|
||||
});
|
||||
|
||||
stream->OnEvent<EndMessage>([&](const Message &, const Subscription &) {
|
||||
ASSERT_LE(vals, kNumTests);
|
||||
if (vals == kNumTests) {
|
||||
r.CloseChannel("main");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(1000ms);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
Loading…
Reference in New Issue
Block a user