From 2d6675df634775286f2efcb5c33733a6fbc50574 Mon Sep 17 00:00:00 2001
From: Mislav Bradac <mislav.bradac@memgraph.io>
Date: Wed, 15 Nov 2017 16:26:10 +0100
Subject: [PATCH] Fix FindChannel implementation

Summary:
Change queue implementation in distributed reactor

Fix FindChannel implementations

Reviewers: mtomic, buda, dgleich

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D986
---
 src/communication/raft/raft.cpp               |  58 ++---
 src/communication/raft/raft.hpp               |   2 +
 src/communication/raft/raft_network.hpp       |   4 +-
 .../reactor/reactor_distributed.hpp           |  65 ++----
 src/communication/reactor/reactor_local.cpp   |  83 +++++--
 src/communication/reactor/reactor_local.hpp   |  83 +++----
 src/data_structures/queue.hpp                 |  23 +-
 tests/manual/raft_experiments.cpp             |   6 +-
 tests/unit/queue.cpp                          |  17 +-
 tests/unit/raft.cpp                           |  11 +-
 tests/unit/reactor_distributed.cpp            |  21 +-
 tests/unit/reactor_local.cpp                  | 211 ++++++++++--------
 12 files changed, 303 insertions(+), 281 deletions(-)

diff --git a/src/communication/raft/raft.cpp b/src/communication/raft/raft.cpp
index 3d0c500f2..25940eaa2 100644
--- a/src/communication/raft/raft.cpp
+++ b/src/communication/raft/raft.cpp
@@ -39,45 +39,33 @@ RaftMember::RaftMember(System &system, const std::string &id,
               }
             }
           },
-          true) {
-  system.Spawn(id, [this](Reactor &r) {
-    EventStream *stream = r.main_.first;
+          true),
+      reactor_(system.Spawn(id, [this](Reactor &r) {
+        EventStream *stream = r.main_.first;
 
-    stream->OnEvent<MLeaderTimeout>([this](
-        const MLeaderTimeout &, const Subscription &) { RunElection(); });
+        stream->OnEvent<MLeaderTimeout>([this](
+            const MLeaderTimeout &, const Subscription &) { RunElection(); });
 
-    stream->OnEvent<MRequestVote>(
-        [this](const MRequestVote &req, const Subscription &) {
-          network_.RequestVoteReply(req.sender_id, OnRequestVote(req));
-        });
-    stream->OnEvent<MRequestVoteReply>(
-        [this](const MRequestVoteReply &req, const Subscription &) {
-          OnRequestVoteReply(req);
-        });
+        stream->OnEvent<MRequestVote>(
+            [this](const MRequestVote &req, const Subscription &) {
+              network_.RequestVoteReply(req.sender_id, OnRequestVote(req));
+            });
+        stream->OnEvent<MRequestVoteReply>(
+            [this](const MRequestVoteReply &req, const Subscription &) {
+              OnRequestVoteReply(req);
+            });
 
-    stream->OnEvent<MAppendEntries>(
-        [this](const MAppendEntries &req, const Subscription &) {
-          network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req));
-        });
-    stream->OnEvent<MAppendEntriesReply>(
-        [this](const MAppendEntriesReply &rep, const Subscription &) {
-          OnAppendEntriesReply(rep);
-        });
+        stream->OnEvent<MAppendEntries>(
+            [this](const MAppendEntries &req, const Subscription &) {
+              network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req));
+            });
+        stream->OnEvent<MAppendEntriesReply>(
+            [this](const MAppendEntriesReply &rep, const Subscription &) {
+              OnAppendEntriesReply(rep);
+            });
+      })) {}
 
-    stream->OnEvent<MShutdown>([&r](const MShutdown &, const Subscription &) {
-      r.CloseChannel("main");
-    });
-  });
-}
-
-RaftMember::~RaftMember() {
-  LogInfo("Shutting down...");
-
-  auto channel = system_.FindChannel(id_, "main");
-  if (channel) {
-    channel->Send<MShutdown>();
-  }
-}
+RaftMember::~RaftMember() { LogInfo("Shutting down..."); }
 
 template <class... Args>
 void RaftMember::LogInfo(const std::string &format, Args &&... args) {
diff --git a/src/communication/raft/raft.hpp b/src/communication/raft/raft.hpp
index 07a4bf7c0..1aaa73ad9 100644
--- a/src/communication/raft/raft.hpp
+++ b/src/communication/raft/raft.hpp
@@ -49,6 +49,8 @@ class RaftMember {
   Watchdog leader_watchdog_;
   Watchdog heartbeat_watchdog_;
 
+  std::unique_ptr<reactor::Reactor> reactor_;
+
   void RunElection();
   void TransitionToFollower();
   void TransitionToCandidate();
diff --git a/src/communication/raft/raft_network.hpp b/src/communication/raft/raft_network.hpp
index 85d6c6a1a..ec522fadb 100644
--- a/src/communication/raft/raft_network.hpp
+++ b/src/communication/raft/raft_network.hpp
@@ -7,7 +7,6 @@
 namespace communication::raft {
 
 struct MLeaderTimeout : public communication::reactor::Message {};
-struct MShutdown : public communication::reactor::Message {};
 
 struct RaftMessage : public communication::reactor::Message {
   RaftMessage(int term, const std::string &sender_id)
@@ -98,7 +97,8 @@ class LocalReactorNetworkInterface : public RaftNetworkInterface {
 
 class FakeNetworkInterface : public RaftNetworkInterface {
  public:
-  explicit FakeNetworkInterface(communication::reactor::System &system) : system_(system) {}
+  explicit FakeNetworkInterface(communication::reactor::System &system)
+      : system_(system) {}
 
   bool RequestVote(const std::string &recipient,
                    const MRequestVote &msg) override {
diff --git a/src/communication/reactor/reactor_distributed.hpp b/src/communication/reactor/reactor_distributed.hpp
index 4af8cd7b6..b4c592bd7 100644
--- a/src/communication/reactor/reactor_distributed.hpp
+++ b/src/communication/reactor/reactor_distributed.hpp
@@ -15,6 +15,7 @@
 #include <gflags/gflags.h>
 
 #include "communication/reactor/reactor_local.hpp"
+#include "data_structures/queue.hpp"
 #include "protocol.hpp"
 
 #include "cereal/archives/binary.hpp"
@@ -70,46 +71,21 @@ class Network {
  public:
   Network() = default;
 
-  // client functions
-
-  std::shared_ptr<ChannelWriter> Resolve(std::string address, uint16_t port,
-                                         std::string reactor_name,
-                                         std::string channel_name) {
-    if (SendMessage(address, port, reactor_name, channel_name, nullptr)) {
-      return std::make_shared<RemoteChannelWriter>(this, address, port,
-                                                   reactor_name, channel_name);
-    }
-    LOG(WARNING) << "Could not resolve " << address << ":" << port << " "
-                 << reactor_name << "/" << channel_name;
-    return nullptr;
-  }
-
   /** Start a threadpool that dispatches the messages from the (outgoing) queue
    * to the sockets */
   void StartClient(int worker_count) {
     LOG(INFO) << "Starting " << worker_count << " client workers";
-    client_run_ = true;
 
+    // condition variables here...
     for (int i = 0; i < worker_count; ++i) {
       pool_.push_back(std::thread([this]() {
-        while (this->client_run_) {
-          this->mutex_.lock();
-          if (!this->queue_.empty()) {
-            NetworkMessage nm(std::move(this->queue_.front()));
-            this->queue_.pop();
-            this->mutex_.unlock();
-            // TODO: store success
-            bool success = SendMessage(nm.address, nm.port, nm.reactor,
-                                       nm.channel, std::move(nm.message));
-            DLOG(INFO) << "Network client message send status: " << success
-                       << std::endl;
-          } else {
-            this->mutex_.unlock();
-          }
-          std::this_thread::sleep_for(std::chrono::milliseconds(50));
+        while (true) {
+          auto message = queue_.AwaitPop();
+          if (message == std::experimental::nullopt) break;
+          SendMessage(message->address, message->port, message->reactor,
+                      message->channel, std::move(message->message));
         }
       }));
-      std::this_thread::sleep_for(std::chrono::milliseconds(5));
     }
   }
 
@@ -120,7 +96,7 @@ class Network {
         break;
       }
     }
-    client_run_ = false;
+    queue_.Signal();
     for (size_t i = 0; i < pool_.size(); ++i) {
       pool_[i].join();
     }
@@ -148,8 +124,8 @@ class Network {
 
     void Send(std::unique_ptr<Message> message) override {
       std::lock_guard<SpinLock> lock(network_->mutex_);
-      network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
-                                           std::move(message)));
+      network_->queue_.Emplace(address_, port_, reactor_, channel_,
+                               std::move(message));
     }
 
    private:
@@ -200,8 +176,7 @@ class Network {
   // client variables
   SpinLock mutex_;
   std::vector<std::thread> pool_;
-  std::queue<NetworkMessage> queue_;
-  std::atomic<bool> client_run_;
+  Queue<NetworkMessage> queue_;
 
   // server variables
   std::thread thread_;
@@ -221,15 +196,15 @@ class DistributedSystem : public ChannelFinder {
   }
 
   // Thread safe.
-  void Spawn(const std::string &name, std::function<void(Reactor &)> setup) {
-    system_.Spawn(name, setup, this);
+  std::unique_ptr<Reactor> Spawn(const std::string &name,
+                                 std::function<void(Reactor &)> setup) {
+    return system_.Spawn(name, setup, this);
   }
 
   // Non-thread safe.
-  // TODO: figure out what should be intereaction of this function and
+  // TODO: figure out what should be interection of this function and
   // destructor.
   void StopServices() {
-    system_.AwaitShutdown();
     network_.StopClient();
     network_.StopServer();
   }
@@ -250,14 +225,8 @@ class DistributedSystem : public ChannelFinder {
       const std::string &address, uint16_t port,
       const std::string &reactor_name,
       const std::string &channel_name) override {
-    // Yeah... Unneeded shared ptr... once again. We love that.
-    std::shared_ptr<ChannelWriter> channel_writer = nullptr;
-    // TODO: Check if this is actually local channel.
-    while (!(channel_writer =
-                 network_.Resolve(address, port, reactor_name, channel_name))) {
-      std::this_thread::sleep_for(std::chrono::milliseconds(200));
-    }
-    return channel_writer;
+    return std::make_shared<Network::RemoteChannelWriter>(
+        &network_, address, port, reactor_name, channel_name);
   }
 
   Network &network() { return network_; }
diff --git a/src/communication/reactor/reactor_local.cpp b/src/communication/reactor/reactor_local.cpp
index f33486718..db333271c 100644
--- a/src/communication/reactor/reactor_local.cpp
+++ b/src/communication/reactor/reactor_local.cpp
@@ -1,9 +1,13 @@
 #include "communication/reactor/reactor_local.hpp"
 
+#include <chrono>
+
 #include "utils/exceptions.hpp"
 
 namespace communication::reactor {
 
+using namespace std::literals::chrono_literals;
+
 void EventStream::Subscription::Unsubscribe() const {
   event_queue_.RemoveCallback(*this);
 }
@@ -18,13 +22,59 @@ std::string Channel::LocalChannelWriter::ReactorName() const {
   return reactor_name_;
 }
 
+void Channel::LocalChannelWriter::Send(std::unique_ptr<Message> m) {
+  // Atomic, per the standard.  We guarantee here that if channel exists it
+  // will not be destroyed by the end of this function.
+  for (int i = 0; i < 2; ++i) {
+    std::shared_ptr<Channel> queue = queue_.lock();
+    // Check if cached queue exists and send message.
+    if (queue) {
+      queue->Push(std::move(m));
+      break;
+    }
+    // If it doesn't exist. Check if there is a new channel with same name.
+    auto new_channel = system_.FindChannel(reactor_name_, channel_name_);
+    auto t =
+        std::dynamic_pointer_cast<Channel::LocalChannelWriter>(new_channel);
+    CHECK(t) << "t is of unexpected type";
+    queue_ = t->queue_;
+  }
+}
+
 std::string Channel::LocalChannelWriter::Name() const { return channel_name_; }
 
-void Channel::Close() {
-  // TODO(zuza): there will be major problems if a reactor tries to close a
-  // stream that isn't theirs luckily this should never happen if the framework
-  // is used as expected.
-  reactor_.CloseChannel(channel_name_);
+std::shared_ptr<Channel::LocalChannelWriter> Channel::LockedOpenChannel() {
+  // TODO(zuza): fix this CHECK using this answer
+  // https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
+  // TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky
+  // or that it doesn't fail sometimes, when it should.
+  CHECK(!self_ptr_.expired());
+  return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
+                                              self_ptr_, reactor_.system_);
+}
+
+void Channel::Close() { reactor_.CloseChannel(channel_name_); }
+
+Reactor::Reactor(ChannelFinder &system, const std::string &name,
+                 const std::function<void(Reactor &)> &setup, System &system2)
+    : system_(system),
+      system2_(system2),
+      name_(name),
+      setup_(setup),
+      main_(Open("main")),
+      thread_([this] {
+        setup_(*this);
+        RunEventLoop();
+        system2_.RemoveReactor(name_);
+      }) {}
+
+Reactor::~Reactor() {
+  {
+    std::unique_lock<std::mutex> guard(*mutex_);
+    channels_.clear();
+  }
+  cvar_->notify_all();
+  thread_.join();
 }
 
 std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
@@ -77,29 +127,18 @@ void Reactor::CloseChannel(const std::string &s) {
 }
 
 void Reactor::RunEventLoop() {
-  bool exit_event_loop = false;
-
   while (true) {
     // Find (or wait) for the next Message.
     PendingMessageInfo info;
     {
       std::unique_lock<std::mutex> guard(*mutex_);
-
-      while (true) {
-        // Not fair because was taken earlier, talk to lion.
+      // Exit the loop if there are no more Channels.
+      cvar_->wait_for(guard, 200ms, [&] {
+        if (channels_.empty()) return true;
         info = GetPendingMessages();
-        if (info.message != nullptr) break;
-
-        // Exit the loop if there are no more Channels.
-        if (channels_.empty()) {
-          exit_event_loop = true;
-          break;
-        }
-
-        cvar_->wait(guard);
-      }
-
-      if (exit_event_loop) break;
+        return static_cast<bool>(info.message);
+      });
+      if (channels_.empty()) break;
     }
 
     for (auto &callback_info : info.callbacks) {
diff --git a/src/communication/reactor/reactor_local.hpp b/src/communication/reactor/reactor_local.hpp
index fd24e7b05..72455bfdd 100644
--- a/src/communication/reactor/reactor_local.hpp
+++ b/src/communication/reactor/reactor_local.hpp
@@ -289,6 +289,8 @@ class Channel {
    * Messages sent to a closed channel are ignored.
    * There can be multiple LocalChannelWriters refering to the same stream if
    * needed.
+   *
+   * It must outlive System.
    */
   class LocalChannelWriter : public ChannelWriter {
    public:
@@ -296,22 +298,14 @@ class Channel {
 
     LocalChannelWriter(const std::string &reactor_name,
                        const std::string &channel_name,
-                       const std::weak_ptr<Channel> &queue)
+                       const std::weak_ptr<Channel> &queue,
+                       ChannelFinder &system)
         : reactor_name_(reactor_name),
           channel_name_(channel_name),
-          queue_(queue) {}
-
-    void Send(std::unique_ptr<Message> m) override {
-      // Atomic, per the standard.  We guarantee here that if channel exists it
-      // will not be destroyed by the end of this function.
-      std::shared_ptr<Channel> queue = queue_.lock();
-      if (queue) {
-        queue->Push(std::move(m));
-      }
-      // TODO: what should we do here? Channel doesn't exist so message will be
-      // lost.
-    }
+          queue_(queue),
+          system_(system) {}
 
+    void Send(std::unique_ptr<Message> m) override;
     std::string ReactorName() const override;
     std::string Name() const override;
 
@@ -319,6 +313,7 @@ class Channel {
     std::string reactor_name_;
     std::string channel_name_;
     std::weak_ptr<Channel> queue_;
+    ChannelFinder &system_;
   };
 
   /**
@@ -381,16 +376,7 @@ class Channel {
     cvar_->notify_one();
   }
 
-  std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
-    // TODO(zuza): fix this CHECK using this answer
-    // https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
-    // TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky
-    // or that it doesn't fail sometimes, when it should.
-    CHECK(!self_ptr_.expired());
-    return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
-                                                self_ptr_);
-  }
-
+  std::shared_ptr<LocalChannelWriter> LockedOpenChannel();
   std::unique_ptr<Message> LockedPop() { return LockedRawPop(); }
 
   void LockedOnEventHelper(std::type_index type_index,
@@ -444,12 +430,10 @@ class Channel {
 class Reactor {
   friend class System;
 
-  Reactor(ChannelFinder &system, const std::string &name,
-          const std::function<void(Reactor &)> &setup)
-      : system_(system), name_(name), setup_(setup), main_(Open("main")) {}
-
  public:
-  ~Reactor() {}
+  Reactor(ChannelFinder &system, const std::string &name,
+          const std::function<void(Reactor &)> &setup, System &system2);
+  ~Reactor();
 
   std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open(
       const std::string &s);
@@ -474,6 +458,7 @@ class Reactor {
   Reactor &operator=(Reactor &&other) = default;
 
   ChannelFinder &system_;
+  System &system2_;
   std::string name_;
   std::function<void(Reactor &)> setup_;
 
@@ -502,6 +487,8 @@ class Reactor {
         callbacks;
   };
 
+  std::thread thread_;
+
   /**
    * Dispatches all waiting messages to callbacks. Shuts down when there are no
    * callbacks left.
@@ -520,21 +507,18 @@ class System : public ChannelFinder {
   friend class Reactor;
   System() = default;
 
-  void Spawn(const std::string &name, std::function<void(Reactor &)> setup,
-             ChannelFinder *finder = nullptr) {
+  std::unique_ptr<Reactor> Spawn(const std::string &name,
+                                 std::function<void(Reactor &)> setup,
+                                 ChannelFinder *finder = nullptr) {
     if (!finder) {
       finder = this;
     }
     std::unique_lock<std::mutex> lock(mutex_);
     CHECK(reactors_.find(name) == reactors_.end())
         << "Reactor with name: '" << name << "' already exists.";
-    std::unique_ptr<Reactor> reactor(new Reactor(*finder, name, setup));
-    std::thread reactor_thread([reactor = reactor.get()] {
-      reactor->setup_(*reactor);
-      reactor->RunEventLoop();
-    });
-    reactors_.emplace(name, std::pair<decltype(reactor), std::thread>{
-                                std::move(reactor), std::move(reactor_thread)});
+    auto reactor = std::make_unique<Reactor>(*finder, name, setup, *this);
+    reactors_.emplace(name, reactor.get());
+    return reactor;
   }
 
   std::shared_ptr<ChannelWriter> FindChannel(
@@ -542,8 +526,10 @@ class System : public ChannelFinder {
       const std::string &channel_name) override {
     std::unique_lock<std::mutex> lock(mutex_);
     auto it_reactor = reactors_.find(reactor_name);
-    if (it_reactor == reactors_.end()) return nullptr;
-    return it_reactor->second.first->FindChannel(channel_name);
+    if (it_reactor == reactors_.end())
+      return std::shared_ptr<ChannelWriter>(new Channel::LocalChannelWriter(
+          reactor_name, channel_name, {}, *this));
+    return it_reactor->second->FindChannel(channel_name);
   }
 
   std::shared_ptr<ChannelWriter> FindChannel(const std::string &, uint16_t,
@@ -556,16 +542,11 @@ class System : public ChannelFinder {
     LOG(FATAL) << "Tried to resolve remote channel in local System";
   }
 
-  // TODO: Think about interaction with destructor. Should we call this in
-  // destructor, complain in destructor if there are alive threads or stop
-  // them
-  // in some way.
-  void AwaitShutdown() {
-    for (auto &key_value : reactors_) {
-      auto &thread = key_value.second.second;
-      thread.join();
-    }
-    reactors_.clear();
+  void RemoveReactor(const std::string &name_) {
+    std::unique_lock<std::mutex> guard(mutex_);
+    auto it = reactors_.find(name_);
+    CHECK(it != reactors_.end()) << "Trying to delete notexisting reactor";
+    reactors_.erase(it);
   }
 
  private:
@@ -575,9 +556,7 @@ class System : public ChannelFinder {
   System &operator=(System &&) = delete;
 
   std::mutex mutex_;
-  std::unordered_map<std::string,
-                     std::pair<std::unique_ptr<Reactor>, std::thread>>
-      reactors_;
+  std::unordered_map<std::string, Reactor *> reactors_;
 };
 
 using Subscription = Channel::LocalEventStream::Subscription;
diff --git a/src/data_structures/queue.hpp b/src/data_structures/queue.hpp
index 230a046f8..a13dc98a9 100644
--- a/src/data_structures/queue.hpp
+++ b/src/data_structures/queue.hpp
@@ -1,5 +1,6 @@
 #pragma once
 
+#include <atomic>
 #include <condition_variable>
 #include <cstdint>
 #include <experimental/optional>
@@ -17,6 +18,8 @@ class Queue {
   Queue(Queue &&) = delete;
   Queue &operator=(Queue &&) = delete;
 
+  ~Queue() { Signal(); }
+
   void Push(T x) {
     std::unique_lock<std::mutex> guard(mutex_);
     queue_.emplace(std::move(x));
@@ -42,23 +45,35 @@ class Queue {
     return queue_.empty();
   }
 
-  T AwaitPop() {
+  // Block until there is an element in the queue and then pop it from the queue
+  // and return it. Function can return nullopt only if Queue is signaled via
+  // Signal function.
+  std::experimental::optional<T> AwaitPop() {
     std::unique_lock<std::mutex> guard(mutex_);
-    cvar_.wait(guard, [this]() { return !queue_.empty(); });
-    auto x = std::move(queue_.front());
+    cvar_.wait(guard, [this] { return !queue_.empty() || signaled_; });
+    if (queue_.empty()) return std::experimental::nullopt;
+    std::experimental::optional<T> x(std::move(queue_.front()));
     queue_.pop();
     return x;
   }
 
+  // Nonblocking version of above function.
   std::experimental::optional<T> MaybePop() {
     std::unique_lock<std::mutex> guard(mutex_);
     if (queue_.empty()) return std::experimental::nullopt;
-    auto x = std::move(queue_.front());
+    std::experimental::optional<T> x(std::move(queue_.front()));
     queue_.pop();
     return x;
   }
 
+  // Notify all threads waiting on conditional variable to stop waiting.
+  void Signal() {
+    signaled_ = true;
+    cvar_.notify_all();
+  }
+
  private:
+  std::atomic<bool> signaled_{false};
   std::queue<T> queue_;
   std::condition_variable cvar_;
   mutable std::mutex mutex_;
diff --git a/tests/manual/raft_experiments.cpp b/tests/manual/raft_experiments.cpp
index 3508dd841..3292708b7 100644
--- a/tests/manual/raft_experiments.cpp
+++ b/tests/manual/raft_experiments.cpp
@@ -51,8 +51,6 @@ milliseconds InitialElection(const RaftConfig &config) {
     end = std::chrono::system_clock::now();
   }
 
-  sys.AwaitShutdown();
-
   return std::chrono::duration_cast<milliseconds>(end - start);
 }
 
@@ -104,8 +102,6 @@ milliseconds Reelection(const RaftConfig &config) {
     end = std::chrono::system_clock::now();
   }
 
-  sys.AwaitShutdown();
-
   return std::chrono::duration_cast<milliseconds>(end - start);
 }
 
@@ -125,7 +121,7 @@ std::vector<milliseconds> RunTest(const std::string &name,
   return results;
 }
 
-int main(int argc, char *argv[]) {
+int main(int, char *argv[]) {
   google::InitGoogleLogging(argv[0]);
 
   RaftConfig config{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms};
diff --git a/tests/unit/queue.cpp b/tests/unit/queue.cpp
index 812538815..310781821 100644
--- a/tests/unit/queue.cpp
+++ b/tests/unit/queue.cpp
@@ -78,11 +78,18 @@ TEST(Queue, AwaitPop) {
     q.Push(4);
   });
 
-  EXPECT_EQ(q.AwaitPop(), 1);
-  EXPECT_EQ(q.AwaitPop(), 2);
-  EXPECT_EQ(q.AwaitPop(), 3);
-  EXPECT_EQ(q.AwaitPop(), 4);
+  EXPECT_EQ(*q.AwaitPop(), 1);
+  EXPECT_EQ(*q.AwaitPop(), 2);
+  EXPECT_EQ(*q.AwaitPop(), 3);
+  EXPECT_EQ(*q.AwaitPop(), 4);
   t.join();
+
+  std::thread t2([&] {
+    std::this_thread::sleep_for(100ms);
+    q.Signal();
+  });
+  EXPECT_EQ(q.AwaitPop(), std::experimental::nullopt);
+  t2.join();
 }
 
 TEST(Queue, Concurrent) {
@@ -112,7 +119,7 @@ TEST(Queue, Concurrent) {
           while (true) {
             int count = num_retrieved++;
             if (count >= kNumProducers * kNumElementsPerProducer) break;
-            retrieved[thread_id].push_back(q.AwaitPop());
+            retrieved[thread_id].push_back(*q.AwaitPop());
           }
         },
         i);
diff --git a/tests/unit/raft.cpp b/tests/unit/raft.cpp
index ad154a7c4..4e5a0439c 100644
--- a/tests/unit/raft.cpp
+++ b/tests/unit/raft.cpp
@@ -27,8 +27,8 @@ TEST(Raft, InitialElection) {
   {
     std::vector<std::unique_ptr<RaftMemberTest>> members;
     for (const auto &member_id : test_config.members) {
-      members.push_back(
-          std::make_unique<RaftMemberTest>(sys, member_id, test_config, network));
+      members.push_back(std::make_unique<RaftMemberTest>(sys, member_id,
+                                                         test_config, network));
       network.Connect(member_id);
     }
 
@@ -39,7 +39,6 @@ TEST(Raft, InitialElection) {
       EXPECT_EQ(member->Leader(), leader);
     }
   }
-  sys.AwaitShutdown();
 }
 
 TEST(Raft, Reelection) {
@@ -49,8 +48,8 @@ TEST(Raft, Reelection) {
   {
     std::vector<std::unique_ptr<RaftMemberTest>> members;
     for (const auto &member_id : test_config.members) {
-      members.push_back(
-          std::make_unique<RaftMemberTest>(sys, member_id, test_config, network));
+      members.push_back(std::make_unique<RaftMemberTest>(sys, member_id,
+                                                         test_config, network));
       network.Connect(member_id);
     }
 
@@ -76,6 +75,4 @@ TEST(Raft, Reelection) {
       EXPECT_EQ(member->Leader(), second_leader);
     }
   }
-
-  sys.AwaitShutdown();
 }
diff --git a/tests/unit/reactor_distributed.cpp b/tests/unit/reactor_distributed.cpp
index 292e09b41..9d8fd8d6d 100644
--- a/tests/unit/reactor_distributed.cpp
+++ b/tests/unit/reactor_distributed.cpp
@@ -18,6 +18,7 @@
 #include "gtest/gtest.h"
 
 using namespace communication::reactor;
+using namespace std::literals::chrono_literals;
 
 struct MessageInt : public Message {
   MessageInt() {}  // cereal needs this
@@ -52,7 +53,7 @@ CEREAL_REGISTER_TYPE(RequestMessage);
 TEST(SimpleTests, StartAndStopServices) {
   DistributedSystem system;
   // do nothing
-  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  std::this_thread::sleep_for(500ms);
   system.StopServices();
 }
 
@@ -65,13 +66,14 @@ TEST(SimpleTests, StartAndStopServices) {
 TEST(SimpleTests, SendEmptyMessage) {
   DistributedSystem system;
 
-  system.Spawn("master", [](Reactor &r) {
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
     auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
     writer->Send<Message>();
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
     r.main_.first->OnEventOnce().ChainOnce<Message>(
         [&](const Message &, const Subscription &subscription) {
           // if this message isn't delivered, the main channel will never be
@@ -80,6 +82,7 @@ TEST(SimpleTests, SendEmptyMessage) {
         });
   });
 
+  std::this_thread::sleep_for(400ms);
   system.StopServices();
 }
 
@@ -93,7 +96,8 @@ TEST(SimpleTests, SendEmptyMessage) {
 TEST(SimpleTests, SendReturnAddressMessage) {
   DistributedSystem system;
 
-  system.Spawn("master", [](Reactor &r) {
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
     auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
     writer->Send<ReturnAddressMessage>(r.name(), "main");
     r.main_.first->OnEvent<MessageInt>(
@@ -102,7 +106,7 @@ TEST(SimpleTests, SendReturnAddressMessage) {
           r.CloseChannel("main");
         });
   });
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
     r.main_.first->OnEvent<ReturnAddressMessage>(
         [&](const ReturnAddressMessage &message, const Subscription &) {
           message.FindChannel(r.system_)->Send<MessageInt>(5);
@@ -110,6 +114,7 @@ TEST(SimpleTests, SendReturnAddressMessage) {
         });
   });
 
+  std::this_thread::sleep_for(400ms);
   system.StopServices();
 }
 
@@ -123,7 +128,8 @@ TEST(SimpleTests, SendReturnAddressMessage) {
 TEST(SimpleTests, SendSerializableMessage) {
   DistributedSystem system;
 
-  system.Spawn("master", [](Reactor &r) {
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
     auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
     writer->Send<RequestMessage>(r.name(), "main", 123);
     r.main_.first->OnEvent<MessageInt>(
@@ -133,7 +139,7 @@ TEST(SimpleTests, SendSerializableMessage) {
         });
   });
 
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
     r.main_.first->OnEvent<RequestMessage>(
         [&](const RequestMessage &message, const Subscription &) {
           ASSERT_EQ(message.x, 123);
@@ -142,6 +148,7 @@ TEST(SimpleTests, SendSerializableMessage) {
         });
   });
 
+  std::this_thread::sleep_for(400ms);
   system.StopServices();
 }
 
diff --git a/tests/unit/reactor_local.cpp b/tests/unit/reactor_local.cpp
index 828c0ed46..08f302aca 100644
--- a/tests/unit/reactor_local.cpp
+++ b/tests/unit/reactor_local.cpp
@@ -11,47 +11,26 @@
 #include "gtest/gtest.h"
 #include "utils/exceptions.hpp"
 
+using namespace std::literals::chrono_literals;
 using namespace communication::reactor;
 using Subscription = EventStream::Subscription;
 
 TEST(SystemTest, ReturnWithoutThrowing) {
   System system;
-  ASSERT_NO_THROW(
-      system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); }));
-  ASSERT_NO_THROW(system.AwaitShutdown());
+  auto master =
+      system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); });
+  std::this_thread::sleep_for(100ms);
 }
 
 TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
   System system;
-  system.Spawn("master", [](Reactor &r) {
+  auto master = system.Spawn("master", [](Reactor &r) {
     r.Open("channel");
     ASSERT_THROW(r.Open("channel"), utils::BasicException);
     r.CloseChannel("main");
     r.CloseChannel("channel");
   });
-  system.AwaitShutdown();
-}
-
-TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
-  System system;
-
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
-    std::this_thread::sleep_for(std::chrono::milliseconds(300));
-    r.CloseChannel("main");
-  });
-
-  system.Spawn("worker", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("master", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
-    std::this_thread::sleep_for(std::chrono::milliseconds(300));
-    r.CloseChannel("main");
-  });
-
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(100ms);
 }
 
 TEST(SimpleSendTest, OneCallback) {
@@ -61,15 +40,14 @@ TEST(SimpleSendTest, OneCallback) {
   };
 
   System system;
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    auto channel_writer = r.system_.FindChannel("worker", "main");
     channel_writer->Send<MessageInt>(888);
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
     EventStream *stream = r.main_.first;
 
     stream->OnEvent<MessageInt>(
@@ -78,8 +56,7 @@ TEST(SimpleSendTest, OneCallback) {
           r.CloseChannel("main");
         });
   });
-
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(200ms);
 }
 
 TEST(SimpleSendTest, IgnoreAfterClose) {
@@ -90,10 +67,10 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
 
   System system;
 
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    std::shared_ptr<ChannelWriter> channel_writer =
+        r.system_.FindChannel("worker", "main");
     channel_writer->Send<MessageInt>(101);
     channel_writer->Send<MessageInt>(102);  // should be ignored
     std::this_thread::sleep_for(std::chrono::milliseconds(300));
@@ -103,7 +80,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
     EventStream *stream = r.main_.first;
     stream->OnEvent<MessageInt>(
         [&r](const MessageInt &msg, const Subscription &) {
@@ -112,7 +89,53 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
         });
   });
 
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
+}
+
+TEST(SimpleSendTest, RecreateChannelAfterClosing) {
+  struct MessageInt : public Message {
+    MessageInt(int xx) : x(xx) {}
+    int x;
+  };
+
+  System system;
+
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    auto channel_writer = r.system_.FindChannel("worker", "main");
+    // Original "worker" reactor will die after it process this message.
+    channel_writer->Send<MessageInt>(101);
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    // This message will be dropped since there is no reactor with name
+    // "worker".
+    channel_writer->Send<MessageInt>(102);
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+    // This message should recieved by new "worker" reactor.
+    channel_writer->Send<MessageInt>(103);
+    r.CloseChannel("main");
+  });
+
+  auto worker = system.Spawn("worker", [](Reactor &r) {
+    EventStream *stream = r.main_.first;
+    stream->OnEvent<MessageInt>(
+        [&r](const MessageInt &msg, const Subscription &) {
+          r.CloseChannel("main");
+          ASSERT_EQ(msg.x, 101);
+        });
+  });
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(300));
+
+  auto worker2 = system.Spawn("worker", [](Reactor &r) {
+    EventStream *stream = r.main_.first;
+    stream->OnEvent<MessageInt>(
+        [&r](const MessageInt &msg, const Subscription &) {
+          r.CloseChannel("main");
+          ASSERT_EQ(msg.x, 103);
+        });
+  });
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(300));
 }
 
 TEST(SimpleSendTest, DuringFirstEvent) {
@@ -125,7 +148,7 @@ TEST(SimpleSendTest, DuringFirstEvent) {
 
   std::promise<int> p;
   auto f = p.get_future();
-  system.Spawn("master", [&p](Reactor &r) mutable {
+  auto master = system.Spawn("master", [&p](Reactor &r) mutable {
     EventStream *stream = r.main_.first;
 
     stream->OnEvent<MessageInt>(
@@ -145,7 +168,6 @@ TEST(SimpleSendTest, DuringFirstEvent) {
 
   f.wait();
   ASSERT_EQ(f.get(), 777);
-  system.AwaitShutdown();
 }
 
 TEST(MultipleSendTest, UnsubscribeService) {
@@ -160,10 +182,10 @@ TEST(MultipleSendTest, UnsubscribeService) {
 
   System system;
 
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    std::shared_ptr<ChannelWriter> channel_writer =
+        r.system_.FindChannel("worker", "main");
     channel_writer->Send<MessageInt>(55);
     channel_writer->Send<MessageInt>(66);
     channel_writer->Send<MessageInt>(77);
@@ -176,30 +198,31 @@ TEST(MultipleSendTest, UnsubscribeService) {
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
-    EventStream *stream = r.main_.first;
+  auto worker =
+      system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
+        EventStream *stream = r.main_.first;
 
-    stream->OnEvent<MessageInt>(
-        [&](const MessageInt &msgint, const Subscription &subscription) {
-          ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
-          ++num_received_messages;
-          if (msgint.x == 66) {
-            subscription.Unsubscribe();  // receive only two of them
-          }
-        });
-    stream->OnEvent<MessageChar>(
-        [&](const MessageChar &msgchar, const Subscription &subscription) {
-          char c = msgchar.x;
-          ++num_received_messages;
-          ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
-          if (num_received_messages == 5) {
-            subscription.Unsubscribe();
-            r.CloseChannel("main");
-          }
-        });
-  });
+        stream->OnEvent<MessageInt>(
+            [&](const MessageInt &msgint, const Subscription &subscription) {
+              ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
+              ++num_received_messages;
+              if (msgint.x == 66) {
+                subscription.Unsubscribe();  // receive only two of them
+              }
+            });
+        stream->OnEvent<MessageChar>(
+            [&](const MessageChar &msgchar, const Subscription &subscription) {
+              char c = msgchar.x;
+              ++num_received_messages;
+              ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
+              if (num_received_messages == 5) {
+                subscription.Unsubscribe();
+                r.CloseChannel("main");
+              }
+            });
+      });
 
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(std::chrono::milliseconds(500));
 }
 
 TEST(MultipleSendTest, OnEvent) {
@@ -213,10 +236,10 @@ TEST(MultipleSendTest, OnEvent) {
   };
 
   System system;
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    std::shared_ptr<ChannelWriter> channel_writer =
+        r.system_.FindChannel("worker", "main");
 
     channel_writer->Send<MessageInt>(101);
     channel_writer->Send<MessageChar>('a');
@@ -225,7 +248,7 @@ TEST(MultipleSendTest, OnEvent) {
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
+  auto worker = system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
     struct EndMessage : Message {};
     EventStream *stream = r.main_.first;
 
@@ -251,7 +274,7 @@ TEST(MultipleSendTest, OnEvent) {
     });
   });
 
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(std::chrono::milliseconds(300));
 }
 
 TEST(MultipleSendTest, Chaining) {
@@ -262,17 +285,17 @@ TEST(MultipleSendTest, Chaining) {
 
   System system;
 
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    std::shared_ptr<ChannelWriter> channel_writer =
+        r.system_.FindChannel("worker", "main");
     channel_writer->Send<MessageInt>(55);
     channel_writer->Send<MessageInt>(66);
     channel_writer->Send<MessageInt>(77);
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
     EventStream *stream = r.main_.first;
 
     stream->OnEventOnce()
@@ -288,8 +311,7 @@ TEST(MultipleSendTest, Chaining) {
               r.CloseChannel("main");
             });
   });
-
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(std::chrono::milliseconds(300));
 }
 
 TEST(MultipleSendTest, ChainingInRightOrder) {
@@ -305,10 +327,10 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
 
   System system;
 
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    std::shared_ptr<ChannelWriter> channel_writer =
+        r.system_.FindChannel("worker", "main");
     channel_writer->Send<MessageChar>('a');
     channel_writer->Send<MessageInt>(55);
     channel_writer->Send<MessageChar>('b');
@@ -316,7 +338,8 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [](Reactor &r) {
+  auto worker = system.Spawn("worker", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
     EventStream *stream = r.main_.first;
     stream->OnEventOnce()
         .ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
@@ -333,7 +356,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
             });
   });
 
-  system.AwaitShutdown();
+  std::this_thread::sleep_for(300ms);
 }
 
 TEST(MultipleSendTest, ProcessManyMessages) {
@@ -346,12 +369,11 @@ TEST(MultipleSendTest, ProcessManyMessages) {
 
   System system;
 
-  system.Spawn("master", [](Reactor &r) {
-    std::shared_ptr<ChannelWriter> channel_writer;
-    while (!(channel_writer = r.system_.FindChannel("worker", "main")))
-      std::this_thread::sleep_for(std::chrono::milliseconds(300));
+  auto master = system.Spawn("master", [](Reactor &r) {
+    std::this_thread::sleep_for(100ms);
+    std::shared_ptr<ChannelWriter> channel_writer =
+        r.system_.FindChannel("worker", "main");
 
-    std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
     for (int i = 0; i < kNumTests; ++i) {
       channel_writer->Send<MessageInt>(rand());
       std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
@@ -359,7 +381,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
     r.CloseChannel("main");
   });
 
-  system.Spawn("worker", [vals = 0](Reactor & r) mutable {
+  auto worker = system.Spawn("worker", [vals = 0](Reactor & r) mutable {
     struct EndMessage : Message {};
     EventStream *stream = r.main_.first;
     vals = 0;
@@ -376,7 +398,8 @@ TEST(MultipleSendTest, ProcessManyMessages) {
       }
     });
   });
-  system.AwaitShutdown();
+
+  std::this_thread::sleep_for(1000ms);
 }
 
 int main(int argc, char **argv) {