From fe3d7529042ce51111e2960e1824fa9febf65cab Mon Sep 17 00:00:00 2001
From: Mislav Bradac <mislav.bradac@memgraph.io>
Date: Wed, 25 Oct 2017 14:47:46 +0200
Subject: [PATCH] Revise reactors code

Reviewers: buda, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D923
---
 CMakeLists.txt                                |   5 +-
 experimental/distributed/main-client.cpp      |  65 +-
 .../distributed/src/memgraph_distributed.hpp  |  25 +-
 .../distributed/src/reactors_distributed.cpp  |  35 --
 .../distributed/src/reactors_distributed.hpp  | 350 -----------
 .../distributed/src/reactors_local.cpp        | 134 -----
 .../distributed/src/reactors_local.hpp        | 540 -----------------
 .../distributed/tests/distributed_test.cpp    | 117 ++--
 .../tests/reactors_distributed_unit.cpp       | 227 -------
 .../distributed/tests/reactors_local_unit.cpp | 483 ---------------
 libs/CMakeLists.txt                           |   3 +
 libs/setup.sh                                 |  31 +-
 .../communication/reactor}/protocol.cpp       |  18 +-
 .../communication/reactor}/protocol.hpp       |  10 +-
 src/communication/reactor/reactor_local.cpp   | 141 +++++
 src/communication/reactor/reactor_local.hpp   | 553 ++++++++++++++++++
 src/io/network/socket.cpp                     |   6 +
 src/io/network/socket.hpp                     |   2 +
 tests/unit/reactor_local.cpp                  | 385 ++++++++++++
 19 files changed, 1228 insertions(+), 1902 deletions(-)
 delete mode 100644 experimental/distributed/src/reactors_distributed.cpp
 delete mode 100644 experimental/distributed/src/reactors_distributed.hpp
 delete mode 100644 experimental/distributed/src/reactors_local.cpp
 delete mode 100644 experimental/distributed/src/reactors_local.hpp
 delete mode 100644 experimental/distributed/tests/reactors_distributed_unit.cpp
 delete mode 100644 experimental/distributed/tests/reactors_local_unit.cpp
 rename {experimental/distributed/src => src/communication/reactor}/protocol.cpp (92%)
 rename {experimental/distributed/src => src/communication/reactor}/protocol.hpp (95%)
 create mode 100644 src/communication/reactor/reactor_local.cpp
 create mode 100644 src/communication/reactor/reactor_local.hpp
 create mode 100644 tests/unit/reactor_local.cpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 14b31f933..cb93b97ef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -186,10 +186,11 @@ target_link_libraries(antlr_opencypher_parser_lib antlr4)
 set(memgraph_src_files
     ${src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
     ${src_dir}/communication/bolt/v1/session.cpp
+    ${src_dir}/communication/reactor/reactor_local.cpp
     ${src_dir}/data_structures/concurrent/skiplist_gc.cpp
+    ${src_dir}/database/dbms.cpp
     ${src_dir}/database/graph_db.cpp
     ${src_dir}/database/graph_db_accessor.cpp
-    ${src_dir}/database/dbms.cpp
     ${src_dir}/durability/recovery.cpp
     ${src_dir}/durability/snapshooter.cpp
     ${src_dir}/io/network/addrinfo.cpp
@@ -219,7 +220,7 @@ set(memgraph_src_files
 # -----------------------------------------------------------------------------
 
 # memgraph_lib depend on these libraries
-set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
+set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools cereal
     antlr_opencypher_parser_lib dl glog gflags)
 
 if (USE_LTALLOC)
diff --git a/experimental/distributed/main-client.cpp b/experimental/distributed/main-client.cpp
index 77c87e016..3c0c15cd1 100644
--- a/experimental/distributed/main-client.cpp
+++ b/experimental/distributed/main-client.cpp
@@ -2,72 +2,60 @@
 #include <iostream>
 #include <memory>
 
-#include "reactors_distributed.hpp"
 #include "memgraph_config.hpp"
 #include "memgraph_distributed.hpp"
 #include "memgraph_transactions.hpp"
+#include "reactors_distributed.hpp"
 
 /**
  * List of queries that should be executed.
  */
-std::vector<std::string> queries = {{
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "create vertex",
-  "vertex count",
-  "create vertex",
-  "create vertex",
-  "vertex count"
-}};
+std::vector<std::string> queries = {
+    {"create vertex", "create vertex", "create vertex", "create vertex",
+     "create vertex", "create vertex", "create vertex", "create vertex",
+     "create vertex", "create vertex", "vertex count", "create vertex",
+     "create vertex", "vertex count"}};
 
 /**
  * This is the client that issues some hard-coded queries.
  */
 class Client : public Reactor {
  public:
-  Client(std::string name) : Reactor(name) {
-  }
+  Client(std::string name) : Reactor(name) {}
 
   void IssueQueries(std::shared_ptr<ChannelWriter> channel_to_leader) {
     // (concurrently) create a couple of vertices
-    for (int query_idx = 0; query_idx < queries.size(); ++query_idx) {
+    for (int query_idx = 0; query_idx < static_cast<int64_t>(queries.size());
+         ++query_idx) {
       // register callback
       std::string channel_name = "query-" + std::to_string(query_idx);
       auto stream = Open(channel_name).first;
-      stream
-        ->OnEventOnce()
-        .ChainOnce<ResultMsg>([this, query_idx](const ResultMsg &msg,
-                                               const Subscription &sub){
-          std::cout << "Result of query " << query_idx << " ("
-                    << queries[query_idx] << "):" << std::endl
-                    << "  " << msg.result() << std::endl;
-          sub.CloseChannel();
-        });
+      stream->OnEventOnce().ChainOnce<ResultMsg>(
+          [this, query_idx](const ResultMsg &msg, const Subscription &sub) {
+            std::cout << "Result of query " << query_idx << " ("
+                      << queries[query_idx] << "):" << std::endl
+                      << "  " << msg.result() << std::endl;
+            sub.CloseChannel();
+          });
 
       // then issue the query (to avoid race conditions)
-      std::cout << "Issuing command " << query_idx << " ("
-                << queries[query_idx] << ")" << std::endl;
+      std::cout << "Issuing command " << query_idx << " (" << queries[query_idx]
+                << ")" << std::endl;
       channel_to_leader->Send<QueryMsg>(channel_name, queries[query_idx]);
     }
   }
 
   virtual void Run() {
-    MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance();
+    MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
     auto mnid = memgraph.LeaderMnid();
 
     memgraph.FindChannel(mnid, "master", "client-queries")
-      ->OnEventOnce()
-      .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage &msg, const Subscription& sub) {
-          sub.CloseChannel();
-          IssueQueries(msg.channelWriter());
-        });
+        ->OnEventOnce()
+        .ChainOnce<ChannelResolvedMessage>(
+            [this](const ChannelResolvedMessage &msg, const Subscription &sub) {
+              sub.CloseChannel();
+              IssueQueries(msg.channelWriter());
+            });
   }
 };
 
@@ -77,7 +65,7 @@ int main(int argc, char *argv[]) {
 
   System &system = System::GetInstance();
   Distributed &distributed = Distributed::GetInstance();
-  MemgraphDistributed& memgraph = MemgraphDistributed::GetInstance();
+  MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
   memgraph.RegisterConfig(ParseConfig());
   distributed.StartServices();
 
@@ -85,6 +73,5 @@ int main(int argc, char *argv[]) {
 
   system.AwaitShutdown();
   distributed.StopServices();
-
   return 0;
 }
diff --git a/experimental/distributed/src/memgraph_distributed.hpp b/experimental/distributed/src/memgraph_distributed.hpp
index aee089d4d..ca57ce8d2 100644
--- a/experimental/distributed/src/memgraph_distributed.hpp
+++ b/experimental/distributed/src/memgraph_distributed.hpp
@@ -4,9 +4,9 @@
 
 #include "reactors_distributed.hpp"
 
-#include <unordered_map>
 #include <mutex>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -17,20 +17,18 @@ class MemgraphDistributed {
  public:
   /**
    * Get the (singleton) instance of MemgraphDistributed.
-   *
-   * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
    */
   static MemgraphDistributed &GetInstance() {
-    static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use
+    static MemgraphDistributed memgraph;
     return memgraph;
   }
 
-  EventStream* FindChannel(MnidT mnid,
-                           const std::string &reactor,
+  EventStream *FindChannel(MnidT mnid, const std::string &reactor,
                            const std::string &channel) {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
+    std::unique_lock<std::mutex> lock(mutex_);
     const auto &location = mnodes_.at(mnid);
-    return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel);
+    return Distributed::GetInstance().FindChannel(
+        location.first, location.second, reactor, channel);
   }
 
   void RegisterConfig(const Config &config) {
@@ -51,23 +49,22 @@ class MemgraphDistributed {
   /**
    * The leader is currently the first node in the config.
    */
-  MnidT LeaderMnid() {
-    return config_.nodes.front().mnid;
-  }
+  MnidT LeaderMnid() const { return config_.nodes.front().mnid; }
 
  protected:
   MemgraphDistributed() {}
 
   /** Register memgraph node id to the given location. */
-  void RegisterMemgraphNode(MnidT mnid, const std::string &address, uint16_t port) {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
+  void RegisterMemgraphNode(MnidT mnid, const std::string &address,
+                            uint16_t port) {
+    std::unique_lock<std::mutex> lock(mutex_);
     mnodes_[mnid] = Location(address, port);
   }
 
  private:
   Config config_;
 
-  std::recursive_mutex mutex_;
+  std::mutex mutex_;
   std::unordered_map<MnidT, Location> mnodes_;
 
   MemgraphDistributed(const MemgraphDistributed &) = delete;
diff --git a/experimental/distributed/src/reactors_distributed.cpp b/experimental/distributed/src/reactors_distributed.cpp
deleted file mode 100644
index 0d59f5e81..000000000
--- a/experimental/distributed/src/reactors_distributed.cpp
+++ /dev/null
@@ -1,35 +0,0 @@
-#include "reactors_distributed.hpp"
-
-DEFINE_string(address, "127.0.0.1", "Network server bind address");
-DEFINE_int32(port, 10000, "Network server bind port");
-
-Network::Network() {}
-
-/**
- * ReturnAddressMsg implementation.
- */
-ReturnAddressMsg::ReturnAddressMsg() {}
-
-ReturnAddressMsg::ReturnAddressMsg(std::string channel)
-    : ReturnAddressMsg(current_reactor_->name(), channel) {}
-
-ReturnAddressMsg::ReturnAddressMsg(std::string reactor, std::string channel)
-    : address_(FLAGS_address),
-      port_(FLAGS_port),
-      reactor_(reactor),
-      channel_(channel) {}
-
-std::string ReturnAddressMsg::Address() const { return address_; }
-uint16_t ReturnAddressMsg::Port() const { return port_; }
-std::string ReturnAddressMsg::ReactorName() const { return reactor_; }
-std::string ReturnAddressMsg::ChannelName() const { return channel_; }
-
-std::shared_ptr<ChannelWriter> ReturnAddressMsg::GetReturnChannelWriter() const {
-  if (address_ == FLAGS_address && port_ == FLAGS_port) {
-    return System::GetInstance().FindChannel(reactor_, channel_);
-  } else {
-    // TODO(zuza): we should probably assert here if services have been already started.
-    return Distributed::GetInstance().network().Resolve(address_, port_, reactor_, channel_);
-  }
-  assert(false);
-}
diff --git a/experimental/distributed/src/reactors_distributed.hpp b/experimental/distributed/src/reactors_distributed.hpp
deleted file mode 100644
index 5e4ed839d..000000000
--- a/experimental/distributed/src/reactors_distributed.hpp
+++ /dev/null
@@ -1,350 +0,0 @@
-#pragma once
-
-#include <cassert>
-#include <exception>
-#include <functional>
-#include <iostream>
-#include <memory>
-#include <mutex>
-#include <queue>
-#include <stdexcept>
-#include <tuple>
-#include <typeindex>
-#include <utility>
-
-#include <gflags/gflags.h>
-
-#include "protocol.hpp"
-#include "reactors_local.hpp"
-
-#include "cereal/archives/binary.hpp"
-#include "cereal/types/base_class.hpp"
-#include "cereal/types/memory.hpp"
-#include "cereal/types/polymorphic.hpp"
-#include "cereal/types/string.hpp"
-#include "cereal/types/utility.hpp"  // utility has to be included because of std::pair
-#include "cereal/types/vector.hpp"
-
-#include "communication/server.hpp"
-#include "threading/sync/spinlock.hpp"
-
-DECLARE_string(address);
-DECLARE_int32(port);
-
-/**
- * Networking service.
- */
-class Network {
- private:
-  using Endpoint = Protocol::Endpoint;
-  using Socket = Protocol::Socket;
-  using NetworkServer = communication::Server<Protocol::Session,
-                                              Protocol::Socket, Protocol::Data>;
-
-  struct NetworkMessage {
-    NetworkMessage()
-      : address(""), port(0), reactor(""), channel(""), message(nullptr) {}
-
-    NetworkMessage(const std::string& _address, uint16_t _port,
-                   const std::string& _reactor, const std::string& _channel,
-                   std::unique_ptr<Message> _message)
-        : address(_address),
-          port(_port),
-          reactor(_reactor),
-          channel(_channel),
-          message(std::move(_message)) {}
-
-    NetworkMessage(NetworkMessage &&nm)
-      : address(std::move(nm.address)),
-        port(std::move(nm.port)),
-        reactor(std::move(nm.reactor)),
-        channel(std::move(nm.channel)),
-        message(std::move(nm.message)) {}
-
-    std::string address;
-    uint16_t port;
-    std::string reactor;
-    std::string channel;
-    std::unique_ptr<Message> message;
-  };
-
- public:
-  Network();
-
-  // client functions
-
-  std::shared_ptr<ChannelWriter> Resolve(std::string address, uint16_t port,
-                                   std::string reactor_name,
-                                   std::string channel_name) {
-    if (Protocol::SendMessage(address, port, reactor_name, channel_name,
-                              nullptr)) {
-      return std::make_shared<RemoteChannelWriter>(this, address, port, reactor_name,
-                                             channel_name);
-    }
-    LOG(WARNING) << "Could not resolve " << address << ":" << port << " " << reactor_name << "/" << channel_name;
-    return nullptr;
-  }
-
-  std::shared_ptr<EventStream> AsyncResolve(const std::string& address, uint16_t port,
-                                            int32_t retries,
-                                            std::chrono::seconds cooldown) {
-    // TODO: Asynchronously resolve channel, and return an event stream
-    // that emits the channel after it gets resolved.
-    return nullptr;
-  }
-
-  /** Start a threadpool that dispatches the messages from the (outgoing) queue to the sockets */
-  void StartClient(int worker_count) {
-    LOG(INFO) << "Starting " << worker_count << " client workers";
-    client_run_ = true;
-
-    for (int i = 0; i < worker_count; ++i) {
-      pool_.push_back(std::thread([worker_count, this]() {
-        while (this->client_run_) {
-          this->mutex_.lock();
-          if (!this->queue_.empty()) {
-            NetworkMessage nm(std::move(this->queue_.front()));
-            this->queue_.pop();
-            this->mutex_.unlock();
-            // TODO: store success
-            bool success =
-                Protocol::SendMessage(nm.address, nm.port, nm.reactor,
-                                      nm.channel, std::move(nm.message));
-            DLOG(INFO) << "Network client message send status: " << success << std::endl;
-          } else {
-            this->mutex_.unlock();
-          }
-          std::this_thread::sleep_for(std::chrono::milliseconds(50));
-        }
-      }));
-      std::this_thread::sleep_for(std::chrono::milliseconds(5));
-    }
-  }
-
-  void StopClient() {
-    while (true) {
-      std::lock_guard<SpinLock> lock(mutex_);
-      if (queue_.empty()) {
-        break;
-      }
-    }
-    client_run_ = false;
-    for (size_t i = 0; i < pool_.size(); ++i) {
-      pool_[i].join();
-    }
-    pool_.clear();
-  }
-
-  class RemoteChannelWriter : public ChannelWriter {
-   public:
-    RemoteChannelWriter(Network *network, std::string address, uint16_t port,
-                  std::string reactor, std::string channel)
-        : network_(network),
-          address_(address),
-          port_(port),
-          reactor_(reactor),
-          channel_(channel) {}
-
-    virtual std::string Address() { return address_; }
-
-    virtual uint16_t Port() { return port_; }
-
-    virtual std::string ReactorName() { return reactor_; }
-
-    virtual std::string Name() { return channel_; }
-
-    virtual void Send(std::unique_ptr<Message> message) {
-      std::lock_guard<SpinLock> lock(network_->mutex_);
-      network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
-                                           std::move(message)));
-    }
-
-   private:
-    Network *network_;
-    std::string address_;
-    uint16_t port_;
-    std::string reactor_;
-    std::string channel_;
-  };
-
-  // server functions
-
-  std::string Address() { return FLAGS_address; }
-
-  uint16_t Port() { return FLAGS_port; }
-
-  /** Start a threadpool that relays the messages from the sockets to the LocalEventStreams */
-  void StartServer(int workers_count) {
-    if (server_ != nullptr) {
-      LOG(FATAL) << "Tried to start a running server!";
-    }
-
-    // Initialize endpoint.
-    Endpoint endpoint;
-    try {
-      endpoint = Endpoint(FLAGS_address.c_str(), FLAGS_port);
-    } catch (io::network::NetworkEndpointException &e) {
-      LOG(FATAL) << e.what();
-    }
-
-    // Initialize socket.
-    Socket socket;
-    if (!socket.Bind(endpoint)) {
-      LOG(FATAL) << "Cannot bind to socket on " << FLAGS_address << " at "
-                 << FLAGS_port;
-    }
-    if (!socket.SetNonBlocking()) {
-      LOG(FATAL) << "Cannot set socket to non blocking!";
-    }
-    if (!socket.Listen(1024)) {
-      LOG(FATAL) << "Cannot listen on socket!";
-    }
-
-    // Initialize server
-    server_ =
-        std::make_unique<NetworkServer>(std::move(socket), protocol_data_);
-
-    // Start server
-    thread_ = std::thread(
-        [workers_count, this]() { this->server_->Start(workers_count); });
-  }
-
-  void StopServer() {
-    if (server_ != nullptr) {
-      server_->Shutdown();
-      thread_.join();
-      server_ = nullptr;
-    }
-  }
-
- private:
-  // client variables
-  SpinLock mutex_;
-  std::vector<std::thread> pool_;
-  std::queue<NetworkMessage> queue_;
-  std::atomic<bool> client_run_;
-
-  // server variables
-  std::thread thread_;
-  Protocol::Data protocol_data_;
-  std::unique_ptr<NetworkServer> server_{nullptr};
-};
-
-/**
- * Message that includes the sender channel used to respond.
- */
-class ReturnAddressMsg : public Message {
- public:
-  /* The return address is on the current reactor, specified channel */
-  ReturnAddressMsg(std::string channel);
-
-  /* The return address is on a specified reactor/channel */
-  ReturnAddressMsg(std::string reactor, std::string channel);
-
-  std::string Address() const;
-  uint16_t Port() const;
-  std::string ReactorName() const;
-  std::string ChannelName() const;
-
-  std::shared_ptr<ChannelWriter> GetReturnChannelWriter() const;
-
-  template<class Archive>
-  void serialize(Archive &ar) {
-    ar(cereal::virtual_base_class<Message>(this), address_, port_,
-       reactor_, channel_);
-  }
-
- protected:
-  friend class cereal::access;
-  ReturnAddressMsg(); // Cereal needs access to a default constructor.
-
- private:
-  std::string address_;
-  uint16_t port_;
-  std::string reactor_;
-  std::string channel_;
-};
-CEREAL_REGISTER_TYPE(ReturnAddressMsg);
-
-
-/**
- * Message that will arrive on a stream returned by Distributed::FindChannel
- * once and if the channel is successfully resolved.
- */
-class ChannelResolvedMessage : public Message {
- public:
-  ChannelResolvedMessage() {}
-  ChannelResolvedMessage(std::shared_ptr<ChannelWriter> channel_writer)
-    : Message(), channel_writer_(channel_writer) {}
-
-  std::shared_ptr<ChannelWriter> channelWriter() const { return channel_writer_; }
-
- private:
-  std::shared_ptr<ChannelWriter> channel_writer_;
-};
-
-/**
- * Placeholder for all functionality related to non-local communication.
- *
- * E.g. resolve remote channels by memgraph node id, etc.
- * Alive through the entire process lifetime.
- * Singleton class. Created automatically on first use.
- * Final (can't extend) because it's a singleton. Please be careful if you're changing this.
- */
-class Distributed final {
- public:
-  /**
-   * Get the (singleton) instance of Distributed.
-   *
-   * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
-   */
-  static Distributed &GetInstance() {
-    static Distributed distributed; // guaranteed to be destroyed, initialized on first use
-    return distributed;
-  }
-
-  void StartServices() {
-    network_.StartClient(4);
-    network_.StartServer(4);
-  }
-
-  void StopServices() {
-    network_.StopClient();
-    network_.StopServer();
-  }
-
-  // TODO: Implement remote Spawn.
-
-  /**
-   * Resolves remote channel.
-   *
-   * TODO: Provide asynchronous implementation of this function.
-   *
-   * @return EventStream on which message will arrive once channel is resolved.
-   * @warning It can only be called from local Reactor.
-   */
-  EventStream* FindChannel(const std::string &address,
-                           uint16_t port,
-                           const std::string &reactor_name,
-                           const std::string &channel_name) {
-    std::shared_ptr<ChannelWriter> channel_writer = nullptr;
-    while (!(channel_writer = network_.Resolve(address, port, reactor_name, channel_name)))
-      std::this_thread::sleep_for(std::chrono::milliseconds(200));
-    auto stream_channel = current_reactor_->Open();
-    stream_channel.second->Send<ChannelResolvedMessage>(channel_writer);
-    return stream_channel.first;
-  }
-
-  Network &network() { return network_; }
-
- protected:
-  Distributed() {}
-
-  Network network_;
-
- private:
-  Distributed(const Distributed &) = delete;
-  Distributed(Distributed &&) = delete;
-  Distributed &operator=(const Distributed &) = delete;
-  Distributed &operator=(Distributed &&) = delete;
-};
diff --git a/experimental/distributed/src/reactors_local.cpp b/experimental/distributed/src/reactors_local.cpp
deleted file mode 100644
index f300d7afb..000000000
--- a/experimental/distributed/src/reactors_local.cpp
+++ /dev/null
@@ -1,134 +0,0 @@
-#include "reactors_local.hpp"
-
-void EventStream::Subscription::Unsubscribe() const {
-  event_queue_.RemoveCb(*this);
-}
-
-void EventStream::Subscription::CloseChannel() const {
-  event_queue_.Close();
-}
-
-const std::string& EventStream::Subscription::ChannelName() const {
-  return event_queue_.channel_name_;
-}
-
-thread_local Reactor* current_reactor_ = nullptr;
-
-std::string Channel::LocalChannelWriter::ReactorName() {
-  return reactor_name_;
-}
-
-std::string Channel::LocalChannelWriter::Name() {
-  return channel_name_;
-}
-
-void Channel::Close() {
-  // TODO(zuza): there will be major problems if a reactor tries to close a stream that isn't theirs
-  //   luckily this should never happen if the framework is used as expected.
-  current_reactor_->CloseChannel(channel_name_);
-}
-
-std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Reactor::Open(const std::string &channel_name) {
-  std::unique_lock<std::mutex> lock(*mutex_);
-  // TODO: Improve the check that the channel name does not exist in the
-  // system.
-  if (channels_.count(channel_name) != 0) {
-    throw std::runtime_error("Channel with name " + channel_name
-        + "already exists");
-  }
-  auto it = channels_.emplace(channel_name,
-    std::make_shared<Channel>(Channel::Params{name_, channel_name, mutex_, cvar_})).first;
-  it->second->self_ptr_ = it->second;
-  return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
-}
-
-std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Reactor::Open() {
-  std::unique_lock<std::mutex> lock(*mutex_);
-  do {
-    std::string channel_name = "stream-" + std::to_string(channel_name_counter_++);
-    if (channels_.count(channel_name) == 0) {
-      // Channel &queue = channels_[channel_name];
-      auto it = channels_.emplace(channel_name,
-        std::make_shared<Channel>(Channel::Params{name_, channel_name, mutex_, cvar_})).first;
-      it->second->self_ptr_ = it->second;
-      return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
-    }
-  } while (true);
-}
-
-const std::shared_ptr<ChannelWriter> Reactor::FindChannel(
-    const std::string &channel_name) {
-  std::unique_lock<std::mutex> lock(*mutex_);
-  auto it_channel = channels_.find(channel_name);
-  if (it_channel == channels_.end()) return nullptr;
-  return it_channel->second->LockedOpenChannel();
-}
-
-void Reactor::CloseChannel(const std::string &s) {
-  std::unique_lock<std::mutex> lock(*mutex_);
-  auto it = channels_.find(s);
-  assert(it != channels_.end());
-  channels_.erase(it);
-  cvar_->notify_all();
-}
-
-void Reactor::RunEventLoop() {
-  bool exit_event_loop = false;
-
-  while (true) {
-    // Find (or wait) for the next Message.
-    MsgAndCbInfo msg_and_cb;
-    {
-      std::unique_lock<std::mutex> lock(*mutex_);
-
-      while (true) {
-        // Not fair because was taken earlier, talk to lion.
-        msg_and_cb = LockedGetPendingMessages();
-        if (msg_and_cb.first != nullptr) break;
-
-        // Exit the loop if there are no more Channels.
-        if (channels_.empty()) {
-          exit_event_loop = true;
-          break;
-        }
-
-        cvar_->wait(lock);
-      }
-
-      if (exit_event_loop) break;
-    }
-
-    for (auto &cbAndSub : msg_and_cb.second) {
-      auto &cb = cbAndSub.first;
-      const Message &msg = *msg_and_cb.first;
-      cb(msg, cbAndSub.second);
-    }
-  }
-}
-
-/**
- * Checks if there is any nonempty EventStream.
- */
-auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
-  // return type after because the scope Reactor:: is not searched before the name
-  for (auto &channels_key_value : channels_) {
-    Channel &event_queue = *channels_key_value.second;
-    auto msg_ptr = event_queue.LockedPop();
-    if (msg_ptr == nullptr) continue;
-    std::type_index tidx = msg_ptr->GetTypeIndex();
-
-    std::vector<std::pair<EventStream::Callback, Subscription> > cb_info;
-    auto msg_type_cb_iter = event_queue.callbacks_.find(tidx);
-    if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type.
-      for (auto &tidx_cb_key_value : msg_type_cb_iter->second) {
-        uint64_t uid = tidx_cb_key_value.first;
-        EventStream::Callback cb = tidx_cb_key_value.second;
-        cb_info.emplace_back(cb, Subscription(event_queue, tidx, uid));
-      }
-    }
-
-    return MsgAndCbInfo(std::move(msg_ptr), std::move(cb_info));
-  }
-
-  return MsgAndCbInfo(nullptr, {});
-}
diff --git a/experimental/distributed/src/reactors_local.hpp b/experimental/distributed/src/reactors_local.hpp
deleted file mode 100644
index 828e02230..000000000
--- a/experimental/distributed/src/reactors_local.hpp
+++ /dev/null
@@ -1,540 +0,0 @@
-#pragma once
-
-#include <cassert>
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <queue>
-#include <thread>
-#include <unordered_map>
-#include <utility>
-
-#include "cereal/types/memory.hpp"
-
-class EventStream;
-class Reactor;
-class System;
-class Channel;
-
-extern thread_local Reactor* current_reactor_;
-
-/**
- * Base class for messages.
- */
-class Message {
- public:
-  virtual ~Message() {}
-
-  template <class Archive>
-  void serialize(Archive &) {}
-
-  /** Run-time type identification that is used for callbacks.
-   *
-   * Warning: this works because of the virtual destructor, don't remove it from this class
-   */
-  std::type_index GetTypeIndex() {
-    return typeid(*this);
-  }
-};
-
-/**
- * Write-end of a Channel (between two reactors).
- */
-class ChannelWriter {
- public:
-  /**
-   * Construct and send the message to the channel.
-   */
-  template<typename MsgType, typename... Args>
-  void Send(Args&&... args) {
-    Send(std::unique_ptr<Message>(new MsgType(std::forward<Args>(args)...)));
-  }
-
-  virtual void Send(std::unique_ptr<Message> ptr) = 0;
-
-  virtual std::string ReactorName() = 0;
-
-  virtual std::string Name() = 0;
-
-  void operator=(const ChannelWriter &) = delete;
-
-  template <class Archive>
-  void serialize(Archive &archive) {
-    archive(ReactorName(), Name());
-  }
-};
-
-/**
- * Read-end of a Channel (between two reactors).
- */
-class EventStream {
- public:
-  class OnEventOnceChainer;
-  class Subscription;
-
-  /**
-   * Register a callback that will be called whenever an event arrives.
-   */
-  template<typename MsgType>
-  void OnEvent(std::function<void(const MsgType&, const Subscription&)> &&cb) {
-    OnEventHelper(typeid(MsgType),
-                  [cb = std::move(cb)](const Message &general_msg,
-                                       const Subscription &subscription) {
-        const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
-        cb(correct_msg, subscription);
-      });
-  }
-
-  /**
-   * Register a callback that will be called only once.
-   * Once event is received, channel of this EventStream is closed.
-   */
-  template<typename MsgType>
-  void OnEventOnceThenClose(std::function<void(const MsgType&)> &&cb) {
-    OnEventHelper(typeid(MsgType),
-                  [cb = std::move(cb)](const Message &general_msg,
-                                       const Subscription &subscription) {
-        const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
-        subscription.CloseChannel();
-        cb(correct_msg);
-      });
-  }
-
-  /**
-   * Starts a chain to register a callback that fires off only once.
-   *
-   * This method supports chaining (see the the class OnEventOnceChainer or the tests for examples).
-   * Warning: when chaining callbacks, make sure that EventStream does not deallocate before the last
-   * chained callback fired.
-   */
-  OnEventOnceChainer OnEventOnce() {
-    return OnEventOnceChainer(*this);
-  }
-
-  /**
-   * Get the name of the channel.
-   */
-  virtual const std::string &ChannelName() = 0;
-
-  /**
-   * Subscription Service.
-   *
-   * Unsubscribe from a callback. Lightweight object (can copy by value).
-   */
-  class Subscription {
-   public:
-    /**
-     * Unsubscribe. Call only once.
-     */
-    void Unsubscribe() const;
-
-    /**
-     * Close the stream. Convenience method.
-     */
-    void CloseChannel() const;
-
-    /**
-     * Get the name of the channel the message is delivered to.
-     */
-    const std::string& ChannelName() const;
-
-   private:
-    friend class Reactor;
-    friend class Channel;
-
-    Subscription(Channel &event_queue, std::type_index tidx, uint64_t cb_uid)
-      : event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { }
-
-    Channel &event_queue_;
-    std::type_index tidx_;
-    uint64_t cb_uid_;
-  };
-
-  /**
-   * Close this event stream, disallowing further events from getting received.
-   *
-   * Any subsequent call after Close() to any function will be result in undefined
-   * behavior (invalid pointer dereference). Can only be called from the thread
-   * associated with the Reactor.
-   */
-  virtual void Close() = 0;
-
-  /**
-   * Convenience class to chain one-off callbacks.
-   *
-   * Usage: Create this class with OnEventOnce() and then chain callbacks using ChainOnce.
-   * A callback will fire only once, unsubscribe and immediately subscribe the next callback to the stream.
-   *
-   * Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb);
-   *
-   * Implementation: This class is a temporary object that remembers the callbacks that are to be installed
-   * and finally installs them in the destructor. Not sure is this kosher, is there another way?
-   */
-  class OnEventOnceChainer {
-   public:
-    OnEventOnceChainer(EventStream &event_stream) : event_stream_(event_stream) {}
-    ~OnEventOnceChainer() {
-      InstallCallbacks();
-    }
-
-    template<typename MsgType>
-    OnEventOnceChainer &ChainOnce(std::function<void(const MsgType&, const Subscription&)> &&cb) {
-      std::function<void(const Message&, const Subscription&)> wrap =
-        [cb = std::move(cb)](const Message &general_msg, const Subscription &subscription) {
-          const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
-          subscription.Unsubscribe();
-          cb(correct_msg, subscription); // Warning: this can close the Channel, be careful what you put after it!
-      };
-      cbs_.emplace_back(typeid(MsgType), std::move(wrap));
-      return *this;
-    }
-
-  private:
-    void InstallCallbacks() {
-      int num_callbacks = cbs_.size();
-      assert(num_callbacks > 0); // We should install at least one callback, otherwise the usage is wrong?
-      std::function<void(const Message&, const Subscription&)> next_cb = nullptr;
-      std::type_index next_type = typeid(nullptr);
-
-      for (int i = num_callbacks - 1; i >= 0; --i) {
-        std::function<void(const Message&, const Subscription&)> tmp_cb = nullptr;
-        tmp_cb = [cb = std::move(cbs_[i].second),
-                  next_type,
-                  next_cb = std::move(next_cb),
-                  es_ptr = &this->event_stream_](const Message &msg, const Subscription &subscription) {
-          cb(msg, subscription);
-          if (next_cb != nullptr) {
-            es_ptr->OnEventHelper(next_type, std::move(next_cb));
-          }
-        };
-        next_cb = std::move(tmp_cb);
-        next_type = cbs_[i].first;
-      }
-
-      event_stream_.OnEventHelper(next_type, std::move(next_cb));
-    }
-
-    EventStream &event_stream_;
-    std::vector<std::pair<std::type_index, std::function<void(const Message&, const Subscription&)>>> cbs_;
-  };
-  typedef std::function<void(const Message&, const Subscription&)> Callback;
-
-private:
-  virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0;
-};
-
-using Subscription = EventStream::Subscription; // To write less.
-
-/**
- * Implementation of a channel.
- *
- * This class is an internal data structure that represents the state of the channel.
- * This class is not meant to be used by the clients of the messaging framework.
- * The Channel class wraps the event queue data structure, the mutex that protects
- * concurrent access to the event queue, the local channel and the event stream.
- * The class is owned by the Reactor. It gets closed when the owner reactor
- * (the one that owns the read-end of a channel) removes/closes it.
- */
-class Channel {
- struct Params;
-
- public:
-  friend class Reactor; // to create a Params initialization object
-  friend class EventStream::Subscription;
-
-  Channel(Params params)
-      : channel_name_(params.channel_name),
-        reactor_name_(params.reactor_name),
-        mutex_(params.mutex),
-        cvar_(params.cvar),
-        stream_(mutex_, this) {}
-
-  /**
-   * LocalChannelWriter represents the channels to reactors living in the same reactor system (write-end of the channels).
-   *
-   * Sending messages to the local channel requires acquiring the mutex.
-   * LocalChannelWriter holds a (weak) pointer to the enclosing Channel object.
-   * Messages sent to a closed channel are ignored.
-   * There can be multiple LocalChannelWriters refering to the same stream if needed.
-   */
-  class LocalChannelWriter : public ChannelWriter {
-   public:
-    friend class Channel;
-
-    LocalChannelWriter(std::shared_ptr<std::mutex> mutex, std::string reactor_name,
-                 std::string channel_name, std::weak_ptr<Channel> queue)
-        : mutex_(mutex),
-          reactor_name_(reactor_name),
-          channel_name_(channel_name),
-          weak_queue_(queue) {}
-
-    virtual void Send(std::unique_ptr<Message> m) {
-      std::shared_ptr<Channel> queue_ = weak_queue_.lock(); // Atomic, per the standard.
-      if (queue_) {
-        // We guarantee here that the Channel is not destroyed.
-        std::unique_lock<std::mutex> lock(*mutex_);
-        queue_->LockedPush(std::move(m));
-      }
-    }
-
-    virtual std::string ReactorName();
-
-    virtual std::string Name();
-
-   private:
-    std::shared_ptr<std::mutex> mutex_;
-    std::string reactor_name_;
-    std::string channel_name_;
-    std::weak_ptr<Channel> weak_queue_;
-  };
-
-  /**
-   * Implementation of the event stream.
-   *
-   * After the enclosing Channel object is destroyed (by a call to CloseChannel or Close).
-   */
-  class LocalEventStream : public EventStream {
-   public:
-    friend class Channel;
-
-    LocalEventStream(std::shared_ptr<std::mutex> mutex, Channel *queue) : mutex_(mutex), queue_(queue) {}
-
-    void OnEventHelper(std::type_index tidx, Callback callback) {
-      std::unique_lock<std::mutex> lock(*mutex_);
-      queue_->LockedOnEventHelper(tidx, callback);
-    }
-
-    const std::string &ChannelName() {
-      return queue_->channel_name_;
-    }
-
-    void Close() {
-      queue_->Close();
-    }
-
-   private:
-    std::shared_ptr<std::mutex> mutex_;
-    std::string channel_name_;
-    Channel *queue_;
-  };
-
-  /**
-   * Close the channel. Must be called from the reactor that owns the channel.
-   */
-  void Close();
-
-  Channel(const Channel &other) = delete;
-  Channel(Channel &&other) = default;
-  Channel &operator=(const Channel &other) = delete;
-  Channel &operator=(Channel &&other) = default;
-
-private:
-  /**
-   * Initialization parameters to Channel.
-   * Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor.
-   */
-  struct Params {
-    std::string reactor_name;
-    std::string channel_name;
-    std::shared_ptr<std::mutex> mutex;
-    std::shared_ptr<std::condition_variable> cvar;
-  };
-
-
-  void LockedPush(std::unique_ptr<Message> m) {
-    queue_.emplace(std::move(m));
-    // This is OK because there is only one Reactor (thread) that can wait on this Channel.
-    cvar_->notify_one();
-  }
-
-  std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
-    assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
-    return std::make_shared<LocalChannelWriter>(mutex_, reactor_name_, channel_name_, self_ptr_);
-  }
-
-  std::unique_ptr<Message> LockedPop() {
-    return LockedRawPop();
-  }
-
-  void LockedOnEventHelper(std::type_index tidx, EventStream::Callback callback) {
-    uint64_t cb_uid = next_cb_uid++;
-    callbacks_[tidx][cb_uid] = callback;
-  }
-
-  std::unique_ptr<Message> LockedRawPop() {
-    if (queue_.empty()) return nullptr;
-    std::unique_ptr<Message> t = std::move(queue_.front());
-    queue_.pop();
-    return t;
-  }
-
-  void RemoveCb(const EventStream::Subscription &subscription) {
-    std::unique_lock<std::mutex> lock(*mutex_);
-    size_t num_erased = callbacks_[subscription.tidx_].erase(subscription.cb_uid_);
-    assert(num_erased == 1);
-  }
-
-  std::string channel_name_;
-  std::string reactor_name_;
-  std::queue<std::unique_ptr<Message>> queue_;
-  // Should only be locked once since it's used by a cond. var. Also caught in dctor, so must be recursive.
-  std::shared_ptr<std::mutex> mutex_;
-  std::shared_ptr<std::condition_variable> cvar_;
-  /**
-   * A weak_ptr to itself.
-   *
-   * There are initialization problems with this, check Params.
-   */
-  std::weak_ptr<Channel> self_ptr_;
-  LocalEventStream stream_;
-  std::unordered_map<std::type_index, std::unordered_map<uint64_t, EventStream::Callback> > callbacks_;
-  uint64_t next_cb_uid = 0;
-};
-
-/**
- * A single unit of concurrent execution in the system.
- *
- * E.g. one worker, one client. Owned by System. Has a thread associated with it.
- */
-class Reactor {
- public:
-  friend class System;
-
-  Reactor(std::string name)
-      : name_(name), main_(Open("main")) {}
-
-  virtual ~Reactor() {}
-
-  virtual void Run() = 0;
-
-  std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Open(const std::string &s);
-  std::pair<EventStream*, std::shared_ptr<ChannelWriter>> Open();
-  const std::shared_ptr<ChannelWriter> FindChannel(const std::string &channel_name);
-
-  /**
-   * Close a channel by name.
-   *
-   * Should only be called from the Reactor thread.
-   */
-  void CloseChannel(const std::string &s);
-
-  /**
-   * Get Reactor name
-   */
-  const std::string &name() { return name_; }
-
-  Reactor(const Reactor &other) = delete;
-  Reactor(Reactor &&other) = default;
-  Reactor &operator=(const Reactor &other) = delete;
-  Reactor &operator=(Reactor &&other) = default;
-
- protected:
-  std::string name_;
-  /*
-   * Locks all Reactor data, including all Channel's in channels_.
-   *
-   * This should be a shared_ptr because LocalChannelWriter can outlive Reactor.
-   */
-  std::shared_ptr<std::mutex> mutex_ =
-      std::make_shared<std::mutex>();
-  std::shared_ptr<std::condition_variable> cvar_ =
-      std::make_shared<std::condition_variable>();
-
-  /**
-   * List of channels of a reactor indexed by name.
-   *
-   * While the channels are owned by the reactor, a shared_ptr to solve the circular reference problem
-   * between ChannelWriters and EventStreams.
-   */
-  std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
-  int64_t channel_name_counter_{0};
-  std::pair<EventStream*, std::shared_ptr<ChannelWriter>> main_;
-
- private:
-  typedef std::pair<std::unique_ptr<Message>,
-                    std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > > MsgAndCbInfo;
-
-  /**
-   * Dispatches all waiting messages to callbacks. Shuts down when there are no callbacks left.
-   */
-  void RunEventLoop();
-
-  // TODO: remove proof of locking evidence ?!
-  MsgAndCbInfo LockedGetPendingMessages();
-};
-
-
-/**
- * Global placeholder for all reactors in the system.
- *
- * E.g. holds set of reactors, channels for all reactors.
- * Alive through the entire process lifetime.
- * Singleton class. Created automatically on first use.
- * Final (can't extend) because it's a singleton. Please be careful if you're changing this.
- */
-class System final {
- public:
-  friend class Reactor;
-
-  /**
-   * Get the (singleton) instance of System.
-   *
-   * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
-   */
-  static System &GetInstance() {
-    static System system; // guaranteed to be destroyed, initialized on first use
-    return system;
-  }
-
-  template <class ReactorType, class... Args>
-  const std::shared_ptr<ChannelWriter> Spawn(const std::string &name,
-                                       Args &&... args) {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    auto *raw_reactor =
-        new ReactorType(name, std::forward<Args>(args)...);
-    std::unique_ptr<Reactor> reactor(raw_reactor);
-    // Capturing a pointer isn't ideal, I would prefer to capture a Reactor&, but not sure how to do it.
-    std::thread reactor_thread(
-        [this, raw_reactor]() { this->StartReactor(*raw_reactor); });
-    assert(reactors_.count(name) == 0);
-    reactors_.emplace(name, std::pair<std::unique_ptr<Reactor>, std::thread>
-                      (std::move(reactor), std::move(reactor_thread)));
-    return nullptr;
-  }
-
-  const std::shared_ptr<ChannelWriter> FindChannel(const std::string &reactor_name,
-                                             const std::string &channel_name) {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
-    auto it_reactor = reactors_.find(reactor_name);
-    if (it_reactor == reactors_.end()) return nullptr;
-    return it_reactor->second.first->FindChannel(channel_name);
-  }
-
-  void AwaitShutdown() {
-    for (auto &key_value : reactors_) {
-      auto &thread = key_value.second.second;
-      thread.join();
-    }
-    reactors_.clear(); // for testing, since System is a singleton now
-  }
-
- private:
-  System() {}
-  System(const System &) = delete;
-  System(System &&) = delete;
-  System &operator=(const System &) = delete;
-  System &operator=(System &&) = delete;
-
-  void StartReactor(Reactor &reactor) {
-    current_reactor_ = &reactor;
-    reactor.Run();
-    reactor.RunEventLoop();  // Activate callbacks.
-  }
-
-  std::recursive_mutex mutex_;
-  // TODO: Replace with a map to a reactor Channel map to have more granular
-  // locking.
-  std::unordered_map<std::string,
-                     std::pair<std::unique_ptr<Reactor>, std::thread>>
-      reactors_;
-};
diff --git a/experimental/distributed/tests/distributed_test.cpp b/experimental/distributed/tests/distributed_test.cpp
index a2b684d74..bb20439c7 100644
--- a/experimental/distributed/tests/distributed_test.cpp
+++ b/experimental/distributed/tests/distributed_test.cpp
@@ -1,12 +1,14 @@
-#include <iostream>
 #include <fstream>
+#include <iostream>
 
 #include <glog/logging.h>
 
 #include "memgraph_config.hpp"
 #include "reactors_distributed.hpp"
 
-DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future
+DEFINE_int64(my_mnid, 0, "Memgraph node id");  // TODO(zuza): this should be
+                                               // assigned by the leader once in
+                                               // the future
 
 class MemgraphDistributed {
  private:
@@ -16,32 +18,35 @@ class MemgraphDistributed {
   /**
    * Get the (singleton) instance of MemgraphDistributed.
    *
-   * More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
+   * More info:
+   * https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
    */
   static MemgraphDistributed &GetInstance() {
-    static MemgraphDistributed memgraph; // guaranteed to be destroyed, initialized on first use
+    static MemgraphDistributed
+        memgraph;  // guaranteed to be destroyed, initialized on first use
     return memgraph;
   }
 
   /** Register memgraph node id to the given location. */
-  void RegisterMemgraphNode(int64_t mnid, const std::string &address, uint16_t port) {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
+  void RegisterMemgraphNode(int64_t mnid, const std::string &address,
+                            uint16_t port) {
+    std::unique_lock<std::mutex> lock(mutex_);
     mnodes_[mnid] = Location(address, port);
   }
 
-  EventStream* FindChannel(int64_t mnid,
-                           const std::string &reactor,
+  EventStream *FindChannel(int64_t mnid, const std::string &reactor,
                            const std::string &channel) {
-    std::unique_lock<std::recursive_mutex> lock(mutex_);
+    std::unique_lock<std::mutex> lock(mutex_);
     const auto &location = mnodes_.at(mnid);
-    return Distributed::GetInstance().FindChannel(location.first, location.second, reactor, channel);
+    return Distributed::GetInstance().FindChannel(
+        location.first, location.second, reactor, channel);
   }
 
  protected:
   MemgraphDistributed() {}
 
  private:
-  std::recursive_mutex mutex_;
+  std::mutex mutex_;
   std::unordered_map<int64_t, Location> mnodes_;
 
   MemgraphDistributed(const MemgraphDistributed &) = delete;
@@ -64,8 +69,8 @@ class MemgraphDistributed {
  *
  * @return Pair (master mnid, list of worker's id).
  */
-std::pair<int64_t, std::vector<int64_t>>
-  ParseConfigAndRegister(const std::string &filename) {
+std::pair<int64_t, std::vector<int64_t>> ParseConfigAndRegister(
+    const std::string &filename) {
   std::ifstream file(filename, std::ifstream::in);
   assert(file.good());
   int64_t master_mnid;
@@ -78,8 +83,7 @@ std::pair<int64_t, std::vector<int64_t>>
   memgraph.RegisterMemgraphNode(master_mnid, address, port);
   while (file.good()) {
     file >> mnid >> address >> port;
-    if (file.eof())
-      break ;
+    if (file.eof()) break;
     memgraph.RegisterMemgraphNode(mnid, address, port);
     worker_mnids.push_back(mnid);
   }
@@ -91,9 +95,9 @@ std::pair<int64_t, std::vector<int64_t>>
  * Sends a text message and has a return address.
  */
 class TextMessage : public ReturnAddressMsg {
-public:
+ public:
   TextMessage(std::string reactor, std::string channel, std::string s)
-    : ReturnAddressMsg(reactor, channel), text(s) {}
+      : ReturnAddressMsg(reactor, channel), text(s) {}
 
   template <class Archive>
   void serialize(Archive &archive) {
@@ -102,51 +106,52 @@ public:
 
   std::string text;
 
-protected:
+ protected:
   friend class cereal::access;
-  TextMessage() {} // Cereal needs access to a default constructor.
+  TextMessage() {}  // Cereal needs access to a default constructor.
 };
 CEREAL_REGISTER_TYPE(TextMessage);
 
-
 class Master : public Reactor {
  public:
   Master(std::string name, int64_t mnid, std::vector<int64_t> &&worker_mnids)
-    : Reactor(name), mnid_(mnid),
-      worker_mnids_(std::move(worker_mnids)) {}
+      : Reactor(name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {}
 
   virtual void Run() {
     MemgraphDistributed &memgraph = MemgraphDistributed::GetInstance();
     Distributed &distributed = Distributed::GetInstance();
 
-    std::cout << "Master (" << mnid_ << ") @ " << distributed.network().Address()
-              << ":" << distributed.network().Port() << std::endl;
+    std::cout << "Master (" << mnid_ << ") @ "
+              << distributed.network().Address() << ":"
+              << distributed.network().Port() << std::endl;
 
     auto stream = main_.first;
 
     // wait until every worker sends a ReturnAddressMsg back, then close
-    stream->OnEvent<TextMessage>([this](const TextMessage &msg,
-                                          const Subscription &subscription) {
-      std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
-      ++workers_seen;
-      if (workers_seen == worker_mnids_.size()) {
-        subscription.Unsubscribe();
-        // Sleep for a while so we can read output in the terminal.
-        // (start_distributed.py runs each process in a new tab which is
-        //  closed immediately after process has finished)
-        std::this_thread::sleep_for(std::chrono::seconds(4));
-        CloseChannel("main");
-      }
-    });
+    stream->OnEvent<TextMessage>(
+        [this](const TextMessage &msg, const Subscription &subscription) {
+          std::cout << "Message from " << msg.Address() << ":" << msg.Port()
+                    << " .. " << msg.text << "\n";
+          ++workers_seen;
+          if (workers_seen == static_cast<int64_t>(worker_mnids_.size())) {
+            subscription.Unsubscribe();
+            // Sleep for a while so we can read output in the terminal.
+            // (start_distributed.py runs each process in a new tab which is
+            //  closed immediately after process has finished)
+            std::this_thread::sleep_for(std::chrono::seconds(4));
+            CloseChannel("main");
+          }
+        });
 
     // send a TextMessage to each worker
     for (auto wmnid : worker_mnids_) {
       auto stream = memgraph.FindChannel(wmnid, "worker", "main");
-      stream->OnEventOnce()
-        .ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg, const Subscription&){
-          msg.channelWriter()->Send<TextMessage>("master", "main", "hi from master");
-          stream->Close();
-        });
+      stream->OnEventOnce().ChainOnce<ChannelResolvedMessage>([this, stream](
+          const ChannelResolvedMessage &msg, const Subscription &) {
+        msg.channelWriter()->Send<TextMessage>("master", "main",
+                                               "hi from master");
+        stream->Close();
+      });
     }
   }
 
@@ -159,28 +164,29 @@ class Master : public Reactor {
 class Worker : public Reactor {
  public:
   Worker(std::string name, int64_t mnid, int64_t master_mnid)
-      : Reactor(name), mnid_(mnid),
-        master_mnid_(master_mnid) {}
+      : Reactor(name), mnid_(mnid), master_mnid_(master_mnid) {}
 
   virtual void Run() {
     Distributed &distributed = Distributed::GetInstance();
 
-    std::cout << "Worker (" << mnid_ << ") @ " << distributed.network().Address()
-              << ":" << distributed.network().Port() << std::endl;
+    std::cout << "Worker (" << mnid_ << ") @ "
+              << distributed.network().Address() << ":"
+              << distributed.network().Port() << std::endl;
 
     auto stream = main_.first;
     // wait until master sends us a TextMessage, then reply back and close
-    stream->OnEventOnce()
-      .ChainOnce<TextMessage>([this](const TextMessage &msg, const Subscription&) {
-      std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
+    stream->OnEventOnce().ChainOnce<TextMessage>(
+        [this](const TextMessage &msg, const Subscription &) {
+          std::cout << "Message from " << msg.Address() << ":" << msg.Port()
+                    << " .. " << msg.text << "\n";
 
-      msg.GetReturnChannelWriter()
-        ->Send<TextMessage>("worker", "main", "hi from worker");
+          msg.GetReturnChannelWriter()->Send<TextMessage>("worker", "main",
+                                                          "hi from worker");
 
-      // Sleep for a while so we can read output in the terminal.
-      std::this_thread::sleep_for(std::chrono::seconds(4));
-      CloseChannel("main");
-    });
+          // Sleep for a while so we can read output in the terminal.
+          std::this_thread::sleep_for(std::chrono::seconds(4));
+          CloseChannel("main");
+        });
   }
 
  protected:
@@ -188,7 +194,6 @@ class Worker : public Reactor {
   const int64_t master_mnid_;
 };
 
-
 int main(int argc, char *argv[]) {
   google::InitGoogleLogging(argv[0]);
   gflags::ParseCommandLineFlags(&argc, &argv, true);
diff --git a/experimental/distributed/tests/reactors_distributed_unit.cpp b/experimental/distributed/tests/reactors_distributed_unit.cpp
deleted file mode 100644
index 1c4d5ce4b..000000000
--- a/experimental/distributed/tests/reactors_distributed_unit.cpp
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * This test file test the Distributed Reactors API on ONLY one process (no real networking).
- * In other words, we send a message from one process to itself.
- */
-
-#include "gtest/gtest.h"
-#include "reactors_distributed.hpp"
-
-#include <atomic>
-#include <chrono>
-#include <cstdlib>
-#include <iostream>
-#include <string>
-#include <thread>
-#include <vector>
-#include <future>
-
-/**
-  * Test do the services start up without crashes.
-  */
-TEST(SimpleTests, StartAndStopServices) {
-  System &system = System::GetInstance();
-  Distributed &distributed = Distributed::GetInstance();
-  distributed.StartServices();
-
-  // do nothing
-  std::this_thread::sleep_for(std::chrono::milliseconds(500));
-
-  system.AwaitShutdown();
-  distributed.StopServices();
-}
-
-/**
-  * Test simple message reception.
-  *
-  * Data flow:
-  * (1) Send an empty message from Master to Worker/main
-  */
-TEST(SimpleTests, SendEmptyMessage) {
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
-        ->OnEventOnce()
-        .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
-                                                  const Subscription& subscription) {
-            msg.channelWriter()->Send<Message>();
-            subscription.CloseChannel();
-          });
-
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      main_.first->OnEventOnce()
-        .ChainOnce<Message>([this](const Message&, const Subscription& subscription) {
-            // if this message isn't delivered, the main channel will never be closed and we infinite loop
-            subscription.CloseChannel(); // close "main"
-          });
-    }
-  };
-
-  // emulate flags like it's a multiprocess system, these may be alredy set by default
-  FLAGS_address = "127.0.0.1";
-  FLAGS_port = 10000;
-
-  System &system = System::GetInstance();
-  Distributed &distributed = Distributed::GetInstance();
-  distributed.StartServices();
-
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-
-  system.AwaitShutdown(); // this must be called before StopServices
-  distributed.StopServices();
-}
-
-/**
-  * Test ReturnAddressMsg functionality.
-  *
-  * Data flow:
-  * (1) Send an empty message from Master to Worker/main
-  * (2) Send an empty message from Worker to Master/main
-  */
-TEST(SimpleTests, SendReturnAddressMessage) {
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
-        ->OnEventOnce()
-        .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
-                                                  const Subscription& sub) {
-            // send a message that will be returned to "main"
-            msg.channelWriter()->Send<ReturnAddressMsg>(this->name(), "main");
-            // close this anonymous channel
-            sub.CloseChannel();
-          });
-
-      main_.first->OnEventOnce()
-        .ChainOnce<Message>([this](const Message&, const Subscription& sub) {
-            // if this message isn't delivered, the main channel will never be closed and we infinite loop
-            // close the "main" channel
-            sub.CloseChannel();
-          });
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      main_.first->OnEventOnce()
-        .ChainOnce<ReturnAddressMsg>([this](const ReturnAddressMsg &msg, const Subscription& sub) {
-            msg.GetReturnChannelWriter()->Send<Message>();
-            sub.CloseChannel(); // close "main"
-          });
-    }
-  };
-
-  // emulate flags like it's a multiprocess system, these may be alredy set by default
-  FLAGS_address = "127.0.0.1";
-  FLAGS_port = 10000;
-
-  System &system = System::GetInstance();
-  Distributed &distributed = Distributed::GetInstance();
-  distributed.StartServices();
-
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-
-  system.AwaitShutdown(); // this must be called before StopServices
-  distributed.StopServices();
-}
-
-// Apparently templates cannot be declared inside local classes, figure out how to move it in?
-// For that reason I obscured the name.
-struct SerializableMessage_TextMessage : public ReturnAddressMsg {
-  SerializableMessage_TextMessage(std::string channel, std::string arg_text, int arg_val)
-    : ReturnAddressMsg(channel), text(arg_text), val(arg_val) {}
-  std::string text;
-  int val;
-
-  template<class Archive>
-  void serialize(Archive &ar) {
-    ar(cereal::virtual_base_class<ReturnAddressMsg>(this), text, val);
-  }
-
- protected:
-  friend class cereal::access;
-  SerializableMessage_TextMessage() {} // Cereal needs access to a default constructor.
-};
-CEREAL_REGISTER_TYPE(SerializableMessage_TextMessage);
-
-/**
-  * Test serializability of a complex message over the network layer.
-  *
-  * Data flow:
-  * (1) Send ("hi", 123) from Master to Worker/main
-  * (2) Send ("hi back", 779) from Worker to Master/main
-  */
-TEST(SimpleTests, SendSerializableMessage) {
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      Distributed::GetInstance().FindChannel("127.0.0.1", 10000, "worker", "main")
-        ->OnEventOnce()
-        .ChainOnce<ChannelResolvedMessage>([this](const ChannelResolvedMessage& msg,
-                                                  const Subscription& sub) {
-            // send a message that will be returned to "main"
-            msg.channelWriter()->Send<SerializableMessage_TextMessage>("main", "hi", 123);
-            // close this anonymous channel
-            sub.CloseChannel();
-          });
-
-      main_.first->OnEventOnce()
-        .ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage& msg, const Subscription& sub) {
-            ASSERT_EQ(msg.text, "hi back");
-            ASSERT_EQ(msg.val, 779);
-            // if this message isn't delivered, the main channel will never be closed and we infinite loop
-            // close the "main" channel
-            sub.CloseChannel();
-          });
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      main_.first->OnEventOnce()
-        .ChainOnce<SerializableMessage_TextMessage>([this](const SerializableMessage_TextMessage &msg, const Subscription& sub) {
-            ASSERT_EQ(msg.text, "hi");
-            ASSERT_EQ(msg.val, 123);
-            msg.GetReturnChannelWriter()->Send<SerializableMessage_TextMessage>
-              ("no channel, dont use this", "hi back", 779);
-            sub.CloseChannel(); // close "main"
-          });
-    }
-  };
-
-  // emulate flags like it's a multiprocess system, these may be alredy set by default
-  FLAGS_address = "127.0.0.1";
-  FLAGS_port = 10000;
-
-  System &system = System::GetInstance();
-  Distributed &distributed = Distributed::GetInstance();
-  distributed.StartServices();
-
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-
-  system.AwaitShutdown(); // this must be called before StopServices
-  distributed.StopServices();
-}
-
-
-int main(int argc, char **argv) {
-  ::testing::InitGoogleTest(&argc, argv);
-  return RUN_ALL_TESTS();
-}
diff --git a/experimental/distributed/tests/reactors_local_unit.cpp b/experimental/distributed/tests/reactors_local_unit.cpp
deleted file mode 100644
index 639698afd..000000000
--- a/experimental/distributed/tests/reactors_local_unit.cpp
+++ /dev/null
@@ -1,483 +0,0 @@
-#include "reactors_local.hpp"
-#include "gtest/gtest.h"
-
-#include <atomic>
-#include <chrono>
-#include <cstdlib>
-#include <future>
-#include <iostream>
-#include <string>
-#include <thread>
-#include <vector>
-
-TEST(SystemTest, ReturnWithoutThrowing) {
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() { CloseChannel("main"); }
-  };
-
-  System &system = System::GetInstance();
-  ASSERT_NO_THROW(system.Spawn<Master>("master"));
-  ASSERT_NO_THROW(system.AwaitShutdown());
-}
-
-TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      Open("channel");
-      ASSERT_THROW(Open("channel"), std::runtime_error);
-      CloseChannel("main");
-      CloseChannel("channel");
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.AwaitShutdown();
-}
-
-TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("master", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      CloseChannel("main");
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(SimpleSendTest, OneCallback) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageInt>(888);
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      EventStream *stream = main_.first;
-
-      stream->OnEvent<MessageInt>(
-          [this](const MessageInt &msg, const Subscription &) {
-            ASSERT_EQ(msg.x, 888);
-            CloseChannel("main");
-          });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(SimpleSendTest, IgnoreAfterClose) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageInt>(101);
-      channel_writer->Send<MessageInt>(102);  // should be ignored
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageInt>(103);  // should be ignored
-      channel_writer->Send<MessageInt>(104);  // should be ignored
-      CloseChannel(
-          "main");  // Write-end doesn't need to be closed because it's in RAII.
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      EventStream *stream = main_.first;
-
-      stream->OnEvent<MessageInt>(
-          [this](const MessageInt &msg, const Subscription &) {
-            CloseChannel("main");
-            ASSERT_EQ(msg.x, 101);
-          });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(SimpleSendTest, DuringFirstEvent) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name, std::promise<int> p)
-        : Reactor(name), p_(std::move(p)) {}
-    virtual void Run() {
-      EventStream *stream = main_.first;
-
-      stream->OnEvent<MessageInt>(
-          [this](const Message &msg, const Subscription &subscription) {
-            const MessageInt &msgint = dynamic_cast<const MessageInt &>(msg);
-            if (msgint.x == 101) FindChannel("main")->Send<MessageInt>(102);
-            if (msgint.x == 102) {
-              subscription.Unsubscribe();
-              CloseChannel("main");
-              p_.set_value(777);
-            }
-          });
-
-      std::shared_ptr<ChannelWriter> channel_writer = FindChannel("main");
-      channel_writer->Send<MessageInt>(101);
-    }
-    std::promise<int> p_;
-  };
-
-  System &system = System::GetInstance();
-  std::promise<int> p;
-  auto f = p.get_future();
-  system.Spawn<Master>("master", std::move(p));
-  f.wait();
-  ASSERT_EQ(f.get(), 777);
-  system.AwaitShutdown();
-}
-
-TEST(MultipleSendTest, UnsubscribeService) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-  struct MessageChar : public Message {
-    MessageChar(char xx) : x(xx) {}
-    char x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageInt>(55);
-      channel_writer->Send<MessageInt>(66);
-      channel_writer->Send<MessageInt>(77);
-      channel_writer->Send<MessageInt>(88);
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageChar>('a');
-      channel_writer->Send<MessageChar>('b');
-      channel_writer->Send<MessageChar>('c');
-      channel_writer->Send<MessageChar>('d');
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    int num_msgs_received = 0;
-
-    virtual void Run() {
-      EventStream *stream = main_.first;
-
-      stream->OnEvent<MessageInt>(
-          [this](const MessageInt &msgint, const Subscription &subscription) {
-            ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
-            ++num_msgs_received;
-            if (msgint.x == 66) {
-              subscription.Unsubscribe();  // receive only two of them
-            }
-          });
-      stream->OnEvent<MessageChar>(
-          [this](const MessageChar &msgchar, const Subscription &subscription) {
-            char c = msgchar.x;
-            ++num_msgs_received;
-            ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
-            if (num_msgs_received == 5) {
-              subscription.Unsubscribe();
-              CloseChannel("main");
-            }
-          });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(MultipleSendTest, OnEvent) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-  struct MessageChar : public Message {
-    MessageChar(char xx) : x(xx) {}
-    char x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-
-      channel_writer->Send<MessageInt>(101);
-      channel_writer->Send<MessageChar>('a');
-      channel_writer->Send<MessageInt>(103);
-      channel_writer->Send<MessageChar>('b');
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    struct EndMessage : Message {};
-    int correct_vals = 0;
-
-    virtual void Run() {
-      EventStream *stream = main_.first;
-      correct_vals = 0;
-
-      stream->OnEvent<MessageInt>(
-          [this](const MessageInt &msgint, const Subscription &) {
-            ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
-            ++correct_vals;
-            main_.second->Send<EndMessage>();
-          });
-
-      stream->OnEvent<MessageChar>(
-          [this](const MessageChar &msgchar, const Subscription &) {
-            ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
-            ++correct_vals;
-            main_.second->Send<EndMessage>();
-          });
-
-      stream->OnEvent<EndMessage>(
-          [this](const EndMessage &, const Subscription &) {
-            ASSERT_LE(correct_vals, 4);
-            if (correct_vals == 4) {
-              CloseChannel("main");
-            }
-          });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(MultipleSendTest, Chaining) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageInt>(55);
-      channel_writer->Send<MessageInt>(66);
-      channel_writer->Send<MessageInt>(77);
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      EventStream *stream = main_.first;
-
-      stream->OnEventOnce()
-          .ChainOnce<MessageInt>(
-              [this](const MessageInt &msg, const Subscription &) {
-                ASSERT_EQ(msg.x, 55);
-              })
-          .ChainOnce<MessageInt>(
-              [](const MessageInt &msg, const Subscription &) {
-                ASSERT_EQ(msg.x, 66);
-              })
-          .ChainOnce<MessageInt>(
-              [this](const MessageInt &msg, const Subscription &) {
-                ASSERT_EQ(msg.x, 77);
-                CloseChannel("main");
-              });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(MultipleSendTest, ChainingInRightOrder) {
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-
-  struct MessageChar : public Message {
-    MessageChar(char xx) : x(xx) {}
-    char x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-      channel_writer->Send<MessageChar>('a');
-      channel_writer->Send<MessageInt>(55);
-      channel_writer->Send<MessageChar>('b');
-      channel_writer->Send<MessageInt>(77);
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    virtual void Run() {
-      EventStream *stream = main_.first;
-
-      stream->OnEventOnce()
-          .ChainOnce<MessageInt>(
-              [this](const MessageInt &msg, const Subscription &) {
-                ASSERT_EQ(msg.x, 55);
-              })
-          .ChainOnce<MessageChar>(
-              [](const MessageChar &msg, const Subscription &) {
-                ASSERT_EQ(msg.x, 'b');
-              })
-          .ChainOnce<MessageInt>(
-              [this](const MessageInt &msg, const Subscription &) {
-                ASSERT_EQ(msg.x, 77);
-                CloseChannel("main");
-              });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-TEST(MultipleSendTest, ProcessManyMessages) {
-  const static int num_tests = 100;
-
-  struct MessageInt : public Message {
-    MessageInt(int xx) : x(xx) {}
-    int x;
-  };
-
-  struct Master : public Reactor {
-    Master(std::string name) : Reactor(name) {}
-    virtual void Run() {
-      std::shared_ptr<ChannelWriter> channel_writer;
-      while (!(channel_writer =
-                   System::GetInstance().FindChannel("worker", "main")))
-        std::this_thread::sleep_for(std::chrono::milliseconds(300));
-
-      std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
-      for (int i = 0; i < num_tests; ++i) {
-        channel_writer->Send<MessageInt>(rand());
-        std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
-      }
-      CloseChannel("main");
-    }
-  };
-
-  struct Worker : public Reactor {
-    Worker(std::string name) : Reactor(name) {}
-
-    struct EndMessage : Message {};
-    int vals = 0;
-
-    virtual void Run() {
-      EventStream *stream = main_.first;
-      vals = 0;
-
-      stream->OnEvent<MessageInt>(
-          [this](const Message &, const Subscription &) {
-            ++vals;
-            main_.second->Send<EndMessage>();
-          });
-
-      stream->OnEvent<EndMessage>(
-          [this](const Message &, const Subscription &) {
-            ASSERT_LE(vals, num_tests);
-            if (vals == num_tests) {
-              CloseChannel("main");
-            }
-          });
-    }
-  };
-
-  System &system = System::GetInstance();
-  system.Spawn<Master>("master");
-  system.Spawn<Worker>("worker");
-  system.AwaitShutdown();
-}
-
-int main(int argc, char **argv) {
-  ::testing::InitGoogleTest(&argc, argv);
-  return RUN_ALL_TESTS();
-}
diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt
index 2d29c6a62..04d1a69ad 100644
--- a/libs/CMakeLists.txt
+++ b/libs/CMakeLists.txt
@@ -163,3 +163,6 @@ import_header_library(cppitertools ${CMAKE_CURRENT_SOURCE_DIR})
 
 # Setup json
 import_header_library(json ${CMAKE_CURRENT_SOURCE_DIR})
+
+# Setup cereal
+import_header_library(cereal "${CMAKE_CURRENT_SOURCE_DIR}/cereal/include")
diff --git a/libs/setup.sh b/libs/setup.sh
index aaf8b9811..5c62cd20b 100755
--- a/libs/setup.sh
+++ b/libs/setup.sh
@@ -10,9 +10,9 @@ cd ${working_dir}
 
 # antlr
 antlr_generator_filename="antlr-4.6-complete.jar"
-#wget -O ${antlr_generator_filename} http://www.antlr.org/download/${antlr_generator_filename}
+# wget -O ${antlr_generator_filename} http://www.antlr.org/download/${antlr_generator_filename}
 wget -nv -O ${antlr_generator_filename} http://deps.memgraph.io/${antlr_generator_filename}
-#git clone https://github.com/antlr/antlr4.git
+# git clone https://github.com/antlr/antlr4.git
 git clone git://deps.memgraph.io/antlr4.git
 antlr4_tag="aacd2a2c95816d8dc1c05814051d631bfec4cf3e" # v4.6
 cd antlr4
@@ -23,7 +23,7 @@ cd ..
 # Use our fork that uses experimental/optional instead of unique_ptr in
 # DerefHolder. Once we move memgraph to c++17 we can use cpp17 branch from
 # original repo.
-#git clone https://github.com/memgraph/cppitertools.git
+# git clone https://github.com/memgraph/cppitertools.git
 git clone git://deps.memgraph.io/cppitertools.git
 cd cppitertools
 cppitertools_tag="4231e0bc6fba2737b2a7a8a1576cf06186b0de6a" # experimental_optional 17 Aug 2017
@@ -31,7 +31,7 @@ git checkout ${cppitertools_tag}
 cd ..
 
 # fmt
-#git clone https://github.com/fmtlib/fmt.git
+# git clone https://github.com/fmtlib/fmt.git
 git clone git://deps.memgraph.io/fmt.git
 fmt_tag="7fa8f8fa48b0903deab5bb42e6760477173ac485" # v3.0.1
 # Commit which fixes an issue when compiling with C++14 and higher.
@@ -42,7 +42,7 @@ git cherry-pick -n ${fmt_cxx14_fix}
 cd ..
 
 # rapidcheck
-#git clone https://github.com/emil-e/rapidcheck.git
+# git clone https://github.com/emil-e/rapidcheck.git
 git clone git://deps.memgraph.io/rapidcheck.git
 rapidcheck_tag="853e14f0f4313a9eb3c71e24848373e7b843dfd1" # Jun 23, 2017
 cd rapidcheck
@@ -50,7 +50,7 @@ git checkout ${rapidcheck_tag}
 cd ..
 
 # google benchmark
-#git clone https://github.com/google/benchmark.git
+# git clone https://github.com/google/benchmark.git
 git clone git://deps.memgraph.io/benchmark.git
 benchmark_tag="4f8bfeae470950ef005327973f15b0044eceaceb" # v1.1.0
 cd benchmark
@@ -58,7 +58,7 @@ git checkout ${benchmark_tag}
 cd ..
 
 # google test
-#git clone https://github.com/google/googletest.git
+# git clone https://github.com/google/googletest.git
 git clone git://deps.memgraph.io/googletest.git
 googletest_tag="ec44c6c1675c25b9827aacd08c02433cccde7780" # v1.8.0
 cd googletest
@@ -66,7 +66,7 @@ git checkout ${googletest_tag}
 cd ..
 
 # google logging
-#git clone https://github.com/memgraph/glog.git
+# git clone https://github.com/memgraph/glog.git
 git clone git://deps.memgraph.io/glog.git
 glog_tag="a6ee5ef590190cdb9f69cccc2db99dc5994b2f92" # custom version (v0.3.5+)
 cd glog
@@ -74,7 +74,7 @@ git checkout ${glog_tag}
 cd ..
 
 # lcov-to-coberatura-xml
-#git clone https://github.com/eriwen/lcov-to-cobertura-xml.git
+# git clone https://github.com/eriwen/lcov-to-cobertura-xml.git
 git clone git://deps.memgraph.io/lcov-to-cobertura-xml.git
 lcov_to_xml_tag="59584761cb5da4687693faec05bf3e2b74e9dde9" # Dec 6, 2016
 cd lcov-to-cobertura-xml
@@ -82,7 +82,7 @@ git checkout ${lcov_to_xml_tag}
 cd ..
 
 # google flags
-#git clone https://github.com/memgraph/gflags.git
+# git clone https://github.com/memgraph/gflags.git
 git clone git://deps.memgraph.io/gflags.git
 gflags_tag="b37ceb03a0e56c9f15ce80409438a555f8a67b7c" # custom version (May 6, 2017)
 cd gflags
@@ -106,14 +106,19 @@ rm postgres.tar.gz
 # We use head on Sep 1, 2017 instead of last release since it was long time ago.
 mkdir json
 cd json
-#wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp"
+# wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp"
 wget -nv http://deps.memgraph.io/json.hpp
 cd ..
 
-#ltalloc
-#git clone https://github.com/r-lyeh/ltalloc.git
+# ltalloc
+# git clone https://github.com/r-lyeh/ltalloc.git
 git clone git://deps.memgraph.io/ltalloc.git
 ltalloc_tag="aefde2afa5cd49c9d1a797aa08ec08b2bec13a36" # Sep 15, 2017
 cd ltalloc
 git checkout ${ltalloc_tag}
+
+# cereal
+git clone https://github.com/USCiLab/cereal.git
+cd cereal
+git checkout v1.2.2
 cd ..
diff --git a/experimental/distributed/src/protocol.cpp b/src/communication/reactor/protocol.cpp
similarity index 92%
rename from experimental/distributed/src/protocol.cpp
rename to src/communication/reactor/protocol.cpp
index fd2fb3d28..b68523ae3 100644
--- a/experimental/distributed/src/protocol.cpp
+++ b/src/communication/reactor/protocol.cpp
@@ -5,10 +5,9 @@
 
 #include "glog/logging.h"
 
-namespace Protocol {
+namespace protocol {
 
-Session::Session(Socket &&socket, Data &)
-    : socket_(std::move(socket)) {
+Session::Session(Socket &&socket, Data &) : socket_(std::move(socket)) {
   event_.data.ptr = this;
 }
 
@@ -22,9 +21,12 @@ std::string Session::GetStringAndShift(SizeT len) {
 
 void Session::Execute() {
   if (!handshake_done_) {
-    // Note: this function can be multiple times before the buffer has the full packet.
-    //   We currently have to check for this case and return without shifting the buffer.
-    //   In other words, only shift anything from the buffer if you can read the entire (sub)message.
+    // Note: this function can be multiple times before the buffer has the full
+    // packet.
+    //   We currently have to check for this case and return without shifting
+    //   the buffer.
+    //   In other words, only shift anything from the buffer if you can read the
+    //   entire (sub)message.
 
     if (buffer_.size() < 2 * sizeof(SizeT)) return;
     SizeT len_reactor = GetLength();
@@ -56,7 +58,7 @@ void Session::Execute() {
 
   // TODO: check for exceptions
   std::istringstream stream;
-  stream.str(std::string(reinterpret_cast<char*>(buffer_.data()), len_data));
+  stream.str(std::string(reinterpret_cast<char *>(buffer_.data()), len_data));
   cereal::BinaryInputArchive iarchive{stream};
   std::unique_ptr<Message> message{nullptr};
   iarchive(message);
@@ -157,7 +159,7 @@ bool SendMessage(std::string address, uint16_t port, std::string reactor,
     LOG(INFO) << "Couldn't send message size!";
     return false;
   }
-  if (!socket.Write(buffer.data(), buffer.size())) {
+  if (!socket.Write(buffer)) {
     LOG(INFO) << "Couldn't send message data!";
     return false;
   }
diff --git a/experimental/distributed/src/protocol.hpp b/src/communication/reactor/protocol.hpp
similarity index 95%
rename from experimental/distributed/src/protocol.hpp
rename to src/communication/reactor/protocol.hpp
index c56964dc7..773acb238 100644
--- a/experimental/distributed/src/protocol.hpp
+++ b/src/communication/reactor/protocol.hpp
@@ -1,5 +1,7 @@
 #pragma once
 
+#include <chrono>
+
 #include "communication/bolt/v1/decoder/buffer.hpp"
 #include "io/network/epoll.hpp"
 #include "io/network/network_endpoint.hpp"
@@ -40,7 +42,8 @@ class Message;
  * Currently the server is implemented to handle more than one message after
  * the initial handshake, but the client can only send one message.
  */
-namespace Protocol {
+namespace protocol {
+
 using Endpoint = io::network::NetworkEndpoint;
 using Socket = io::network::Socket;
 using StreamBuffer = io::network::StreamBuffer;
@@ -100,6 +103,8 @@ class Session {
    */
   void Written(size_t len);
 
+  bool TimedOut() { return false; }
+
   /**
    * Closes the session (client socket).
    */
@@ -108,6 +113,8 @@ class Session {
   io::network::Epoll::Event event_;
   Socket socket_;
 
+  std::chrono::time_point<std::chrono::steady_clock> last_event_time_;
+
  private:
   SizeT GetLength(int offset = 0);
   std::string GetStringAndShift(SizeT len);
@@ -115,6 +122,7 @@ class Session {
 
   bool alive_{true};
   bool handshake_done_{false};
+
   std::string reactor_{""};
   std::string channel_{""};
 
diff --git a/src/communication/reactor/reactor_local.cpp b/src/communication/reactor/reactor_local.cpp
new file mode 100644
index 000000000..7d5649e14
--- /dev/null
+++ b/src/communication/reactor/reactor_local.cpp
@@ -0,0 +1,141 @@
+#include "communication/reactor/reactor_local.hpp"
+
+#include "utils/exceptions.hpp"
+
+namespace communication::reactor {
+
+thread_local Reactor *current_reactor_ = nullptr;
+
+void EventStream::Subscription::Unsubscribe() const {
+  event_queue_.RemoveCallback(*this);
+}
+
+void EventStream::Subscription::CloseChannel() const { event_queue_.Close(); }
+
+const std::string &EventStream::Subscription::channel_name() const {
+  return event_queue_.channel_name_;
+}
+
+std::string Channel::LocalChannelWriter::ReactorName() const {
+  return reactor_name_;
+}
+
+std::string Channel::LocalChannelWriter::Name() const { return channel_name_; }
+
+void Channel::Close() {
+  // TODO(zuza): there will be major problems if a reactor tries to close a
+  // stream that isn't theirs luckily this should never happen if the framework
+  // is used as expected.
+  current_reactor_->CloseChannel(channel_name_);
+}
+
+std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
+    const std::string &channel_name) {
+  std::unique_lock<std::mutex> lock(*mutex_);
+  if (channels_.count(channel_name) != 0) {
+    throw utils::BasicException("Channel with name " + channel_name +
+                                "already exists");
+  }
+  auto it =
+      channels_
+          .emplace(channel_name, std::make_shared<Channel>(Channel::Params{
+                                     name_, channel_name, mutex_, cvar_}))
+          .first;
+  it->second->self_ptr_ = it->second;
+  return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
+}
+
+std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open() {
+  std::unique_lock<std::mutex> lock(*mutex_);
+  do {
+    std::string channel_name =
+        "stream-" + std::to_string(channel_name_counter_++);
+    if (channels_.count(channel_name) == 0) {
+      auto it =
+          channels_
+              .emplace(channel_name, std::make_shared<Channel>(Channel::Params{
+                                         name_, channel_name, mutex_, cvar_}))
+              .first;
+      it->second->self_ptr_ = it->second;
+      return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
+    }
+  } while (true);
+}
+
+std::shared_ptr<ChannelWriter> Reactor::FindChannel(
+    const std::string &channel_name) {
+  std::unique_lock<std::mutex> lock(*mutex_);
+  auto it_channel = channels_.find(channel_name);
+  if (it_channel == channels_.end()) return nullptr;
+  return it_channel->second->LockedOpenChannel();
+}
+
+void Reactor::CloseChannel(const std::string &s) {
+  std::unique_lock<std::mutex> lock(*mutex_);
+  auto it = channels_.find(s);
+  CHECK(it != channels_.end()) << "Trying to close nonexisting channel";
+  channels_.erase(it);
+  cvar_->notify_all();
+}
+
+void Reactor::RunEventLoop() {
+  bool exit_event_loop = false;
+
+  while (true) {
+    // Find (or wait) for the next Message.
+    PendingMessageInfo info;
+    {
+      std::unique_lock<std::mutex> guard(*mutex_);
+
+      while (true) {
+        // Not fair because was taken earlier, talk to lion.
+        info = GetPendingMessages();
+        if (info.message != nullptr) break;
+
+        // Exit the loop if there are no more Channels.
+        if (channels_.empty()) {
+          exit_event_loop = true;
+          break;
+        }
+
+        cvar_->wait(guard);
+      }
+
+      if (exit_event_loop) break;
+    }
+
+    for (auto &callback_info : info.callbacks) {
+      callback_info.first(*info.message, callback_info.second);
+    }
+  }
+}
+
+/**
+ * Checks if there is any nonempty EventStream.
+ */
+Reactor::PendingMessageInfo Reactor::GetPendingMessages() {
+  for (auto &channels_key_value : channels_) {
+    Channel &event_queue = *channels_key_value.second;
+    auto message = event_queue.LockedPop();
+    if (message == nullptr) continue;
+    std::type_index type_index = message->GetTypeIndex();
+
+    using Subscription = EventStream::Subscription;
+    std::vector<std::pair<EventStream::Callback, Subscription>> callback_info;
+    auto msg_type_cb_iter = event_queue.callbacks_.find(type_index);
+    if (msg_type_cb_iter != event_queue.callbacks_.end()) {
+      // There is a callback for this type.
+      for (auto &type_index_cb_key_value : msg_type_cb_iter->second) {
+        auto uid = type_index_cb_key_value.first;
+        auto callback = type_index_cb_key_value.second;
+        callback_info.emplace_back(callback,
+                                   Subscription(event_queue, type_index, uid));
+      }
+    }
+
+    return PendingMessageInfo{std::move(message), std::move(callback_info)};
+  }
+
+  return PendingMessageInfo{};
+}
+}
diff --git a/src/communication/reactor/reactor_local.hpp b/src/communication/reactor/reactor_local.hpp
new file mode 100644
index 000000000..d96101cba
--- /dev/null
+++ b/src/communication/reactor/reactor_local.hpp
@@ -0,0 +1,553 @@
+#pragma once
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+
+#include "cereal/types/memory.hpp"
+#include "glog/logging.h"
+
+namespace communication::reactor {
+
+class EventStream;
+class Reactor;
+class System;
+class Channel;
+
+extern thread_local Reactor *current_reactor_;
+
+/**
+ * Base class for messages.
+ */
+class Message {
+ public:
+  virtual ~Message() {}
+
+  template <class Archive>
+  void serialize(Archive &) {}
+
+  /**
+   * Run-time type identification that is used for callbacks.
+   *
+   * Warning: this works because of the virtual destructor, don't remove it from
+   * this class
+   */
+  std::type_index GetTypeIndex() { return typeid(*this); }
+};
+
+/**
+ * Write-end of a Channel (between two reactors).
+ */
+class ChannelWriter {
+ public:
+  ChannelWriter() = default;
+  ChannelWriter(const ChannelWriter &) = delete;
+  void operator=(const ChannelWriter &) = delete;
+  ChannelWriter(ChannelWriter &&) = delete;
+  void operator=(ChannelWriter &&) = delete;
+
+  /**
+   * Construct and send the message to the channel.
+   */
+  template <typename TMessage, typename... Args>
+  void Send(Args &&... args) {
+    Send(std::unique_ptr<Message>(
+        std::make_unique<TMessage>(std::forward<Args>(args)...)));
+  }
+
+  virtual void Send(std::unique_ptr<Message> message) = 0;
+
+  virtual std::string ReactorName() const = 0;
+  virtual std::string Name() const = 0;
+
+  template <class Archive>
+  void serialize(Archive &archive) {
+    archive(ReactorName(), Name());
+  }
+};
+
+/**
+ * Read-end of a Channel (between two reactors).
+ */
+class EventStream {
+ public:
+  class OnEventOnceChainer;
+  class Subscription;
+
+  /**
+   * Register a callback that will be called whenever an event arrives.
+   */
+  template <typename TMessage>
+  void OnEvent(
+      std::function<void(const TMessage &, const Subscription &)> &&callback) {
+    OnEventHelper(typeid(TMessage), [callback = std::move(callback)](
+                                        const Message &base_message,
+                                        const Subscription &subscription) {
+      const auto &message = dynamic_cast<const TMessage &>(base_message);
+      callback(message, subscription);
+    });
+  }
+
+  /**
+   * Register a callback that will be called only once.
+   * Once event is received, channel of this EventStream is closed.
+   */
+  template <typename TMessage>
+  void OnEventOnceThenClose(std::function<void(const TMessage &)> &&callback) {
+    OnEventHelper(typeid(TMessage), [callback = std::move(callback)](
+                                        const Message &base_message,
+                                        const Subscription &subscription) {
+      const TMessage &message = dynamic_cast<const TMessage &>(base_message);
+      subscription.CloseChannel();
+      callback(message);
+    });
+  }
+
+  /**
+   * Starts a chain to register a callback that fires off only once.
+   *
+   * This method supports chaining (see the the class OnEventOnceChainer or the
+   * tests for examples).
+   * Warning: when chaining callbacks, make sure that EventStream does not
+   * deallocate before the last
+   * chained callback fired.
+   */
+  OnEventOnceChainer OnEventOnce() { return OnEventOnceChainer(*this); }
+
+  /**
+   * Get the name of the channel.
+   */
+  virtual const std::string &ChannelName() = 0;
+
+  /**
+   * Subscription Service.
+   *
+   * Unsubscribe from a callback. Lightweight object (can copy by value).
+   */
+  class Subscription {
+   public:
+    /**
+     * Unsubscribe. Call only once.
+     */
+    void Unsubscribe() const;
+
+    /**
+     * Close the stream. Convenience method.
+     */
+    void CloseChannel() const;
+
+    /**
+     * Get the name of the channel the message is delivered to.
+     */
+    const std::string &channel_name() const;
+
+   private:
+    friend class Reactor;
+    friend class Channel;
+
+    Subscription(Channel &event_queue, std::type_index type_index,
+                 uint64_t callback_id)
+        : event_queue_(event_queue),
+          type_index_(type_index),
+          callback_id_(callback_id) {}
+
+    Channel &event_queue_;
+    std::type_index type_index_;
+    uint64_t callback_id_;
+  };
+
+  /**
+   * Close this event stream, disallowing further events from getting received.
+   *
+   * Any subsequent call after Close() to any function will be result in
+   * undefined
+   * behavior (invalid pointer dereference). Can only be called from the thread
+   * associated with the Reactor.
+   */
+  virtual void Close() = 0;
+
+  /**
+   * Convenience class to chain one-off callbacks.
+   *
+   * Usage: Create this class with OnEventOnce() and then chain callbacks using
+   * ChainOnce.
+   * A callback will fire only once, unsubscribe and immediately subscribe the
+   * next callback to the stream.
+   *
+   * Example: stream->OnEventOnce().ChainOnce(firstCb).ChainOnce(secondCb);
+   *
+   * Implementation: This class is a temporary object that remembers the
+   * callbacks that are to be installed
+   * and finally installs them in the destructor. Not sure is this kosher, is
+   * there another way?
+   */
+  class OnEventOnceChainer {
+   public:
+    OnEventOnceChainer(EventStream &event_stream)
+        : event_stream_(event_stream) {}
+    ~OnEventOnceChainer() { InstallCallbacks(); }
+
+    template <typename TMessage>
+    OnEventOnceChainer &ChainOnce(
+        std::function<void(const TMessage &, const Subscription &)>
+            &&callback) {
+      std::function<void(const Message &, const Subscription &)>
+          wrap = [callback = std::move(callback)](
+              const Message &base_message, const Subscription &subscription) {
+        const TMessage &message = dynamic_cast<const TMessage &>(base_message);
+        subscription.Unsubscribe();
+        // Warning: this can close the Channel, be careful what you put after
+        // it!
+        callback(message, subscription);
+      };
+      callbacks_.emplace_back(typeid(TMessage), std::move(wrap));
+      return *this;
+    }
+
+   private:
+    void InstallCallbacks() {
+      int num_callbacks = callbacks_.size();
+      CHECK(num_callbacks > 0) << "No callback will be installed";
+      std::function<void(const Message &, const Subscription &)> next_callback;
+      std::type_index next_type = typeid(nullptr);
+
+      for (int i = num_callbacks - 1; i >= 0; --i) {
+        std::function<void(const Message &, const Subscription &)>
+            tmp_callback = [
+              callback = std::move(callbacks_[i].second), next_type,
+              next_callback = std::move(next_callback),
+              event_stream = &this->event_stream_
+            ](const Message &message, const Subscription &subscription) {
+          callback(message, subscription);
+          if (next_callback) {
+            event_stream->OnEventHelper(next_type, std::move(next_callback));
+          }
+        };
+        next_callback = std::move(tmp_callback);
+        next_type = callbacks_[i].first;
+      }
+
+      event_stream_.OnEventHelper(next_type, std::move(next_callback));
+    }
+
+    EventStream &event_stream_;
+    std::vector<
+        std::pair<std::type_index,
+                  std::function<void(const Message &, const Subscription &)>>>
+        callbacks_;
+  };
+
+  typedef std::function<void(const Message &, const Subscription &)> Callback;
+
+ private:
+  virtual void OnEventHelper(std::type_index type_index, Callback callback) = 0;
+};
+
+/**
+ * Implementation of a channel.
+ *
+ * This class is an internal data structure that represents the state of the
+ * channel. This class is not meant to be used by the clients of the messaging
+ * framework. The Channel class wraps the event queue data structure, the mutex
+ * that protects concurrent access to the event queue, the local channel and the
+ * event stream. The class is owned by the Reactor. It gets closed when the
+ * owner reactor (the one that owns the read-end of a channel) removes/closes
+ * it.
+ */
+class Channel {
+  struct Params;
+
+ public:
+  friend class Reactor;  // to create a Params initialization object
+  friend class EventStream::Subscription;
+
+  Channel(Params params)
+      : channel_name_(params.channel_name),
+        reactor_name_(params.reactor_name),
+        mutex_(params.mutex),
+        cvar_(params.cvar),
+        stream_(mutex_, this) {}
+
+  /**
+   * LocalChannelWriter represents the channels to reactors living in the same
+   * reactor system (write-end of the channels).
+   *
+   * Sending messages to the local channel requires acquiring the mutex.
+   * LocalChannelWriter holds a (weak) pointer to the enclosing Channel object.
+   * Messages sent to a closed channel are ignored.
+   * There can be multiple LocalChannelWriters refering to the same stream if
+   * needed.
+   */
+  class LocalChannelWriter : public ChannelWriter {
+   public:
+    friend class Channel;
+
+    LocalChannelWriter(std::string reactor_name, std::string channel_name,
+                       std::weak_ptr<Channel> queue)
+        : reactor_name_(reactor_name),
+          channel_name_(channel_name),
+          queue_(queue) {}
+
+    void Send(std::unique_ptr<Message> m) override {
+      // Atomic, per the standard.  We guarantee here that if channel exists it
+      // will not be destroyed by the end of this function.
+      std::shared_ptr<Channel> queue = queue_.lock();
+      if (queue) {
+        queue->Push(std::move(m));
+      }
+      // TODO: what should we do here? Channel doesn't exist so message will be
+      // lost.
+    }
+
+    std::string ReactorName() const override;
+    std::string Name() const override;
+
+   private:
+    std::string reactor_name_;
+    std::string channel_name_;
+    std::weak_ptr<Channel> queue_;
+  };
+
+  /**
+   * Implementation of the event stream.
+   *
+   * After the enclosing Channel object is destroyed (by a call to CloseChannel
+   * or Close).
+   */
+  class LocalEventStream : public EventStream {
+   public:
+    friend class Channel;
+
+    LocalEventStream(std::shared_ptr<std::mutex> mutex, Channel *queue)
+        : mutex_(mutex), queue_(queue) {}
+
+    void OnEventHelper(std::type_index type_index, Callback callback) {
+      std::unique_lock<std::mutex> lock(*mutex_);
+      queue_->LockedOnEventHelper(type_index, callback);
+    }
+
+    const std::string &ChannelName() { return queue_->channel_name_; }
+
+    void Close() { queue_->Close(); }
+
+   private:
+    std::shared_ptr<std::mutex> mutex_;
+    std::string channel_name_;
+    Channel *queue_;
+  };
+
+  /**
+   * Close the channel. Must be called from the reactor that owns the channel.
+   */
+  void Close();
+
+  Channel(const Channel &other) = delete;
+  Channel(Channel &&other) = default;
+  Channel &operator=(const Channel &other) = delete;
+  Channel &operator=(Channel &&other) = default;
+
+ private:
+  /**
+   * Initialization parameters to Channel.
+   * Warning: do not forget to initialize self_ptr_ individually. Private
+   * because it shouldn't be created outside of a Reactor.
+   */
+  struct Params {
+    std::string reactor_name;
+    std::string channel_name;
+    std::shared_ptr<std::mutex> mutex;
+    std::shared_ptr<std::condition_variable> cvar;
+  };
+
+  void Push(std::unique_ptr<Message> m) {
+    std::unique_lock<std::mutex> guard(*mutex_);
+    queue_.emplace(std::move(m));
+    // This is OK because there is only one Reactor (thread) that can wait on
+    // this Channel.
+    cvar_->notify_one();
+  }
+
+  std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
+    // TODO(zuza): fix this CHECK using this answer
+    // https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
+    // TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky
+    // or that it doesn't fail sometimes, when it should.
+    CHECK(!self_ptr_.expired());
+    return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
+                                                self_ptr_);
+  }
+
+  std::unique_ptr<Message> LockedPop() { return LockedRawPop(); }
+
+  void LockedOnEventHelper(std::type_index type_index,
+                           EventStream::Callback callback) {
+    uint64_t callback_id = next_callback_id++;
+    callbacks_[type_index][callback_id] = callback;
+  }
+
+  std::unique_ptr<Message> LockedRawPop() {
+    if (queue_.empty()) return nullptr;
+    std::unique_ptr<Message> t = std::move(queue_.front());
+    queue_.pop();
+    return t;
+  }
+
+  void RemoveCallback(const EventStream::Subscription &subscription) {
+    std::unique_lock<std::mutex> lock(*mutex_);
+    auto num_erased =
+        callbacks_[subscription.type_index_].erase(subscription.callback_id_);
+    CHECK(num_erased == 1) << "Expected to remove 1 element";
+  }
+
+  std::string channel_name_;
+  std::string reactor_name_;
+  std::queue<std::unique_ptr<Message>> queue_;
+  // Should only be locked once since it's used by a cond. var. Also caught in
+  // dctor, so must be recursive.
+  std::shared_ptr<std::mutex> mutex_;
+  std::shared_ptr<std::condition_variable> cvar_;
+  /**
+   * A weak_ptr to itself.
+   *
+   * There are initialization problems with this, check Params.
+   */
+  std::weak_ptr<Channel> self_ptr_;
+  LocalEventStream stream_;
+  std::unordered_map<std::type_index,
+                     std::unordered_map<uint64_t, EventStream::Callback>>
+      callbacks_;
+  uint64_t next_callback_id = 0;
+};
+
+/**
+ * A single unit of concurrent execution in the system.
+ *
+ * E.g. one worker, one client. Owned by System. Has a thread associated with
+ * it.
+ */
+class Reactor {
+  friend class System;
+
+  Reactor(System &system, std::string name,
+          std::function<void(Reactor &)> setup)
+      : system_(system), name_(name), setup_(setup), main_(Open("main")) {}
+
+ public:
+  ~Reactor() {}
+
+  std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open(
+      const std::string &s);
+  std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open();
+  std::shared_ptr<ChannelWriter> FindChannel(const std::string &channel_name);
+
+  /**
+   * Close a channel by name.
+   *
+   * Should only be called from the Reactor thread.
+   */
+  void CloseChannel(const std::string &s);
+
+  /**
+   * Get Reactor name
+   */
+  const std::string &name() const { return name_; }
+
+  Reactor(const Reactor &other) = delete;
+  Reactor(Reactor &&other) = default;
+  Reactor &operator=(const Reactor &other) = delete;
+  Reactor &operator=(Reactor &&other) = default;
+
+  System &system_;
+  std::string name_;
+  std::function<void(Reactor &)> setup_;
+
+  /*
+   * Locks all Reactor data, including all Channel's in channels_.
+   *
+   * This should be a shared_ptr because LocalChannelWriter can outlive Reactor.
+   */
+  std::shared_ptr<std::mutex> mutex_ = std::make_shared<std::mutex>();
+  std::shared_ptr<std::condition_variable> cvar_ =
+      std::make_shared<std::condition_variable>();
+
+  /**
+   * List of channels of a reactor indexed by name.
+   */
+  std::unordered_map<std::string, std::shared_ptr<Channel>> channels_;
+  int64_t channel_name_counter_ = 0;
+  std::pair<EventStream *, std::shared_ptr<ChannelWriter>> main_;
+
+ private:
+  struct PendingMessageInfo {
+    std::unique_ptr<Message> message;
+    std::vector<std::pair<EventStream::Callback, EventStream::Subscription>>
+        callbacks;
+  };
+
+  /**
+   * Dispatches all waiting messages to callbacks. Shuts down when there are no
+   * callbacks left.
+   */
+  void RunEventLoop();
+
+  PendingMessageInfo GetPendingMessages();
+};
+
+/**
+ * Placeholder for all reactors.
+ * Make sure object of this class outlives all Reactors created by it.
+ */
+class System {
+ public:
+  friend class Reactor;
+  System() = default;
+
+  void Spawn(const std::string &name, std::function<void(Reactor &)> setup) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    std::unique_ptr<Reactor> reactor(new Reactor(*this, name, setup));
+    std::thread reactor_thread([ this, raw_reactor = reactor.get() ] {
+      current_reactor_ = raw_reactor;
+      raw_reactor->setup_(*raw_reactor);
+      raw_reactor->RunEventLoop();
+    });
+    auto got = reactors_.emplace(
+        name, std::pair<decltype(reactor), std::thread>{
+                  std::move(reactor), std::move(reactor_thread)});
+    CHECK(got.second) << "Reactor with name: '" << name << "' already exists";
+  }
+
+  const std::shared_ptr<ChannelWriter> FindChannel(
+      const std::string &reactor_name, const std::string &channel_name) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    auto it_reactor = reactors_.find(reactor_name);
+    if (it_reactor == reactors_.end()) return nullptr;
+    return it_reactor->second.first->FindChannel(channel_name);
+  }
+
+  // TODO: Think about interaction with destructor. Should we call this in
+  // destructor, complain in destructor if there are alive threads or stop them
+  // in some way.
+  void AwaitShutdown() {
+    for (auto &key_value : reactors_) {
+      auto &thread = key_value.second.second;
+      thread.join();
+    }
+    reactors_.clear();
+  }
+
+ private:
+  System(const System &) = delete;
+  System(System &&) = delete;
+  System &operator=(const System &) = delete;
+  System &operator=(System &&) = delete;
+
+  std::mutex mutex_;
+  std::unordered_map<std::string,
+                     std::pair<std::unique_ptr<Reactor>, std::thread>>
+      reactors_;
+};
+}
diff --git a/src/io/network/socket.cpp b/src/io/network/socket.cpp
index d15e80014..a90eaeac8 100644
--- a/src/io/network/socket.cpp
+++ b/src/io/network/socket.cpp
@@ -220,6 +220,12 @@ bool Socket::Write(const uint8_t *data, size_t len,
   return true;
 }
 
+bool Socket::Write(const std::string &s,
+                   const std::function<bool()> &keep_retrying) {
+  return Write(reinterpret_cast<const uint8_t *>(s.data()), s.size(),
+               keep_retrying);
+}
+
 int Socket::Read(void *buffer, size_t len) {
   return read(socket_, buffer, len);
 }
diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp
index 8a19828d2..6158da3fd 100644
--- a/src/io/network/socket.hpp
+++ b/src/io/network/socket.hpp
@@ -151,6 +151,8 @@ class Socket {
    */
   bool Write(const uint8_t *data, size_t len,
              const std::function<bool()> &keep_retrying = [] { return false; });
+  bool Write(const std::string &s,
+             const std::function<bool()> &keep_retrying = [] { return false; });
 
   /**
    * Read data from the socket.
diff --git a/tests/unit/reactor_local.cpp b/tests/unit/reactor_local.cpp
new file mode 100644
index 000000000..828c0ed46
--- /dev/null
+++ b/tests/unit/reactor_local.cpp
@@ -0,0 +1,385 @@
+#include <atomic>
+#include <chrono>
+#include <cstdlib>
+#include <future>
+#include <iostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "communication/reactor/reactor_local.hpp"
+#include "gtest/gtest.h"
+#include "utils/exceptions.hpp"
+
+using namespace communication::reactor;
+using Subscription = EventStream::Subscription;
+
+TEST(SystemTest, ReturnWithoutThrowing) {
+  System system;
+  ASSERT_NO_THROW(
+      system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); }));
+  ASSERT_NO_THROW(system.AwaitShutdown());
+}
+
+TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
+  System system;
+  system.Spawn("master", [](Reactor &r) {
+    r.Open("channel");
+    ASSERT_THROW(r.Open("channel"), utils::BasicException);
+    r.CloseChannel("main");
+    r.CloseChannel("channel");
+  });
+  system.AwaitShutdown();
+}
+
+TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
+  System system;
+
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("master", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    r.CloseChannel("main");
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(SimpleSendTest, OneCallback) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  System system;
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageInt>(888);
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [](Reactor &r) {
+    EventStream *stream = r.main_.first;
+
+    stream->OnEvent<MessageInt>(
+        [&r](const MessageInt &msg, const Subscription &) {
+          ASSERT_EQ(msg.x, 888);
+          r.CloseChannel("main");
+        });
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(SimpleSendTest, IgnoreAfterClose) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  System system;
+
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageInt>(101);
+    channel_writer->Send<MessageInt>(102);  // should be ignored
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageInt>(103);  // should be ignored
+    channel_writer->Send<MessageInt>(104);  // should be ignored
+    // Write-end doesn't need to be closed because it's in RAII.
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [](Reactor &r) {
+    EventStream *stream = r.main_.first;
+    stream->OnEvent<MessageInt>(
+        [&r](const MessageInt &msg, const Subscription &) {
+          r.CloseChannel("main");
+          ASSERT_EQ(msg.x, 101);
+        });
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(SimpleSendTest, DuringFirstEvent) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  System system;
+
+  std::promise<int> p;
+  auto f = p.get_future();
+  system.Spawn("master", [&p](Reactor &r) mutable {
+    EventStream *stream = r.main_.first;
+
+    stream->OnEvent<MessageInt>(
+        [&](const Message &msg, const Subscription &subscription) {
+          const MessageInt &msgint = dynamic_cast<const MessageInt &>(msg);
+          if (msgint.x == 101) r.FindChannel("main")->Send<MessageInt>(102);
+          if (msgint.x == 102) {
+            subscription.Unsubscribe();
+            r.CloseChannel("main");
+            p.set_value(777);
+          }
+        });
+
+    std::shared_ptr<ChannelWriter> channel_writer = r.FindChannel("main");
+    channel_writer->Send<MessageInt>(101);
+  });
+
+  f.wait();
+  ASSERT_EQ(f.get(), 777);
+  system.AwaitShutdown();
+}
+
+TEST(MultipleSendTest, UnsubscribeService) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+  struct MessageChar : public Message {
+    MessageChar(char xx) : x(xx) {}
+    char x;
+  };
+
+  System system;
+
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageInt>(55);
+    channel_writer->Send<MessageInt>(66);
+    channel_writer->Send<MessageInt>(77);
+    channel_writer->Send<MessageInt>(88);
+    std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageChar>('a');
+    channel_writer->Send<MessageChar>('b');
+    channel_writer->Send<MessageChar>('c');
+    channel_writer->Send<MessageChar>('d');
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
+    EventStream *stream = r.main_.first;
+
+    stream->OnEvent<MessageInt>(
+        [&](const MessageInt &msgint, const Subscription &subscription) {
+          ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
+          ++num_received_messages;
+          if (msgint.x == 66) {
+            subscription.Unsubscribe();  // receive only two of them
+          }
+        });
+    stream->OnEvent<MessageChar>(
+        [&](const MessageChar &msgchar, const Subscription &subscription) {
+          char c = msgchar.x;
+          ++num_received_messages;
+          ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
+          if (num_received_messages == 5) {
+            subscription.Unsubscribe();
+            r.CloseChannel("main");
+          }
+        });
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(MultipleSendTest, OnEvent) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+  struct MessageChar : public Message {
+    MessageChar(char xx) : x(xx) {}
+    char x;
+  };
+
+  System system;
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
+    channel_writer->Send<MessageInt>(101);
+    channel_writer->Send<MessageChar>('a');
+    channel_writer->Send<MessageInt>(103);
+    channel_writer->Send<MessageChar>('b');
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
+    struct EndMessage : Message {};
+    EventStream *stream = r.main_.first;
+
+    stream->OnEvent<MessageInt>(
+        [&](const MessageInt &msgint, const Subscription &) {
+          ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
+          ++correct_vals;
+          r.main_.second->Send<EndMessage>();
+        });
+
+    stream->OnEvent<MessageChar>(
+        [&](const MessageChar &msgchar, const Subscription &) {
+          ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
+          ++correct_vals;
+          r.main_.second->Send<EndMessage>();
+        });
+
+    stream->OnEvent<EndMessage>([&](const EndMessage &, const Subscription &) {
+      ASSERT_LE(correct_vals, 4);
+      if (correct_vals == 4) {
+        r.CloseChannel("main");
+      }
+    });
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(MultipleSendTest, Chaining) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  System system;
+
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageInt>(55);
+    channel_writer->Send<MessageInt>(66);
+    channel_writer->Send<MessageInt>(77);
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [](Reactor &r) {
+    EventStream *stream = r.main_.first;
+
+    stream->OnEventOnce()
+        .ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
+          ASSERT_EQ(msg.x, 55);
+        })
+        .ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
+          ASSERT_EQ(msg.x, 66);
+        })
+        .ChainOnce<MessageInt>(
+            [&](const MessageInt &msg, const Subscription &) {
+              ASSERT_EQ(msg.x, 77);
+              r.CloseChannel("main");
+            });
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(MultipleSendTest, ChainingInRightOrder) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  struct MessageChar : public Message {
+    MessageChar(char xx) : x(xx) {}
+    char x;
+  };
+
+  System system;
+
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+    channel_writer->Send<MessageChar>('a');
+    channel_writer->Send<MessageInt>(55);
+    channel_writer->Send<MessageChar>('b');
+    channel_writer->Send<MessageInt>(77);
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [](Reactor &r) {
+    EventStream *stream = r.main_.first;
+    stream->OnEventOnce()
+        .ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
+          ASSERT_EQ(msg.x, 55);
+        })
+        .ChainOnce<MessageChar>(
+            [](const MessageChar &msg, const Subscription &) {
+              ASSERT_EQ(msg.x, 'b');
+            })
+        .ChainOnce<MessageInt>(
+            [&](const MessageInt &msg, const Subscription &) {
+              ASSERT_EQ(msg.x, 77);
+              r.CloseChannel("main");
+            });
+  });
+
+  system.AwaitShutdown();
+}
+
+TEST(MultipleSendTest, ProcessManyMessages) {
+  const static int kNumTests = 100;
+
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  System system;
+
+  system.Spawn("master", [](Reactor &r) {
+    std::shared_ptr<ChannelWriter> channel_writer;
+    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
+      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
+    for (int i = 0; i < kNumTests; ++i) {
+      channel_writer->Send<MessageInt>(rand());
+      std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
+    }
+    r.CloseChannel("main");
+  });
+
+  system.Spawn("worker", [vals = 0](Reactor & r) mutable {
+    struct EndMessage : Message {};
+    EventStream *stream = r.main_.first;
+    vals = 0;
+
+    stream->OnEvent<MessageInt>([&](const Message &, const Subscription &) {
+      ++vals;
+      r.main_.second->Send<EndMessage>();
+    });
+
+    stream->OnEvent<EndMessage>([&](const Message &, const Subscription &) {
+      ASSERT_LE(vals, kNumTests);
+      if (vals == kNumTests) {
+        r.CloseChannel("main");
+      }
+    });
+  });
+  system.AwaitShutdown();
+}
+
+int main(int argc, char **argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}