Removed Double Locking, Simplified Ownership or LocalEventStream, ....

Summary:
1. added TAGS to .gitignore
2. Commented: LockedGetPendingMessages is not fair.
3. Removed lock from EventQueue dtor.
4. Changed recursive_mutex -> mutex in Reactor and EventQueue.
5. [main change] made LocalEventStream a composite part of EventQueue, the new and only way to close EQ is to call close() in Reactor
6. [main change] make LocalChannel non-unique, it can be freely created and shared around.
7. renamed Reactor::Close into Reactor::CloseConnector

Reviewers: lion, buda

Reviewed By: lion

Subscribers: mislav.bradac, pullbot

Differential Revision: https://phabricator.memgraph.io/D630
This commit is contained in:
Goran Zuzic 2017-08-04 17:32:13 +02:00
parent 599e5651af
commit b68265823c
5 changed files with 320 additions and 196 deletions

1
.gitignore vendored
View File

@ -34,3 +34,4 @@ tags
ve/ ve/
ve3/ ve3/
perf.data* perf.data*
TAGS

View File

@ -12,7 +12,7 @@ const int NUM_WORKERS = 1;
class Txn : public SenderMessage { class Txn : public SenderMessage {
public: public:
Txn(ChannelRefT channel, int64_t id) : SenderMessage(channel), id_(id) {} Txn(std::shared_ptr<Channel> channel, int64_t id) : SenderMessage(channel), id_(id) {}
int64_t id() const { return id_; } int64_t id() const { return id_; }
template <class Archive> template <class Archive>
@ -26,7 +26,7 @@ class Txn : public SenderMessage {
class CreateNodeTxn : public Txn { class CreateNodeTxn : public Txn {
public: public:
CreateNodeTxn(ChannelRefT channel, int64_t id) : Txn(channel, id) {} CreateNodeTxn(std::shared_ptr<Channel> channel, int64_t id) : Txn(channel, id) {}
template <class Archive> template <class Archive>
void serialize(Archive &archive) { void serialize(Archive &archive) {
@ -36,7 +36,7 @@ class CreateNodeTxn : public Txn {
class CountNodesTxn : public Txn { class CountNodesTxn : public Txn {
public: public:
CountNodesTxn(ChannelRefT channel, int64_t id) : Txn(channel, id) {} CountNodesTxn(std::shared_ptr<Channel> channel, int64_t id) : Txn(channel, id) {}
template <class Archive> template <class Archive>
void serialize(Archive &archive) { void serialize(Archive &archive) {
@ -60,7 +60,7 @@ class CountNodesTxnResult : public Message {
class CommitRequest : public SenderMessage { class CommitRequest : public SenderMessage {
public: public:
CommitRequest(ChannelRefT sender, int64_t worker_id) CommitRequest(std::shared_ptr<Channel> sender, int64_t worker_id)
: SenderMessage(sender), worker_id_(worker_id) {} : SenderMessage(sender), worker_id_(worker_id) {}
int64_t worker_id() { return worker_id_; } int64_t worker_id() { return worker_id_; }
@ -75,7 +75,7 @@ class CommitRequest : public SenderMessage {
class AbortRequest : public SenderMessage { class AbortRequest : public SenderMessage {
public: public:
AbortRequest(ChannelRefT sender, int64_t worker_id) AbortRequest(std::shared_ptr<Channel> sender, int64_t worker_id)
: SenderMessage(sender), worker_id_(worker_id) {} : SenderMessage(sender), worker_id_(worker_id) {}
int64_t worker_id() { return worker_id_; } int64_t worker_id() { return worker_id_; }
@ -182,7 +182,7 @@ class Master : public Reactor {
std::cerr << "unknown message\n"; std::cerr << "unknown message\n";
exit(1); exit(1);
} }
Close(txn_channel_name); CloseConnector(txn_channel_name);
} }
void PerformCountNodes() { void PerformCountNodes() {
@ -194,7 +194,7 @@ class Master : public Reactor {
channels_[w_id]->Send( channels_[w_id]->Send(
std::make_unique<CountNodesTxn>(connector.second, xid)); std::make_unique<CountNodesTxn>(connector.second, xid));
std::vector<ChannelRefT> txn_channels; std::vector<std::shared_ptr<Channel>> txn_channels;
txn_channels.resize(NUM_WORKERS, nullptr); txn_channels.resize(NUM_WORKERS, nullptr);
bool commit = true; bool commit = true;
for (int responds = 0; responds < NUM_WORKERS; ++responds) { for (int responds = 0; responds < NUM_WORKERS; ++responds) {
@ -231,7 +231,7 @@ class Master : public Reactor {
} }
} }
Close(txn_channel_name); CloseConnector(txn_channel_name);
std::cout << "graph has " << count << " vertices" << std::endl; std::cout << "graph has " << count << " vertices" << std::endl;
} }
@ -312,7 +312,7 @@ class Worker : public Reactor {
std::cerr << "unknown message\n"; std::cerr << "unknown message\n";
exit(1); exit(1);
} }
Close(GetTxnChannelName(txn->id())); CloseConnector(GetTxnChannelName(txn->id()));
} }
void HandleCountNodes(CountNodesTxn *txn) { void HandleCountNodes(CountNodesTxn *txn) {
@ -334,7 +334,7 @@ class Worker : public Reactor {
std::cerr << "unknown message\n"; std::cerr << "unknown message\n";
exit(1); exit(1);
} }
Close(GetTxnChannelName(txn->id())); CloseConnector(GetTxnChannelName(txn->id()));
} }
// TODO: Don't repeat code from Master. // TODO: Don't repeat code from Master.
@ -350,7 +350,6 @@ class Worker : public Reactor {
std::shared_ptr<Channel> master_channel_ = nullptr; std::shared_ptr<Channel> master_channel_ = nullptr;
int worker_id_; int worker_id_;
// Storage storage_;
}; };
void ClientMain(System *system) { void ClientMain(System *system) {

View File

@ -6,107 +6,96 @@ void EventStream::Subscription::unsubscribe() {
thread_local Reactor* current_reactor_ = nullptr; thread_local Reactor* current_reactor_ = nullptr;
std::string EventQueue::LocalChannel::Hostname() { std::string Connector::LocalChannel::Hostname() {
return system_->network().Hostname(); return system_->network().Hostname();
} }
int32_t EventQueue::LocalChannel::Port() { int32_t Connector::LocalChannel::Port() {
return system_->network().Port(); return system_->network().Port();
} }
std::string EventQueue::LocalChannel::ReactorName() { std::string Connector::LocalChannel::ReactorName() {
return reactor_name_; return reactor_name_;
} }
std::string EventQueue::LocalChannel::Name() { std::string Connector::LocalChannel::Name() {
return name_; return name_;
} }
void EventQueue::LocalEventStream::Close() { std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open(const std::string &channel_name) {
current_reactor_->Close(name_); std::unique_lock<std::mutex> lock(*mutex_);
}
ConnectorT Reactor::Open(const std::string &channel_name) {
std::unique_lock<std::recursive_mutex> lock(*mutex_);
// TODO: Improve the check that the channel name does not exist in the // TODO: Improve the check that the channel name does not exist in the
// system. // system.
assert(connectors_.count(channel_name) == 0); assert(connectors_.count(channel_name) == 0);
auto it = connectors_.emplace(channel_name, auto it = connectors_.emplace(channel_name,
EventQueue::Params{system_, name_, channel_name, mutex_, cvar_}).first; std::make_shared<Connector>(Connector::Params{system_, name_, channel_name, mutex_, cvar_})).first;
return ConnectorT(it->second.stream_, it->second.channel_); it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
} }
ConnectorT Reactor::Open() { std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open() {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
do { do {
std::string channel_name = "stream-" + std::to_string(channel_name_counter_++); std::string channel_name = "stream-" + std::to_string(channel_name_counter_++);
if (connectors_.count(channel_name) == 0) { if (connectors_.count(channel_name) == 0) {
// EventQueue &queue = connectors_[channel_name]; // Connector &queue = connectors_[channel_name];
auto it = connectors_.emplace(channel_name, auto it = connectors_.emplace(channel_name,
EventQueue::Params{system_, name_, channel_name, mutex_, cvar_}).first; std::make_shared<Connector>(Connector::Params{system_, name_, channel_name, mutex_, cvar_})).first;
return ConnectorT(it->second.stream_, it->second.channel_); it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
} }
} while (true); } while (true);
} }
const std::shared_ptr<Channel> Reactor::FindChannel( const std::shared_ptr<Channel> Reactor::FindChannel(
const std::string &channel_name) { const std::string &channel_name) {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
auto it_connector = connectors_.find(channel_name); auto it_connector = connectors_.find(channel_name);
if (it_connector == connectors_.end()) return nullptr; if (it_connector == connectors_.end()) return nullptr;
return it_connector->second.channel_; return it_connector->second->LockedOpenChannel();
} }
void Reactor::Close(const std::string &s) { void Reactor::CloseConnector(const std::string &s) {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
auto it = connectors_.find(s); auto it = connectors_.find(s);
assert(it != connectors_.end()); assert(it != connectors_.end());
LockedCloseInternal(it->second); connectors_.erase(it);
connectors_.erase(it); // this calls the EventQueue destructor that catches the mutex, ugh.
} }
void Reactor::LockedCloseInternal(EventQueue& event_queue) { void Reactor::CloseAllConnectors() {
// TODO(zuza): figure this out! @@@@ std::unique_lock<std::mutex> lock(*mutex_);
std::cout << "Close Channel! Reactor name = " << name_ << " Channel name = " << event_queue.name_ << std::endl; connectors_.clear();
} }
void Reactor::RunEventLoop() { void Reactor::RunEventLoop() {
std::cout << "event loop is run!" << std::endl; bool exit_event_loop = false;
while (true) { while (true) {
// Clean up EventQueues without callbacks. // Find (or wait) for the next Message.
MsgAndCbInfo msg_and_cb;
{ {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
for (auto connectors_it = connectors_.begin(); connectors_it != connectors_.end(); ) {
EventQueue& event_queue = connectors_it->second;
if (event_queue.LockedCanBeClosed()) {
LockedCloseInternal(event_queue);
connectors_it = connectors_.erase(connectors_it); // This removes the element from the collection.
} else {
++connectors_it;
}
}
}
// Process and wait for events to dispatch.
MsgAndCbInfo msgAndCb;
{
std::unique_lock<std::recursive_mutex> lock(*mutex_);
// Exit the loop if there are no more EventQueues.
if (connectors_.empty()) {
return;
}
while (true) { while (true) {
msgAndCb = LockedGetPendingMessages(lock); // Exit the loop if there are no more Connectors.
if (msgAndCb.first != nullptr) break; if (connectors_.empty()) {
exit_event_loop = true;
break;
}
// Not fair because was taken earlier, talk to lion.
msg_and_cb = LockedGetPendingMessages(lock);
if (msg_and_cb.first != nullptr) break;
cvar_->wait(lock); cvar_->wait(lock);
} }
if (exit_event_loop) break;
} }
for (auto& cbAndSub : msgAndCb.second) { for (auto& cbAndSub : msg_and_cb.second) {
auto& cb = cbAndSub.first; auto& cb = cbAndSub.first;
const Message& msg = *msgAndCb.first; const Message& msg = *msg_and_cb.first;
cb(msg, cbAndSub.second); cb(msg, cbAndSub.second);
} }
} }
@ -115,10 +104,10 @@ void Reactor::RunEventLoop() {
/** /**
* Checks if there is any nonempty EventStream. * Checks if there is any nonempty EventStream.
*/ */
auto Reactor::LockedGetPendingMessages(std::unique_lock<std::recursive_mutex> &lock) -> MsgAndCbInfo { auto Reactor::LockedGetPendingMessages(std::unique_lock<std::mutex> &lock) -> MsgAndCbInfo {
// return type after because the scope Reactor:: is not searched before the name // return type after because the scope Reactor:: is not searched before the name
for (auto& connectors_key_value : connectors_) { for (auto& connectors_key_value : connectors_) {
EventQueue& event_queue = connectors_key_value.second; Connector& event_queue = *connectors_key_value.second;
auto msg_ptr = event_queue.LockedPop(lock); auto msg_ptr = event_queue.LockedPop(lock);
if (msg_ptr == nullptr) continue; if (msg_ptr == nullptr) continue;

View File

@ -19,7 +19,7 @@ class Message;
class EventStream; class EventStream;
class Reactor; class Reactor;
class System; class System;
class EventQueue; class Connector;
extern thread_local Reactor* current_reactor_; extern thread_local Reactor* current_reactor_;
@ -74,10 +74,10 @@ class EventStream {
private: private:
friend class Reactor; friend class Reactor;
Subscription(EventQueue& event_queue, uint64_t cb_uid) : event_queue_(event_queue) { Subscription(Connector& event_queue, uint64_t cb_uid) : event_queue_(event_queue) {
cb_uid_ = cb_uid; cb_uid_ = cb_uid;
} }
EventQueue& event_queue_; Connector& event_queue_;
uint64_t cb_uid_; uint64_t cb_uid_;
}; };
@ -87,11 +87,6 @@ class EventStream {
* Register a callback that will be called whenever an event arrives. * Register a callback that will be called whenever an event arrives.
*/ */
virtual void OnEvent(Callback callback) = 0; virtual void OnEvent(Callback callback) = 0;
/**
* Close this event stream, disallowing further events from getting received.
*/
virtual void Close() = 0;
}; };
/** /**
@ -99,88 +94,51 @@ class EventStream {
* *
* This class is an internal data structure that represents the state of the connector. * This class is an internal data structure that represents the state of the connector.
* This class is not meant to be used by the clients of the messaging framework. * This class is not meant to be used by the clients of the messaging framework.
* The EventQueue class wraps the event queue data structure, the mutex that protects * The Connector class wraps the event queue data structure, the mutex that protects
* concurrent access to the event queue, the local channel and the event stream. * concurrent access to the event queue, the local channel and the event stream.
* The class is owned by the Reactor, but its LocalChannel can outlive it. * The class is owned by the Reactor. It gets closed when the owner reactor
* See the LocalChannel and LocalEventStream nested classes for further information. * (the one that owns the read-end of a connector) removes/closes it.
*/ */
class EventQueue { class Connector {
class Params;
public: public:
friend class Reactor; friend class Reactor; // to create a Params initialization object
friend class EventStream::Subscription; friend class EventStream::Subscription;
struct Params { Connector(Params params)
System* system;
std::string reactor_name;
std::string name;
std::shared_ptr<std::recursive_mutex> mutex;
std::shared_ptr<std::condition_variable_any> cvar;
};
EventQueue(Params params)
: system_(params.system), : system_(params.system),
reactor_name_(params.reactor_name),
name_(params.name), name_(params.name),
reactor_name_(params.reactor_name),
mutex_(params.mutex), mutex_(params.mutex),
cvar_(params.cvar) {} cvar_(params.cvar),
stream_(mutex_, name_, this) {}
/** /**
* The destructor locks the mutex of the EventQueue and sets queue pointer to null. * LocalChannel represents the channels to reactors living in the same reactor system (write-end of the connectors).
*/
~EventQueue() {
// Ugly: this is the ONLY thing that is allowed to lock this recursive mutex twice.
// This is because we can't make a locked and a unlocked version of the destructor.
std::unique_lock<std::recursive_mutex> lock(*mutex_);
stream_->queue_ = nullptr;
channel_->queue_ = nullptr;
}
void LockedPush(std::unique_lock<std::recursive_mutex> &, std::unique_ptr<Message> m) {
queue_.push(std::move(m));
cvar_->notify_one();
}
std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::recursive_mutex> &lock) {
std::unique_ptr<Message> m;
while (!(m = LockedRawPop())) {
cvar_->wait(lock);
}
return m;
}
std::unique_ptr<Message> LockedPop(std::unique_lock<std::recursive_mutex> &lock) {
return LockedRawPop();
}
void LockedOnEvent(EventStream::Callback callback) {
uint64_t cb_uid = next_cb_uid++;
callbacks_[cb_uid] = callback;
}
/**
* LocalChannel represents the channels to reactors living in the same reactor system.
* *
* Sending messages to the local channel requires acquiring the mutex. * Sending messages to the local channel requires acquiring the mutex.
* LocalChannel holds a pointer to the enclosing EventQueue object. * LocalChannel holds a (weak) pointer to the enclosing Connector object.
* The enclosing EventQueue object is destroyed when the reactor calls Close. * Messages sent to a closed channel are ignored.
* When this happens, the pointer to the enclosing EventQueue object is set to null. * There can be multiple LocalChannels refering to the same stream if needed.
* After this, all the message sends on this channel are dropped.
*/ */
class LocalChannel : public Channel { class LocalChannel : public Channel {
public: public:
friend class EventQueue; friend class Connector;
LocalChannel(std::shared_ptr<std::recursive_mutex> mutex, std::string reactor_name, LocalChannel(std::shared_ptr<std::mutex> mutex, std::string reactor_name,
std::string name, EventQueue *queue, System *system) std::string name, std::weak_ptr<Connector> queue, System *system)
: mutex_(mutex), : mutex_(mutex),
reactor_name_(reactor_name), reactor_name_(reactor_name),
name_(name), name_(name),
queue_(queue), weak_queue_(queue),
system_(system) {} system_(system) {}
virtual void Send(std::unique_ptr<Message> m) { virtual void Send(std::unique_ptr<Message> m) {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::shared_ptr<Connector> queue_ = weak_queue_.lock(); // Atomic, per the standard.
if (queue_ != nullptr) { if (queue_) {
// We guarantee here that the Connector is not destroyed.
std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedPush(lock, std::move(m)); queue_->LockedPush(lock, std::move(m));
} }
} }
@ -194,27 +152,26 @@ class EventQueue {
virtual std::string Name(); virtual std::string Name();
private: private:
std::shared_ptr<std::recursive_mutex> mutex_; std::shared_ptr<std::mutex> mutex_;
std::string reactor_name_; std::string reactor_name_;
std::string name_; std::string name_;
EventQueue *queue_; std::weak_ptr<Connector> weak_queue_;
System *system_; System *system_;
}; };
/** /**
* Implementation of the event stream. * Implementation of the event stream.
* *
* After the enclosing EventQueue object is destroyed (by a call to Close), * After the enclosing Connector object is destroyed (by a call to CloseChannel or Close).
* it is no longer legal to call any of the event stream methods.
*/ */
class LocalEventStream : public EventStream { class LocalEventStream : public EventStream {
public: public:
friend class EventQueue; friend class Connector;
LocalEventStream(std::shared_ptr<std::recursive_mutex> mutex, std::string name, LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string name,
EventQueue *queue) : mutex_(mutex), name_(name), queue_(queue) {} Connector *queue) : mutex_(mutex), name_(name), queue_(queue) {}
std::unique_ptr<Message> AwaitEvent() { std::unique_ptr<Message> AwaitEvent() {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
if (queue_ != nullptr) { if (queue_ != nullptr) {
return queue_->LockedAwaitPop(lock); return queue_->LockedAwaitPop(lock);
} }
@ -222,7 +179,7 @@ class EventQueue {
"Cannot call method after connector was closed."); "Cannot call method after connector was closed.");
} }
std::unique_ptr<Message> PopEvent() { std::unique_ptr<Message> PopEvent() {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
if (queue_ != nullptr) { if (queue_ != nullptr) {
return queue_->LockedPop(lock); return queue_->LockedPop(lock);
} }
@ -230,7 +187,7 @@ class EventQueue {
"Cannot call method after connector was closed."); "Cannot call method after connector was closed.");
} }
void OnEvent(EventStream::Callback callback) { void OnEvent(EventStream::Callback callback) {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
if (queue_ != nullptr) { if (queue_ != nullptr) {
queue_->LockedOnEvent(callback); queue_->LockedOnEvent(callback);
return; return;
@ -239,67 +196,97 @@ class EventQueue {
"Cannot call method after connector was closed."); "Cannot call method after connector was closed.");
} }
void Close();
private: private:
std::shared_ptr<std::recursive_mutex> mutex_; std::shared_ptr<std::mutex> mutex_;
std::string name_; std::string name_;
EventQueue *queue_; Connector *queue_;
};
Connector(const Connector &other) = delete;
Connector(Connector &&other) = default;
Connector &operator=(const Connector &other) = delete;
Connector &operator=(Connector &&other) = default;
private:
/**
* Initialization parameters to Connector.
* Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor.
*/
struct Params {
System* system;
std::string reactor_name;
/**
* Connector name.
*/
std::string name;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cvar;
}; };
private:
void LockedPush(std::unique_lock<std::mutex> &, std::unique_ptr<Message> m) {
queue_.push(std::move(m));
// This is OK because there is only one Reactor (thread) that can wait on this Connector.
cvar_->notify_one();
}
std::shared_ptr<LocalChannel> LockedOpenChannel() {
assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
return std::make_shared<LocalChannel>(mutex_, reactor_name_, name_, self_ptr_, system_);
}
std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) {
std::unique_ptr<Message> m;
while (!(m = LockedRawPop())) {
cvar_->wait(lock);
}
return m;
}
std::unique_ptr<Message> LockedPop(std::unique_lock<std::mutex> &lock) {
return LockedRawPop();
}
void LockedOnEvent(EventStream::Callback callback) {
uint64_t cb_uid = next_cb_uid++;
callbacks_[cb_uid] = callback;
}
std::unique_ptr<Message> LockedRawPop() { std::unique_ptr<Message> LockedRawPop() {
if (queue_.empty()) return nullptr; if (queue_.empty()) return nullptr;
std::unique_ptr<Message> t = std::move(queue_.front()); std::unique_ptr<Message> t = std::move(queue_.front());
queue_.pop(); queue_.pop();
return std::move(t); return t;
}
/**
* Should the owner close this EventQueue?
*
* Currently only checks if there are no more messages and all callbacks have unsubscribed?
* This assumes the event loop has been started.
*/
bool LockedCanBeClosed() {
return callbacks_.empty() && queue_.empty();
} }
void RemoveCbByUid(uint64_t uid) { void RemoveCbByUid(uint64_t uid) {
std::unique_lock<std::recursive_mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
size_t num_erased = callbacks_.erase(uid); size_t num_erased = callbacks_.erase(uid);
assert(num_erased == 1); assert(num_erased == 1);
// TODO(zuza): if no more callbacks, shut down the class (and the eventloop is started). First, figure out ownership of EventQueue?
} }
System *system_; System *system_;
std::string name_; std::string name_;
std::string reactor_name_; std::string reactor_name_;
std::queue<std::unique_ptr<Message>> queue_; std::queue<std::unique_ptr<Message>> queue_;
// Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive.
std::shared_ptr<std::recursive_mutex> mutex_; std::shared_ptr<std::mutex> mutex_;
std::shared_ptr<std::condition_variable_any> cvar_; std::shared_ptr<std::condition_variable> cvar_;
std::shared_ptr<LocalEventStream> stream_ = /**
std::make_shared<LocalEventStream>(mutex_, name_, this); * A weak_ptr to itself.
std::shared_ptr<LocalChannel> channel_ = *
std::make_shared<LocalChannel>(mutex_, reactor_name_, name_, this, system_); * There are initialization problems with this, check Params.
*/
std::weak_ptr<Connector> self_ptr_;
LocalEventStream stream_;
std::unordered_map<uint64_t, EventStream::Callback> callbacks_; std::unordered_map<uint64_t, EventStream::Callback> callbacks_;
uint64_t next_cb_uid = 0; uint64_t next_cb_uid = 0;
}; };
/**
* Pair composed of read-end and write-end of a connection.
*/
using ConnectorT = std::pair<std::shared_ptr<EventStream>, std::shared_ptr<Channel>>;
using ChannelRefT = std::shared_ptr<Channel>;
/** /**
* A single unit of concurrent execution in the system. * A single unit of concurrent execution in the system.
* *
* E.g. one worker, one client. Owned by System. * E.g. one worker, one client. Owned by System. Has a thread associated with it.
*/ */
class Reactor { class Reactor {
public: public:
@ -307,31 +294,56 @@ class Reactor {
Reactor(System *system, std::string name) Reactor(System *system, std::string name)
: system_(system), name_(name), main_(Open("main")) {} : system_(system), name_(name), main_(Open("main")) {}
virtual ~Reactor() {} virtual ~Reactor() {}
virtual void Run() = 0; virtual void Run() = 0;
ConnectorT Open(const std::string &s); std::pair<EventStream*, std::shared_ptr<Channel>> Open(const std::string &s);
ConnectorT Open(); std::pair<EventStream*, std::shared_ptr<Channel>> Open();
const std::shared_ptr<Channel> FindChannel(const std::string &channel_name); const std::shared_ptr<Channel> FindChannel(const std::string &channel_name);
void Close(const std::string &s);
/**
* Close a connector by name.
*
* Should only be called from the Reactor thread.
*/
void CloseConnector(const std::string &s);
/**
* close all connectors (typically during shutdown).
*
* Should only be called from the Reactor thread.
*/
void CloseAllConnectors();
Reactor(const Reactor &other) = delete;
Reactor(Reactor &&other) = default;
Reactor &operator=(const Reactor &other) = delete;
Reactor &operator=(Reactor &&other) = default;
protected: protected:
System *system_; System *system_;
std::string name_; std::string name_;
/* /*
* Locks all Reactor data, including all EventQueue's in connectors_. * Locks all Reactor data, including all Connector's in connectors_.
* *
* This should be a shared_ptr because LocalChannel can outlive Reactor. * This should be a shared_ptr because LocalChannel can outlive Reactor.
*/ */
std::shared_ptr<std::recursive_mutex> mutex_ = std::shared_ptr<std::mutex> mutex_ =
std::make_shared<std::recursive_mutex>(); std::make_shared<std::mutex>();
std::shared_ptr<std::condition_variable_any> cvar_ = std::shared_ptr<std::condition_variable> cvar_ =
std::make_shared<std::condition_variable_any>(); std::make_shared<std::condition_variable>();
std::unordered_map<std::string, EventQueue> connectors_;
/**
* List of connectors of a reactor indexed by name.
*
* While the connectors are owned by the reactor, a shared_ptr to solve the circular reference problem
* between Channels and EventStreams.
*/
std::unordered_map<std::string, std::shared_ptr<Connector>> connectors_;
int64_t channel_name_counter_{0}; int64_t channel_name_counter_{0};
ConnectorT main_; std::pair<EventStream*, std::shared_ptr<Channel>> main_;
private: private:
typedef std::pair<std::unique_ptr<Message>, typedef std::pair<std::unique_ptr<Message>,
@ -342,10 +354,8 @@ class Reactor {
*/ */
void RunEventLoop(); void RunEventLoop();
void LockedCloseInternal(EventQueue& event_queue);
// TODO: remove proof of locking evidence ?! // TODO: remove proof of locking evidence ?!
MsgAndCbInfo LockedGetPendingMessages(std::unique_lock<std::recursive_mutex> &lock); MsgAndCbInfo LockedGetPendingMessages(std::unique_lock<std::mutex> &lock);
}; };
/** /**
@ -437,9 +447,9 @@ class Message {
*/ */
class SenderMessage : public Message { class SenderMessage : public Message {
public: public:
SenderMessage(ChannelRefT sender) : sender_(sender) {} SenderMessage(std::shared_ptr<Channel> sender) : sender_(sender) {}
ChannelRefT sender() { return sender_; } std::shared_ptr<Channel> sender() { return sender_; }
template <class Archive> template <class Archive>
void serialize(Archive &ar) { void serialize(Archive &ar) {
@ -447,7 +457,7 @@ class SenderMessage : public Message {
} }
private: private:
ChannelRefT sender_; std::shared_ptr<Channel> sender_;
}; };
/** /**
@ -534,7 +544,7 @@ class System {
} }
std::recursive_mutex mutex_; std::recursive_mutex mutex_;
// TODO: Replace with a map to a reactor EventQueue map to have more granular // TODO: Replace with a map to a reactor Connector map to have more granular
// locking. // locking.
std::unordered_map<std::string, std::unordered_map<std::string,
std::pair<std::unique_ptr<Reactor>, std::thread>> std::pair<std::unique_ptr<Reactor>, std::thread>>

View File

@ -0,0 +1,125 @@
#include "gtest/gtest.h"
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "communication.hpp"
TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
CloseConnector("main");
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
CloseConnector("main");
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, OneSimpleSend) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
channel->Send(std::make_unique<MessageInt>(123));
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
CloseConnector("main");
MessageInt* msg = dynamic_cast<MessageInt *>(m_uptr.get());
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->x, 123);
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, IgnoreAfterClose) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
channel->Send(std::make_unique<MessageInt>(101));
channel->Send(std::make_unique<MessageInt>(102));
std::this_thread::sleep_for(std::chrono::seconds(1));
channel->Send(std::make_unique<MessageInt>(103)); // these ones should be ignored
channel->Send(std::make_unique<MessageInt>(104));
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
CloseConnector("main");
MessageInt* msg = dynamic_cast<MessageInt *>(m_uptr.get());
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->x, 101);
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}