Connected RemoteChannel to Callbacks by removing type_index from where it isn't needed

Reviewers: sasa.stanko, buda

Reviewed By: sasa.stanko

Subscribers: pullbot, mferencevic

Differential Revision: https://phabricator.memgraph.io/D660
This commit is contained in:
Goran Zuzic 2017-08-11 17:01:03 +02:00
parent 0703724295
commit 34121a676c
2 changed files with 44 additions and 41 deletions

View File

@ -120,11 +120,11 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
for (auto& connectors_key_value : connectors_) { for (auto& connectors_key_value : connectors_) {
Connector& event_queue = *connectors_key_value.second; Connector& event_queue = *connectors_key_value.second;
auto msg_ptr = event_queue.LockedPop(); auto msg_ptr = event_queue.LockedPop();
if (msg_ptr.second == nullptr) continue; if (msg_ptr == nullptr) continue;
const std::type_index& tidx = msg_ptr.first; std::type_index tidx = msg_ptr->GetTypeIndex();
std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > cb_info; std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > cb_info;
auto msg_type_cb_iter = event_queue.callbacks_.find(msg_ptr.first); auto msg_type_cb_iter = event_queue.callbacks_.find(tidx);
if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type. if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type.
for (auto& tidx_cb_key_value : msg_type_cb_iter->second) { for (auto& tidx_cb_key_value : msg_type_cb_iter->second) {
uint64_t uid = tidx_cb_key_value.first; uint64_t uid = tidx_cb_key_value.first;
@ -133,7 +133,7 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
} }
} }
return MsgAndCbInfo(std::move(msg_ptr.second), std::move(cb_info)); return MsgAndCbInfo(std::move(msg_ptr), std::move(cb_info));
} }
return MsgAndCbInfo(nullptr, {}); return MsgAndCbInfo(nullptr, {});

View File

@ -51,13 +51,10 @@ class Channel {
*/ */
template<typename MsgType, typename... Args> template<typename MsgType, typename... Args>
void Send(Args&&... args) { void Send(Args&&... args) {
SendHelper(typeid(MsgType), std::unique_ptr<Message>(new MsgType(std::forward<Args>(args)...))); Send(std::unique_ptr<Message>(new MsgType(std::forward<Args>(args)...)));
} }
template<typename MsgType> virtual void Send(std::unique_ptr<Message> ptr) = 0;
void Send(std::unique_ptr<MsgType>&& msg_ptr) {
SendHelper(typeid(MsgType), std::move(msg_ptr));
}
virtual std::string Address() = 0; virtual std::string Address() = 0;
@ -73,8 +70,6 @@ class Channel {
void serialize(Archive &archive) { void serialize(Archive &archive) {
archive(Address(), Port(), ReactorName(), Name()); archive(Address(), Port(), ReactorName(), Name());
} }
virtual void SendHelper(const std::type_index&, std::unique_ptr<Message>) = 0;
}; };
/** /**
@ -85,14 +80,12 @@ class EventStream {
/** /**
* Blocks until a message arrives. * Blocks until a message arrives.
*/ */
virtual std::pair<std::type_index, std::unique_ptr<Message>> AwaitTypedEvent() = 0; virtual std::unique_ptr<Message> AwaitEvent() = 0;
std::unique_ptr<Message> AwaitEvent() { return AwaitTypedEvent().second; }
/** /**
* Polls if there is a message available, returning null if there is none. * Polls if there is a message available, returning null if there is none.
*/ */
virtual std::pair<std::type_index, std::unique_ptr<Message>> PopTypedEvent() = 0; virtual std::unique_ptr<Message> PopEvent() = 0;
std::unique_ptr<Message> PopEvent() { return PopTypedEvent().second; }
/** /**
* Subscription Service. * Subscription Service.
@ -261,12 +254,12 @@ class Connector {
weak_queue_(queue), weak_queue_(queue),
system_(system) {} system_(system) {}
virtual void SendHelper(const std::type_index& tidx, std::unique_ptr<Message> m) { virtual void Send(std::unique_ptr<Message> m) {
std::shared_ptr<Connector> queue_ = weak_queue_.lock(); // Atomic, per the standard. std::shared_ptr<Connector> queue_ = weak_queue_.lock(); // Atomic, per the standard.
if (queue_) { if (queue_) {
// We guarantee here that the Connector is not destroyed. // We guarantee here that the Connector is not destroyed.
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedPush(tidx, std::move(m)); queue_->LockedPush(std::move(m));
} }
} }
@ -297,11 +290,11 @@ class Connector {
LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string connector_name, LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string connector_name,
Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {} Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {}
std::pair<std::type_index, std::unique_ptr<Message>> AwaitTypedEvent() { std::unique_ptr<Message> AwaitEvent() {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedAwaitPop(lock); return queue_->LockedAwaitPop(lock);
} }
std::pair<std::type_index, std::unique_ptr<Message>> PopTypedEvent() { std::unique_ptr<Message> PopEvent() {
std::unique_lock<std::mutex> lock(*mutex_); std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedPop(); return queue_->LockedPop();
} }
@ -336,8 +329,8 @@ private:
}; };
void LockedPush(const std::type_index& tidx, std::unique_ptr<Message> m) { void LockedPush(std::unique_ptr<Message> m) {
queue_.emplace(tidx, std::move(m)); queue_.emplace(std::move(m));
// This is OK because there is only one Reactor (thread) that can wait on this Connector. // This is OK because there is only one Reactor (thread) that can wait on this Connector.
cvar_->notify_one(); cvar_->notify_one();
} }
@ -347,10 +340,10 @@ private:
return std::make_shared<LocalChannel>(mutex_, reactor_name_, connector_name_, self_ptr_, system_); return std::make_shared<LocalChannel>(mutex_, reactor_name_, connector_name_, self_ptr_, system_);
} }
std::pair<std::type_index, std::unique_ptr<Message>> LockedAwaitPop(std::unique_lock<std::mutex> &lock) { std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) {
while (true) { while (true) {
std::pair<std::type_index, std::unique_ptr<Message>> m = LockedRawPop(); std::unique_ptr<Message> m = LockedRawPop();
if (!m.second) { if (!m) {
cvar_->wait(lock); cvar_->wait(lock);
} else { } else {
return m; return m;
@ -358,7 +351,7 @@ private:
} }
} }
std::pair<std::type_index, std::unique_ptr<Message>> LockedPop() { std::unique_ptr<Message> LockedPop() {
return LockedRawPop(); return LockedRawPop();
} }
@ -367,9 +360,9 @@ private:
callbacks_[tidx][cb_uid] = callback; callbacks_[tidx][cb_uid] = callback;
} }
std::pair<std::type_index, std::unique_ptr<Message>> LockedRawPop() { std::unique_ptr<Message> LockedRawPop() {
if (queue_.empty()) return std::pair<std::type_index, std::unique_ptr<Message>>{typeid(nullptr), nullptr}; if (queue_.empty()) return nullptr;
std::pair<std::type_index, std::unique_ptr<Message> > t = std::move(queue_.front()); std::unique_ptr<Message> t = std::move(queue_.front());
queue_.pop(); queue_.pop();
return t; return t;
} }
@ -383,7 +376,7 @@ private:
System *system_; System *system_;
std::string connector_name_; std::string connector_name_;
std::string reactor_name_; std::string reactor_name_;
std::queue<std::pair<std::type_index, std::unique_ptr<Message>>> queue_; std::queue<std::unique_ptr<Message>> queue_;
// Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive.
std::shared_ptr<std::mutex> mutex_; std::shared_ptr<std::mutex> mutex_;
std::shared_ptr<std::condition_variable> cvar_; std::shared_ptr<std::condition_variable> cvar_;
@ -485,10 +478,11 @@ class Network {
struct NetworkMessage { struct NetworkMessage {
NetworkMessage() NetworkMessage()
: address(""), port(0), reactor(""), channel(""), message(nullptr) {} : address(""), port(0), reactor(""), channel(""), message(nullptr) {}
NetworkMessage(std::string _address, uint16_t _port, std::string _reactor, NetworkMessage(const std::string& _address, uint16_t _port,
std::string _channel, std::unique_ptr<Message> _message) const std::string& _reactor, const std::string& _channel,
std::unique_ptr<Message> _message)
: address(_address), : address(_address),
port(_port), port(_port),
reactor(_reactor), reactor(_reactor),
@ -496,11 +490,11 @@ class Network {
message(std::move(_message)) {} message(std::move(_message)) {}
NetworkMessage(NetworkMessage &&nm) NetworkMessage(NetworkMessage &&nm)
: address(nm.address), : address(std::move(nm.address)),
port(nm.port), port(std::move(nm.port)),
reactor(nm.reactor), reactor(std::move(nm.reactor)),
channel(nm.channel), channel(std::move(nm.channel)),
message(std::move(nm.message)) {} message(std::move(nm.message)) {}
std::string address; std::string address;
uint16_t port; uint16_t port;
@ -525,7 +519,7 @@ class Network {
return nullptr; return nullptr;
} }
std::shared_ptr<EventStream> AsyncResolve(std::string address, uint16_t port, std::shared_ptr<EventStream> AsyncResolve(const std::string& address, uint16_t port,
int32_t retries, int32_t retries,
std::chrono::seconds cooldown) { std::chrono::seconds cooldown) {
// TODO: Asynchronously resolve channel, and return an event stream // TODO: Asynchronously resolve channel, and return an event stream
@ -533,6 +527,7 @@ class Network {
return nullptr; return nullptr;
} }
/** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */
void StartClient(int worker_count) { void StartClient(int worker_count) {
LOG(INFO) << "Starting " << worker_count << " client workers"; LOG(INFO) << "Starting " << worker_count << " client workers";
for (int i = 0; i < worker_count; ++i) { for (int i = 0; i < worker_count; ++i) {
@ -589,11 +584,10 @@ class Network {
virtual std::string Name() { return channel_; } virtual std::string Name() { return channel_; }
virtual void SendHelper(const std::type_index& tidx, std::unique_ptr<Message> message) { virtual void Send(std::unique_ptr<Message> message) {
network_->mutex_.lock(); std::lock_guard<SpinLock> lock(network_->mutex_);
network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
std::move(message))); std::move(message)));
network_->mutex_.unlock();
} }
private: private:
@ -610,6 +604,7 @@ class Network {
uint16_t Port() { return FLAGS_port; } uint16_t Port() { return FLAGS_port; }
/** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */
void StartServer(int workers_count) { void StartServer(int workers_count) {
if (server_ != nullptr) { if (server_ != nullptr) {
LOG(FATAL) << "Tried to start a running server!"; LOG(FATAL) << "Tried to start a running server!";
@ -676,6 +671,14 @@ class Message {
template <class Archive> template <class Archive>
void serialize(Archive &) {} void serialize(Archive &) {}
/** Run-time type identification that is used for callbacks.
*
* Warning: this works because of the virtual destructor, don't remove it from this class
*/
std::type_index GetTypeIndex() {
return typeid(*this);
}
}; };
/** /**