diff --git a/experimental/distributed/main.cpp b/experimental/distributed/main.cpp index a4997e0f8..62582e319 100644 --- a/experimental/distributed/main.cpp +++ b/experimental/distributed/main.cpp @@ -6,7 +6,7 @@ #include #include -#include "communication.hpp" +#include "reactors_distributed.hpp" const int NUM_WORKERS = 1; diff --git a/experimental/distributed/src/protocol.cpp b/experimental/distributed/src/protocol.cpp index e1f965865..d59ca1fa9 100644 --- a/experimental/distributed/src/protocol.cpp +++ b/experimental/distributed/src/protocol.cpp @@ -1,7 +1,7 @@ #include #include "protocol.hpp" -#include "communication.hpp" +#include "reactors_distributed.hpp" #include "glog/logging.h" diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp new file mode 100644 index 000000000..8740655c3 --- /dev/null +++ b/experimental/distributed/src/reactors_distributed.cpp @@ -0,0 +1,32 @@ +#include "reactors_distributed.hpp" + +DEFINE_string(address, "127.0.0.1", "Network server bind address"); +DEFINE_int32(port, 10000, "Network server bind port"); + +Network::Network(System *system) : system_(system), protocol_data_(system_) {} + +/** + * SenderMessage implementation. + */ +SenderMessage::SenderMessage() {} + +SenderMessage::SenderMessage(std::string reactor, std::string channel) + : address_(FLAGS_address), + port_(FLAGS_port), + reactor_(reactor), + channel_(channel) {} + +std::string SenderMessage::Address() const { return address_; } +uint16_t SenderMessage::Port() const { return port_; } +std::string SenderMessage::ReactorName() const { return reactor_; } +std::string SenderMessage::ChannelName() const { return channel_; } + +std::shared_ptr SenderMessage::GetChannelToSender( + System *system, Distributed *distributed) const { + if (address_ == FLAGS_address && port_ == FLAGS_port) { + return system->FindChannel(reactor_, channel_); + } + if (distributed) + return distributed->network().Resolve(address_, port_, reactor_, channel_); + assert(false); +} diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp new file mode 100644 index 000000000..1bf79dcb8 --- /dev/null +++ b/experimental/distributed/src/reactors_distributed.hpp @@ -0,0 +1,340 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "protocol.hpp" +#include "reactors_local.hpp" + +#include "cereal/archives/binary.hpp" +#include "cereal/types/base_class.hpp" +#include "cereal/types/memory.hpp" +#include "cereal/types/polymorphic.hpp" +#include "cereal/types/string.hpp" +#include "cereal/types/utility.hpp" // utility has to be included because of std::pair +#include "cereal/types/vector.hpp" + +#include "communication/server.hpp" +#include "threading/sync/spinlock.hpp" + +DECLARE_string(address); +DECLARE_int32(port); + +/** + * Networking service. + */ +class Network { + private: + using Endpoint = Protocol::Endpoint; + using Socket = Protocol::Socket; + using NetworkServer = communication::Server; + + struct NetworkMessage { + NetworkMessage() + : address(""), port(0), reactor(""), channel(""), message(nullptr) {} + + NetworkMessage(const std::string& _address, uint16_t _port, + const std::string& _reactor, const std::string& _channel, + std::unique_ptr _message) + : address(_address), + port(_port), + reactor(_reactor), + channel(_channel), + message(std::move(_message)) {} + + NetworkMessage(NetworkMessage &&nm) + : address(std::move(nm.address)), + port(std::move(nm.port)), + reactor(std::move(nm.reactor)), + channel(std::move(nm.channel)), + message(std::move(nm.message)) {} + + std::string address; + uint16_t port; + std::string reactor; + std::string channel; + std::unique_ptr message; + }; + + public: + Network(System *system); + + // client functions + + std::shared_ptr Resolve(std::string address, uint16_t port, + std::string reactor_name, + std::string channel_name) { + if (Protocol::SendMessage(address, port, reactor_name, channel_name, + nullptr)) { + return std::make_shared(this, address, port, reactor_name, + channel_name); + } + return nullptr; + } + + std::shared_ptr AsyncResolve(const std::string& address, uint16_t port, + int32_t retries, + std::chrono::seconds cooldown) { + // TODO: Asynchronously resolve channel, and return an event stream + // that emits the channel after it gets resolved. + return nullptr; + } + + /** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */ + void StartClient(int worker_count) { + LOG(INFO) << "Starting " << worker_count << " client workers"; + for (int i = 0; i < worker_count; ++i) { + pool_.push_back(std::thread([worker_count, this]() { + while (this->client_run_) { + this->mutex_.lock(); + if (!this->queue_.empty()) { + NetworkMessage nm(std::move(this->queue_.front())); + this->queue_.pop(); + this->mutex_.unlock(); + // TODO: store success + bool success = + Protocol::SendMessage(nm.address, nm.port, nm.reactor, + nm.channel, std::move(nm.message)); + std::cout << "Network client message send status: " << success << std::endl; + } else { + this->mutex_.unlock(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + })); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } + + void StopClient() { + while (true) { + std::lock_guard lock(mutex_); + if (queue_.empty()) { + break; + } + } + client_run_ = false; + for (size_t i = 0; i < pool_.size(); ++i) { + pool_[i].join(); + } + } + + class RemoteChannel : public Channel { + public: + RemoteChannel(Network *network, std::string address, uint16_t port, + std::string reactor, std::string channel) + : network_(network), + address_(address), + port_(port), + reactor_(reactor), + channel_(channel) {} + + virtual std::string Address() { return address_; } + + virtual uint16_t Port() { return port_; } + + virtual std::string ReactorName() { return reactor_; } + + virtual std::string Name() { return channel_; } + + virtual void Send(std::unique_ptr message) { + std::lock_guard lock(network_->mutex_); + network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, + std::move(message))); + } + + private: + Network *network_; + std::string address_; + uint16_t port_; + std::string reactor_; + std::string channel_; + }; + + // server functions + + std::string Address() { return FLAGS_address; } + + uint16_t Port() { return FLAGS_port; } + + /** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */ + void StartServer(int workers_count) { + if (server_ != nullptr) { + LOG(FATAL) << "Tried to start a running server!"; + } + + // Initialize endpoint. + Endpoint endpoint; + try { + endpoint = Endpoint(FLAGS_address.c_str(), FLAGS_port); + } catch (io::network::NetworkEndpointException &e) { + LOG(FATAL) << e.what(); + } + + // Initialize socket. + Socket socket; + if (!socket.Bind(endpoint)) { + LOG(FATAL) << "Cannot bind to socket on " << FLAGS_address << " at " + << FLAGS_port; + } + if (!socket.SetNonBlocking()) { + LOG(FATAL) << "Cannot set socket to non blocking!"; + } + if (!socket.Listen(1024)) { + LOG(FATAL) << "Cannot listen on socket!"; + } + + // Initialize server + server_ = + std::make_unique(std::move(socket), protocol_data_); + + // Start server + thread_ = std::thread( + [workers_count, this]() { this->server_->Start(workers_count); }); + } + + void StopServer() { + if (server_ != nullptr) { + server_->Shutdown(); + thread_.join(); + } + } + + private: + System *system_; + + // client variables + SpinLock mutex_; + std::vector pool_; + std::queue queue_; + std::atomic client_run_{true}; + + // server variables + std::thread thread_; + Protocol::Data protocol_data_; + std::unique_ptr server_{nullptr}; +}; + +class Distributed; + +/** + * Message that includes the sender channel used to respond. + */ +class SenderMessage : public Message { + public: + SenderMessage(); + SenderMessage(std::string reactor, std::string channel); + + std::string Address() const; + uint16_t Port() const; + std::string ReactorName() const; + std::string ChannelName() const; + + std::shared_ptr GetChannelToSender(System *system, + Distributed *distributed = nullptr) const; + + template + void serialize(Archive &ar) { + ar(cereal::virtual_base_class(this), address_, port_, + reactor_, channel_); + } + + private: + std::string address_; + uint16_t port_; + std::string reactor_; + std::string channel_; +}; +CEREAL_REGISTER_TYPE(SenderMessage); + + +/** + * Message that will arrive on a stream returned by Distributed::FindChannel + * once and if the channel is successfully resolved. + */ +class ChannelResolvedMessage : public Message { + public: + ChannelResolvedMessage() {} + ChannelResolvedMessage(std::shared_ptr channel) + : Message(), channel_(channel) {} + + std::shared_ptr channel() const { return channel_; } + + private: + std::shared_ptr channel_; +}; + +/** + * Placeholder for all functionality related to non-local communication + * E.g. resolve remote channels by memgraph node id, etc. + */ +class Distributed { + public: + Distributed(System &system) : system_(system), network_(&system) {} + + Distributed(const Distributed &) = delete; + Distributed(Distributed &&) = delete; + Distributed &operator=(const Distributed &) = delete; + Distributed &operator=(Distributed &&) = delete; + + void StartServices() { + network_.StartClient(4); + network_.StartServer(4); + } + + void StopServices() { + network_.StopClient(); + network_.StopServer(); + } + + // TODO: Implement remote Spawn. + + /** + * Resolves remote channel. + * + * TODO: Provide asynchronous implementation of this function. + * + * @return EventStream on which message will arrive once channel is resolved. + * @warning It can only be called from local Reactor. + */ + EventStream* FindChannel(const std::string &address, + uint16_t port, + const std::string &reactor_name, + const std::string &channel_name) { + std::shared_ptr channel = nullptr; + while (!(channel = network_.Resolve(address, port, reactor_name, channel_name))) + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + auto stream_channel = current_reactor_->Open(); + stream_channel.second->Send(channel); + return stream_channel.first; + } + + System &system() { return system_; } + Network &network() { return network_; } + + protected: + System &system_; + Network network_; +}; + + +class DistributedReactor : public Reactor { + public: + DistributedReactor(System *system, std::string name, Distributed &distributed) + : Reactor(system, name), distributed_(distributed) {} + + protected: + Distributed &distributed_; +}; diff --git a/experimental/distributed/src/communication.cpp b/experimental/distributed/src/reactors_local.cpp similarity index 76% rename from experimental/distributed/src/communication.cpp rename to experimental/distributed/src/reactors_local.cpp index c4eedfdce..57feeaf28 100644 --- a/experimental/distributed/src/communication.cpp +++ b/experimental/distributed/src/reactors_local.cpp @@ -1,7 +1,4 @@ -#include "communication.hpp" - -DEFINE_string(address, "127.0.0.1", "Network server bind address"); -DEFINE_int32(port, 10000, "Network server bind port"); +#include "reactors_local.hpp" void EventStream::Subscription::unsubscribe() const { event_queue_.RemoveCb(*this); @@ -9,14 +6,6 @@ void EventStream::Subscription::unsubscribe() const { thread_local Reactor* current_reactor_ = nullptr; -std::string Connector::LocalChannel::Address() { - return FLAGS_address; -} - -uint16_t Connector::LocalChannel::Port() { - return FLAGS_port; -} - std::string Connector::LocalChannel::ReactorName() { return reactor_name_; } @@ -138,31 +127,3 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo { return MsgAndCbInfo(nullptr, {}); } - -Network::Network(System *system) : system_(system), protocol_data_(system_) {} - -/** - * SenderMessage implementation. - */ -SenderMessage::SenderMessage() {} - -SenderMessage::SenderMessage(std::string reactor, std::string channel) - : address_(FLAGS_address), - port_(FLAGS_port), - reactor_(reactor), - channel_(channel) {} - -std::string SenderMessage::Address() const { return address_; } -uint16_t SenderMessage::Port() const { return port_; } -std::string SenderMessage::ReactorName() const { return reactor_; } -std::string SenderMessage::ChannelName() const { return channel_; } - -std::shared_ptr SenderMessage::GetChannelToSender( - System *system, Distributed *distributed) const { - if (address_ == FLAGS_address && port_ == FLAGS_port) { - return system->FindChannel(reactor_, channel_); - } - if (distributed) - return distributed->network().Resolve(address_, port_, reactor_, channel_); - assert(false); -} diff --git a/experimental/distributed/src/communication.hpp b/experimental/distributed/src/reactors_local.hpp similarity index 62% rename from experimental/distributed/src/communication.hpp rename to experimental/distributed/src/reactors_local.hpp index 13add150b..4ab5dc1a0 100644 --- a/experimental/distributed/src/communication.hpp +++ b/experimental/distributed/src/reactors_local.hpp @@ -2,38 +2,15 @@ #include #include -#include -#include -#include #include #include #include -#include #include -#include -#include #include #include -#include - -#include "protocol.hpp" - -#include "cereal/archives/binary.hpp" -#include "cereal/types/base_class.hpp" #include "cereal/types/memory.hpp" -#include "cereal/types/polymorphic.hpp" -#include "cereal/types/string.hpp" -#include "cereal/types/utility.hpp" // utility has to be included because of std::pair -#include "cereal/types/vector.hpp" -#include "communication/server.hpp" -#include "threading/sync/spinlock.hpp" - -DECLARE_string(address); -DECLARE_int32(port); - -class Message; class EventStream; class Reactor; class System; @@ -41,6 +18,25 @@ class Connector; extern thread_local Reactor* current_reactor_; +/** + * Base class for messages. + */ +class Message { + public: + virtual ~Message() {} + + template + void serialize(Archive &) {} + + /** Run-time type identification that is used for callbacks. + * + * Warning: this works because of the virtual destructor, don't remove it from this class + */ + std::type_index GetTypeIndex() { + return typeid(*this); + } +}; + /** * Write-end of a Connector (between two reactors). */ @@ -56,10 +52,6 @@ class Channel { virtual void Send(std::unique_ptr ptr) = 0; - virtual std::string Address() = 0; - - virtual uint16_t Port() = 0; - virtual std::string ReactorName() = 0; virtual std::string Name() = 0; @@ -68,7 +60,7 @@ class Channel { template void serialize(Archive &archive) { - archive(Address(), Port(), ReactorName(), Name()); + archive(ReactorName(), Name()); } }; @@ -260,10 +252,6 @@ class Connector { } } - virtual std::string Address(); - - virtual uint16_t Port(); - virtual std::string ReactorName(); virtual std::string Name(); @@ -460,252 +448,6 @@ class Reactor { MsgAndCbInfo LockedGetPendingMessages(); }; -/** - * Networking service. - */ -class Network { - private: - using Endpoint = Protocol::Endpoint; - using Socket = Protocol::Socket; - using NetworkServer = communication::Server; - - struct NetworkMessage { - NetworkMessage() - : address(""), port(0), reactor(""), channel(""), message(nullptr) {} - - NetworkMessage(const std::string& _address, uint16_t _port, - const std::string& _reactor, const std::string& _channel, - std::unique_ptr _message) - : address(_address), - port(_port), - reactor(_reactor), - channel(_channel), - message(std::move(_message)) {} - - NetworkMessage(NetworkMessage &&nm) - : address(std::move(nm.address)), - port(std::move(nm.port)), - reactor(std::move(nm.reactor)), - channel(std::move(nm.channel)), - message(std::move(nm.message)) {} - - std::string address; - uint16_t port; - std::string reactor; - std::string channel; - std::unique_ptr message; - }; - - public: - Network(System *system); - - // client functions - - std::shared_ptr Resolve(std::string address, uint16_t port, - std::string reactor_name, - std::string channel_name) { - if (Protocol::SendMessage(address, port, reactor_name, channel_name, - nullptr)) { - return std::make_shared(this, address, port, reactor_name, - channel_name); - } - return nullptr; - } - - std::shared_ptr AsyncResolve(const std::string& address, uint16_t port, - int32_t retries, - std::chrono::seconds cooldown) { - // TODO: Asynchronously resolve channel, and return an event stream - // that emits the channel after it gets resolved. - return nullptr; - } - - /** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */ - void StartClient(int worker_count) { - LOG(INFO) << "Starting " << worker_count << " client workers"; - for (int i = 0; i < worker_count; ++i) { - pool_.push_back(std::thread([worker_count, this]() { - while (this->client_run_) { - this->mutex_.lock(); - if (!this->queue_.empty()) { - NetworkMessage nm(std::move(this->queue_.front())); - this->queue_.pop(); - this->mutex_.unlock(); - // TODO: store success - bool success = - Protocol::SendMessage(nm.address, nm.port, nm.reactor, - nm.channel, std::move(nm.message)); - std::cout << "Network client message send status: " << success << std::endl; - } else { - this->mutex_.unlock(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } - })); - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } - } - - void StopClient() { - while (true) { - std::lock_guard lock(mutex_); - if (queue_.empty()) { - break; - } - } - client_run_ = false; - for (int i = 0; i < pool_.size(); ++i) { - pool_[i].join(); - } - } - - class RemoteChannel : public Channel { - public: - RemoteChannel(Network *network, std::string address, uint16_t port, - std::string reactor, std::string channel) - : network_(network), - address_(address), - port_(port), - reactor_(reactor), - channel_(channel) {} - - virtual std::string Address() { return address_; } - - virtual uint16_t Port() { return port_; } - - virtual std::string ReactorName() { return reactor_; } - - virtual std::string Name() { return channel_; } - - virtual void Send(std::unique_ptr message) { - std::lock_guard lock(network_->mutex_); - network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_, - std::move(message))); - } - - private: - Network *network_; - std::string address_; - uint16_t port_; - std::string reactor_; - std::string channel_; - }; - - // server functions - - std::string Address() { return FLAGS_address; } - - uint16_t Port() { return FLAGS_port; } - - /** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */ - void StartServer(int workers_count) { - if (server_ != nullptr) { - LOG(FATAL) << "Tried to start a running server!"; - } - - // Initialize endpoint. - Endpoint endpoint; - try { - endpoint = Endpoint(FLAGS_address.c_str(), FLAGS_port); - } catch (io::network::NetworkEndpointException &e) { - LOG(FATAL) << e.what(); - } - - // Initialize socket. - Socket socket; - if (!socket.Bind(endpoint)) { - LOG(FATAL) << "Cannot bind to socket on " << FLAGS_address << " at " - << FLAGS_port; - } - if (!socket.SetNonBlocking()) { - LOG(FATAL) << "Cannot set socket to non blocking!"; - } - if (!socket.Listen(1024)) { - LOG(FATAL) << "Cannot listen on socket!"; - } - - // Initialize server - server_ = - std::make_unique(std::move(socket), protocol_data_); - - // Start server - thread_ = std::thread( - [workers_count, this]() { this->server_->Start(workers_count); }); - } - - void StopServer() { - if (server_ != nullptr) { - server_->Shutdown(); - thread_.join(); - } - } - - private: - System *system_; - - // client variables - SpinLock mutex_; - std::vector pool_; - std::queue queue_; - std::atomic client_run_{true}; - - // server variables - std::thread thread_; - Protocol::Data protocol_data_; - std::unique_ptr server_{nullptr}; -}; - -/** - * Base class for messages. - */ -class Message { - public: - virtual ~Message() {} - - template - void serialize(Archive &) {} - - /** Run-time type identification that is used for callbacks. - * - * Warning: this works because of the virtual destructor, don't remove it from this class - */ - std::type_index GetTypeIndex() { - return typeid(*this); - } -}; - -class Distributed; - -/** - * Message that includes the sender channel used to respond. - */ -class SenderMessage : public Message { - public: - SenderMessage(); - SenderMessage(std::string reactor, std::string channel); - - std::string Address() const; - uint16_t Port() const; - std::string ReactorName() const; - std::string ChannelName() const; - - std::shared_ptr GetChannelToSender(System *system, - Distributed *distributed = nullptr) const; - - template - void serialize(Archive &ar) { - ar(cereal::virtual_base_class(this), address_, port_, - reactor_, channel_); - } - - private: - std::string address_; - uint16_t port_; - std::string reactor_; - std::string channel_; -}; -CEREAL_REGISTER_TYPE(SenderMessage); /** * Global placeholder for all reactors in the system. Alive through the entire process lifetime. @@ -768,82 +510,3 @@ class System { std::pair, std::thread>> reactors_; }; - - -/** - * Message that will arrive on a stream returned by Distributed::FindChannel - * once and if the channel is successfully resolved. - */ -class ChannelResolvedMessage : public Message { - public: - ChannelResolvedMessage() {} - ChannelResolvedMessage(std::shared_ptr channel) - : Message(), channel_(channel) {} - - std::shared_ptr channel() const { return channel_; } - - private: - std::shared_ptr channel_; -}; - -/** - * Placeholder for all functionality related to non-local communication - * E.g. resolve remote channels by memgraph node id, etc. - */ -class Distributed { - public: - Distributed(System &system) : system_(system), network_(&system) {} - - Distributed(const Distributed &) = delete; - Distributed(Distributed &&) = delete; - Distributed &operator=(const Distributed &) = delete; - Distributed &operator=(Distributed &&) = delete; - - void StartServices() { - network_.StartClient(4); - network_.StartServer(4); - } - - void StopServices() { - network_.StopClient(); - network_.StopServer(); - } - - // TODO: Implement remote Spawn. - - /** - * Resolves remote channel. - * - * TODO: Provide asynchronous implementation of this function. - * - * @return EventStream on which message will arrive once channel is resolved. - * @warning It can only be called from local Reactor. - */ - EventStream* FindChannel(const std::string &address, - uint16_t port, - const std::string &reactor_name, - const std::string &channel_name) { - std::shared_ptr channel = nullptr; - while (!(channel = network_.Resolve(address, port, reactor_name, channel_name))) - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - auto stream_channel = current_reactor_->Open(); - stream_channel.second->Send(channel); - return stream_channel.first; - } - - System &system() { return system_; } - Network &network() { return network_; } - - protected: - System &system_; - Network network_; -}; - -class DistributedReactor : public Reactor { - public: - DistributedReactor(System *system, std::string name, Distributed &distributed) - : Reactor(system, name), distributed_(distributed) {} - - protected: - Distributed &distributed_; -}; \ No newline at end of file diff --git a/experimental/distributed/tests/distributed_test.cpp b/experimental/distributed/tests/distributed_test.cpp index d527fc5a1..1e129a31b 100644 --- a/experimental/distributed/tests/distributed_test.cpp +++ b/experimental/distributed/tests/distributed_test.cpp @@ -1,9 +1,9 @@ #include #include -#include "communication.hpp" +#include "reactors_distributed.hpp" -DEFINE_int64(my_mnid, 0, "Memgraph node id"); +DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future DEFINE_string(config_filename, "", "File containing list of all processes"); class MemgraphDistributed : public Distributed { @@ -172,4 +172,4 @@ int main(int argc, char *argv[]) { distributed.StopServices(); return 0; -} \ No newline at end of file +} diff --git a/experimental/distributed/tests/network_chat.cpp b/experimental/distributed/tests/network_chat.cpp index b1760c6d7..179eccb47 100644 --- a/experimental/distributed/tests/network_chat.cpp +++ b/experimental/distributed/tests/network_chat.cpp @@ -1,7 +1,7 @@ // command to run: // gnome-terminal --tab -e './network_chat --port 10000 --minloglevel 2' --tab -e './network_chat --port 10001 --minloglevel 2' -#include "communication.hpp" +#include "reactors_distributed.hpp" class ChatMessage : public SenderMessage { public: diff --git a/experimental/distributed/tests/network_client.cpp b/experimental/distributed/tests/network_client.cpp index 88cee7c0a..710347b02 100644 --- a/experimental/distributed/tests/network_client.cpp +++ b/experimental/distributed/tests/network_client.cpp @@ -1,4 +1,4 @@ -#include "communication.hpp" +#include "reactors_distributed.hpp" int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); diff --git a/experimental/distributed/tests/connector_unit.cpp b/experimental/distributed/tests/reactors_local_unit.cpp similarity index 99% rename from experimental/distributed/tests/connector_unit.cpp rename to experimental/distributed/tests/reactors_local_unit.cpp index 26d1455ee..f8aff1467 100644 --- a/experimental/distributed/tests/connector_unit.cpp +++ b/experimental/distributed/tests/reactors_local_unit.cpp @@ -9,7 +9,7 @@ #include #include -#include "communication.hpp" +#include "reactors_local.hpp" TEST(SystemTest, ReturnWithoutThrowing) { struct Master : public Reactor { @@ -478,7 +478,7 @@ TEST(MultipleSendTest, ProcessManyMessages) { EventStream* stream = main_.first; vals = 0; - stream->OnEvent([this](const Message& msg, const EventStream::Subscription&) { + stream->OnEvent([this](const Message&, const EventStream::Subscription&) { ++vals; main_.second->Send(); });