From 9448e23dc9bfe74decf0a1bbfa6525777f10a264 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 27 Oct 2022 13:36:53 +0000
Subject: [PATCH 01/31] Check-in basic shard scheduler skeleton

---
 src/storage/v3/shard_manager.hpp   |  4 ++-
 src/storage/v3/shard_scheduler.hpp | 43 ++++++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)
 create mode 100644 src/storage/v3/shard_scheduler.hpp

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index a119148e9..ba59c403f 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -28,6 +28,7 @@
 #include <storage/v3/shard_rsm.hpp>
 #include "coordinator/shard_map.hpp"
 #include "storage/v3/config.hpp"
+#include "storage/v3/shard_scheduler.hpp"
 
 namespace memgraph::storage::v3 {
 
@@ -78,7 +79,7 @@ template <typename IoImpl>
 class ShardManager {
  public:
   ShardManager(io::Io<IoImpl> io, Address coordinator_leader, coordinator::ShardMap shard_map)
-      : io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)} {}
+      : io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)}, shard_scheduler_(io) {}
 
   /// Periodic protocol maintenance. Returns the time that Cron should be called again
   /// in the future.
@@ -133,6 +134,7 @@ class ShardManager {
 
  private:
   io::Io<IoImpl> io_;
+  ShardScheduler<IoImpl> shard_scheduler_;
   std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
   std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
   Time next_cron_ = Time::min();
diff --git a/src/storage/v3/shard_scheduler.hpp b/src/storage/v3/shard_scheduler.hpp
new file mode 100644
index 000000000..6cb1bf421
--- /dev/null
+++ b/src/storage/v3/shard_scheduler.hpp
@@ -0,0 +1,43 @@
+// Copyright 2022 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+#include <boost/asio/thread_pool.hpp>
+#include <boost/uuid/uuid.hpp>
+
+#include "io/rsm/raft.hpp"
+#include "query/v2/requests.hpp"
+#include "storage/v3/shard_manager.hpp"
+
+namespace memgraph::storage::v3 {
+
+using memgraph::io::rsm::Raft;
+using memgraph::io::rsm::WriteRequest;
+using memgraph::io::rsm::WriteResponse;
+using memgraph::msgs::ReadRequests;
+using memgraph::msgs::ReadResponses;
+using memgraph::msgs::WriteRequests;
+using memgraph::msgs::WriteResponses;
+
+template <typename IoImpl>
+using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
+
+template <class IoImpl>
+class ShardScheduler {
+  std::map<boost::uuids::uuid, ShardRaft<IoImpl>> rsm_map_;
+  io::Io<IoImpl> io_;
+
+ public:
+  ShardScheduler(io::Io<IoImpl> io) : io_(io) {}
+};
+
+}  // namespace memgraph::storage::v3

From cebe6f62fa1d9f17663163efd177032311ba8788 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 31 Oct 2022 16:03:47 +0000
Subject: [PATCH 02/31] Implement skeleton worker threadpool for the
 ShardManager

---
 src/machine_manager/machine_config.hpp  |   5 +
 src/machine_manager/machine_manager.hpp |   4 +-
 src/storage/v3/shard_manager.hpp        | 107 ++++++++++-----
 src/storage/v3/shard_scheduler.hpp      |  43 ------
 src/storage/v3/shard_worker.hpp         | 173 ++++++++++++++++++++++++
 5 files changed, 252 insertions(+), 80 deletions(-)
 delete mode 100644 src/storage/v3/shard_scheduler.hpp
 create mode 100644 src/storage/v3/shard_worker.hpp

diff --git a/src/machine_manager/machine_config.hpp b/src/machine_manager/machine_config.hpp
index 6e46d2b83..1ddc34ae2 100644
--- a/src/machine_manager/machine_config.hpp
+++ b/src/machine_manager/machine_config.hpp
@@ -11,7 +11,11 @@
 
 #pragma once
 
+#include <algorithm>
+#include <thread>
+
 #include <boost/asio/ip/tcp.hpp>
+
 #include "io/address.hpp"
 #include "storage/v3/property_value.hpp"
 #include "storage/v3/schemas.hpp"
@@ -37,6 +41,7 @@ struct MachineConfig {
   bool is_query_engine;
   boost::asio::ip::address listen_ip;
   uint16_t listen_port;
+  size_t n_shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
 };
 
 }  // namespace memgraph::machine_manager
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index aab658429..eb43e588e 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -70,11 +70,11 @@ class MachineManager {
  public:
   // TODO initialize ShardManager with "real" coordinator addresses instead of io.GetAddress
   // which is only true for single-machine config.
-  MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator, coordinator::ShardMap &shard_map)
+  MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator, coordinator::ShardMap shard_map)
       : io_(io),
         config_(config),
         coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
-        shard_manager_{io.ForkLocal(), coordinator_.GetAddress(), shard_map} {}
+        shard_manager_{io.ForkLocal(), config.n_shard_worker_threads, coordinator_.GetAddress(), shard_map} {}
 
   Address CoordinatorAddress() { return coordinator_.GetAddress(); }
 
diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index ba59c403f..964b6ba9f 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -16,45 +16,45 @@
 
 #include <boost/uuid/uuid.hpp>
 
-#include <coordinator/coordinator.hpp>
-#include <io/address.hpp>
-#include <io/message_conversion.hpp>
-#include <io/messages.hpp>
-#include <io/rsm/raft.hpp>
-#include <io/time.hpp>
-#include <io/transport.hpp>
-#include <query/v2/requests.hpp>
-#include <storage/v3/shard.hpp>
-#include <storage/v3/shard_rsm.hpp>
+#include "coordinator/coordinator.hpp"
 #include "coordinator/shard_map.hpp"
+#include "io/address.hpp"
+#include "io/message_conversion.hpp"
+#include "io/messages.hpp"
+#include "io/rsm/raft.hpp"
+#include "io/time.hpp"
+#include "io/transport.hpp"
+#include "query/v2/requests.hpp"
 #include "storage/v3/config.hpp"
-#include "storage/v3/shard_scheduler.hpp"
+#include "storage/v3/shard.hpp"
+#include "storage/v3/shard_rsm.hpp"
+#include "storage/v3/shard_worker.hpp"
 
 namespace memgraph::storage::v3 {
 
 using boost::uuids::uuid;
 
-using memgraph::coordinator::CoordinatorWriteRequests;
-using memgraph::coordinator::CoordinatorWriteResponses;
-using memgraph::coordinator::HeartbeatRequest;
-using memgraph::coordinator::HeartbeatResponse;
-using memgraph::io::Address;
-using memgraph::io::Duration;
-using memgraph::io::Message;
-using memgraph::io::RequestId;
-using memgraph::io::ResponseFuture;
-using memgraph::io::Time;
-using memgraph::io::messages::CoordinatorMessages;
-using memgraph::io::messages::ShardManagerMessages;
-using memgraph::io::messages::ShardMessages;
-using memgraph::io::rsm::Raft;
-using memgraph::io::rsm::WriteRequest;
-using memgraph::io::rsm::WriteResponse;
-using memgraph::msgs::ReadRequests;
-using memgraph::msgs::ReadResponses;
-using memgraph::msgs::WriteRequests;
-using memgraph::msgs::WriteResponses;
-using memgraph::storage::v3::ShardRsm;
+using coordinator::CoordinatorWriteRequests;
+using coordinator::CoordinatorWriteResponses;
+using coordinator::HeartbeatRequest;
+using coordinator::HeartbeatResponse;
+using io::Address;
+using io::Duration;
+using io::Message;
+using io::RequestId;
+using io::ResponseFuture;
+using io::Time;
+using io::messages::CoordinatorMessages;
+using io::messages::ShardManagerMessages;
+using io::messages::ShardMessages;
+using io::rsm::Raft;
+using io::rsm::WriteRequest;
+using io::rsm::WriteResponse;
+using msgs::ReadRequests;
+using msgs::ReadResponses;
+using msgs::WriteRequests;
+using msgs::WriteResponses;
+using storage::v3::ShardRsm;
 
 using ShardManagerOrRsmMessage = std::variant<ShardMessages, ShardManagerMessages>;
 using TimeUuidPair = std::pair<Time, uuid>;
@@ -78,8 +78,44 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
 template <typename IoImpl>
 class ShardManager {
  public:
-  ShardManager(io::Io<IoImpl> io, Address coordinator_leader, coordinator::ShardMap shard_map)
-      : io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)}, shard_scheduler_(io) {}
+  ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader,
+               coordinator::ShardMap shard_map)
+      : io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)} {
+    MG_ASSERT(n_shard_worker_threads >= 1);
+
+    shard_worker::Queue queue;
+
+    for (int i = 0; i < n_shard_worker_threads; i++) {
+      shard_worker::Queue queue;
+      shard_worker::ShardWorker worker{io, queue};
+      auto worker_handle = std::jthread([worker = std::move(worker)]() mutable { worker.Run(); });
+
+      workers_.emplace_back(queue);
+      worker_handles_.emplace_back(std::move(worker_handle));
+    }
+  }
+
+  ShardManager(ShardManager &&) = default;
+  ShardManager &operator=(ShardManager &&) = default;
+  ShardManager(const ShardManager &) = delete;
+  ShardManager &operator=(const ShardManager &) = delete;
+
+  ~ShardManager() {
+    auto shutdown_acks = std::vector<io::Future<bool>>{};
+    for (auto worker : workers_) {
+      auto [future, promise] = io::FuturePromisePair<bool>();
+      worker.Push(shard_worker::ShutDown{.acknowledge_shutdown = std::move(promise)});
+      shutdown_acks.emplace_back(std::move(future));
+    }
+
+    for (auto &&ack : shutdown_acks) {
+      bool acked = std::move(ack).Wait();
+      MG_ASSERT(acked);
+    }
+
+    // The jthread handes for our shard worker threads will be
+    // blocked on implicitly when worker_handles_ is destroyed.
+  }
 
   /// Periodic protocol maintenance. Returns the time that Cron should be called again
   /// in the future.
@@ -134,7 +170,8 @@ class ShardManager {
 
  private:
   io::Io<IoImpl> io_;
-  ShardScheduler<IoImpl> shard_scheduler_;
+  std::vector<shard_worker::Queue> workers_;
+  std::vector<std::jthread> worker_handles_;
   std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
   std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
   Time next_cron_ = Time::min();
diff --git a/src/storage/v3/shard_scheduler.hpp b/src/storage/v3/shard_scheduler.hpp
deleted file mode 100644
index 6cb1bf421..000000000
--- a/src/storage/v3/shard_scheduler.hpp
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2022 Memgraph Ltd.
-//
-// Use of this software is governed by the Business Source License
-// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
-// License, and you may not use this file except in compliance with the Business Source License.
-//
-// As of the Change Date specified in that file, in accordance with
-// the Business Source License, use of this software will be governed
-// by the Apache License, Version 2.0, included in the file
-// licenses/APL.txt.
-
-#pragma once
-
-#include <boost/asio/thread_pool.hpp>
-#include <boost/uuid/uuid.hpp>
-
-#include "io/rsm/raft.hpp"
-#include "query/v2/requests.hpp"
-#include "storage/v3/shard_manager.hpp"
-
-namespace memgraph::storage::v3 {
-
-using memgraph::io::rsm::Raft;
-using memgraph::io::rsm::WriteRequest;
-using memgraph::io::rsm::WriteResponse;
-using memgraph::msgs::ReadRequests;
-using memgraph::msgs::ReadResponses;
-using memgraph::msgs::WriteRequests;
-using memgraph::msgs::WriteResponses;
-
-template <typename IoImpl>
-using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
-
-template <class IoImpl>
-class ShardScheduler {
-  std::map<boost::uuids::uuid, ShardRaft<IoImpl>> rsm_map_;
-  io::Io<IoImpl> io_;
-
- public:
-  ShardScheduler(io::Io<IoImpl> io) : io_(io) {}
-};
-
-}  // namespace memgraph::storage::v3
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
new file mode 100644
index 000000000..e47f02e97
--- /dev/null
+++ b/src/storage/v3/shard_worker.hpp
@@ -0,0 +1,173 @@
+// Copyright 2022 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+#include <deque>
+#include <memory>
+#include <queue>
+#include <variant>
+
+#include <boost/uuid/uuid.hpp>
+
+#include "io/future.hpp"
+#include "io/rsm/raft.hpp"
+#include "io/time.hpp"
+#include "io/transport.hpp"
+#include "query/v2/requests.hpp"
+#include "storage/v3/shard_rsm.hpp"
+
+namespace memgraph::storage::v3::shard_worker {
+
+/// Obligations:
+/// * ShutDown
+/// * Cron
+/// * Handle
+/// * InitializeRsm
+
+using boost::uuids::uuid;
+
+using io::Time;
+using io::rsm::Raft;
+using msgs::ReadRequests;
+using msgs::ReadResponses;
+using msgs::WriteRequests;
+using msgs::WriteResponses;
+using storage::v3::ShardRsm;
+
+template <typename IoImpl>
+using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
+
+struct ShutDown {
+  io::Promise<bool> acknowledge_shutdown;
+};
+
+struct Cron {
+  io::Promise<io::Time> request_next_cron_at;
+};
+
+struct InitializeRsm {};
+
+struct Handle {};
+
+using Message = std::variant<ShutDown, Cron, InitializeRsm, Handle>;
+
+struct QueueInner {
+  std::mutex mu{};
+  std::condition_variable cv;
+  // TODO(tyler) handle simulator communication std::shared_ptr<std::atomic<int>> blocked;
+
+  // TODO(tyler) investigate using a priority queue that prioritizes messages in a way that
+  // improves overall QoS. For example, maybe we want to schedule raft Append messages
+  // ahead of Read messages or generally writes before reads for lowering the load on the
+  // overall system faster etc... When we do this, we need to make sure to avoid
+  // starvation by sometimes randomizing priorities, rather than following a strict
+  // prioritization.
+  std::deque<Message> queue;
+};
+
+/// There are two reasons to implement our own Queue instead of using
+/// one off-the-shelf:
+/// 1. we will need to know in the simulator when all threads are waiting
+/// 2. we will want to implement our own priority queue within this for QoS
+class Queue {
+  std::shared_ptr<QueueInner> inner_ = std::make_shared<QueueInner>();
+
+ public:
+  void Push(Message &&message) {
+    {
+      std::unique_lock<std::mutex> lock(inner_->mu);
+
+      inner_->queue.push_back(std::forward<Message>(message));
+    }  // lock dropped before notifying condition variable
+
+    inner_->cv.notify_all();
+  }
+
+  Message Pop() {
+    std::unique_lock<std::mutex> lock(inner_->mu);
+
+    while (inner_->queue.empty()) {
+      inner_->cv.wait(lock);
+    }
+
+    Message message = std::move(inner_->queue.front());
+    inner_->queue.pop_front();
+
+    return message;
+  }
+};
+
+/// A ShardWorker owns Raft<ShardRsm> instances. receives messages from the ShardManager.
+template <class IoImpl>
+class ShardWorker {
+  io::Io<IoImpl> io_;
+  Queue queue_;
+  std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
+  Time next_cron_ = Time::min();
+  std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
+
+  bool Process(ShutDown &&shut_down) {
+    shut_down.acknowledge_shutdown.Fill(true);
+    return false;
+  }
+
+  bool Process(Cron &&cron) {
+    Time ret = Cron();
+    return true;
+  }
+  bool Process(InitializeRsm &&initialize_rsm) { return true; }
+  bool Process(Handle &&handle) { return true; }
+
+  Time Cron() {
+    spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
+    Time now = io_.Now();
+
+    if (!cron_schedule_.empty()) {
+      const auto &[time, uuid] = cron_schedule_.top();
+
+      auto &rsm = rsm_map_.at(uuid);
+      Time next_for_uuid = rsm.Cron();
+
+      cron_schedule_.pop();
+      cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
+
+      const auto &[next_time, _uuid] = cron_schedule_.top();
+
+      return std::min(next_cron_, next_time);
+    }
+
+    return next_cron_;
+  }
+
+ public:
+  ShardWorker(io::Io<IoImpl> io, Queue queue) : io_(io), queue_(queue) {}
+  ShardWorker(ShardWorker &&) = default;
+  ShardWorker &operator=(ShardWorker &&) = default;
+  ShardWorker(const ShardWorker &) = delete;
+  ShardWorker &operator=(const ShardWorker &) = delete;
+  ~ShardWorker() = default;
+
+  void Run() {
+    while (true) {
+      Message message = queue_.Pop();
+
+      const bool should_continue =
+          std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
+
+      if (!should_continue) {
+        return;
+      }
+    }
+  }
+};
+
+}  // namespace memgraph::storage::v3::shard_worker

From d7bc93c55fe93745c5f3a0823a6b6708c082bbfb Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 31 Oct 2022 16:17:34 +0000
Subject: [PATCH 03/31] Fill Cron next time promise from each worker thread

---
 src/storage/v3/shard_worker.hpp | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index e47f02e97..3ec1d9bb9 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -18,6 +18,9 @@
 
 #include <boost/uuid/uuid.hpp>
 
+#include "coordinator/coordinator.hpp"
+#include "coordinator/shard_map.hpp"
+#include "io/address.hpp"
 #include "io/future.hpp"
 #include "io/rsm/raft.hpp"
 #include "io/time.hpp"
@@ -121,10 +124,13 @@ class ShardWorker {
   }
 
   bool Process(Cron &&cron) {
-    Time ret = Cron();
+    Time next_cron = Cron();
+    cron.request_next_cron_at.Fill(next_cron);
     return true;
   }
+
   bool Process(InitializeRsm &&initialize_rsm) { return true; }
+
   bool Process(Handle &&handle) { return true; }
 
   Time Cron() {

From d0cad6e6baae84e03943d9ec13109de4dfbedaee Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 31 Oct 2022 17:25:08 +0000
Subject: [PATCH 04/31] Temporarily duplicate shard management logic from
 ShardManger in ShardWorker

---
 src/storage/v3/shard_manager.hpp |  6 ++--
 src/storage/v3/shard_worker.hpp  | 62 +++++++++++++++++++++++++++-----
 2 files changed, 56 insertions(+), 12 deletions(-)

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 8f53c7a90..0b13b1f09 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -78,9 +78,8 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
 template <typename IoImpl>
 class ShardManager {
  public:
-  ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader,
-               coordinator::ShardMap shard_map)
-      : io_(io), coordinator_leader_(coordinator_leader), shard_map_{std::move(shard_map)} {
+  ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader)
+      : io_(io), coordinator_leader_(coordinator_leader) {
     MG_ASSERT(n_shard_worker_threads >= 1);
 
     shard_worker::Queue queue;
@@ -234,7 +233,6 @@ class ShardManager {
     }
   }
 
-  /// Returns true if the RSM was able to be initialized, and false if it was already initialized
   void InitializeRsm(coordinator::ShardToInitialize to_init) {
     if (rsm_map_.contains(to_init.uuid)) {
       // it's not a bug for the coordinator to send us UUIDs that we have
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 3ec1d9bb9..0cbae8c6c 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -22,6 +22,7 @@
 #include "coordinator/shard_map.hpp"
 #include "io/address.hpp"
 #include "io/future.hpp"
+#include "io/messages.hpp"
 #include "io/rsm/raft.hpp"
 #include "io/time.hpp"
 #include "io/transport.hpp"
@@ -33,12 +34,16 @@ namespace memgraph::storage::v3::shard_worker {
 /// Obligations:
 /// * ShutDown
 /// * Cron
-/// * Handle
-/// * InitializeRsm
+/// * RouteMessage
+/// * ShardToInitialize
 
 using boost::uuids::uuid;
 
+using coordinator::ShardToInitialize;
+using io::Address;
+using io::RequestId;
 using io::Time;
+using io::messages::ShardMessages;
 using io::rsm::Raft;
 using msgs::ReadRequests;
 using msgs::ReadResponses;
@@ -57,11 +62,14 @@ struct Cron {
   io::Promise<io::Time> request_next_cron_at;
 };
 
-struct InitializeRsm {};
+struct RouteMessage {
+  ShardMessages message;
+  RequestId request_id;
+  Address to;
+  Address from;
+};
 
-struct Handle {};
-
-using Message = std::variant<ShutDown, Cron, InitializeRsm, Handle>;
+using Message = std::variant<ShutDown, Cron, ShardToInitialize, RouteMessage>;
 
 struct QueueInner {
   std::mutex mu{};
@@ -129,9 +137,19 @@ class ShardWorker {
     return true;
   }
 
-  bool Process(InitializeRsm &&initialize_rsm) { return true; }
+  bool Process(ShardToInitialize &&shard_to_initialize) {
+    InitializeRsm(std::forward<ShardToInitialize>(shard_to_initialize));
 
-  bool Process(Handle &&handle) { return true; }
+    return true;
+  }
+
+  bool Process(RouteMessage &&route_message) {
+    auto &rsm = rsm_map_.at(route_message.to.unique_id);
+
+    rsm.Handle(std::move(route_message.message), route_message.request_id, route_message.from);
+
+    return true;
+  }
 
   Time Cron() {
     spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
@@ -154,6 +172,34 @@ class ShardWorker {
     return next_cron_;
   }
 
+  void InitializeRsm(ShardToInitialize to_init) {
+    if (rsm_map_.contains(to_init.uuid)) {
+      // it's not a bug for the coordinator to send us UUIDs that we have
+      // already created, because there may have been lag that caused
+      // the coordinator not to hear back from us.
+      return;
+    }
+
+    auto rsm_io = io_.ForkLocal();
+    auto io_addr = rsm_io.GetAddress();
+    io_addr.unique_id = to_init.uuid;
+    rsm_io.SetAddress(io_addr);
+
+    // TODO(tyler) get peers from Coordinator in HeartbeatResponse
+    std::vector<Address> rsm_peers = {};
+
+    std::unique_ptr<Shard> shard = std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key,
+                                                           to_init.schema, to_init.config, to_init.id_to_names);
+
+    ShardRsm rsm_state{std::move(shard)};
+
+    ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
+
+    spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
+
+    rsm_map_.emplace(to_init.uuid, std::move(rsm));
+  }
+
  public:
   ShardWorker(io::Io<IoImpl> io, Queue queue) : io_(io), queue_(queue) {}
   ShardWorker(ShardWorker &&) = default;

From 951b05811649921d532e3cfd301386d3a0d3c825 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 31 Oct 2022 18:04:30 +0000
Subject: [PATCH 05/31] Complete migration from single-threaded ShardManager to
 multi-threaded ShardWorker processing

---
 src/storage/v3/shard_manager.hpp | 96 ++++++++++++++------------------
 src/storage/v3/shard_worker.hpp  | 32 ++++++-----
 2 files changed, 60 insertions(+), 68 deletions(-)

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 0b13b1f09..db89c7ded 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -14,6 +14,7 @@
 #include <queue>
 #include <set>
 
+#include <boost/functional/hash.hpp>
 #include <boost/uuid/uuid.hpp>
 
 #include "coordinator/coordinator.hpp"
@@ -116,39 +117,43 @@ class ShardManager {
     // blocked on implicitly when worker_handles_ is destroyed.
   }
 
+  size_t UuidToWorkerIndex(const uuid &to) {
+    size_t hash = boost::hash<boost::uuids::uuid>()(to);
+    return hash % workers_.size();
+  }
+
+  void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) {
+    workers_[worker_index].Push(std::forward<shard_worker::Message>(message));
+  }
+
+  void SendToWorkerByUuid(const uuid &to, shard_worker::Message &&message) {
+    size_t worker_index = UuidToWorkerIndex(to);
+    SendToWorkerByIndex(worker_index, std::forward<shard_worker::Message>(message));
+  }
+
   /// Periodic protocol maintenance. Returns the time that Cron should be called again
   /// in the future.
   Time Cron() {
     spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
     Time now = io_.Now();
 
-    if (now >= next_cron_) {
+    if (now >= next_reconciliation_) {
       Reconciliation();
 
       std::uniform_int_distribution time_distrib(kMinimumCronInterval.count(), kMaximumCronInterval.count());
 
       const auto rand = io_.Rand(time_distrib);
 
-      next_cron_ = now + Duration{rand};
+      next_reconciliation_ = now + Duration{rand};
     }
 
-    if (!cron_schedule_.empty()) {
-      const auto &[time, uuid] = cron_schedule_.top();
-
-      if (time <= now) {
-        auto &rsm = rsm_map_.at(uuid);
-        Time next_for_uuid = rsm.Cron();
-
-        cron_schedule_.pop();
-        cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
-
-        const auto &[next_time, _uuid] = cron_schedule_.top();
-
-        return std::min(next_cron_, next_time);
-      }
+    for (auto &worker : workers_) {
+      worker.Push(shard_worker::Cron{});
     }
 
-    return next_cron_;
+    Time next_worker_cron = now + std::chrono::milliseconds(500);
+
+    return std::min(next_worker_cron, next_reconciliation_);
   }
 
   /// Returns the Address for our underlying Io implementation
@@ -162,18 +167,20 @@ class ShardManager {
     MG_ASSERT(address.last_known_port == to.last_known_port);
     MG_ASSERT(address.last_known_ip == to.last_known_ip);
 
-    auto &rsm = rsm_map_.at(to.unique_id);
-
-    rsm.Handle(std::forward<ShardMessages>(sm), request_id, from);
+    SendToWorkerByUuid(to.unique_id, shard_worker::RouteMessage{
+                                         .message = std::forward<ShardMessages>(sm),
+                                         .request_id = request_id,
+                                         .to = to,
+                                         .from = from,
+                                     });
   }
 
  private:
   io::Io<IoImpl> io_;
   std::vector<shard_worker::Queue> workers_;
   std::vector<std::jthread> worker_handles_;
-  std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
-  std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
-  Time next_cron_ = Time::min();
+  std::set<uuid> rsm_set_;
+  Time next_reconciliation_ = Time::min();
   Address coordinator_leader_;
   std::optional<ResponseFuture<WriteResponse<CoordinatorWriteResponses>>> heartbeat_res_;
 
@@ -227,39 +234,20 @@ class ShardManager {
   }
 
   void EnsureShardsInitialized(HeartbeatResponse hr) {
-    for (const auto &shard_to_initialize : hr.shards_to_initialize) {
-      InitializeRsm(shard_to_initialize);
-      initialized_but_not_confirmed_rsm_.emplace(shard_to_initialize.uuid);
+    for (const auto &to_init : hr.shards_to_initialize) {
+      if (rsm_set_.contains(to_init.uuid)) {
+        // it's not a bug for the coordinator to send us UUIDs that we have
+        // already created, because there may have been lag that caused
+        // the coordinator not to hear back from us.
+        return;
+      }
+
+      SendToWorkerByUuid(to_init.uuid, to_init);
+
+      rsm_set_.emplace(to_init.uuid);
+      initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
     }
   }
-
-  void InitializeRsm(coordinator::ShardToInitialize to_init) {
-    if (rsm_map_.contains(to_init.uuid)) {
-      // it's not a bug for the coordinator to send us UUIDs that we have
-      // already created, because there may have been lag that caused
-      // the coordinator not to hear back from us.
-      return;
-    }
-
-    auto rsm_io = io_.ForkLocal();
-    auto io_addr = rsm_io.GetAddress();
-    io_addr.unique_id = to_init.uuid;
-    rsm_io.SetAddress(io_addr);
-
-    // TODO(tyler) get peers from Coordinator in HeartbeatResponse
-    std::vector<Address> rsm_peers = {};
-
-    std::unique_ptr<Shard> shard = std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key,
-                                                           to_init.schema, to_init.config, to_init.id_to_names);
-
-    ShardRsm rsm_state{std::move(shard)};
-
-    ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
-
-    spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
-
-    rsm_map_.emplace(to_init.uuid, std::move(rsm));
-  }
 };
 
 }  // namespace memgraph::storage::v3
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 0cbae8c6c..3204b0362 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -11,6 +11,7 @@
 
 #pragma once
 
+#include <chrono>
 #include <deque>
 #include <memory>
 #include <queue>
@@ -58,9 +59,7 @@ struct ShutDown {
   io::Promise<bool> acknowledge_shutdown;
 };
 
-struct Cron {
-  io::Promise<io::Time> request_next_cron_at;
-};
+struct Cron {};
 
 struct RouteMessage {
   ShardMessages message;
@@ -132,8 +131,7 @@ class ShardWorker {
   }
 
   bool Process(Cron &&cron) {
-    Time next_cron = Cron();
-    cron.request_next_cron_at.Fill(next_cron);
+    Cron();
     return true;
   }
 
@@ -155,21 +153,23 @@ class ShardWorker {
     spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
     Time now = io_.Now();
 
-    if (!cron_schedule_.empty()) {
+    while (!cron_schedule_.empty()) {
       const auto &[time, uuid] = cron_schedule_.top();
 
-      auto &rsm = rsm_map_.at(uuid);
-      Time next_for_uuid = rsm.Cron();
+      if (time <= now) {
+        auto &rsm = rsm_map_.at(uuid);
+        Time next_for_uuid = rsm.Cron();
 
-      cron_schedule_.pop();
-      cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
+        cron_schedule_.pop();
+        cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
 
-      const auto &[next_time, _uuid] = cron_schedule_.top();
-
-      return std::min(next_cron_, next_time);
+        const auto &[next_time, _uuid] = cron_schedule_.top();
+      } else {
+        return time;
+      }
     }
 
-    return next_cron_;
+    return now + std::chrono::microseconds(1000);
   }
 
   void InitializeRsm(ShardToInitialize to_init) {
@@ -197,6 +197,10 @@ class ShardWorker {
 
     spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
 
+    // perform an initial Cron call for the new RSM
+    Time next_cron = rsm.Cron();
+    cron_schedule_.push(std::make_pair(next_cron, to_init.uuid));
+
     rsm_map_.emplace(to_init.uuid, std::move(rsm));
   }
 

From bb1e8aa1640e9081f8ca2ffb3b3b2e97163a8aeb Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 1 Nov 2022 10:46:25 +0000
Subject: [PATCH 06/31] Improve histogram output by adding a pretty table
 printing function

---
 src/io/local_transport/local_transport.hpp    |  4 +--
 .../local_transport_handle.hpp                |  2 +-
 src/io/message_histogram_collector.hpp        | 34 +++++++++++++++++--
 src/io/simulator/simulator_handle.cpp         |  2 +-
 src/io/simulator/simulator_handle.hpp         |  2 +-
 src/io/simulator/simulator_transport.hpp      |  4 +--
 src/io/transport.hpp                          |  4 +--
 tests/unit/1k_shards_1k_create_scanall.cpp    | 26 +++++++++++---
 8 files changed, 59 insertions(+), 19 deletions(-)

diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp
index 4fc6a4361..bf1991d21 100644
--- a/src/io/local_transport/local_transport.hpp
+++ b/src/io/local_transport/local_transport.hpp
@@ -61,8 +61,6 @@ class LocalTransport {
     return distrib(rng);
   }
 
-  std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
-    return local_transport_handle_->ResponseLatencies();
-  }
+  LatencyHistogramSummaries ResponseLatencies() { return local_transport_handle_->ResponseLatencies(); }
 };
 };  // namespace memgraph::io::local_transport
diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp
index ad257c926..6cab1a24f 100644
--- a/src/io/local_transport/local_transport_handle.hpp
+++ b/src/io/local_transport/local_transport_handle.hpp
@@ -56,7 +56,7 @@ class LocalTransportHandle {
     return should_shut_down_;
   }
 
-  std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
+  LatencyHistogramSummaries ResponseLatencies() {
     std::unique_lock<std::mutex> lock(mu_);
     return histograms_.ResponseLatencies();
   }
diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp
index 4c99f7f1f..542767568 100644
--- a/src/io/message_histogram_collector.hpp
+++ b/src/io/message_histogram_collector.hpp
@@ -20,6 +20,7 @@
 #include "io/time.hpp"
 #include "utils/histogram.hpp"
 #include "utils/logging.hpp"
+#include "utils/print_helpers.hpp"
 #include "utils/type_info_ref.hpp"
 
 namespace memgraph::io {
@@ -57,6 +58,35 @@ struct LatencyHistogramSummary {
   }
 };
 
+struct LatencyHistogramSummaries {
+  std::unordered_map<std::string, LatencyHistogramSummary> latencies;
+
+  std::string SummaryTable() {
+    std::string output = "";
+
+    const auto row = [&output](const auto &c1, const auto &c2, const auto &c3, const auto &c4, const auto &c5,
+                               const auto &c6, const auto &c7) {
+      output +=
+          fmt::format("{: >50} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8}\n", c1, c2, c3, c4, c5, c6, c7);
+    };
+    row("name", "count", "min (μs)", "med (μs)", "p99 (μs)", "max (μs)", "sum (μs)");
+
+    for (const auto &[name, histo] : latencies) {
+      row(name, histo.count, histo.p0.count(), histo.p50.count(), histo.p99.count(), histo.p100.count(),
+          histo.sum.count());
+    }
+
+    output += "\n";
+    return output;
+  }
+
+  friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) {
+    using memgraph::utils::print_helpers::operator<<;
+    in << histo.latencies;
+    return in;
+  }
+};
+
 class MessageHistogramCollector {
   std::unordered_map<utils::TypeInfoRef, utils::Histogram, utils::TypeInfoHasher, utils::TypeInfoEqualTo> histograms_;
 
@@ -66,7 +96,7 @@ class MessageHistogramCollector {
     histo.Measure(duration.count());
   }
 
-  std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
+  LatencyHistogramSummaries ResponseLatencies() {
     std::unordered_map<std::string, LatencyHistogramSummary> ret{};
 
     for (const auto &[type_id, histo] : histograms_) {
@@ -90,7 +120,7 @@ class MessageHistogramCollector {
       ret.emplace(demangled_name, latency_histogram_summary);
     }
 
-    return ret;
+    return LatencyHistogramSummaries{.latencies = ret};
   }
 };
 
diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index d71ecd0a9..74b2854a0 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -31,7 +31,7 @@ bool SimulatorHandle::ShouldShutDown() const {
   return should_shut_down_;
 }
 
-std::unordered_map<std::string, LatencyHistogramSummary> SimulatorHandle::ResponseLatencies() {
+LatencyHistogramSummaries SimulatorHandle::ResponseLatencies() {
   std::unique_lock<std::mutex> lock(mu_);
   return histograms_.ResponseLatencies();
 }
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 75bf8fb5e..a781f6ee6 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -78,7 +78,7 @@ class SimulatorHandle {
   explicit SimulatorHandle(SimulatorConfig config)
       : cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {}
 
-  std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies();
+  LatencyHistogramSummaries ResponseLatencies();
 
   ~SimulatorHandle() {
     for (auto it = promises_.begin(); it != promises_.end();) {
diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp
index f1c68230d..1aacfed9a 100644
--- a/src/io/simulator/simulator_transport.hpp
+++ b/src/io/simulator/simulator_transport.hpp
@@ -64,8 +64,6 @@ class SimulatorTransport {
     return distrib(rng_);
   }
 
-  std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
-    return simulator_handle_->ResponseLatencies();
-  }
+  LatencyHistogramSummaries ResponseLatencies() { return simulator_handle_->ResponseLatencies(); }
 };
 };  // namespace memgraph::io::simulator
diff --git a/src/io/transport.hpp b/src/io/transport.hpp
index a1a337ddb..7640e07ac 100644
--- a/src/io/transport.hpp
+++ b/src/io/transport.hpp
@@ -143,8 +143,6 @@ class Io {
 
   Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); }
 
-  std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
-    return implementation_.ResponseLatencies();
-  }
+  LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); }
 };
 };  // namespace memgraph::io
diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp
index 5fdbfafa8..cba9cc8ab 100644
--- a/tests/unit/1k_shards_1k_create_scanall.cpp
+++ b/tests/unit/1k_shards_1k_create_scanall.cpp
@@ -31,7 +31,6 @@
 #include "machine_manager/machine_manager.hpp"
 #include "query/v2/requests.hpp"
 #include "query/v2/shard_request_manager.hpp"
-#include "utils/print_helpers.hpp"
 #include "utils/variant_helpers.hpp"
 
 namespace memgraph::tests::simulation {
@@ -95,7 +94,7 @@ MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Addre
 
   Coordinator coordinator{shard_map};
 
-  return MachineManager{io, config, coordinator, shard_map};
+  return MachineManager{io, config, coordinator};
 }
 
 void RunMachine(MachineManager<LocalTransport> mm) { mm.Run(); }
@@ -225,7 +224,9 @@ TEST(MachineManager, ManyShards) {
   auto replication_factor = 1;
   auto create_ops = 1000;
 
+  auto time_before_shard_map_creation = cli_io_2.Now();
   ShardMap initialization_sm = TestShardMap(shard_splits, replication_factor);
+  auto time_after_shard_map_creation = cli_io_2.Now();
 
   auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm);
   Address coordinator_address = mm_1.CoordinatorAddress();
@@ -233,7 +234,10 @@ TEST(MachineManager, ManyShards) {
   auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
 
   CoordinatorClient<LocalTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
+
+  auto time_before_shard_stabilization = cli_io_2.Now();
   WaitForShardsToInitialize(coordinator_client);
+  auto time_after_shard_stabilization = cli_io_2.Now();
 
   msgs::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client), std::move(cli_io));
 
@@ -241,18 +245,30 @@ TEST(MachineManager, ManyShards) {
 
   auto correctness_model = std::set<CompoundKey>{};
 
+  auto time_before_creates = cli_io_2.Now();
+
   for (int i = 0; i < create_ops; i++) {
     ExecuteOp(shard_request_manager, correctness_model, CreateVertex{.first = i, .second = i});
   }
 
+  auto time_after_creates = cli_io_2.Now();
+
   ExecuteOp(shard_request_manager, correctness_model, ScanAll{});
 
+  auto time_after_scan = cli_io_2.Now();
+
   local_system.ShutDown();
 
-  auto histo = cli_io_2.ResponseLatencies();
+  auto latencies = cli_io_2.ResponseLatencies();
 
-  using memgraph::utils::print_helpers::operator<<;
-  std::cout << "response latencies: " << histo << std::endl;
+  std::cout << "response latencies: \n" << latencies.SummaryTable();
+
+  std::cout << "split shard map:     " << (time_after_shard_map_creation - time_before_shard_map_creation).count()
+            << std::endl;
+  std::cout << "shard stabilization: " << (time_after_shard_stabilization - time_before_shard_stabilization).count()
+            << std::endl;
+  std::cout << "create nodes:        " << (time_after_creates - time_before_creates).count() << std::endl;
+  std::cout << "scan nodes:          " << (time_after_scan - time_after_creates).count() << std::endl;
 }
 
 }  // namespace memgraph::tests::simulation

From a13f260236486fa1f637d350a5de2fd9f6b86105 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 1 Nov 2022 13:57:57 +0000
Subject: [PATCH 07/31] Parameterize shard worker threads in the MachineConfig
 and simplify test output

---
 src/io/message_histogram_collector.hpp     |  4 +-
 src/machine_manager/machine_config.hpp     |  2 +-
 src/machine_manager/machine_manager.hpp    |  2 +-
 src/storage/v3/shard_manager.hpp           |  6 +-
 src/storage/v3/shard_worker.hpp            |  2 -
 tests/unit/1k_shards_1k_create_scanall.cpp | 68 ++++++++++++++++------
 6 files changed, 56 insertions(+), 28 deletions(-)

diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp
index 542767568..644fe272b 100644
--- a/src/io/message_histogram_collector.hpp
+++ b/src/io/message_histogram_collector.hpp
@@ -69,11 +69,11 @@ struct LatencyHistogramSummaries {
       output +=
           fmt::format("{: >50} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8} | {: >8}\n", c1, c2, c3, c4, c5, c6, c7);
     };
-    row("name", "count", "min (μs)", "med (μs)", "p99 (μs)", "max (μs)", "sum (μs)");
+    row("name", "count", "min (μs)", "med (μs)", "p99 (μs)", "max (μs)", "sum (ms)");
 
     for (const auto &[name, histo] : latencies) {
       row(name, histo.count, histo.p0.count(), histo.p50.count(), histo.p99.count(), histo.p100.count(),
-          histo.sum.count());
+          histo.sum.count() / 1000);
     }
 
     output += "\n";
diff --git a/src/machine_manager/machine_config.hpp b/src/machine_manager/machine_config.hpp
index 1ddc34ae2..52711642b 100644
--- a/src/machine_manager/machine_config.hpp
+++ b/src/machine_manager/machine_config.hpp
@@ -41,7 +41,7 @@ struct MachineConfig {
   bool is_query_engine;
   boost::asio::ip::address listen_ip;
   uint16_t listen_port;
-  size_t n_shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
+  size_t shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
 };
 
 }  // namespace memgraph::machine_manager
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index e41938caa..388a27e2f 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -73,7 +73,7 @@ class MachineManager {
       : io_(io),
         config_(config),
         coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
-        shard_manager_{io.ForkLocal(), config.n_shard_worker_threads, coordinator_.GetAddress()} {}
+        shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_.GetAddress()} {}
 
   Address CoordinatorAddress() { return coordinator_.GetAddress(); }
 
diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index db89c7ded..a849c8c17 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -79,13 +79,13 @@ static_assert(kMinimumCronInterval < kMaximumCronInterval,
 template <typename IoImpl>
 class ShardManager {
  public:
-  ShardManager(io::Io<IoImpl> io, size_t n_shard_worker_threads, Address coordinator_leader)
+  ShardManager(io::Io<IoImpl> io, size_t shard_worker_threads, Address coordinator_leader)
       : io_(io), coordinator_leader_(coordinator_leader) {
-    MG_ASSERT(n_shard_worker_threads >= 1);
+    MG_ASSERT(shard_worker_threads >= 1);
 
     shard_worker::Queue queue;
 
-    for (int i = 0; i < n_shard_worker_threads; i++) {
+    for (int i = 0; i < shard_worker_threads; i++) {
       shard_worker::Queue queue;
       shard_worker::ShardWorker worker{io, queue};
       auto worker_handle = std::jthread([worker = std::move(worker)]() mutable { worker.Run(); });
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 3204b0362..ae590c541 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -162,8 +162,6 @@ class ShardWorker {
 
         cron_schedule_.pop();
         cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
-
-        const auto &[next_time, _uuid] = cron_schedule_.top();
       } else {
         return time;
       }
diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/1k_shards_1k_create_scanall.cpp
index cba9cc8ab..3e95f9a6a 100644
--- a/tests/unit/1k_shards_1k_create_scanall.cpp
+++ b/tests/unit/1k_shards_1k_create_scanall.cpp
@@ -81,13 +81,14 @@ struct ScanAll {
 };
 
 MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Address> coordinator_addresses, Address addr,
-                                    ShardMap shard_map) {
+                                    ShardMap shard_map, size_t shard_worker_threads) {
   MachineConfig config{
       .coordinator_addresses = coordinator_addresses,
       .is_storage = true,
       .is_coordinator = true,
       .listen_ip = addr.last_known_ip,
       .listen_port = addr.last_known_port,
+      .shard_worker_threads = shard_worker_threads,
   };
 
   Io<LocalTransport> io = local_system.Register(addr);
@@ -123,7 +124,7 @@ void WaitForShardsToInitialize(CoordinatorClient<LocalTransport> &coordinator_cl
   }
 }
 
-ShardMap TestShardMap(int n_splits, int replication_factor) {
+ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards) {
   ShardMap sm{};
 
   const std::string label_name = std::string("test_label");
@@ -146,8 +147,8 @@ ShardMap TestShardMap(int n_splits, int replication_factor) {
   MG_ASSERT(label_id.has_value());
 
   // split the shard at N split points
-  for (int64_t i = 1; i < n_splits; ++i) {
-    const auto key1 = memgraph::storage::v3::PropertyValue(i);
+  for (int64_t i = 1; i < shards; ++i) {
+    const auto key1 = memgraph::storage::v3::PropertyValue(i * gap_between_shards);
     const auto key2 = memgraph::storage::v3::PropertyValue(0);
 
     const auto split_point = {key1, key2};
@@ -207,7 +208,16 @@ void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
   }
 }
 
-TEST(MachineManager, ManyShards) {
+void RunWorkload(int shards, int replication_factor, int create_ops, int scan_ops, int shard_worker_threads,
+                 int gap_between_shards) {
+  // std::cout << "======================== NEW TEST ======================== \n";
+  // std::cout << "shards:               " << shards << std::endl;
+  // std::cout << "replication factor:   " << replication_factor << std::endl;
+  // std::cout << "create ops:           " << create_ops << std::endl;
+  // std::cout << "scan all ops:         " << scan_ops << std::endl;
+  // std::cout << "shard worker threads: " << shard_worker_threads << std::endl;
+  // std::cout << "gap between shards:   " << gap_between_shards << std::endl;
+
   LocalSystem local_system;
 
   auto cli_addr = Address::TestAddress(1);
@@ -220,15 +230,11 @@ TEST(MachineManager, ManyShards) {
       machine_1_addr,
   };
 
-  auto shard_splits = 1024;
-  auto replication_factor = 1;
-  auto create_ops = 1000;
-
   auto time_before_shard_map_creation = cli_io_2.Now();
-  ShardMap initialization_sm = TestShardMap(shard_splits, replication_factor);
+  ShardMap initialization_sm = TestShardMap(shards, replication_factor, gap_between_shards);
   auto time_after_shard_map_creation = cli_io_2.Now();
 
-  auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm);
+  auto mm_1 = MkMm(local_system, coordinator_addresses, machine_1_addr, initialization_sm, shard_worker_threads);
   Address coordinator_address = mm_1.CoordinatorAddress();
 
   auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
@@ -253,7 +259,9 @@ TEST(MachineManager, ManyShards) {
 
   auto time_after_creates = cli_io_2.Now();
 
-  ExecuteOp(shard_request_manager, correctness_model, ScanAll{});
+  for (int i = 0; i < scan_ops; i++) {
+    ExecuteOp(shard_request_manager, correctness_model, ScanAll{});
+  }
 
   auto time_after_scan = cli_io_2.Now();
 
@@ -261,14 +269,36 @@ TEST(MachineManager, ManyShards) {
 
   auto latencies = cli_io_2.ResponseLatencies();
 
-  std::cout << "response latencies: \n" << latencies.SummaryTable();
+  // std::cout << "response latencies: \n" << latencies.SummaryTable();
 
-  std::cout << "split shard map:     " << (time_after_shard_map_creation - time_before_shard_map_creation).count()
-            << std::endl;
-  std::cout << "shard stabilization: " << (time_after_shard_stabilization - time_before_shard_stabilization).count()
-            << std::endl;
-  std::cout << "create nodes:        " << (time_after_creates - time_before_creates).count() << std::endl;
-  std::cout << "scan nodes:          " << (time_after_scan - time_after_creates).count() << std::endl;
+  // std::cout << "serial time break-down: (μs)\n";
+
+  // std::cout << fmt::format("{: >20}: {: >10}\n", "split shard map", (time_after_shard_map_creation -
+  // time_before_shard_map_creation).count()); std::cout << fmt::format("{: >20}: {: >10}\n", "shard stabilization",
+  // (time_after_shard_stabilization - time_before_shard_stabilization).count()); std::cout << fmt::format("{: >20}: {:
+  // >10}\n", "create nodes", (time_after_creates - time_before_creates).count()); std::cout << fmt::format("{: >20}: {:
+  // >10}\n", "scan nodes", (time_after_scan - time_after_creates).count());
+
+  std::cout << fmt::format("{} {} {}\n", shards, shard_worker_threads, (time_after_scan - time_after_creates).count());
+}
+
+TEST(MachineManager, ManyShards) {
+  auto shards_attempts = {1,  2,  3,  4,  5,  6,  7,  8,  9,  10, 11, 12, 13,
+                          14, 15, 16, 17, 18, 19, 20, 22, 24, 26, 28, 30, 32};
+  auto shard_worker_thread_attempts = {1, 2, 3, 4, 6, 8};
+  auto replication_factor = 1;
+  auto create_ops = 1024;
+  auto scan_ops = 1;
+
+  std::cout << "splits threads scan_all_microseconds\n";
+
+  for (const auto shards : shards_attempts) {
+    auto gap_between_shards = create_ops / shards;
+
+    for (const auto shard_worker_threads : shard_worker_thread_attempts) {
+      RunWorkload(shards, replication_factor, create_ops, scan_ops, shard_worker_threads, gap_between_shards);
+    }
+  }
 }
 
 }  // namespace memgraph::tests::simulation

From 5d3eaf6a55c2cbcc48bbf24bf543223eae8cd694 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 1 Nov 2022 14:02:07 +0000
Subject: [PATCH 08/31] Rename 1k_shards_1k_create_scanall test to
 high_density_shard_create_scan

---
 ...s_1k_create_scanall.cpp => high_density_shard_create_scan.cpp} | 0
 1 file changed, 0 insertions(+), 0 deletions(-)
 rename tests/unit/{1k_shards_1k_create_scanall.cpp => high_density_shard_create_scan.cpp} (100%)

diff --git a/tests/unit/1k_shards_1k_create_scanall.cpp b/tests/unit/high_density_shard_create_scan.cpp
similarity index 100%
rename from tests/unit/1k_shards_1k_create_scanall.cpp
rename to tests/unit/high_density_shard_create_scan.cpp

From a6add80fc962913223e29b68f90e89023edfcc0e Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 1 Nov 2022 14:52:38 +0000
Subject: [PATCH 09/31] Use static RSM partitioning function for achieving a
 smooth Shard->ShardWorker distribution

---
 src/storage/v3/shard_manager.hpp | 41 ++++++++++++++++++++++++++------
 tests/unit/CMakeLists.txt        |  6 ++---
 2 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index a849c8c17..4d7507e87 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -13,6 +13,7 @@
 
 #include <queue>
 #include <set>
+#include <unordered_map>
 
 #include <boost/functional/hash.hpp>
 #include <boost/uuid/uuid.hpp>
@@ -92,6 +93,7 @@ class ShardManager {
 
       workers_.emplace_back(queue);
       worker_handles_.emplace_back(std::move(worker_handle));
+      worker_rsm_counts_.emplace_back(0);
     }
   }
 
@@ -118,8 +120,29 @@ class ShardManager {
   }
 
   size_t UuidToWorkerIndex(const uuid &to) {
-    size_t hash = boost::hash<boost::uuids::uuid>()(to);
-    return hash % workers_.size();
+    if (rsm_worker_mapping_.contains(to)) {
+      return rsm_worker_mapping_.at(to);
+    }
+
+    // We will now create a mapping for this (probably new) shard
+    // by choosing the worker with the lowest number of existing
+    // mappings.
+
+    size_t min_index = 0;
+    size_t min_count = worker_rsm_counts_.at(min_index);
+
+    for (int i = 0; i < worker_rsm_counts_.size(); i++) {
+      size_t worker_count = worker_rsm_counts_.at(i);
+      if (worker_count <= min_count) {
+        min_count = worker_count;
+        min_index = i;
+      }
+    }
+
+    worker_rsm_counts_[min_index]++;
+    rsm_worker_mapping_.emplace(to, min_index);
+
+    return min_index;
   }
 
   void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) {
@@ -179,7 +202,8 @@ class ShardManager {
   io::Io<IoImpl> io_;
   std::vector<shard_worker::Queue> workers_;
   std::vector<std::jthread> worker_handles_;
-  std::set<uuid> rsm_set_;
+  std::vector<size_t> worker_rsm_counts_;
+  std::unordered_map<uuid, size_t, boost::hash<boost::uuids::uuid>> rsm_worker_mapping_;
   Time next_reconciliation_ = Time::min();
   Address coordinator_leader_;
   std::optional<ResponseFuture<WriteResponse<CoordinatorWriteResponses>>> heartbeat_res_;
@@ -235,17 +259,20 @@ class ShardManager {
 
   void EnsureShardsInitialized(HeartbeatResponse hr) {
     for (const auto &to_init : hr.shards_to_initialize) {
-      if (rsm_set_.contains(to_init.uuid)) {
+      initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
+
+      if (rsm_worker_mapping_.contains(to_init.uuid)) {
         // it's not a bug for the coordinator to send us UUIDs that we have
         // already created, because there may have been lag that caused
         // the coordinator not to hear back from us.
         return;
       }
 
-      SendToWorkerByUuid(to_init.uuid, to_init);
+      size_t worker_index = UuidToWorkerIndex(to_init.uuid);
 
-      rsm_set_.emplace(to_init.uuid);
-      initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
+      SendToWorkerByIndex(worker_index, to_init);
+
+      rsm_worker_mapping_.emplace(to_init.uuid, worker_index);
     }
   }
 };
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index 52e3a7eef..7cf045443 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -443,6 +443,6 @@ target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test
 add_unit_test(coordinator_shard_map.cpp)
 target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator)
 
-# Tests for 1000 shards, 1000 creates, scan
-add_unit_test(1k_shards_1k_create_scanall.cpp)
-target_link_libraries(${test_prefix}1k_shards_1k_create_scanall mg-io mg-coordinator mg-storage-v3 mg-query-v2)
+# Tests for many shards, many creates, scan
+add_unit_test(high_density_shard_create_scan.cpp)
+target_link_libraries(${test_prefix}high_density_shard_create_scan mg-io mg-coordinator mg-storage-v3 mg-query-v2)

From 7596e8535879d63e2f090e28191dfa9bee887502 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Wed, 2 Nov 2022 12:37:34 +0000
Subject: [PATCH 10/31] When message conversion fails to happen, demangle the
 concrete type name and error log a useful message

---
 src/io/message_conversion.hpp | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp
index 11c045123..c4cce732b 100644
--- a/src/io/message_conversion.hpp
+++ b/src/io/message_conversion.hpp
@@ -11,6 +11,8 @@
 
 #pragma once
 
+#include <boost/core/demangle.hpp>
+
 #include "io/transport.hpp"
 #include "utils/type_info_ref.hpp"
 
@@ -90,6 +92,10 @@ struct OpaqueMessage {
       };
     }
 
+    std::string demangled_name = "\"" + boost::core::demangle(message.type().name()) + "\"";
+    spdlog::error("failed to cast message of type {} to expected request type (probably in Receive argument types)",
+                  demangled_name);
+
     return std::nullopt;
   }
 };

From 6239f4fc3e360ec39388e26ee0f8758f90309b3a Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Wed, 2 Nov 2022 17:11:26 +0000
Subject: [PATCH 11/31] Simplify usage of PromiseKey in LocalTransportHandle to
 avoid replier address

---
 src/io/local_transport/local_transport_handle.hpp | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp
index 6cab1a24f..869a7e9a7 100644
--- a/src/io/local_transport/local_transport_handle.hpp
+++ b/src/io/local_transport/local_transport_handle.hpp
@@ -113,8 +113,7 @@ class LocalTransportHandle {
                                  .message = std::move(message_any),
                                  .type_info = type_info};
 
-    PromiseKey promise_key{
-        .requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address};
+    PromiseKey promise_key{.requester_address = to_address, .request_id = opaque_message.request_id};
 
     {
       std::unique_lock<std::mutex> lock(mu_);
@@ -152,8 +151,7 @@ class LocalTransportHandle {
     {
       std::unique_lock<std::mutex> lock(mu_);
 
-      PromiseKey promise_key{
-          .requester_address = from_address, .request_id = request_id, .replier_address = to_address};
+      PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
       OpaquePromise opaque_promise(std::move(promise).ToUnique());
       DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
       promises_.emplace(std::move(promise_key), std::move(dop));

From 78528bd60972f7c5fe333afe4857acfa8e4166bc Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Wed, 2 Nov 2022 17:12:21 +0000
Subject: [PATCH 12/31] Avoid the ShutDown explicit acknowledgement due to
 jthread already blocking on this

---
 src/storage/v3/shard_manager.hpp | 12 +-----------
 src/storage/v3/shard_worker.hpp  | 11 +++--------
 2 files changed, 4 insertions(+), 19 deletions(-)

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 4d7507e87..559f51afe 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -84,8 +84,6 @@ class ShardManager {
       : io_(io), coordinator_leader_(coordinator_leader) {
     MG_ASSERT(shard_worker_threads >= 1);
 
-    shard_worker::Queue queue;
-
     for (int i = 0; i < shard_worker_threads; i++) {
       shard_worker::Queue queue;
       shard_worker::ShardWorker worker{io, queue};
@@ -103,16 +101,8 @@ class ShardManager {
   ShardManager &operator=(const ShardManager &) = delete;
 
   ~ShardManager() {
-    auto shutdown_acks = std::vector<io::Future<bool>>{};
     for (auto worker : workers_) {
-      auto [future, promise] = io::FuturePromisePair<bool>();
-      worker.Push(shard_worker::ShutDown{.acknowledge_shutdown = std::move(promise)});
-      shutdown_acks.emplace_back(std::move(future));
-    }
-
-    for (auto &&ack : shutdown_acks) {
-      bool acked = std::move(ack).Wait();
-      MG_ASSERT(acked);
+      worker.Push(shard_worker::ShutDown{});
     }
 
     // The jthread handes for our shard worker threads will be
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index ae590c541..4cca118e7 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -55,9 +55,7 @@ using storage::v3::ShardRsm;
 template <typename IoImpl>
 using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
 
-struct ShutDown {
-  io::Promise<bool> acknowledge_shutdown;
-};
+struct ShutDown {};
 
 struct Cron {};
 
@@ -96,7 +94,7 @@ class Queue {
     {
       std::unique_lock<std::mutex> lock(inner_->mu);
 
-      inner_->queue.push_back(std::forward<Message>(message));
+      inner_->queue.emplace_back(std::forward<Message>(message));
     }  // lock dropped before notifying condition variable
 
     inner_->cv.notify_all();
@@ -125,10 +123,7 @@ class ShardWorker {
   Time next_cron_ = Time::min();
   std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
 
-  bool Process(ShutDown &&shut_down) {
-    shut_down.acknowledge_shutdown.Fill(true);
-    return false;
-  }
+  bool Process(ShutDown && /* shut_down */) { return false; }
 
   bool Process(Cron &&cron) {
     Cron();

From a815ec9617d57101c45a7ce70d41d6ae461d43a9 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Wed, 2 Nov 2022 17:15:52 +0000
Subject: [PATCH 13/31] Handle Coordinator work on a separate thread,
 unblocking the MachineManager to route additional messages to Shards

---
 src/coordinator/coordinator_worker.hpp  | 157 ++++++++++++++++++++++++
 src/machine_manager/machine_manager.hpp | 105 ++++++++++------
 2 files changed, 224 insertions(+), 38 deletions(-)
 create mode 100644 src/coordinator/coordinator_worker.hpp

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
new file mode 100644
index 000000000..b89a9c496
--- /dev/null
+++ b/src/coordinator/coordinator_worker.hpp
@@ -0,0 +1,157 @@
+// Copyright 2022 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+#include <chrono>
+#include <deque>
+#include <memory>
+#include <queue>
+#include <variant>
+
+#include <boost/uuid/uuid.hpp>
+
+#include "coordinator/coordinator.hpp"
+#include "coordinator/shard_map.hpp"
+#include "io/address.hpp"
+#include "io/future.hpp"
+#include "io/messages.hpp"
+#include "io/rsm/raft.hpp"
+#include "io/time.hpp"
+#include "io/transport.hpp"
+#include "query/v2/requests.hpp"
+
+namespace memgraph::coordinator::coordinator_worker {
+
+/// Obligations:
+/// * ShutDown
+/// * Cron
+/// * RouteMessage
+
+using boost::uuids::uuid;
+
+using coordinator::Coordinator;
+using coordinator::CoordinatorRsm;
+using coordinator::ShardToInitialize;
+using io::Address;
+using io::RequestId;
+using io::Time;
+using io::messages::CoordinatorMessages;
+using io::rsm::Raft;
+using msgs::ReadRequests;
+using msgs::ReadResponses;
+using msgs::WriteRequests;
+using msgs::WriteResponses;
+
+struct ShutDown {};
+
+struct Cron {};
+
+struct RouteMessage {
+  CoordinatorMessages message;
+  RequestId request_id;
+  Address to;
+  Address from;
+};
+
+using Message = std::variant<RouteMessage, Cron, ShutDown>;
+
+struct QueueInner {
+  std::mutex mu{};
+  std::condition_variable cv;
+  // TODO(tyler) handle simulator communication std::shared_ptr<std::atomic<int>> blocked;
+
+  // TODO(tyler) investigate using a priority queue that prioritizes messages in a way that
+  // improves overall QoS. For example, maybe we want to schedule raft Append messages
+  // ahead of Read messages or generally writes before reads for lowering the load on the
+  // overall system faster etc... When we do this, we need to make sure to avoid
+  // starvation by sometimes randomizing priorities, rather than following a strict
+  // prioritization.
+  std::deque<Message> queue;
+};
+
+/// There are two reasons to implement our own Queue instead of using
+/// one off-the-shelf:
+/// 1. we will need to know in the simulator when all threads are waiting
+/// 2. we will want to implement our own priority queue within this for QoS
+class Queue {
+  std::shared_ptr<QueueInner> inner_ = std::make_shared<QueueInner>();
+
+ public:
+  void Push(Message &&message) {
+    {
+      std::unique_lock<std::mutex> lock(inner_->mu);
+
+      inner_->queue.emplace_back(std::forward<Message>(message));
+    }  // lock dropped before notifying condition variable
+
+    inner_->cv.notify_all();
+  }
+
+  Message Pop() {
+    std::unique_lock<std::mutex> lock(inner_->mu);
+
+    while (inner_->queue.empty()) {
+      inner_->cv.wait(lock);
+    }
+
+    Message message = std::move(inner_->queue.front());
+    inner_->queue.pop_front();
+
+    return message;
+  }
+};
+
+/// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
+template <class IoImpl>
+class CoordinatorWorker {
+  io::Io<IoImpl> io_;
+  Queue queue_;
+  CoordinatorRsm<IoImpl> coordinator_;
+
+  bool Process(ShutDown && /*shut_down*/) { return false; }
+
+  bool Process(Cron &&cron) {
+    coordinator_.Cron();
+    return true;
+  }
+
+  bool Process(RouteMessage &&route_message) {
+    coordinator_.Handle(std::move(route_message.message), route_message.request_id, route_message.from);
+
+    return true;
+  }
+
+ public:
+  CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
+      : io_(io), queue_(queue), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
+
+  CoordinatorWorker(CoordinatorWorker &&) = default;
+  CoordinatorWorker &operator=(CoordinatorWorker &&) = default;
+  CoordinatorWorker(const CoordinatorWorker &) = delete;
+  CoordinatorWorker &operator=(const CoordinatorWorker &) = delete;
+  ~CoordinatorWorker() = default;
+
+  void Run() {
+    while (true) {
+      Message message = queue_.Pop();
+
+      const bool should_continue =
+          std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
+
+      if (!should_continue) {
+        return;
+      }
+    }
+  }
+};
+
+}  // namespace memgraph::coordinator::coordinator_worker
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 388a27e2f..75dd0d564 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -11,39 +11,43 @@
 
 #pragma once
 
-#include <coordinator/coordinator_rsm.hpp>
-#include <io/message_conversion.hpp>
-#include <io/messages.hpp>
-#include <io/rsm/rsm_client.hpp>
-#include <io/time.hpp>
-#include <machine_manager/machine_config.hpp>
-#include <storage/v3/shard_manager.hpp>
+#include "coordinator/coordinator_rsm.hpp"
+#include "coordinator/coordinator_worker.hpp"
+#include "io/message_conversion.hpp"
+#include "io/messages.hpp"
+#include "io/rsm/rsm_client.hpp"
+#include "io/time.hpp"
+#include "machine_manager/machine_config.hpp"
+#include "storage/v3/shard_manager.hpp"
 
 namespace memgraph::machine_manager {
 
-using memgraph::coordinator::Coordinator;
-using memgraph::coordinator::CoordinatorReadRequests;
-using memgraph::coordinator::CoordinatorReadResponses;
-using memgraph::coordinator::CoordinatorRsm;
-using memgraph::coordinator::CoordinatorWriteRequests;
-using memgraph::coordinator::CoordinatorWriteResponses;
-using memgraph::io::ConvertVariant;
-using memgraph::io::Duration;
-using memgraph::io::RequestId;
-using memgraph::io::Time;
-using memgraph::io::messages::CoordinatorMessages;
-using memgraph::io::messages::ShardManagerMessages;
-using memgraph::io::messages::ShardMessages;
-using memgraph::io::messages::StorageReadRequest;
-using memgraph::io::messages::StorageWriteRequest;
-using memgraph::io::rsm::AppendRequest;
-using memgraph::io::rsm::AppendResponse;
-using memgraph::io::rsm::ReadRequest;
-using memgraph::io::rsm::VoteRequest;
-using memgraph::io::rsm::VoteResponse;
-using memgraph::io::rsm::WriteRequest;
-using memgraph::io::rsm::WriteResponse;
-using memgraph::storage::v3::ShardManager;
+using coordinator::Coordinator;
+using coordinator::CoordinatorReadRequests;
+using coordinator::CoordinatorReadResponses;
+using coordinator::CoordinatorRsm;
+using coordinator::CoordinatorWriteRequests;
+using coordinator::CoordinatorWriteResponses;
+using coordinator::coordinator_worker::CoordinatorWorker;
+using CoordinatorRouteMessage = coordinator::coordinator_worker::RouteMessage;
+using CoordinatorQueue = coordinator::coordinator_worker::Queue;
+using io::ConvertVariant;
+using io::Duration;
+using io::RequestId;
+using io::Time;
+using io::messages::CoordinatorMessages;
+using io::messages::ShardManagerMessages;
+using io::messages::ShardMessages;
+using io::messages::StorageReadRequest;
+using io::messages::StorageWriteRequest;
+using io::rsm::AppendRequest;
+using io::rsm::AppendResponse;
+using io::rsm::ReadRequest;
+using io::rsm::VoteRequest;
+using io::rsm::VoteResponse;
+using io::rsm::WriteRequest;
+using io::rsm::WriteResponse;
+using storage::v3::ShardManager;
 
 /// The MachineManager is responsible for:
 /// * starting the entire system and ensuring that high-level
@@ -62,7 +66,9 @@ template <typename IoImpl>
 class MachineManager {
   io::Io<IoImpl> io_;
   MachineConfig config_;
-  CoordinatorRsm<IoImpl> coordinator_;
+  Address coordinator_address_;
+  CoordinatorQueue coordinator_queue_;
+  std::jthread coordinator_handle_;
   ShardManager<IoImpl> shard_manager_;
   Time next_cron_ = Time::min();
 
@@ -72,10 +78,27 @@ class MachineManager {
   MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
       : io_(io),
         config_(config),
-        coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
-        shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_.GetAddress()} {}
+        coordinator_address_(io.GetAddress().ForkUniqueAddress()),
+        shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} {
+    auto coordinator_io = io.ForkLocal();
+    coordinator_io.SetAddress(coordinator_address_);
+    CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator};
+    coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
+  }
 
-  Address CoordinatorAddress() { return coordinator_.GetAddress(); }
+  MachineManager(MachineManager &&) = default;
+  MachineManager &operator=(MachineManager &&) = default;
+  MachineManager(const MachineManager &) = delete;
+  MachineManager &operator=(const MachineManager &) = delete;
+
+  ~MachineManager() {
+    coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{});
+    spdlog::error("a");
+    std::move(coordinator_handle_).join();
+    spdlog::error("b");
+  }
+
+  Address CoordinatorAddress() { return coordinator_address_; }
 
   void Run() {
     while (!io_.ShouldShutDown()) {
@@ -85,7 +108,7 @@ class MachineManager {
         next_cron_ = Cron();
       }
 
-      Duration receive_timeout = next_cron_ - now;
+      Duration receive_timeout = std::max(next_cron_, now) - now;
 
       // Note: this parameter pack must be kept in-sync with the ReceiveWithTimeout parameter pack below
       using AllMessages =
@@ -113,7 +136,7 @@ class MachineManager {
       spdlog::info("MM got message to {}", request_envelope.to_address.ToString());
 
       // If message is for the coordinator, cast it to subset and pass it to the coordinator
-      bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address;
+      bool to_coordinator = coordinator_address_ == request_envelope.to_address;
       if (to_coordinator) {
         std::optional<CoordinatorMessages> conversion_attempt =
             ConvertVariant<AllMessages, ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>,
@@ -126,8 +149,13 @@ class MachineManager {
 
         CoordinatorMessages &&cm = std::move(conversion_attempt.value());
 
-        coordinator_.Handle(std::forward<CoordinatorMessages>(cm), request_envelope.request_id,
-                            request_envelope.from_address);
+        CoordinatorRouteMessage route_message{
+            .message = std::forward<CoordinatorMessages>(cm),
+            .request_id = request_envelope.request_id,
+            .to = request_envelope.to_address,
+            .from = request_envelope.from_address,
+        };
+        coordinator_queue_.Push(std::move(route_message));
         continue;
       }
 
@@ -168,6 +196,7 @@ class MachineManager {
  private:
   Time Cron() {
     spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
+    coordinator_queue_.Push(coordinator::coordinator_worker::Cron{});
     return shard_manager_.Cron();
   }
 };

From fa1ddfea12781cf8817af5c0d09ccf15bf457aff Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Wed, 2 Nov 2022 17:45:27 +0000
Subject: [PATCH 14/31] Fix a bug where the MachineManager's destructor became
 incorrect after being moved

---
 src/coordinator/coordinator_worker.hpp  | 3 ++-
 src/machine_manager/machine_manager.hpp | 8 ++++----
 src/storage/v3/shard_worker.hpp         | 1 +
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index b89a9c496..46ca39b02 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -88,6 +88,7 @@ class Queue {
  public:
   void Push(Message &&message) {
     {
+      MG_ASSERT(inner_.use_count() > 0);
       std::unique_lock<std::mutex> lock(inner_->mu);
 
       inner_->queue.emplace_back(std::forward<Message>(message));
@@ -132,7 +133,7 @@ class CoordinatorWorker {
 
  public:
   CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
-      : io_(io), queue_(queue), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
+      : io_(io), queue_(std::move(queue)), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
 
   CoordinatorWorker(CoordinatorWorker &&) = default;
   CoordinatorWorker &operator=(CoordinatorWorker &&) = default;
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 75dd0d564..2bbc99441 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -92,10 +92,10 @@ class MachineManager {
   MachineManager &operator=(const MachineManager &) = delete;
 
   ~MachineManager() {
-    coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{});
-    spdlog::error("a");
-    std::move(coordinator_handle_).join();
-    spdlog::error("b");
+    if (coordinator_handle_.joinable()) {
+      coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{});
+      std::move(coordinator_handle_).join();
+    }
   }
 
   Address CoordinatorAddress() { return coordinator_address_; }
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 4cca118e7..b1fefdfd9 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -92,6 +92,7 @@ class Queue {
  public:
   void Push(Message &&message) {
     {
+      MG_ASSERT(inner_.use_count() > 0);
       std::unique_lock<std::mutex> lock(inner_->mu);
 
       inner_->queue.emplace_back(std::forward<Message>(message));

From dd8dd4f6c41c1695810bba5ccd7c463700809411 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 09:31:06 +0000
Subject: [PATCH 15/31] Restructure responsibilities for assigning request ids
 to the transport handles. Simplify promise tracking to avoid replier
 addresses, enabling eventual direct server return (DSR)

---
 src/io/local_transport/local_transport.hpp      | 11 +++--------
 .../local_transport/local_transport_handle.hpp  | 14 ++++++++++++--
 src/io/message_conversion.hpp                   |  9 +--------
 src/io/simulator/simulator_handle.cpp           |  4 +---
 src/io/simulator/simulator_handle.hpp           | 17 ++++++++++++-----
 src/io/simulator/simulator_transport.hpp        | 11 +++--------
 src/io/transport.hpp                            |  8 ++------
 7 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp
index bf1991d21..258df6385 100644
--- a/src/io/local_transport/local_transport.hpp
+++ b/src/io/local_transport/local_transport.hpp
@@ -31,14 +31,9 @@ class LocalTransport {
       : local_transport_handle_(std::move(local_transport_handle)) {}
 
   template <Message RequestT, Message ResponseT>
-  ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestId request_id, RequestT request,
-                                    Duration timeout) {
-    auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
-
-    local_transport_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
-                                           std::move(promise));
-
-    return std::move(future);
+  ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
+    return local_transport_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address,
+                                                                                std::move(request), timeout);
   }
 
   template <Message... Ms>
diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp
index 869a7e9a7..25a106e9b 100644
--- a/src/io/local_transport/local_transport_handle.hpp
+++ b/src/io/local_transport/local_transport_handle.hpp
@@ -30,6 +30,7 @@ class LocalTransportHandle {
   mutable std::condition_variable cv_;
   bool should_shut_down_ = false;
   MessageHistogramCollector histograms_;
+  RequestId request_id_counter_ = 0;
 
   // the responses to requests that are being waited on
   std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
@@ -138,8 +139,10 @@ class LocalTransportHandle {
   }
 
   template <Message RequestT, Message ResponseT>
-  void SubmitRequest(Address to_address, Address from_address, RequestId request_id, RequestT &&request,
-                     Duration timeout, ResponsePromise<ResponseT> promise) {
+  ResponseFuture<ResponseT> SubmitRequest(Address to_address, Address from_address, RequestT &&request,
+                                          Duration timeout) {
+    auto [future, promise] = memgraph::io::FuturePromisePair<ResponseResult<ResponseT>>();
+
     const bool port_matches = to_address.last_known_port == from_address.last_known_port;
     const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
 
@@ -148,16 +151,23 @@ class LocalTransportHandle {
     const auto now = Now();
     const Time deadline = now + timeout;
 
+    RequestId request_id;
     {
       std::unique_lock<std::mutex> lock(mu_);
 
+      request_id = ++request_id_counter_;
       PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
       OpaquePromise opaque_promise(std::move(promise).ToUnique());
       DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
+
+      // TODO(tyler) assert not already present
+
       promises_.emplace(std::move(promise_key), std::move(dop));
     }  // lock dropped
 
     Send(to_address, from_address, request_id, std::forward<RequestT>(request));
+
+    return std::move(future);
   }
 };
 
diff --git a/src/io/message_conversion.hpp b/src/io/message_conversion.hpp
index c4cce732b..1463abc06 100644
--- a/src/io/message_conversion.hpp
+++ b/src/io/message_conversion.hpp
@@ -21,9 +21,6 @@ namespace memgraph::io {
 struct PromiseKey {
   Address requester_address;
   uint64_t request_id;
-  // TODO(tyler) possibly remove replier_address from promise key
-  // once we want to support DSR.
-  Address replier_address;
 
  public:
   friend bool operator<(const PromiseKey &lhs, const PromiseKey &rhs) {
@@ -31,11 +28,7 @@ struct PromiseKey {
       return lhs.requester_address < rhs.requester_address;
     }
 
-    if (lhs.request_id != rhs.request_id) {
-      return lhs.request_id < rhs.request_id;
-    }
-
-    return lhs.replier_address < rhs.replier_address;
+    return lhs.request_id < rhs.request_id;
   }
 };
 
diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 74b2854a0..74925812e 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -108,9 +108,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
     stats_.dropped_messages++;
   }
 
-  PromiseKey promise_key{.requester_address = to_address,
-                         .request_id = opaque_message.request_id,
-                         .replier_address = opaque_message.from_address};
+  PromiseKey promise_key{.requester_address = to_address, .request_id = opaque_message.request_id};
 
   if (promises_.contains(promise_key)) {
     // complete waiting promise if it's there
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index a781f6ee6..2b236f0ec 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -56,14 +56,14 @@ class SimulatorHandle {
   std::uniform_int_distribution<int> drop_distrib_{0, 99};
   SimulatorConfig config_;
   MessageHistogramCollector histograms_;
+  RequestId request_id_counter_{0};
 
   void TimeoutPromisesPastDeadline() {
     const Time now = cluster_wide_time_microseconds_;
     for (auto it = promises_.begin(); it != promises_.end();) {
       auto &[promise_key, dop] = *it;
       if (dop.deadline < now && config_.perform_timeouts) {
-        spdlog::info("timing out request from requester {} to replier {}.", promise_key.requester_address.ToString(),
-                     promise_key.replier_address.ToString());
+        spdlog::info("timing out request from requester {}.", promise_key.requester_address.ToString());
         std::move(dop).promise.TimeOut();
         it = promises_.erase(it);
 
@@ -101,12 +101,17 @@ class SimulatorHandle {
   bool ShouldShutDown() const;
 
   template <Message Request, Message Response>
-  void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
-                     Duration timeout, ResponsePromise<Response> &&promise) {
+  ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
+                                         std::function<bool()> &&maybe_tick_simulator) {
     auto type_info = TypeInfoFor(request);
 
+    auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
+        std::forward<std::function<bool()>>(maybe_tick_simulator));
+
     std::unique_lock<std::mutex> lock(mu_);
 
+    RequestId request_id = ++request_id_counter_;
+
     const Time deadline = cluster_wide_time_microseconds_ + timeout;
 
     std::any message(request);
@@ -117,7 +122,7 @@ class SimulatorHandle {
                      .type_info = type_info};
     in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
 
-    PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
+    PromiseKey promise_key{.requester_address = from_address, .request_id = request_id};
     OpaquePromise opaque_promise(std::move(promise).ToUnique());
     DeadlineAndOpaquePromise dop{
         .requested_at = cluster_wide_time_microseconds_,
@@ -130,6 +135,8 @@ class SimulatorHandle {
     stats_.total_requests++;
 
     cv_.notify_all();
+
+    return std::move(future);
   }
 
   template <Message... Ms>
diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp
index 1aacfed9a..5e5a24aa9 100644
--- a/src/io/simulator/simulator_transport.hpp
+++ b/src/io/simulator/simulator_transport.hpp
@@ -33,16 +33,11 @@ class SimulatorTransport {
       : simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {}
 
   template <Message RequestT, Message ResponseT>
-  ResponseFuture<ResponseT> Request(Address to_address, Address from_address, uint64_t request_id, RequestT request,
-                                    Duration timeout) {
+  ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
     std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
-    auto [future, promise] =
-        memgraph::io::FuturePromisePairWithNotifier<ResponseResult<ResponseT>>(maybe_tick_simulator);
 
-    simulator_handle_->SubmitRequest(to_address, from_address, request_id, std::move(request), timeout,
-                                     std::move(promise));
-
-    return std::move(future);
+    return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
+                                                                          timeout, std::move(maybe_tick_simulator));
   }
 
   template <Message... Ms>
diff --git a/src/io/transport.hpp b/src/io/transport.hpp
index 7640e07ac..2abf10af2 100644
--- a/src/io/transport.hpp
+++ b/src/io/transport.hpp
@@ -68,7 +68,6 @@ template <typename I>
 class Io {
   I implementation_;
   Address address_;
-  RequestId request_id_counter_ = 0;
   Duration default_timeout_ = std::chrono::microseconds{100000};
 
  public:
@@ -84,20 +83,17 @@ class Io {
   /// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients.
   template <Message RequestT, Message ResponseT>
   ResponseFuture<ResponseT> RequestWithTimeout(Address address, RequestT request, Duration timeout) {
-    const RequestId request_id = ++request_id_counter_;
     const Address from_address = address_;
-    return implementation_.template Request<RequestT, ResponseT>(address, from_address, request_id, request, timeout);
+    return implementation_.template Request<RequestT, ResponseT>(address, from_address, request, timeout);
   }
 
   /// Issue a request that times out after the default timeout. This tends
   /// to be used by clients.
   template <Message RequestT, Message ResponseT>
   ResponseFuture<ResponseT> Request(Address to_address, RequestT request) {
-    const RequestId request_id = ++request_id_counter_;
     const Duration timeout = default_timeout_;
     const Address from_address = address_;
-    return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, request_id,
-                                                                 std::move(request), timeout);
+    return implementation_.template Request<RequestT, ResponseT>(to_address, from_address, std::move(request), timeout);
   }
 
   /// Wait for an explicit number of microseconds for a request of one of the

From 4db83b81596cbdb81cfc36e7a920a0fabf3ead21 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 09:41:28 +0000
Subject: [PATCH 16/31] Add a few safety checks to the new concurrent Queue
 structures and the promise maps in the transport layer

---
 src/coordinator/coordinator_worker.hpp            | 1 +
 src/io/local_transport/local_transport_handle.hpp | 3 +--
 src/io/simulator/simulator_handle.hpp             | 3 +++
 src/storage/v3/shard_worker.hpp                   | 1 +
 4 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index 46ca39b02..4cf74111e 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -98,6 +98,7 @@ class Queue {
   }
 
   Message Pop() {
+    MG_ASSERT(inner_.use_count() > 0);
     std::unique_lock<std::mutex> lock(inner_->mu);
 
     while (inner_->queue.empty()) {
diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp
index 25a106e9b..1c2ce2510 100644
--- a/src/io/local_transport/local_transport_handle.hpp
+++ b/src/io/local_transport/local_transport_handle.hpp
@@ -160,8 +160,7 @@ class LocalTransportHandle {
       OpaquePromise opaque_promise(std::move(promise).ToUnique());
       DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
 
-      // TODO(tyler) assert not already present
-
+      MG_ASSERT(!promises_.contains(promise_key));
       promises_.emplace(std::move(promise_key), std::move(dop));
     }  // lock dropped
 
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 2b236f0ec..3420786c7 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -129,6 +129,9 @@ class SimulatorHandle {
         .deadline = deadline,
         .promise = std::move(opaque_promise),
     };
+
+    MG_ASSERT(!promises_.contains(promise_key));
+
     promises_.emplace(std::move(promise_key), std::move(dop));
 
     stats_.total_messages++;
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index b1fefdfd9..87d991cfd 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -102,6 +102,7 @@ class Queue {
   }
 
   Message Pop() {
+    MG_ASSERT(inner_.use_count() > 0);
     std::unique_lock<std::mutex> lock(inner_->mu);
 
     while (inner_->queue.empty()) {

From b83fb287adc60b2b64dea898b7e10a70a159cf6f Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 11:00:07 +0000
Subject: [PATCH 17/31] Apply feedback from clang-tidy

---
 src/coordinator/coordinator_worker.hpp            | 13 ++++---------
 src/io/local_transport/local_transport_handle.hpp |  2 +-
 src/io/message_histogram_collector.hpp            |  2 +-
 src/machine_manager/machine_manager.hpp           |  4 ++--
 src/storage/v3/shard_manager.hpp                  |  4 ++--
 src/storage/v3/shard_worker.hpp                   |  6 +++---
 6 files changed, 13 insertions(+), 18 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index 4cf74111e..f61a536d2 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -17,9 +17,8 @@
 #include <queue>
 #include <variant>
 
-#include <boost/uuid/uuid.hpp>
-
 #include "coordinator/coordinator.hpp"
+#include "coordinator/coordinator_rsm.hpp"
 #include "coordinator/shard_map.hpp"
 #include "io/address.hpp"
 #include "io/future.hpp"
@@ -36,16 +35,12 @@ namespace memgraph::coordinator::coordinator_worker {
 /// * Cron
 /// * RouteMessage
 
-using boost::uuids::uuid;
-
 using coordinator::Coordinator;
 using coordinator::CoordinatorRsm;
-using coordinator::ShardToInitialize;
 using io::Address;
 using io::RequestId;
 using io::Time;
 using io::messages::CoordinatorMessages;
-using io::rsm::Raft;
 using msgs::ReadRequests;
 using msgs::ReadResponses;
 using msgs::WriteRequests;
@@ -121,7 +116,7 @@ class CoordinatorWorker {
 
   bool Process(ShutDown && /*shut_down*/) { return false; }
 
-  bool Process(Cron &&cron) {
+  bool Process(Cron && /* cron */) {
     coordinator_.Cron();
     return true;
   }
@@ -136,8 +131,8 @@ class CoordinatorWorker {
   CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
       : io_(io), queue_(std::move(queue)), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
 
-  CoordinatorWorker(CoordinatorWorker &&) = default;
-  CoordinatorWorker &operator=(CoordinatorWorker &&) = default;
+  CoordinatorWorker(CoordinatorWorker &&) noexcept = default;
+  CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default;
   CoordinatorWorker(const CoordinatorWorker &) = delete;
   CoordinatorWorker &operator=(const CoordinatorWorker &) = delete;
   ~CoordinatorWorker() = default;
diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp
index 1c2ce2510..2303ae735 100644
--- a/src/io/local_transport/local_transport_handle.hpp
+++ b/src/io/local_transport/local_transport_handle.hpp
@@ -151,7 +151,7 @@ class LocalTransportHandle {
     const auto now = Now();
     const Time deadline = now + timeout;
 
-    RequestId request_id;
+    RequestId request_id = 0;
     {
       std::unique_lock<std::mutex> lock(mu_);
 
diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp
index 644fe272b..e663be988 100644
--- a/src/io/message_histogram_collector.hpp
+++ b/src/io/message_histogram_collector.hpp
@@ -62,7 +62,7 @@ struct LatencyHistogramSummaries {
   std::unordered_map<std::string, LatencyHistogramSummary> latencies;
 
   std::string SummaryTable() {
-    std::string output = "";
+    std::string output;
 
     const auto row = [&output](const auto &c1, const auto &c2, const auto &c3, const auto &c4, const auto &c5,
                                const auto &c6, const auto &c7) {
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 2bbc99441..8436099fd 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -86,8 +86,8 @@ class MachineManager {
     coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
   }
 
-  MachineManager(MachineManager &&) = default;
-  MachineManager &operator=(MachineManager &&) = default;
+  MachineManager(MachineManager &&) noexcept = default;
+  MachineManager &operator=(MachineManager &&) noexcept = default;
   MachineManager(const MachineManager &) = delete;
   MachineManager &operator=(const MachineManager &) = delete;
 
diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 559f51afe..60c7af9db 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -95,8 +95,8 @@ class ShardManager {
     }
   }
 
-  ShardManager(ShardManager &&) = default;
-  ShardManager &operator=(ShardManager &&) = default;
+  ShardManager(ShardManager &&) noexcept = default;
+  ShardManager &operator=(ShardManager &&) noexcept = default;
   ShardManager(const ShardManager &) = delete;
   ShardManager &operator=(const ShardManager &) = delete;
 
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 87d991cfd..0c8ee69c6 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -127,7 +127,7 @@ class ShardWorker {
 
   bool Process(ShutDown && /* shut_down */) { return false; }
 
-  bool Process(Cron &&cron) {
+  bool Process(Cron && /* cron */) {
     Cron();
     return true;
   }
@@ -201,8 +201,8 @@ class ShardWorker {
 
  public:
   ShardWorker(io::Io<IoImpl> io, Queue queue) : io_(io), queue_(queue) {}
-  ShardWorker(ShardWorker &&) = default;
-  ShardWorker &operator=(ShardWorker &&) = default;
+  ShardWorker(ShardWorker &&) noexcept = default;
+  ShardWorker &operator=(ShardWorker &&) noexcept = default;
   ShardWorker(const ShardWorker &) = delete;
   ShardWorker &operator=(const ShardWorker &) = delete;
   ~ShardWorker() = default;

From 9235515dabb00f79e3d8ab31d1460d1b60479649 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 12:38:12 +0000
Subject: [PATCH 18/31] Reduce high_density_shard_create_scan shard+thread
 combinations. Log Raft write request demangled names

---
 src/io/rsm/raft.hpp                           | 16 +++++++++++++++-
 tests/unit/high_density_shard_create_scan.cpp |  5 ++---
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp
index 8d825f9e9..a1814b77a 100644
--- a/src/io/rsm/raft.hpp
+++ b/src/io/rsm/raft.hpp
@@ -22,6 +22,8 @@
 #include <unordered_map>
 #include <vector>
 
+#include <boost/core/demangle.hpp>
+
 #include "io/message_conversion.hpp"
 #include "io/simulator/simulator.hpp"
 #include "io/transport.hpp"
@@ -109,6 +111,16 @@ utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* write_respo
   return typeid(WriteReturn);
 }
 
+template <class WriteOperation>
+utils::TypeInfoRef TypeInfoFor(const WriteRequest<WriteOperation> & /* write_request */) {
+  return typeid(WriteOperation);
+}
+
+template <class... WriteOperations>
+utils::TypeInfoRef TypeInfoFor(const WriteRequest<std::variant<WriteOperations...>> &write_request) {
+  return TypeInfoForVariant(write_request.operation);
+}
+
 /// AppendRequest is a raft-level message that the Leader
 /// periodically broadcasts to all Follower peers. This
 /// serves three main roles:
@@ -918,7 +930,9 @@ class Raft {
   // only leaders actually handle replication requests from clients
   std::optional<Role> Handle(Leader &leader, WriteRequest<WriteOperation> &&req, RequestId request_id,
                              Address from_address) {
-    Log("handling WriteRequest");
+    auto type_info = TypeInfoFor(req);
+    std::string demangled_name = boost::core::demangle(type_info.get().name());
+    Log("handling WriteRequest<" + demangled_name + ">");
 
     // we are the leader. add item to log and send Append to peers
     MG_ASSERT(state_.term >= LastLogTerm());
diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp
index 69c97ca19..6d0c23065 100644
--- a/tests/unit/high_density_shard_create_scan.cpp
+++ b/tests/unit/high_density_shard_create_scan.cpp
@@ -283,9 +283,8 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op
 }
 
 TEST(MachineManager, ManyShards) {
-  auto shards_attempts = {1,  2,  3,  4,  5,  6,  7,  8,  9,  10, 11, 12, 13,
-                          14, 15, 16, 17, 18, 19, 20, 22, 24, 26, 28, 30, 32};
-  auto shard_worker_thread_attempts = {1, 2, 3, 4, 6, 8};
+  auto shards_attempts = {1, 2, 4, 8};
+  auto shard_worker_thread_attempts = {1, 2, 4, 8};
   auto replication_factor = 1;
   auto create_ops = 1024;
   auto scan_ops = 1;

From 0364311dd036d4875926652e619e4dcace7bb933 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 12:44:47 +0000
Subject: [PATCH 19/31] Log latency histograms in the
 high_density_shard_create_scan test

---
 tests/unit/high_density_shard_create_scan.cpp | 29 ++++++++++---------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp
index 6d0c23065..64c1c4876 100644
--- a/tests/unit/high_density_shard_create_scan.cpp
+++ b/tests/unit/high_density_shard_create_scan.cpp
@@ -210,13 +210,13 @@ void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
 
 void RunWorkload(int shards, int replication_factor, int create_ops, int scan_ops, int shard_worker_threads,
                  int gap_between_shards) {
-  // std::cout << "======================== NEW TEST ======================== \n";
-  // std::cout << "shards:               " << shards << std::endl;
-  // std::cout << "replication factor:   " << replication_factor << std::endl;
-  // std::cout << "create ops:           " << create_ops << std::endl;
-  // std::cout << "scan all ops:         " << scan_ops << std::endl;
-  // std::cout << "shard worker threads: " << shard_worker_threads << std::endl;
-  // std::cout << "gap between shards:   " << gap_between_shards << std::endl;
+  spdlog::info("======================== NEW TEST ========================");
+  spdlog::info("shards:               ", shards);
+  spdlog::info("replication factor:   ", replication_factor);
+  spdlog::info("create ops:           ", create_ops);
+  spdlog::info("scan all ops:         ", scan_ops);
+  spdlog::info("shard worker threads: ", shard_worker_threads);
+  spdlog::info("gap between shards:   ", gap_between_shards);
 
   LocalSystem local_system;
 
@@ -269,15 +269,16 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op
 
   auto latencies = cli_io_2.ResponseLatencies();
 
-  // std::cout << "response latencies: \n" << latencies.SummaryTable();
+  spdlog::info("response latencies: \n{}", latencies.SummaryTable());
 
-  // std::cout << "serial time break-down: (μs)\n";
+  spdlog::info("serial time break-down: (μs)");
 
-  // std::cout << fmt::format("{: >20}: {: >10}\n", "split shard map", (time_after_shard_map_creation -
-  // time_before_shard_map_creation).count()); std::cout << fmt::format("{: >20}: {: >10}\n", "shard stabilization",
-  // (time_after_shard_stabilization - time_before_shard_stabilization).count()); std::cout << fmt::format("{: >20}: {:
-  // >10}\n", "create nodes", (time_after_creates - time_before_creates).count()); std::cout << fmt::format("{: >20}: {:
-  // >10}\n", "scan nodes", (time_after_scan - time_after_creates).count());
+  spdlog::info("{: >20}: {: >10}", "split shard map",
+               (time_after_shard_map_creation - time_before_shard_map_creation).count());
+  spdlog::info("{: >20}: {: >10}", "shard stabilization",
+               (time_after_shard_stabilization - time_before_shard_stabilization).count());
+  spdlog::info("{: >20}: {: >10}", "create nodes", (time_after_creates - time_before_creates).count());
+  spdlog::info("{: >20}: {: >10}", "scan nodes", (time_after_scan - time_after_creates).count());
 
   std::cout << fmt::format("{} {} {}\n", shards, shard_worker_threads, (time_after_scan - time_after_creates).count());
 }

From 2de1d6c3591914199f74ad9791b2f5d1e0614da3 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 13:27:45 +0000
Subject: [PATCH 20/31] Fix UB due to integer overflow

---
 src/io/rsm/raft.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp
index a1814b77a..eccbf031b 100644
--- a/src/io/rsm/raft.hpp
+++ b/src/io/rsm/raft.hpp
@@ -581,7 +581,7 @@ class Raft {
     const Time now = io_.Now();
     const Duration broadcast_timeout = RandomTimeout(kMinimumBroadcastTimeout, kMaximumBroadcastTimeout);
 
-    if (now - leader.last_broadcast > broadcast_timeout) {
+    if (now > leader.last_broadcast + broadcast_timeout) {
       BroadcastAppendEntries(leader.followers);
       leader.last_broadcast = now;
     }

From 25fdb1a1f0dc105a6c03e894179fb558ab3633ec Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 13:48:11 +0000
Subject: [PATCH 21/31] Make the high_density_shard_create_scan test run much
 faster

---
 tests/unit/high_density_shard_create_scan.cpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp
index 64c1c4876..9c2d1cfd7 100644
--- a/tests/unit/high_density_shard_create_scan.cpp
+++ b/tests/unit/high_density_shard_create_scan.cpp
@@ -284,10 +284,10 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op
 }
 
 TEST(MachineManager, ManyShards) {
-  auto shards_attempts = {1, 2, 4, 8};
-  auto shard_worker_thread_attempts = {1, 2, 4, 8};
+  auto shards_attempts = {1, 64};
+  auto shard_worker_thread_attempts = {1, 32};
   auto replication_factor = 1;
-  auto create_ops = 1024;
+  auto create_ops = 128;
   auto scan_ops = 1;
 
   std::cout << "splits threads scan_all_microseconds\n";

From 7e6ec8bb269144b19351b101513d9191d31e599b Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 3 Nov 2022 14:38:16 +0000
Subject: [PATCH 22/31] Capture this instead of all references in scope for
 call to std::visit to route CoordinatorWorker messages

---
 src/coordinator/coordinator_worker.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index f61a536d2..26b314946 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -142,7 +142,7 @@ class CoordinatorWorker {
       Message message = queue_.Pop();
 
       const bool should_continue =
-          std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
+          std::visit([this](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
 
       if (!should_continue) {
         return;

From 8598f6edf44889d0e7e4e42dd5842d516c5f333e Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 4 Nov 2022 11:14:39 +0000
Subject: [PATCH 23/31] Fix a race condition that happens when logging from a
 detached thread in the cluster property test. Improve the ShardManager dtor
 and log statements

---
 src/storage/v3/shard_manager.hpp           |  2 ++
 src/storage/v3/shard_worker.hpp            |  2 +-
 tests/simulation/cluster_property_test.cpp |  3 +++
 tests/simulation/test_cluster.hpp          | 25 +++++++++++++++++++---
 4 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 60c7af9db..b8f93b509 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -105,6 +105,8 @@ class ShardManager {
       worker.Push(shard_worker::ShutDown{});
     }
 
+    workers_.clear();
+
     // The jthread handes for our shard worker threads will be
     // blocked on implicitly when worker_handles_ is destroyed.
   }
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 0c8ee69c6..46e02e6cc 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -147,7 +147,7 @@ class ShardWorker {
   }
 
   Time Cron() {
-    spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
+    spdlog::info("running ShardWorker::Cron, address {}", io_.GetAddress().ToString());
     Time now = io_.Now();
 
     while (!cron_schedule_.empty()) {
diff --git a/tests/simulation/cluster_property_test.cpp b/tests/simulation/cluster_property_test.cpp
index 0de5c21fc..48327be5b 100644
--- a/tests/simulation/cluster_property_test.cpp
+++ b/tests/simulation/cluster_property_test.cpp
@@ -18,6 +18,7 @@
 #include <gtest/gtest.h>
 #include <rapidcheck.h>
 #include <rapidcheck/gtest.h>
+#include <spdlog/cfg/env.h>
 
 #include "generated_operations.hpp"
 #include "io/simulator/simulator_config.hpp"
@@ -35,6 +36,8 @@ using storage::v3::kMaximumCronInterval;
 RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) {
   // TODO(tyler) set abort_time to something more restrictive than Time::max()
 
+  spdlog::cfg::load_env_levels();
+
   SimulatorConfig sim_config{
       .drop_percent = 0,
       .perform_timeouts = false,
diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp
index b87bda998..6a32a391d 100644
--- a/tests/simulation/test_cluster.hpp
+++ b/tests/simulation/test_cluster.hpp
@@ -194,6 +194,22 @@ void ExecuteOp(msgs::ShardRequestManager<SimulatorTransport> &shard_request_mana
   }
 }
 
+/// This struct exists as a way of detaching
+/// a thread if something causes an uncaught
+/// exception - because that thread would not
+/// receive a ShutDown message otherwise, and
+/// would cause the test to hang forever.
+struct DetachIfDropped {
+  std::jthread &handle;
+  bool detach = true;
+
+  ~DetachIfDropped() {
+    if (detach && handle.joinable()) {
+      handle.detach();
+    }
+  }
+};
+
 void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config,
                           const std::vector<Op> &ops) {
   spdlog::info("========================== NEW SIMULATION ==========================");
@@ -217,9 +233,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
 
   auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
 
-  // Need to detach this thread so that the destructor does not
-  // block before we can propagate assertion failures.
-  mm_thread_1.detach();
+  auto detach_on_error = DetachIfDropped{.handle = mm_thread_1};
 
   // TODO(tyler) clarify addresses of coordinator etc... as it's a mess
 
@@ -236,6 +250,11 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
     std::visit([&](auto &o) { ExecuteOp(shard_request_manager, correctness_model, o); }, op.inner);
   }
 
+  // We have now completed our workload without failing any assertions, so we can
+  // disable detaching the worker thread, which will cause the mm_thread_1 jthread
+  // to be joined when this function returns.
+  detach_on_error.detach = false;
+
   simulator.ShutDown();
 
   SimulatorStats stats = simulator.Stats();

From 24864ff7d23c5e107fd346c953e99d929d2d8dce Mon Sep 17 00:00:00 2001
From: Tyler Neely <tyler.neely@memgraph.io>
Date: Fri, 4 Nov 2022 12:17:57 +0100
Subject: [PATCH 24/31] Update src/coordinator/coordinator_worker.hpp

Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>
---
 src/coordinator/coordinator_worker.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index 26b314946..888d6283a 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -108,7 +108,7 @@ class Queue {
 };
 
 /// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
-template <class IoImpl>
+template <typename IoImpl>
 class CoordinatorWorker {
   io::Io<IoImpl> io_;
   Queue queue_;

From 43ad5855c48703cd4f3b36c1c3b03eeee6b48bee Mon Sep 17 00:00:00 2001
From: Tyler Neely <tyler.neely@memgraph.io>
Date: Fri, 4 Nov 2022 12:27:40 +0100
Subject: [PATCH 25/31] Update src/machine_manager/machine_manager.hpp

Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>
---
 src/machine_manager/machine_manager.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 8436099fd..64fc5a3b3 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -94,7 +94,7 @@ class MachineManager {
   ~MachineManager() {
     if (coordinator_handle_.joinable()) {
       coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{});
-      std::move(coordinator_handle_).join();
+      coordinator_handle_.join();
     }
   }
 

From 486231b1b90bbad9746aa9b2f7baf407eee0fcce Mon Sep 17 00:00:00 2001
From: Tyler Neely <tyler.neely@memgraph.io>
Date: Fri, 4 Nov 2022 12:28:01 +0100
Subject: [PATCH 26/31] Update src/machine_manager/machine_manager.hpp

Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>
---
 src/machine_manager/machine_manager.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 64fc5a3b3..c1c9188b2 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -150,7 +150,7 @@ class MachineManager {
         CoordinatorMessages &&cm = std::move(conversion_attempt.value());
 
         CoordinatorRouteMessage route_message{
-            .message = std::forward<CoordinatorMessages>(cm),
+            .message = std::move<CoordinatorMessages>(cm),
             .request_id = request_envelope.request_id,
             .to = request_envelope.to_address,
             .from = request_envelope.from_address,

From c745f8c8774dc8c153bc4ce1d667840262f90d00 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 4 Nov 2022 11:32:17 +0000
Subject: [PATCH 27/31] Fix build after breaking code suggestion

---
 src/machine_manager/machine_manager.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index c1c9188b2..f9ea8ff2a 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -150,7 +150,7 @@ class MachineManager {
         CoordinatorMessages &&cm = std::move(conversion_attempt.value());
 
         CoordinatorRouteMessage route_message{
-            .message = std::move<CoordinatorMessages>(cm),
+            .message = std::move(cm),
             .request_id = request_envelope.request_id,
             .to = request_envelope.to_address,
             .from = request_envelope.from_address,

From fa5c9a2568222def0bd42d4588531891d3cae1a3 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 4 Nov 2022 11:33:02 +0000
Subject: [PATCH 28/31] Make items popped from *worker::Queue const. Use
 std::move instead of std::forward in one place

---
 src/coordinator/coordinator_worker.hpp | 4 ++--
 src/storage/v3/shard_worker.hpp        | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index 888d6283a..f449e63e9 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -86,7 +86,7 @@ class Queue {
       MG_ASSERT(inner_.use_count() > 0);
       std::unique_lock<std::mutex> lock(inner_->mu);
 
-      inner_->queue.emplace_back(std::forward<Message>(message));
+      inner_->queue.emplace_back(std::move(message));
     }  // lock dropped before notifying condition variable
 
     inner_->cv.notify_all();
@@ -100,7 +100,7 @@ class Queue {
       inner_->cv.wait(lock);
     }
 
-    Message message = std::move(inner_->queue.front());
+    const Message message = std::move(inner_->queue.front());
     inner_->queue.pop_front();
 
     return message;
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 46e02e6cc..52677567a 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -109,7 +109,7 @@ class Queue {
       inner_->cv.wait(lock);
     }
 
-    Message message = std::move(inner_->queue.front());
+    const Message message = std::move(inner_->queue.front());
     inner_->queue.pop_front();
 
     return message;

From bb7c7f762783fa17962c619e9e50df0b192d4aee Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 4 Nov 2022 14:07:49 +0000
Subject: [PATCH 29/31] Make popped messages non-const to allow for RVO

---
 src/coordinator/coordinator_worker.hpp | 2 +-
 src/storage/v3/shard_worker.hpp        | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index f449e63e9..ec7410f8b 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -100,7 +100,7 @@ class Queue {
       inner_->cv.wait(lock);
     }
 
-    const Message message = std::move(inner_->queue.front());
+    Message message = std::move(inner_->queue.front());
     inner_->queue.pop_front();
 
     return message;
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 52677567a..46e02e6cc 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -109,7 +109,7 @@ class Queue {
       inner_->cv.wait(lock);
     }
 
-    const Message message = std::move(inner_->queue.front());
+    Message message = std::move(inner_->queue.front());
     inner_->queue.pop_front();
 
     return message;

From 528e30a9bed9a260430b159637968ccd7c58d60d Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 4 Nov 2022 14:13:15 +0000
Subject: [PATCH 30/31] Avoid warning for not using captured this. Use
 std::move instead of forward where appropriate

---
 src/coordinator/coordinator_worker.hpp | 4 ++--
 src/storage/v3/shard_manager.hpp       | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index ec7410f8b..600ebdc31 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -141,8 +141,8 @@ class CoordinatorWorker {
     while (true) {
       Message message = queue_.Pop();
 
-      const bool should_continue =
-          std::visit([this](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
+      const bool should_continue = std::visit(
+          [this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
 
       if (!should_continue) {
         return;
diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index b8f93b509..a3dc8b9b6 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -183,7 +183,7 @@ class ShardManager {
     MG_ASSERT(address.last_known_ip == to.last_known_ip);
 
     SendToWorkerByUuid(to.unique_id, shard_worker::RouteMessage{
-                                         .message = std::forward<ShardMessages>(sm),
+                                         .message = std::move(sm),
                                          .request_id = request_id,
                                          .to = to,
                                          .from = from,

From 1abfe288066097ab0a8133f55e302a9aee9bb5ce Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 4 Nov 2022 15:11:32 +0000
Subject: [PATCH 31/31] Correctly use a moved transport interface while
 constructing CoordinatorWorker

---
 src/coordinator/coordinator_worker.hpp | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index 600ebdc31..46bc546cd 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -129,7 +129,9 @@ class CoordinatorWorker {
 
  public:
   CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
-      : io_(io), queue_(std::move(queue)), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
+      : io_(std::move(io)),
+        queue_(std::move(queue)),
+        coordinator_{std::move(io_.ForkLocal()), {}, std::move(coordinator)} {}
 
   CoordinatorWorker(CoordinatorWorker &&) noexcept = default;
   CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default;