Renamings

Summary:
1. SenderMessage -> ReturnAddressMsg and GetChannelToSender -> GetReturnChannelWriter
2. Channel -> ChannelWriter
3. Connector -> Channel
4. removed old serialize_test
5. removed AwaitEvent and PopEvent

Reviewers: sasa.stanko

Reviewed By: sasa.stanko

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D698
This commit is contained in:
Goran Zuzic 2017-08-23 15:16:26 +02:00
parent 10e98b5c2e
commit 3ae35fa161
11 changed files with 260 additions and 436 deletions

View File

@ -9,9 +9,9 @@ This subdirectory structure implements distributed infrastructure of Memgraph.
* Node: a computer that performs (distributed) work. * Node: a computer that performs (distributed) work.
* Vertex: an abstract graph concept. * Vertex: an abstract graph concept.
* Reactor: a unit of concurrent execution, lives on its own thread. * Reactor: a unit of concurrent execution, lives on its own thread.
* Connector: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes. * Channel: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes.
* EventStream: read-end of a connector, is owned by exactly one Reactor/thread. * EventStream: read-end of a channel, is owned by exactly one Reactor/thread.
* Channel: write-end of a connector, can be owned (wrote into) by multiple threads. * ChannelWriter: write-end of a channel, can be owned (wrote into) by multiple threads.
## conventions ## conventions

View File

@ -6,22 +6,22 @@ DEFINE_int32(port, 10000, "Network server bind port");
Network::Network() {} Network::Network() {}
/** /**
* SenderMessage implementation. * ReturnAddressMsg implementation.
*/ */
SenderMessage::SenderMessage() {} ReturnAddressMsg::ReturnAddressMsg() {}
SenderMessage::SenderMessage(std::string reactor, std::string channel) ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel)
: address_(FLAGS_address), : address_(FLAGS_address),
port_(FLAGS_port), port_(FLAGS_port),
reactor_(reactor), reactor_(reactor),
channel_(channel) {} channel_(channel) {}
std::string SenderMessage::Address() const { return address_; } std::string ReturnAddressMsg::Address() const { return address_; }
uint16_t SenderMessage::Port() const { return port_; } uint16_t ReturnAddressMsg::Port() const { return port_; }
std::string SenderMessage::ReactorName() const { return reactor_; } std::string ReturnAddressMsg::ReactorName() const { return reactor_; }
std::string SenderMessage::ChannelName() const { return channel_; } std::string ReturnAddressMsg::ChannelName() const { return channel_; }
std::shared_ptr<Channel> SenderMessage::GetChannelToSender() const { std::shared_ptr<ChannelWriter> ReturnAddressMsg::GetReturnChannelWriter() const {
if (address_ == FLAGS_address && port_ == FLAGS_port) { if (address_ == FLAGS_address && port_ == FLAGS_port) {
return System::GetInstance().FindChannel(reactor_, channel_); return System::GetInstance().FindChannel(reactor_, channel_);
} else { } else {

View File

@ -73,12 +73,12 @@ class Network {
// client functions // client functions
std::shared_ptr<Channel> Resolve(std::string address, uint16_t port, std::shared_ptr<ChannelWriter> Resolve(std::string address, uint16_t port,
std::string reactor_name, std::string reactor_name,
std::string channel_name) { std::string channel_name) {
if (Protocol::SendMessage(address, port, reactor_name, channel_name, if (Protocol::SendMessage(address, port, reactor_name, channel_name,
nullptr)) { nullptr)) {
return std::make_shared<RemoteChannel>(this, address, port, reactor_name, return std::make_shared<RemoteChannelWriter>(this, address, port, reactor_name,
channel_name); channel_name);
} }
return nullptr; return nullptr;
@ -131,9 +131,9 @@ class Network {
} }
} }
class RemoteChannel : public Channel { class RemoteChannelWriter : public ChannelWriter {
public: public:
RemoteChannel(Network *network, std::string address, uint16_t port, RemoteChannelWriter(Network *network, std::string address, uint16_t port,
std::string reactor, std::string channel) std::string reactor, std::string channel)
: network_(network), : network_(network),
address_(address), address_(address),
@ -228,17 +228,17 @@ class Network {
/** /**
* Message that includes the sender channel used to respond. * Message that includes the sender channel used to respond.
*/ */
class SenderMessage : public Message { class ReturnAddressMsg : public Message {
public: public:
SenderMessage(); ReturnAddressMsg();
SenderMessage(std::string reactor, std::string channel); ReturnAddressMsg(std::string reactor, std::string channel);
std::string Address() const; std::string Address() const;
uint16_t Port() const; uint16_t Port() const;
std::string ReactorName() const; std::string ReactorName() const;
std::string ChannelName() const; std::string ChannelName() const;
std::shared_ptr<Channel> GetChannelToSender() const; std::shared_ptr<ChannelWriter> GetReturnChannelWriter() const;
template <class Archive> template <class Archive>
void serialize(Archive &ar) { void serialize(Archive &ar) {
@ -252,7 +252,7 @@ class SenderMessage : public Message {
std::string reactor_; std::string reactor_;
std::string channel_; std::string channel_;
}; };
CEREAL_REGISTER_TYPE(SenderMessage); CEREAL_REGISTER_TYPE(ReturnAddressMsg);
/** /**
@ -262,13 +262,13 @@ CEREAL_REGISTER_TYPE(SenderMessage);
class ChannelResolvedMessage : public Message { class ChannelResolvedMessage : public Message {
public: public:
ChannelResolvedMessage() {} ChannelResolvedMessage() {}
ChannelResolvedMessage(std::shared_ptr<Channel> channel) ChannelResolvedMessage(std::shared_ptr<ChannelWriter> channel_writer)
: Message(), channel_(channel) {} : Message(), channel_writer_(channel_writer) {}
std::shared_ptr<Channel> channel() const { return channel_; } std::shared_ptr<ChannelWriter> channelWriter() const { return channel_writer_; }
private: private:
std::shared_ptr<Channel> channel_; std::shared_ptr<ChannelWriter> channel_writer_;
}; };
/** /**
@ -315,11 +315,11 @@ class Distributed final {
uint16_t port, uint16_t port,
const std::string &reactor_name, const std::string &reactor_name,
const std::string &channel_name) { const std::string &channel_name) {
std::shared_ptr<Channel> channel = nullptr; std::shared_ptr<ChannelWriter> channel_writer = nullptr;
while (!(channel = network_.Resolve(address, port, reactor_name, channel_name))) while (!(channel_writer = network_.Resolve(address, port, reactor_name, channel_name)))
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
auto stream_channel = current_reactor_->Open(); auto stream_channel = current_reactor_->Open();
stream_channel.second->Send<ChannelResolvedMessage>(channel); stream_channel.second->Send<ChannelResolvedMessage>(channel_writer);
return stream_channel.first; return stream_channel.first;
} }

View File

@ -6,64 +6,64 @@ void EventStream::Subscription::unsubscribe() const {
thread_local Reactor* current_reactor_ = nullptr; thread_local Reactor* current_reactor_ = nullptr;
std::string Connector::LocalChannel::ReactorName() { std::string Channel::LocalChannelWriter::ReactorName() {
return reactor_name_; return reactor_name_;
} }
std::string Connector::LocalChannel::Name() { std::string Channel::LocalChannelWriter::Name() {
return connector_name_; return channel_name_;
} }
void Connector::LocalEventStream::Close() { void Channel::LocalEventStream::Close() {
current_reactor_->CloseConnector(connector_name_); current_reactor_->CloseChannel(channel_name_);
} }
std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open(const std::string &connector_name) { std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Reactor::Open(const std::string &channel_name) {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
// TODO: Improve the check that the channel name does not exist in the // TODO: Improve the check that the channel name does not exist in the
// system. // system.
if (connectors_.count(connector_name) != 0) { if (channels_.count(channel_name) != 0) {
throw std::runtime_error("Connector with name " + connector_name throw std::runtime_error("Channel with name " + channel_name
+ "already exists"); + "already exists");
} }
auto it = connectors_.emplace(connector_name, auto it = channels_.emplace(channel_name,
std::make_shared<Connector>(Connector::Params{name_, connector_name, mutex_, cvar_})).first; std::make_shared<Channel>(Channel::Params{name_, channel_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second; it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
} }
std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open() { std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Reactor::Open() {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
do { do {
std::string connector_name = "stream-" + std::to_string(connector_name_counter_++); std::string channel_name = "stream-" + std::to_string(channel_name_counter_++);
if (connectors_.count(connector_name) == 0) { if (channels_.count(channel_name) == 0) {
// Connector &queue = connectors_[connector_name]; // Channel &queue = channels_[channel_name];
auto it = connectors_.emplace(connector_name, auto it = channels_.emplace(channel_name,
std::make_shared<Connector>(Connector::Params{name_, connector_name, mutex_, cvar_})).first; std::make_shared<Channel>(Channel::Params{name_, channel_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second; it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel()); return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
} }
} while (true); } while (true);
} }
const std::shared_ptr<Channel> Reactor::FindChannel( const std::shared_ptr<ChannelWriter> Reactor::FindChannel(
const std::string &channel_name) { const std::string &channel_name) {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
auto it_connector = connectors_.find(channel_name); auto it_channel = channels_.find(channel_name);
if (it_connector == connectors_.end()) return nullptr; if (it_channel == channels_.end()) return nullptr;
return it_connector->second->LockedOpenChannel(); return it_channel->second->LockedOpenChannel();
} }
void Reactor::CloseConnector(const std::string &s) { void Reactor::CloseChannel(const std::string &s) {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
auto it = connectors_.find(s); auto it = channels_.find(s);
assert(it != connectors_.end()); assert(it != channels_.end());
connectors_.erase(it); channels_.erase(it);
} }
void Reactor::CloseAllConnectors() { void Reactor::CloseAllChannels() {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
connectors_.clear(); channels_.clear();
} }
void Reactor::RunEventLoop() { void Reactor::RunEventLoop() {
@ -77,8 +77,8 @@ void Reactor::RunEventLoop() {
while (true) { while (true) {
// Exit the loop if there are no more Connectors. // Exit the loop if there are no more Channels.
if (connectors_.empty()) { if (channels_.empty()) {
exit_event_loop = true; exit_event_loop = true;
break; break;
} }
@ -106,8 +106,8 @@ void Reactor::RunEventLoop() {
*/ */
auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
// return type after because the scope Reactor:: is not searched before the name // return type after because the scope Reactor:: is not searched before the name
for (auto &connectors_key_value : connectors_) { for (auto &channels_key_value : channels_) {
Connector &event_queue = *connectors_key_value.second; Channel &event_queue = *channels_key_value.second;
auto msg_ptr = event_queue.LockedPop(); auto msg_ptr = event_queue.LockedPop();
if (msg_ptr == nullptr) continue; if (msg_ptr == nullptr) continue;
std::type_index tidx = msg_ptr->GetTypeIndex(); std::type_index tidx = msg_ptr->GetTypeIndex();

View File

@ -14,7 +14,7 @@
class EventStream; class EventStream;
class Reactor; class Reactor;
class System; class System;
class Connector; class Channel;
extern thread_local Reactor* current_reactor_; extern thread_local Reactor* current_reactor_;
@ -38,9 +38,9 @@ class Message {
}; };
/** /**
* Write-end of a Connector (between two reactors). * Write-end of a Channel (between two reactors).
*/ */
class Channel { class ChannelWriter {
public: public:
/** /**
* Construct and send the message to the channel. * Construct and send the message to the channel.
@ -56,7 +56,7 @@ class Channel {
virtual std::string Name() = 0; virtual std::string Name() = 0;
void operator=(const Channel &) = delete; void operator=(const ChannelWriter &) = delete;
template <class Archive> template <class Archive>
void serialize(Archive &archive) { void serialize(Archive &archive) {
@ -65,24 +65,40 @@ class Channel {
}; };
/** /**
* Read-end of a Connector (between two reactors). * Read-end of a Channel (between two reactors).
*/ */
class EventStream { class EventStream {
public: public:
/** class Subscription;
* Blocks until a message arrives. class OnEventOnceChainer;
*/
virtual std::unique_ptr<Message> AwaitEvent() = 0;
/** /**
* Polls if there is a message available, returning null if there is none. * Register a callback that will be called whenever an event arrives.
*/ */
virtual std::unique_ptr<Message> PopEvent() = 0; template<typename MsgType>
void OnEvent(std::function<void(const MsgType&, const Subscription&)> &&cb) {
OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message &general_msg,
const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
cb(correct_msg, subscription);
});
}
/** /**
* Get the name of the connector. * Starts a chain to register a callback that fires off only once.
*
* This method supports chaining (see the the class OnEventOnceChainer or the tests for examples).
* Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last
* chained callback fired.
*/ */
virtual const std::string &ConnectorName() = 0; OnEventOnceChainer OnEventOnce() {
return OnEventOnceChainer(*this);
}
/**
* Get the name of the channel.
*/
virtual const std::string &ChannelName() = 0;
/** /**
* Subscription Service. * Subscription Service.
* *
@ -97,40 +113,16 @@ class EventStream {
private: private:
friend class Reactor; friend class Reactor;
friend class Connector; friend class Channel;
Subscription(Connector &event_queue, std::type_index tidx, uint64_t cb_uid) Subscription(Channel &event_queue, std::type_index tidx, uint64_t cb_uid)
: event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { } : event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { }
Connector &event_queue_; Channel &event_queue_;
std::type_index tidx_; std::type_index tidx_;
uint64_t cb_uid_; uint64_t cb_uid_;
}; };
/**
* Register a callback that will be called whenever an event arrives.
*/
template<typename MsgType>
void OnEvent(std::function<void(const MsgType&, const Subscription&)> &&cb) {
OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message &general_msg,
const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
cb(correct_msg, subscription);
});
}
class OnEventOnceChainer;
/**
* Starts a chain to register a callback that fires off only once.
*
* This method supports chaining (see the the class OnEventOnceChainer or the tests for examples).
* Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last
* chained callback fired.
*/
OnEventOnceChainer OnEventOnce() {
return OnEventOnceChainer(*this);
}
/** /**
* Close this event stream, disallowing further events from getting received. * Close this event stream, disallowing further events from getting received.
* *
@ -205,52 +197,52 @@ private:
}; };
/** /**
* Implementation of a connector. * Implementation of a channel.
* *
* This class is an internal data structure that represents the state of the connector. * This class is an internal data structure that represents the state of the channel.
* This class is not meant to be used by the clients of the messaging framework. * This class is not meant to be used by the clients of the messaging framework.
* The Connector class wraps the event queue data structure, the mutex that protects * The Channel class wraps the event queue data structure, the mutex that protects
* concurrent access to the event queue, the local channel and the event stream. * concurrent access to the event queue, the local channel and the event stream.
* The class is owned by the Reactor. It gets closed when the owner reactor * The class is owned by the Reactor. It gets closed when the owner reactor
* (the one that owns the read-end of a connector) removes/closes it. * (the one that owns the read-end of a channel) removes/closes it.
*/ */
class Connector { class Channel {
struct Params; struct Params;
public: public:
friend class Reactor; // to create a Params initialization object friend class Reactor; // to create a Params initialization object
friend class EventStream::Subscription; friend class EventStream::Subscription;
Connector(Params params) Channel(Params params)
: connector_name_(params.connector_name), : channel_name_(params.channel_name),
reactor_name_(params.reactor_name), reactor_name_(params.reactor_name),
mutex_(params.mutex), mutex_(params.mutex),
cvar_(params.cvar), cvar_(params.cvar),
stream_(mutex_, connector_name_, this) {} stream_(mutex_, channel_name_, this) {}
/** /**
* LocalChannel represents the channels to reactors living in the same reactor system (write-end of the connectors). * LocalChannelWriter represents the channels to reactors living in the same reactor system (write-end of the channels).
* *
* Sending messages to the local channel requires acquiring the mutex. * Sending messages to the local channel requires acquiring the mutex.
* LocalChannel holds a (weak) pointer to the enclosing Connector object. * LocalChannelWriter holds a (weak) pointer to the enclosing Channel object.
* Messages sent to a closed channel are ignored. * Messages sent to a closed channel are ignored.
* There can be multiple LocalChannels refering to the same stream if needed. * There can be multiple LocalChannelWriters refering to the same stream if needed.
*/ */
class LocalChannel : public Channel { class LocalChannelWriter : public ChannelWriter {
public: public:
friend class Connector; friend class Channel;
LocalChannel(std::shared_ptr<std::mutex> mutex, std::string reactor_name, LocalChannelWriter(std::shared_ptr<std::mutex> mutex, std::string reactor_name,
std::string connector_name, std::weak_ptr<Connector> queue) std::string channel_name, std::weak_ptr<Channel> queue)
: mutex_(mutex), : mutex_(mutex),
reactor_name_(reactor_name), reactor_name_(reactor_name),
connector_name_(connector_name), channel_name_(channel_name),
weak_queue_(queue) {} weak_queue_(queue) {}
virtual void Send(std::unique_ptr<Message> m) { virtual void Send(std::unique_ptr<Message> m) {
std::shared_ptr<Connector> queue_ = weak_queue_.lock(); // Atomic, per the standard. std::shared_ptr<Channel> queue_ = weak_queue_.lock(); // Atomic, per the standard.
if (queue_) { if (queue_) {
// We guarantee here that the Connector is not destroyed. // We guarantee here that the Channel is not destroyed.
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedPush(std::move(m)); queue_->LockedPush(std::move(m));
} }
@ -263,57 +255,53 @@ class Connector {
private: private:
std::shared_ptr<std::mutex> mutex_; std::shared_ptr<std::mutex> mutex_;
std::string reactor_name_; std::string reactor_name_;
std::string connector_name_; std::string channel_name_;
std::weak_ptr<Connector> weak_queue_; std::weak_ptr<Channel> weak_queue_;
}; };
/** /**
* Implementation of the event stream. * Implementation of the event stream.
* *
* After the enclosing Connector object is destroyed (by a call to CloseChannel or Close). * After the enclosing Channel object is destroyed (by a call to CloseChannel or Close).
*/ */
class LocalEventStream : public EventStream { class LocalEventStream : public EventStream {
public: public:
friend class Connector; friend class Channel;
LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string connector_name, LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string channel_name,
Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {} Channel *queue) : mutex_(mutex), channel_name_(channel_name), queue_(queue) {}
std::unique_ptr<Message> AwaitEvent() { std::unique_ptr<Message> AwaitEvent() {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedAwaitPop(lock); return queue_->LockedAwaitPop(lock);
} }
std::unique_ptr<Message> PopEvent() {
std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedPop();
}
void OnEventHelper(std::type_index tidx, Callback callback) { void OnEventHelper(std::type_index tidx, Callback callback) {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedOnEventHelper(tidx, callback); queue_->LockedOnEventHelper(tidx, callback);
} }
const std::string &ConnectorName() { const std::string &ChannelName() {
return queue_->connector_name_; return queue_->channel_name_;
} }
void Close(); void Close();
private: private:
std::shared_ptr<std::mutex> mutex_; std::shared_ptr<std::mutex> mutex_;
std::string connector_name_; std::string channel_name_;
Connector *queue_; Channel *queue_;
}; };
Connector(const Connector &other) = delete; Channel(const Channel &other) = delete;
Connector(Connector &&other) = default; Channel(Channel &&other) = default;
Connector &operator=(const Connector &other) = delete; Channel &operator=(const Channel &other) = delete;
Connector &operator=(Connector &&other) = default; Channel &operator=(Channel &&other) = default;
private: private:
/** /**
* Initialization parameters to Connector. * Initialization parameters to Channel.
* Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor. * Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor.
*/ */
struct Params { struct Params {
std::string reactor_name; std::string reactor_name;
std::string connector_name; std::string channel_name;
std::shared_ptr<std::mutex> mutex; std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cvar; std::shared_ptr<std::condition_variable> cvar;
}; };
@ -321,13 +309,13 @@ private:
void LockedPush(std::unique_ptr<Message> m) { void LockedPush(std::unique_ptr<Message> m) {
queue_.emplace(std::move(m)); queue_.emplace(std::move(m));
// This is OK because there is only one Reactor (thread) that can wait on this Connector. // This is OK because there is only one Reactor (thread) that can wait on this Channel.
cvar_->notify_one(); cvar_->notify_one();
} }
std::shared_ptr<LocalChannel> LockedOpenChannel() { std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
return std::make_shared<LocalChannel>(mutex_, reactor_name_, connector_name_, self_ptr_); return std::make_shared<LocalChannelWriter>(mutex_, reactor_name_, channel_name_, self_ptr_);
} }
std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) { std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) {
@ -363,7 +351,7 @@ private:
assert(num_erased == 1); assert(num_erased == 1);
} }
std::string connector_name_; std::string channel_name_;
std::string reactor_name_; std::string reactor_name_;
std::queue<std::unique_ptr<Message>> queue_; std::queue<std::unique_ptr<Message>> queue_;
// Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive.
@ -374,7 +362,7 @@ private:
* *
* There are initialization problems with this, check Params. * There are initialization problems with this, check Params.
*/ */
std::weak_ptr<Connector> self_ptr_; std::weak_ptr<Channel> self_ptr_;
LocalEventStream stream_; LocalEventStream stream_;
std::unordered_map<std::type_index, std::unordered_map<uint64_t, EventStream::Callback> > callbacks_; std::unordered_map<std::type_index, std::unordered_map<uint64_t, EventStream::Callback> > callbacks_;
uint64_t next_cb_uid = 0; uint64_t next_cb_uid = 0;
@ -396,23 +384,23 @@ class Reactor {
virtual void Run() = 0; virtual void Run() = 0;
std::pair<EventStream*, std::shared_ptr<Channel>> Open(const std::string &s); std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Open(const std::string &s);
std::pair<EventStream*, std::shared_ptr<Channel>> Open(); std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Open();
const std::shared_ptr<Channel> FindChannel(const std::string &channel_name); const std::shared_ptr<ChannelWriter> FindChannel(const std::string &channel_name);
/** /**
* Close a connector by name. * Close a channel by name.
* *
* Should only be called from the Reactor thread. * Should only be called from the Reactor thread.
*/ */
void CloseConnector(const std::string &s); void CloseChannel(const std::string &s);
/** /**
* close all connectors (typically during shutdown). * close all channels (typically during shutdown).
* *
* Should only be called from the Reactor thread. * Should only be called from the Reactor thread.
*/ */
void CloseAllConnectors(); void CloseAllChannels();
/** /**
* Get Reactor name * Get Reactor name
@ -427,9 +415,9 @@ class Reactor {
protected: protected:
std::string name_; std::string name_;
/* /*
* Locks all Reactor data, including all Connector's in connectors_. * Locks all Reactor data, including all Channel's in channels_.
* *
* This should be a shared_ptr because LocalChannel can outlive Reactor. * This should be a shared_ptr because LocalChannelWriter can outlive Reactor.
*/ */
std::shared_ptr<std::mutex> mutex_ = std::shared_ptr<std::mutex> mutex_ =
std::make_shared<std::mutex>(); std::make_shared<std::mutex>();
@ -437,14 +425,14 @@ class Reactor {
std::make_shared<std::condition_variable>(); std::make_shared<std::condition_variable>();
/** /**
* List of connectors of a reactor indexed by name. * List of channels of a reactor indexed by name.
* *
* While the connectors are owned by the reactor, a shared_ptr to solve the circular reference problem * While the channels are owned by the reactor, a shared_ptr to solve the circular reference problem
* between Channels and EventStreams. * between ChannelWriters and EventStreams.
*/ */
std::unordered_map<std::string, std::shared_ptr<Connector>> connectors_; std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
int64_t connector_name_counter_{0}; int64_t channel_name_counter_{0};
std::pair<EventStream*, std::shared_ptr<Channel>> main_; std::pair<EventStream*, std::shared_ptr<ChannelWriter>> main_;
private: private:
typedef std::pair<std::unique_ptr<Message>, typedef std::pair<std::unique_ptr<Message>,
@ -483,7 +471,7 @@ class System final {
} }
template <class ReactorType, class... Args> template <class ReactorType, class... Args>
const std::shared_ptr<Channel> Spawn(const std::string &name, const std::shared_ptr<ChannelWriter> Spawn(const std::string &name,
Args &&... args) { Args &&... args) {
std::unique_lock<std::recursive_mutex> lock(mutex_); std::unique_lock<std::recursive_mutex> lock(mutex_);
auto *raw_reactor = auto *raw_reactor =
@ -498,7 +486,7 @@ class System final {
return nullptr; return nullptr;
} }
const std::shared_ptr<Channel> FindChannel(const std::string &reactor_name, const std::shared_ptr<ChannelWriter> FindChannel(const std::string &reactor_name,
const std::string &channel_name) { const std::string &channel_name) {
std::unique_lock<std::recursive_mutex> lock(mutex_); std::unique_lock<std::recursive_mutex> lock(mutex_);
auto it_reactor = reactors_.find(reactor_name); auto it_reactor = reactors_.find(reactor_name);
@ -528,7 +516,7 @@ class System final {
} }
std::recursive_mutex mutex_; std::recursive_mutex mutex_;
// TODO: Replace with a map to a reactor Connector map to have more granular // TODO: Replace with a map to a reactor Channel map to have more granular
// locking. // locking.
std::unordered_map<std::string, std::unordered_map<std::string,
std::pair<std::unique_ptr<Reactor>, std::thread>> std::pair<std::unique_ptr<Reactor>, std::thread>>

View File

@ -90,14 +90,14 @@ std::pair<int64_t, std::vector<int64_t>>
/** /**
* Sends a text message and has a return address. * Sends a text message and has a return address.
*/ */
class TextMessage : public SenderMessage { class TextMessage : public ReturnAddressMsg {
public: public:
TextMessage(std::string reactor, std::string channel, std::string s) TextMessage(std::string reactor, std::string channel, std::string s)
: SenderMessage(reactor, channel), text(s) {} : ReturnAddressMsg(reactor, channel), text(s) {}
template <class Archive> template <class Archive>
void serialize(Archive &archive) { void serialize(Archive &archive) {
archive(cereal::virtual_base_class<SenderMessage>(this), text); archive(cereal::virtual_base_class<ReturnAddressMsg>(this), text);
} }
std::string text; std::string text;
@ -124,7 +124,7 @@ class Master : public Reactor {
auto stream = main_.first; auto stream = main_.first;
// wait until every worker sends a SenderMessage back, then close // wait until every worker sends a ReturnAddressMsg back, then close
stream->OnEvent<TextMessage>([this](const TextMessage &msg, stream->OnEvent<TextMessage>([this](const TextMessage &msg,
const EventStream::Subscription &subscription) { const EventStream::Subscription &subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
@ -135,7 +135,7 @@ class Master : public Reactor {
// (start_distributed.py runs each process in a new tab which is // (start_distributed.py runs each process in a new tab which is
// closed immediately after process has finished) // closed immediately after process has finished)
std::this_thread::sleep_for(std::chrono::seconds(4)); std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main"); CloseChannel("main");
} }
}); });
@ -144,7 +144,7 @@ class Master : public Reactor {
auto stream = memgraph.FindChannel(wmnid, "worker", "main"); auto stream = memgraph.FindChannel(wmnid, "worker", "main");
stream->OnEventOnce() stream->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){ .ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){
msg.channel()->Send<TextMessage>("master", "main", "hi from master"); msg.channelWriter()->Send<TextMessage>("master", "main", "hi from master");
stream->Close(); stream->Close();
}); });
} }
@ -174,12 +174,12 @@ class Worker : public Reactor {
.ChainOnce<TextMessage>([this](const TextMessage &msg) { .ChainOnce<TextMessage>([this](const TextMessage &msg) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n"; std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
msg.GetChannelToSender() msg.GetReturnChannelWriter()
->Send<TextMessage>("worker", "main", "hi from worker"); ->Send<TextMessage>("worker", "main", "hi from worker");
// Sleep for a while so we can read output in the terminal. // Sleep for a while so we can read output in the terminal.
std::this_thread::sleep_for(std::chrono::seconds(4)); std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main"); CloseChannel("main");
}); });
} }

View File

@ -12,14 +12,14 @@
// const int NUM_WORKERS = 1; // const int NUM_WORKERS = 1;
// class Txn : public SenderMessage { // class Txn : public ReturnAddressMsg {
// public: // public:
// Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {} // Txn(std::string reactor, std::string channel, int64_t id) : ReturnAddressMsg(reactor, channel), id_(id) {}
// int64_t id() const { return id_; } // int64_t id() const { return id_; }
// template <class Archive> // template <class Archive>
// void serialize(Archive &archive) { // void serialize(Archive &archive) {
// archive(cereal::base_class<SenderMessage>(this), id_); // archive(cereal::base_class<ReturnAddressMsg>(this), id_);
// } // }
// private: // private:
@ -60,30 +60,30 @@
// int64_t count_; // int64_t count_;
// }; // };
// class CommitRequest : public SenderMessage { // class CommitRequest : public ReturnAddressMsg {
// public: // public:
// CommitRequest(std::string reactor, std::string channel, int64_t worker_id) // CommitRequest(std::string reactor, std::string channel, int64_t worker_id)
// : SenderMessage(reactor, channel), worker_id_(worker_id) {} // : ReturnAddressMsg(reactor, channel), worker_id_(worker_id) {}
// int64_t worker_id() { return worker_id_; } // int64_t worker_id() { return worker_id_; }
// template <class Archive> // template <class Archive>
// void serialize(Archive &archive) { // void serialize(Archive &archive) {
// archive(cereal::base_class<SenderMessage>(this), worker_id_); // archive(cereal::base_class<ReturnAddressMsg>(this), worker_id_);
// } // }
// private: // private:
// int64_t worker_id_; // int64_t worker_id_;
// }; // };
// class AbortRequest : public SenderMessage { // class AbortRequest : public ReturnAddressMsg {
// public: // public:
// AbortRequest(std::string reactor, std::string channel, int64_t worker_id) // AbortRequest(std::string reactor, std::string channel, int64_t worker_id)
// : SenderMessage(reactor, channel), worker_id_(worker_id) {} // : ReturnAddressMsg(reactor, channel), worker_id_(worker_id) {}
// int64_t worker_id() { return worker_id_; } // int64_t worker_id() { return worker_id_; }
// template <class Archive> // template <class Archive>
// void serialize(Archive &archive) { // void serialize(Archive &archive) {
// archive(cereal::base_class<SenderMessage>(this), worker_id_); // archive(cereal::base_class<ReturnAddressMsg>(this), worker_id_);
// } // }
// private: // private:
@ -139,8 +139,8 @@
// if (Query *query = dynamic_cast<Query *>(m.get())) { // if (Query *query = dynamic_cast<Query *>(m.get())) {
// ProcessQuery(query); // ProcessQuery(query);
// break; // process only the first query // break; // process only the first query
// } else if (SenderMessage *msg = dynamic_cast<SenderMessage *>(m.get())) { // } else if (ReturnAddressMsg *msg = dynamic_cast<ReturnAddressMsg *>(m.get())) {
// std::cout << "SenderMessage received!" << std::endl; // std::cout << "ReturnAddressMsg received!" << std::endl;
// std::cout << " Address: " << msg->Address() << std::endl; // std::cout << " Address: " << msg->Address() << std::endl;
// std::cout << " Port: " << msg->Port() << std::endl; // std::cout << " Port: " << msg->Port() << std::endl;
// std::cout << " Reactor: " << msg->ReactorName() << std::endl; // std::cout << " Reactor: " << msg->ReactorName() << std::endl;
@ -175,27 +175,27 @@
// int worker_id = rand() % NUM_WORKERS; // int worker_id = rand() % NUM_WORKERS;
// int64_t xid = GetTransactionId(); // int64_t xid = GetTransactionId();
// std::string txn_channel_name = GetTxnName(xid); // std::string txn_channel_name = GetTxnName(xid);
// auto connector = Open(txn_channel_name); // auto channel = Open(txn_channel_name);
// auto stream = connector.first; // auto stream = channel.first;
// channels_[worker_id]->Send<CreateNodeTxn>("master", "main", xid); // channels_[worker_id]->Send<CreateNodeTxn>("master", "main", xid);
// auto m = stream->AwaitEvent(); // auto m = stream->AwaitEvent();
// if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) { // if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
// req->GetChannelToSender(system_)->Send<CommitDirective>(); // req->GetReturnChannelWriter(system_)->Send<CommitDirective>();
// } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) { // } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
// req->GetChannelToSender(system_)->Send<AbortDirective>(); // req->GetReturnChannelWriter(system_)->Send<AbortDirective>();
// } else { // } else {
// std::cerr << "unknown message\n"; // std::cerr << "unknown message\n";
// exit(1); // exit(1);
// } // }
// CloseConnector(txn_channel_name); // CloseChannel(txn_channel_name);
// } // }
// void PerformCountNodes() { // void PerformCountNodes() {
// int64_t xid = GetTransactionId(); // int64_t xid = GetTransactionId();
// std::string txn_channel_name = GetTxnName(xid); // std::string txn_channel_name = GetTxnName(xid);
// auto connector = Open(txn_channel_name); // auto channel = Open(txn_channel_name);
// auto stream = connector.first; // auto stream = channel.first;
// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id) // for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
// channels_[w_id]->Send<CountNodesTxn>("master", "main", xid); // channels_[w_id]->Send<CountNodesTxn>("master", "main", xid);
@ -205,10 +205,10 @@
// for (int responds = 0; responds < NUM_WORKERS; ++responds) { // for (int responds = 0; responds < NUM_WORKERS; ++responds) {
// auto m = stream->AwaitEvent(); // auto m = stream->AwaitEvent();
// if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) { // if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
// txn_channels[req->worker_id()] = req->GetChannelToSender(system_); // txn_channels[req->worker_id()] = req->GetReturnChannelWriter(system_);
// commit &= true; // commit &= true;
// } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) { // } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
// txn_channels[req->worker_id()] = req->GetChannelToSender(system_); // txn_channels[req->worker_id()] = req->GetReturnChannelWriter(system_);
// commit = false; // commit = false;
// } else { // } else {
// std::cerr << "unknown message\n"; // std::cerr << "unknown message\n";
@ -236,7 +236,7 @@
// } // }
// } // }
// CloseConnector(txn_channel_name); // CloseChannel(txn_channel_name);
// std::cout << "graph has " << count << " vertices" << std::endl; // std::cout << "graph has " << count << " vertices" << std::endl;
// } // }
@ -302,9 +302,9 @@
// } // }
// void HandleCreateNode(CreateNodeTxn *txn) { // void HandleCreateNode(CreateNodeTxn *txn) {
// auto connector = Open(GetTxnChannelName(txn->id())); // auto channel = Open(GetTxnChannelName(txn->id()));
// auto stream = connector.first; // auto stream = channel.first;
// auto masterChannel = txn->GetChannelToSender(system_); // auto masterChannel = txn->GetReturnChannelWriter(system_);
// // TODO: Do the actual commit. // // TODO: Do the actual commit.
// masterChannel->Send<CommitRequest>("master", "main", worker_id_); // masterChannel->Send<CommitRequest>("master", "main", worker_id_);
// auto m = stream->AwaitEvent(); // auto m = stream->AwaitEvent();
@ -316,13 +316,13 @@
// std::cerr << "unknown message\n"; // std::cerr << "unknown message\n";
// exit(1); // exit(1);
// } // }
// CloseConnector(GetTxnChannelName(txn->id())); // CloseChannel(GetTxnChannelName(txn->id()));
// } // }
// void HandleCountNodes(CountNodesTxn *txn) { // void HandleCountNodes(CountNodesTxn *txn) {
// auto connector = Open(GetTxnChannelName(txn->id())); // auto channel = Open(GetTxnChannelName(txn->id()));
// auto stream = connector.first; // auto stream = channel.first;
// auto masterChannel = txn->GetChannelToSender(system_); // auto masterChannel = txn->GetReturnChannelWriter(system_);
// // TODO: Fix this hack -- use the storage. // // TODO: Fix this hack -- use the storage.
// int num = 123; // int num = 123;
@ -337,7 +337,7 @@
// std::cerr << "unknown message\n"; // std::cerr << "unknown message\n";
// exit(1); // exit(1);
// } // }
// CloseConnector(GetTxnChannelName(txn->id())); // CloseChannel(GetTxnChannelName(txn->id()));
// } // }
// // TODO: Don't repeat code from Master. // // TODO: Don't repeat code from Master.

View File

@ -3,18 +3,18 @@
#include "reactors_distributed.hpp" #include "reactors_distributed.hpp"
class ChatMessage : public SenderMessage { class ChatMessage : public ReturnAddressMsg {
public: public:
ChatMessage() : SenderMessage(), message_("") {} ChatMessage() : ReturnAddressMsg(), message_("") {}
ChatMessage(std::string reactor, std::string channel, std::string message) ChatMessage(std::string reactor, std::string channel, std::string message)
: SenderMessage(reactor, channel), message_(message) {} : ReturnAddressMsg(reactor, channel), message_(message) {}
std::string Message() const { return message_; } std::string Message() const { return message_; }
template <class Archive> template <class Archive>
void serialize(Archive &ar) { void serialize(Archive &ar) {
ar(cereal::base_class<SenderMessage>(this), message_); ar(cereal::base_class<ReturnAddressMsg>(this), message_);
} }
private: private:
@ -56,7 +56,7 @@ class ChatServer : public Reactor {
std::cout << "Received message from " << msg.Address() << ":" std::cout << "Received message from " << msg.Address() << ":"
<< msg.Port() << " -> '" << msg.Message() << "'" << msg.Port() << " -> '" << msg.Message() << "'"
<< std::endl; << std::endl;
auto channel = msg.GetChannelToSender(); auto channel = msg.GetReturnChannelWriter();
if (channel != nullptr) { if (channel != nullptr) {
channel->Send<ChatACK>("server", "chat", msg.Message()); channel->Send<ChatACK>("server", "chat", msg.Message());
} }

View File

@ -7,7 +7,7 @@ int main(int argc, char *argv[]) {
auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main"); auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main");
std::cout << channel << std::endl; std::cout << channel << std::endl;
if (channel != nullptr) { if (channel != nullptr) {
channel->Send<SenderMessage>("master", "main"); channel->Send<ReturnAddressMsg>("master", "main");
} }
distributed.network().StopClient(); distributed.network().StopClient();
return 0; return 0;

View File

@ -1,4 +1,5 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "reactors_local.hpp"
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
@ -9,13 +10,11 @@
#include <vector> #include <vector>
#include <future> #include <future>
#include "reactors_local.hpp"
TEST(SystemTest, ReturnWithoutThrowing) { TEST(SystemTest, ReturnWithoutThrowing) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -31,8 +30,8 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
virtual void Run() { virtual void Run() {
Open("channel"); Open("channel");
ASSERT_THROW(Open("channel"), std::runtime_error); ASSERT_THROW(Open("channel"), std::runtime_error);
CloseConnector("main"); CloseChannel("main");
CloseConnector("channel"); CloseChannel("channel");
} }
}; };
@ -42,62 +41,26 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
} }
TEST(ConnectorSetUpTest, CheckMainChannelIsSet) { TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main"); CloseChannel("main");
} }
}; };
struct Worker : public Reactor { struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {} Worker(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("master", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main"); CloseChannel("main");
}
};
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, OneSimpleSend) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(123);
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
}
};
struct Worker : public Reactor {
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
CloseConnector("main");
MessageInt* msg = dynamic_cast<MessageInt *>(m_uptr.get());
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->x, 123);
} }
}; };
@ -116,11 +79,11 @@ TEST(SimpleSendTest, OneCallback) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(888); channel_writer->Send<MessageInt>(888);
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -131,7 +94,7 @@ TEST(SimpleSendTest, OneCallback) {
stream->OnEvent<MessageInt>([this](const MessageInt &msg, const EventStream::Subscription&) { stream->OnEvent<MessageInt>([this](const MessageInt &msg, const EventStream::Subscription&) {
ASSERT_EQ(msg.x, 888); ASSERT_EQ(msg.x, 888);
CloseConnector("main"); CloseChannel("main");
}); });
} }
}; };
@ -152,15 +115,15 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101); channel_writer->Send<MessageInt>(101);
channel->Send<MessageInt>(102); // should be ignored channel_writer->Send<MessageInt>(102); // should be ignored
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(103); // should be ignored channel_writer->Send<MessageInt>(103); // should be ignored
channel->Send<MessageInt>(104); // should be ignored channel_writer->Send<MessageInt>(104); // should be ignored
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII. CloseChannel("main"); // Write-end doesn't need to be closed because it's in RAII.
} }
}; };
@ -168,11 +131,11 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
Worker(std::string name) : Reactor(name) {} Worker(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
EventStream* stream = main_.first; EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
CloseConnector("main"); stream->OnEvent<MessageInt>([this](const MessageInt& msg, const EventStream::Subscription&) {
MessageInt* msg = dynamic_cast<MessageInt *>(m_uptr.get()); CloseChannel("main");
ASSERT_NE(msg, nullptr); ASSERT_EQ(msg.x, 101);
ASSERT_EQ(msg->x, 101); });
} }
}; };
@ -182,7 +145,6 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
system.AwaitShutdown(); system.AwaitShutdown();
} }
TEST(SimpleSendTest, DuringFirstEvent) { TEST(SimpleSendTest, DuringFirstEvent) {
struct MessageInt : public Message { struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {} MessageInt(int xx) : x(xx) {}
@ -200,13 +162,13 @@ TEST(SimpleSendTest, DuringFirstEvent) {
FindChannel("main")->Send<MessageInt>(102); FindChannel("main")->Send<MessageInt>(102);
if (msgint.x == 102) { if (msgint.x == 102) {
subscription.unsubscribe(); subscription.unsubscribe();
CloseConnector("main"); CloseChannel("main");
p_.set_value(777); p_.set_value(777);
} }
}); });
std::shared_ptr<Channel> channel = FindChannel("main"); std::shared_ptr<ChannelWriter> channel_writer = FindChannel("main");
channel->Send<MessageInt>(101); channel_writer->Send<MessageInt>(101);
} }
std::promise<int> p_; std::promise<int> p_;
}; };
@ -234,19 +196,19 @@ TEST(MultipleSendTest, UnsubscribeService) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55); channel_writer->Send<MessageInt>(55);
channel->Send<MessageInt>(66); channel_writer->Send<MessageInt>(66);
channel->Send<MessageInt>(77); channel_writer->Send<MessageInt>(77);
channel->Send<MessageInt>(88); channel_writer->Send<MessageInt>(88);
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageChar>('a'); channel_writer->Send<MessageChar>('a');
channel->Send<MessageChar>('b'); channel_writer->Send<MessageChar>('b');
channel->Send<MessageChar>('c'); channel_writer->Send<MessageChar>('c');
channel->Send<MessageChar>('d'); channel_writer->Send<MessageChar>('d');
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -271,7 +233,7 @@ TEST(MultipleSendTest, UnsubscribeService) {
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c'); ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
if (num_msgs_received == 5) { if (num_msgs_received == 5) {
subscription.unsubscribe(); subscription.unsubscribe();
CloseConnector("main"); CloseChannel("main");
} }
}); });
} }
@ -297,15 +259,15 @@ TEST(MultipleSendTest, OnEvent) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101); channel_writer->Send<MessageInt>(101);
channel->Send<MessageChar>('a'); channel_writer->Send<MessageChar>('a');
channel->Send<MessageInt>(103); channel_writer->Send<MessageInt>(103);
channel->Send<MessageChar>('b'); channel_writer->Send<MessageChar>('b');
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -334,7 +296,7 @@ TEST(MultipleSendTest, OnEvent) {
stream->OnEvent<EndMessage>([this](const EndMessage&, const EventStream::Subscription&) { stream->OnEvent<EndMessage>([this](const EndMessage&, const EventStream::Subscription&) {
ASSERT_LE(correct_vals, 4); ASSERT_LE(correct_vals, 4);
if (correct_vals == 4) { if (correct_vals == 4) {
CloseConnector("main"); CloseChannel("main");
} }
}); });
} }
@ -355,13 +317,13 @@ TEST(MultipleSendTest, Chaining) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55); channel_writer->Send<MessageInt>(55);
channel->Send<MessageInt>(66); channel_writer->Send<MessageInt>(66);
channel->Send<MessageInt>(77); channel_writer->Send<MessageInt>(77);
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -380,7 +342,7 @@ TEST(MultipleSendTest, Chaining) {
}) })
.ChainOnce<MessageInt>([this](const MessageInt &msg) { .ChainOnce<MessageInt>([this](const MessageInt &msg) {
ASSERT_EQ(msg.x, 77); ASSERT_EQ(msg.x, 77);
CloseConnector("main"); CloseChannel("main");
}); });
} }
}; };
@ -406,14 +368,14 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageChar>('a'); channel_writer->Send<MessageChar>('a');
channel->Send<MessageInt>(55); channel_writer->Send<MessageInt>(55);
channel->Send<MessageChar>('b'); channel_writer->Send<MessageChar>('b');
channel->Send<MessageInt>(77); channel_writer->Send<MessageInt>(77);
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -432,7 +394,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
}) })
.ChainOnce<MessageInt>([this](const MessageInt &msg) { .ChainOnce<MessageInt>([this](const MessageInt &msg) {
ASSERT_EQ(msg.x, 77); ASSERT_EQ(msg.x, 77);
CloseConnector("main"); CloseChannel("main");
}); });
} }
}; };
@ -455,16 +417,16 @@ TEST(MultipleSendTest, ProcessManyMessages) {
struct Master : public Reactor { struct Master : public Reactor {
Master(std::string name) : Reactor(name) {} Master(std::string name) : Reactor(name) {}
virtual void Run() { virtual void Run() {
std::shared_ptr<Channel> channel; std::shared_ptr<ChannelWriter> channel_writer;
while (!(channel = System::GetInstance().FindChannel("worker", "main"))) while (!(channel_writer = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
for (int i = 0; i < num_tests; ++i) { for (int i = 0; i < num_tests; ++i) {
channel->Send<MessageInt>(rand()); channel_writer->Send<MessageInt>(rand());
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5)); std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
} }
CloseConnector("main"); CloseChannel("main");
} }
}; };
@ -486,7 +448,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
stream->OnEvent<EndMessage>([this](const Message&, const EventStream::Subscription&) { stream->OnEvent<EndMessage>([this](const Message&, const EventStream::Subscription&) {
ASSERT_LE(vals, num_tests); ASSERT_LE(vals, num_tests);
if (vals == num_tests) { if (vals == num_tests) {
CloseConnector("main"); CloseChannel("main");
} }
}); });
} }

View File

@ -1,126 +0,0 @@
#include <fstream>
#include <streambuf>
#include "cereal/archives/binary.hpp"
#include "cereal/types/memory.hpp"
#include "cereal/types/string.hpp"
#include "cereal/types/utility.hpp" // utility has to be included because of std::pair
#include "cereal/types/vector.hpp"
struct BasicSerializable {
int64_t x_;
std::string y_;
BasicSerializable() = default;
BasicSerializable(int64_t x, std::string y) : x_(x), y_(y) {}
template <class Archive>
void serialize(Archive &ar) {
ar(x_, y_);
}
template <typename Archive>
static void load_and_construct(
Archive &ar, cereal::construct<BasicSerializable> &construct) {
int64_t x;
std::string y;
ar(x, y);
construct(x, y);
}
};
struct ComplexSerializable {
using VectorT = std::vector<float>;
using VectorPairT = std::vector<std::pair<std::string, BasicSerializable>>;
BasicSerializable x_;
VectorT y_;
VectorPairT z_;
ComplexSerializable(const BasicSerializable &x, const VectorT &y,
const VectorPairT &z)
: x_(x), y_(y), z_(z) {}
template <typename Archive>
void serialize(Archive &ar) {
ar(x_, y_, z_);
}
template <typename Archive>
static void load_and_construct(
Archive &ar, cereal::construct<ComplexSerializable> &construct) {
BasicSerializable x;
VectorT y;
VectorPairT z;
ar(x, y, z);
construct(x, y, z);
}
};
class DummyStreamBuf : public std::basic_streambuf<char> {
protected:
std::streamsize xsputn(const char *data, std::streamsize count) override {
for (std::streamsize i = 0; i < count; ++i) {
data_.push_back(data[i]);
}
return count;
}
std::streamsize xsgetn(char *data, std::streamsize count) override {
if (count < 0) return 0;
if (static_cast<size_t>(position_ + count) > data_.size()) {
count = data_.size() - position_;
position_ = data_.size();
}
memcpy(data, data_.data() + position_, count);
position_ += count;
return count;
}
private:
std::vector<char> data_;
std::streamsize position_{0};
};
int main() {
DummyStreamBuf sb;
std::iostream iostream(&sb);
// serialization
cereal::BinaryOutputArchive oarchive{iostream};
std::unique_ptr<BasicSerializable const> const basic_serializable_object{
new BasicSerializable{100, "Test"}};
std::unique_ptr<ComplexSerializable const> const complex_serializable_object{
new ComplexSerializable{
{100, "test"},
{3.4, 3.4},
{{"first", {10, "Basic1"}}, {"second", {20, "Basic2"}}}}};
oarchive(basic_serializable_object);
oarchive(complex_serializable_object);
// deserialization
cereal::BinaryInputArchive iarchive{iostream};
std::unique_ptr<BasicSerializable> basic_deserialized_object{nullptr};
std::unique_ptr<ComplexSerializable> complex_deserialized_object{nullptr};
iarchive(basic_deserialized_object);
iarchive(complex_deserialized_object);
// output
std::cout << "Basic Deserialized: " << basic_deserialized_object->x_ << "; "
<< basic_deserialized_object->y_ << std::endl;
auto x = complex_deserialized_object->x_;
auto y = complex_deserialized_object->y_;
auto z = complex_deserialized_object->z_;
std::cout << "Complex Deserialized" << std::endl;
std::cout << " x_ -> " << x.x_ << "; " << x.y_ << std::endl;
std::cout << " y_ -> ";
for (const auto v_item : y) std::cout << v_item << " ";
std::cout << std::endl;
std::cout << " z_ -> ";
for (const auto v_item : z)
std::cout << v_item.first << " | Pair: (" << v_item.second.x_ << ", "
<< v_item.second.y_ << ")"
<< "::";
std::cout << std::endl;
return 0;
}