Implemented EventStream::Close

Summary:
1. Consistency Changes in Reactors
2. Renamed Channel -> Connector where appropriate

Merge branch 'dev' of https://phabricator.memgraph.io/diffusion/MG/memgraph into dev

Reviewers: lion, buda

Reviewed By: buda

Subscribers: pullbot, sasa.stanko

Differential Revision: https://phabricator.memgraph.io/D638
This commit is contained in:
Goran Zuzic 2017-08-07 14:12:04 +02:00
parent 0b8d71ee8f
commit 59b395626c
3 changed files with 58 additions and 56 deletions

View File

@ -12,12 +12,13 @@ BASE_FLAGS = [
'-Wno-variadic-macros',
'-fexceptions',
'-ferror-limit=10000',
'-DNDEBUG',
'-std=c++1z',
'-xc++',
'-I/usr/lib/',
'-I/usr/include/',
'-I./src',
'-I./src',
'-I./experimental/distributed/src',
'-I./experimental/distributed/libs/cereal/include',
'-I./include',
'-I./libs/fmt',
'-I./libs/yaml-cpp',

View File

@ -19,16 +19,20 @@ std::string Connector::LocalChannel::ReactorName() {
}
std::string Connector::LocalChannel::Name() {
return name_;
return connector_name_;
}
std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open(const std::string &channel_name) {
void Connector::LocalEventStream::Close() {
current_reactor_->CloseConnector(connector_name_);
}
std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open(const std::string &connector_name) {
std::unique_lock<std::mutex> lock(*mutex_);
// TODO: Improve the check that the channel name does not exist in the
// system.
assert(connectors_.count(channel_name) == 0);
auto it = connectors_.emplace(channel_name,
std::make_shared<Connector>(Connector::Params{system_, name_, channel_name, mutex_, cvar_})).first;
assert(connectors_.count(connector_name) == 0);
auto it = connectors_.emplace(connector_name,
std::make_shared<Connector>(Connector::Params{system_, name_, connector_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
@ -36,11 +40,11 @@ std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open(const std::strin
std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open() {
std::unique_lock<std::mutex> lock(*mutex_);
do {
std::string channel_name = "stream-" + std::to_string(channel_name_counter_++);
if (connectors_.count(channel_name) == 0) {
// Connector &queue = connectors_[channel_name];
auto it = connectors_.emplace(channel_name,
std::make_shared<Connector>(Connector::Params{system_, name_, channel_name, mutex_, cvar_})).first;
std::string connector_name = "stream-" + std::to_string(connector_name_counter_++);
if (connectors_.count(connector_name) == 0) {
// Connector &queue = connectors_[connector_name];
auto it = connectors_.emplace(connector_name,
std::make_shared<Connector>(Connector::Params{system_, name_, connector_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
@ -69,7 +73,7 @@ void Reactor::CloseAllConnectors() {
void Reactor::RunEventLoop() {
bool exit_event_loop = false;
while (true) {
// Find (or wait) for the next Message.
MsgAndCbInfo msg_and_cb;
@ -110,7 +114,7 @@ auto Reactor::LockedGetPendingMessages(std::unique_lock<std::mutex> &lock) -> Ms
Connector& event_queue = *connectors_key_value.second;
auto msg_ptr = event_queue.LockedPop(lock);
if (msg_ptr == nullptr) continue;
std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > cb_info;
for (auto& callbacks_key_value : event_queue.callbacks_) {
uint64_t uid = callbacks_key_value.first;

View File

@ -50,7 +50,7 @@ class Channel {
* Read-end of a Connector (between two reactors).
*/
class EventStream {
public:
public:
/**
* Blocks until a message arrives.
*/
@ -62,7 +62,9 @@ class EventStream {
virtual std::unique_ptr<Message> PopEvent() = 0;
/**
* Subscription Service. Lightweight object (can copy by value).
* Subscription Service.
*
* Unsubscribe from a callback. Lightweight object (can copy by value).
*/
class Subscription {
public:
@ -80,13 +82,23 @@ class EventStream {
Connector& event_queue_;
uint64_t cb_uid_;
};
typedef std::function<void(const Message&, Subscription&)> Callback;
/**
* Register a callback that will be called whenever an event arrives.
*/
virtual void OnEvent(Callback callback) = 0;
/**
* Close this event stream, disallowing further events from getting received.
*
* Any subsequent call after Close() to any function will be result in undefined
* behavior (invalid pointer dereference). Can only be called from the thread
* associated with the Reactor.
*/
virtual void Close() = 0;
};
/**
@ -100,7 +112,7 @@ class EventStream {
* (the one that owns the read-end of a connector) removes/closes it.
*/
class Connector {
class Params;
struct Params;
public:
friend class Reactor; // to create a Params initialization object
@ -108,11 +120,11 @@ class Connector {
Connector(Params params)
: system_(params.system),
name_(params.name),
connector_name_(params.connector_name),
reactor_name_(params.reactor_name),
mutex_(params.mutex),
cvar_(params.cvar),
stream_(mutex_, name_, this) {}
stream_(mutex_, connector_name_, this) {}
/**
* LocalChannel represents the channels to reactors living in the same reactor system (write-end of the connectors).
@ -127,10 +139,10 @@ class Connector {
friend class Connector;
LocalChannel(std::shared_ptr<std::mutex> mutex, std::string reactor_name,
std::string name, std::weak_ptr<Connector> queue, System *system)
std::string connector_name, std::weak_ptr<Connector> queue, System *system)
: mutex_(mutex),
reactor_name_(reactor_name),
name_(name),
connector_name_(connector_name),
weak_queue_(queue),
system_(system) {}
@ -154,7 +166,7 @@ class Connector {
private:
std::shared_ptr<std::mutex> mutex_;
std::string reactor_name_;
std::string name_;
std::string connector_name_;
std::weak_ptr<Connector> weak_queue_;
System *system_;
};
@ -168,40 +180,28 @@ class Connector {
public:
friend class Connector;
LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string name,
Connector *queue) : mutex_(mutex), name_(name), queue_(queue) {}
LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string connector_name,
Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {}
std::unique_ptr<Message> AwaitEvent() {
std::unique_lock<std::mutex> lock(*mutex_);
if (queue_ != nullptr) {
return queue_->LockedAwaitPop(lock);
}
throw std::runtime_error(
"Cannot call method after connector was closed.");
return queue_->LockedAwaitPop(lock);
}
std::unique_ptr<Message> PopEvent() {
std::unique_lock<std::mutex> lock(*mutex_);
if (queue_ != nullptr) {
return queue_->LockedPop(lock);
}
throw std::runtime_error(
"Cannot call method after connector was closed.");
return queue_->LockedPop(lock);
}
void OnEvent(EventStream::Callback callback) {
std::unique_lock<std::mutex> lock(*mutex_);
if (queue_ != nullptr) {
queue_->LockedOnEvent(callback);
return;
}
throw std::runtime_error(
"Cannot call method after connector was closed.");
queue_->LockedOnEvent(callback);
}
void Close();
private:
std::shared_ptr<std::mutex> mutex_;
std::string name_;
std::string connector_name_;
Connector *queue_;
};
Connector(const Connector &other) = delete;
Connector(Connector &&other) = default;
Connector &operator=(const Connector &other) = delete;
@ -215,10 +215,7 @@ private:
struct Params {
System* system;
std::string reactor_name;
/**
* Connector name.
*/
std::string name;
std::string connector_name;
std::shared_ptr<std::mutex> mutex;
std::shared_ptr<std::condition_variable> cvar;
};
@ -232,7 +229,7 @@ private:
std::shared_ptr<LocalChannel> LockedOpenChannel() {
assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
return std::make_shared<LocalChannel>(mutex_, reactor_name_, name_, self_ptr_, system_);
return std::make_shared<LocalChannel>(mutex_, reactor_name_, connector_name_, self_ptr_, system_);
}
std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) {
@ -251,7 +248,7 @@ private:
uint64_t cb_uid = next_cb_uid++;
callbacks_[cb_uid] = callback;
}
std::unique_ptr<Message> LockedRawPop() {
if (queue_.empty()) return nullptr;
std::unique_ptr<Message> t = std::move(queue_.front());
@ -266,13 +263,13 @@ private:
}
System *system_;
std::string name_;
std::string connector_name_;
std::string reactor_name_;
std::queue<std::unique_ptr<Message>> queue_;
// Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive.
std::shared_ptr<std::mutex> mutex_;
std::shared_ptr<std::mutex> mutex_;
std::shared_ptr<std::condition_variable> cvar_;
/**
/**
* A weak_ptr to itself.
*
* There are initialization problems with this, check Params.
@ -285,7 +282,7 @@ private:
/**
* A single unit of concurrent execution in the system.
*
*
* E.g. one worker, one client. Owned by System. Has a thread associated with it.
*/
class Reactor {
@ -294,7 +291,7 @@ class Reactor {
Reactor(System *system, std::string name)
: system_(system), name_(name), main_(Open("main")) {}
virtual ~Reactor() {}
virtual void Run() = 0;
@ -321,7 +318,7 @@ class Reactor {
Reactor(Reactor &&other) = default;
Reactor &operator=(const Reactor &other) = delete;
Reactor &operator=(Reactor &&other) = default;
protected:
System *system_;
std::string name_;
@ -342,7 +339,7 @@ class Reactor {
* between Channels and EventStreams.
*/
std::unordered_map<std::string, std::shared_ptr<Connector>> connectors_;
int64_t channel_name_counter_{0};
int64_t connector_name_counter_{0};
std::pair<EventStream*, std::shared_ptr<Channel>> main_;
private: