diff --git a/experimental/distributed/src/communication.cpp b/experimental/distributed/src/communication.cpp index 761a069c7..649a63f92 100644 --- a/experimental/distributed/src/communication.cpp +++ b/experimental/distributed/src/communication.cpp @@ -120,11 +120,11 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { for (auto& connectors_key_value : connectors_) { Connector& event_queue = *connectors_key_value.second; auto msg_ptr = event_queue.LockedPop(); - if (msg_ptr.second == nullptr) continue; - const std::type_index& tidx = msg_ptr.first; + if (msg_ptr == nullptr) continue; + std::type_index tidx = msg_ptr->GetTypeIndex(); std::vector > cb_info; - auto msg_type_cb_iter = event_queue.callbacks_.find(msg_ptr.first); + auto msg_type_cb_iter = event_queue.callbacks_.find(tidx); if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type. for (auto& tidx_cb_key_value : msg_type_cb_iter->second) { uint64_t uid = tidx_cb_key_value.first; @@ -133,7 +133,7 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { } } - return MsgAndCbInfo(std::move(msg_ptr.second), std::move(cb_info)); + return MsgAndCbInfo(std::move(msg_ptr), std::move(cb_info)); } return MsgAndCbInfo(nullptr, {}); diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/communication.hpp index f1244d5bc..ed9e1c4ee 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/communication.hpp @@ -51,13 +51,10 @@ class Channel { */ template void Send(Args&&... args) { - SendHelper(typeid(MsgType), std::unique_ptr(new MsgType(std::forward(args)...))); + Send(std::unique_ptr(new MsgType(std::forward(args)...))); } - template - void Send(std::unique_ptr&& msg_ptr) { - SendHelper(typeid(MsgType), std::move(msg_ptr)); - } + virtual void Send(std::unique_ptr ptr) = 0; virtual std::string Address() = 0; @@ -73,8 +70,6 @@ class Channel { void serialize(Archive &archive) { archive(Address(), Port(), ReactorName(), Name()); } - - virtual void SendHelper(const std::type_index&, std::unique_ptr) = 0; }; /** @@ -85,14 +80,12 @@ class EventStream { /** * Blocks until a message arrives. */ - virtual std::pair> AwaitTypedEvent() = 0; - std::unique_ptr AwaitEvent() { return AwaitTypedEvent().second; } + virtual std::unique_ptr AwaitEvent() = 0; /** * Polls if there is a message available, returning null if there is none. */ - virtual std::pair> PopTypedEvent() = 0; - std::unique_ptr PopEvent() { return PopTypedEvent().second; } + virtual std::unique_ptr PopEvent() = 0; /** * Subscription Service. @@ -261,12 +254,12 @@ class Connector { weak_queue_(queue), system_(system) {} - virtual void SendHelper(const std::type_index& tidx, std::unique_ptr m) { + virtual void Send(std::unique_ptr m) { std::shared_ptr queue_ = weak_queue_.lock(); // Atomic, per the standard. if (queue_) { // We guarantee here that the Connector is not destroyed. std::unique_lock lock(*mutex_); - queue_->LockedPush(tidx, std::move(m)); + queue_->LockedPush(std::move(m)); } } @@ -297,11 +290,11 @@ class Connector { LocalEventStream(std::shared_ptr mutex, std::string connector_name, Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {} - std::pair> AwaitTypedEvent() { + std::unique_ptr AwaitEvent() { std::unique_lock lock(*mutex_); return queue_->LockedAwaitPop(lock); } - std::pair> PopTypedEvent() { + std::unique_ptr PopEvent() { std::unique_lock lock(*mutex_); return queue_->LockedPop(); } @@ -336,8 +329,8 @@ private: }; - void LockedPush(const std::type_index& tidx, std::unique_ptr m) { - queue_.emplace(tidx, std::move(m)); + void LockedPush(std::unique_ptr m) { + queue_.emplace(std::move(m)); // This is OK because there is only one Reactor (thread) that can wait on this Connector. cvar_->notify_one(); } @@ -347,10 +340,10 @@ private: return std::make_shared(mutex_, reactor_name_, connector_name_, self_ptr_, system_); } - std::pair> LockedAwaitPop(std::unique_lock &lock) { + std::unique_ptr LockedAwaitPop(std::unique_lock &lock) { while (true) { - std::pair> m = LockedRawPop(); - if (!m.second) { + std::unique_ptr m = LockedRawPop(); + if (!m) { cvar_->wait(lock); } else { return m; @@ -358,7 +351,7 @@ private: } } - std::pair> LockedPop() { + std::unique_ptr LockedPop() { return LockedRawPop(); } @@ -367,9 +360,9 @@ private: callbacks_[tidx][cb_uid] = callback; } - std::pair> LockedRawPop() { - if (queue_.empty()) return std::pair>{typeid(nullptr), nullptr}; - std::pair > t = std::move(queue_.front()); + std::unique_ptr LockedRawPop() { + if (queue_.empty()) return nullptr; + std::unique_ptr t = std::move(queue_.front()); queue_.pop(); return t; } @@ -383,7 +376,7 @@ private: System *system_; std::string connector_name_; std::string reactor_name_; - std::queue>> queue_; + std::queue> queue_; // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive. std::shared_ptr mutex_; std::shared_ptr cvar_; @@ -485,10 +478,11 @@ class Network { struct NetworkMessage { NetworkMessage() - : address(""), port(0), reactor(""), channel(""), message(nullptr) {} + : address(""), port(0), reactor(""), channel(""), message(nullptr) {} - NetworkMessage(std::string _address, uint16_t _port, std::string _reactor, - std::string _channel, std::unique_ptr _message) + NetworkMessage(const std::string& _address, uint16_t _port, + const std::string& _reactor, const std::string& _channel, + std::unique_ptr _message) : address(_address), port(_port), reactor(_reactor), @@ -496,11 +490,11 @@ class Network { message(std::move(_message)) {} NetworkMessage(NetworkMessage &&nm) - : address(nm.address), - port(nm.port), - reactor(nm.reactor), - channel(nm.channel), - message(std::move(nm.message)) {} + : address(std::move(nm.address)), + port(std::move(nm.port)), + reactor(std::move(nm.reactor)), + channel(std::move(nm.channel)), + message(std::move(nm.message)) {} std::string address; uint16_t port; @@ -525,7 +519,7 @@ class Network { return nullptr; } - std::shared_ptr AsyncResolve(std::string address, uint16_t port, + std::shared_ptr AsyncResolve(const std::string& address, uint16_t port, int32_t retries, std::chrono::seconds cooldown) { // TODO: Asynchronously resolve channel, and return an event stream @@ -533,6 +527,7 @@ class Network { return nullptr; } + /** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */ void StartClient(int worker_count) { LOG(INFO) << "Starting " << worker_count << " client workers"; for (int i = 0; i < worker_count; ++i) { @@ -589,11 +584,10 @@ class Network { virtual std::string Name() { return channel_; } - virtual void SendHelper(const std::type_index& tidx, std::unique_ptr message) { - network_->mutex_.lock(); + virtual void Send(std::unique_ptr message) { + std::lock_guard lock(network_->mutex_); network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, std::move(message))); - network_->mutex_.unlock(); } private: @@ -610,6 +604,7 @@ class Network { uint16_t Port() { return FLAGS_port; } + /** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */ void StartServer(int workers_count) { if (server_ != nullptr) { LOG(FATAL) << "Tried to start a running server!"; @@ -676,6 +671,14 @@ class Message { template void serialize(Archive &) {} + + /** Run-time type identification that is used for callbacks. + * + * Warning: this works because of the virtual destructor, don't remove it from this class + */ + std::type_index GetTypeIndex() { + return typeid(*this); + } }; /**