From 80d677621037fe56cb2b40bf56060601584750ad Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 17:36:46 +0000
Subject: [PATCH 01/26] Improve simulator determinism

---
 src/io/future.hpp                        |  6 +--
 src/io/message_histogram_collector.hpp   |  4 ++
 src/io/simulator/simulator_handle.cpp    | 31 +++++++++++--
 src/io/simulator/simulator_handle.hpp    | 42 +++++++++++-------
 src/io/simulator/simulator_stats.hpp     | 14 ++++++
 src/io/simulator/simulator_transport.hpp |  7 ++-
 tests/simulation/basic_request.cpp       | 55 ++++++++++++++++++------
 7 files changed, 122 insertions(+), 37 deletions(-)

diff --git a/src/io/future.hpp b/src/io/future.hpp
index bb2ef548c..98437b496 100644
--- a/src/io/future.hpp
+++ b/src/io/future.hpp
@@ -64,7 +64,6 @@ class Shared {
     waiting_ = true;
 
     while (!item_) {
-      bool simulator_progressed = false;
       if (simulator_notifier_) [[unlikely]] {
         // We can't hold our own lock while notifying
         // the simulator because notifying the simulator
@@ -77,7 +76,7 @@ class Shared {
         // so we have to get out of its way to avoid
         // a cyclical deadlock.
         lock.unlock();
-        simulator_progressed = std::invoke(simulator_notifier_);
+        std::invoke(simulator_notifier_);
         lock.lock();
         if (item_) {
           // item may have been filled while we
@@ -85,8 +84,7 @@ class Shared {
           // the simulator of our waiting_ status.
           break;
         }
-      }
-      if (!simulator_progressed) [[likely]] {
+      } else {
         cv_.wait(lock);
       }
       MG_ASSERT(!consumed_, "Future consumed twice!");
diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp
index e663be988..27521fdeb 100644
--- a/src/io/message_histogram_collector.hpp
+++ b/src/io/message_histogram_collector.hpp
@@ -39,6 +39,8 @@ struct LatencyHistogramSummary {
   Duration p100;
   Duration sum;
 
+  friend bool operator==(const LatencyHistogramSummary &lhs, const LatencyHistogramSummary &rhs) = default;
+
   friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) {
     in << "{ \"count\": " << histo.count;
     in << ", \"p0\": " << histo.p0.count();
@@ -80,6 +82,8 @@ struct LatencyHistogramSummaries {
     return output;
   }
 
+  friend bool operator==(const LatencyHistogramSummaries &lhs, const LatencyHistogramSummaries &rhs) = default;
+
   friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummaries &histo) {
     using memgraph::utils::print_helpers::operator<<;
     in << histo.latencies;
diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 74925812e..096673e21 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -46,9 +46,13 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
     const bool all_servers_blocked = blocked_servers == server_addresses_.size();
 
     if (all_servers_blocked) {
+      spdlog::info("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers,
+                   server_addresses_.size());
       return;
     }
 
+    spdlog::info("not returning from quiescent because we see {} blocked out of {}", blocked_servers,
+                 server_addresses_.size());
     cv_.wait(lock);
   }
 }
@@ -67,9 +71,14 @@ bool SimulatorHandle::MaybeTickSimulator() {
 
   stats_.simulator_ticks++;
 
-  cv_.notify_all();
+  bool fired_cv = false;
+  bool timed_anything_out = TimeoutPromisesPastDeadline();
 
-  TimeoutPromisesPastDeadline();
+  if (timed_anything_out) {
+    fired_cv = true;
+    blocked_on_receive_.clear();
+    cv_.notify_all();
+  }
 
   if (in_flight_.empty()) {
     // return early here because there are no messages to schedule
@@ -79,9 +88,15 @@ bool SimulatorHandle::MaybeTickSimulator() {
     const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
     cluster_wide_time_microseconds_ += clock_advance;
 
+    if (!fired_cv) {
+      cv_.notify_all();
+      blocked_on_receive_.clear();
+      fired_cv = true;
+    }
+
     if (cluster_wide_time_microseconds_ >= config_.abort_time) {
       if (should_shut_down_) {
-        return false;
+        return true;
       }
       spdlog::error(
           "Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
@@ -120,22 +135,32 @@ bool SimulatorHandle::MaybeTickSimulator() {
     if (should_drop || normal_timeout) {
       stats_.timed_out_requests++;
       dop.promise.TimeOut();
+      spdlog::info("timing out request");
     } else {
       stats_.total_responses++;
       Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
       auto type_info = opaque_message.type_info;
       dop.promise.Fill(std::move(opaque_message), response_latency);
       histograms_.Measure(type_info, response_latency);
+      spdlog::info("replying to request");
     }
   } else if (should_drop) {
     // don't add it anywhere, let it drop
+    spdlog::info("silently dropping request");
   } else {
     // add to can_receive_ if not
+    spdlog::info("adding message to can_receive_");
     const auto &[om_vec, inserted] =
         can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
     om_vec->second.emplace_back(std::move(opaque_message));
   }
 
+  if (!fired_cv) {
+    cv_.notify_all();
+    blocked_on_receive_.clear();
+    fired_cv = true;
+  }
+
   return true;
 }
 
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 3420786c7..5e564224b 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -58,7 +58,8 @@ class SimulatorHandle {
   MessageHistogramCollector histograms_;
   RequestId request_id_counter_{0};
 
-  void TimeoutPromisesPastDeadline() {
+  bool TimeoutPromisesPastDeadline() {
+    bool timed_anything_out = false;
     const Time now = cluster_wide_time_microseconds_;
     for (auto it = promises_.begin(); it != promises_.end();) {
       auto &[promise_key, dop] = *it;
@@ -68,10 +69,12 @@ class SimulatorHandle {
         it = promises_.erase(it);
 
         stats_.timed_out_requests++;
+        timed_anything_out = true;
       } else {
         ++it;
       }
     }
+    return timed_anything_out;
   }
 
  public:
@@ -103,6 +106,7 @@ class SimulatorHandle {
   template <Message Request, Message Response>
   ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
                                          std::function<bool()> &&maybe_tick_simulator) {
+    spdlog::info("submitting request to {}", to_address.last_known_port);
     auto type_info = TypeInfoFor(request);
 
     auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
@@ -146,8 +150,6 @@ class SimulatorHandle {
   requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
     std::unique_lock<std::mutex> lock(mu_);
 
-    blocked_on_receive_.emplace(receiver);
-
     const Time deadline = cluster_wide_time_microseconds_ + timeout;
 
     auto partial_address = receiver.ToPartialAddress();
@@ -164,38 +166,46 @@ class SimulatorHandle {
           auto m_opt = std::move(message).Take<Ms...>();
           MG_ASSERT(m_opt.has_value(), "Wrong message type received compared to the expected type");
 
-          blocked_on_receive_.erase(receiver);
-
           return std::move(m_opt).value();
         }
       }
 
       lock.unlock();
+      spdlog::info("server calling MaybeTickSimulator");
       bool made_progress = MaybeTickSimulator();
+      spdlog::info("server returned from MaybeTickSimulator");
       lock.lock();
       if (!should_shut_down_ && !made_progress) {
+        blocked_on_receive_.emplace(receiver);
+        cv_.notify_all();
+        spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port);
         cv_.wait(lock);
       }
     }
 
-    blocked_on_receive_.erase(receiver);
-
     return TimedOut{};
   }
 
   template <Message M>
   void Send(Address to_address, Address from_address, RequestId request_id, M message) {
+    spdlog::info("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port);
     auto type_info = TypeInfoFor(message);
-    std::unique_lock<std::mutex> lock(mu_);
-    std::any message_any(std::move(message));
-    OpaqueMessage om{.to_address = to_address,
-                     .from_address = from_address,
-                     .request_id = request_id,
-                     .message = std::move(message_any),
-                     .type_info = type_info};
-    in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
+    {
+      std::unique_lock<std::mutex> lock(mu_);
+      std::any message_any(std::move(message));
+      OpaqueMessage om{.to_address = to_address,
+                       .from_address = from_address,
+                       .request_id = request_id,
+                       .message = std::move(message_any),
+                       .type_info = type_info};
+      in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
 
-    stats_.total_messages++;
+      stats_.total_messages++;
+    }  // lock dropped before cv notification
+
+    spdlog::info("sender calling MaybeTickSimulator");
+    MaybeTickSimulator();
+    spdlog::info("sender returned from MaybeTickSimulator");
 
     cv_.notify_all();
   }
diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp
index 7f529a456..08d1f1dd3 100644
--- a/src/io/simulator/simulator_stats.hpp
+++ b/src/io/simulator/simulator_stats.hpp
@@ -21,5 +21,19 @@ struct SimulatorStats {
   uint64_t total_requests = 0;
   uint64_t total_responses = 0;
   uint64_t simulator_ticks = 0;
+
+  friend bool operator==(const SimulatorStats &lhs, const SimulatorStats &rhs) = default;
+
+  friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
+    std::string formated = fmt::format(
+        "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, "
+        "total_responses: {}, simulator_ticks: {} }}",
+        stats.total_messages, stats.dropped_messages, stats.timed_out_requests, stats.total_requests,
+        stats.total_responses, stats.simulator_ticks);
+
+    in << formated;
+
+    return in;
+  }
 };
 };  // namespace memgraph::io::simulator
diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp
index 5e5a24aa9..6b6bc7c5c 100644
--- a/src/io/simulator/simulator_transport.hpp
+++ b/src/io/simulator/simulator_transport.hpp
@@ -34,7 +34,12 @@ class SimulatorTransport {
 
   template <Message RequestT, Message ResponseT>
   ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
-    std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
+    std::function<bool()> maybe_tick_simulator = [this] {
+      spdlog::info("client calling MaybeTickSimulator");
+      bool ret = simulator_handle_->MaybeTickSimulator();
+      spdlog::info("client returned from MaybeTickSimulator");
+      return ret;
+    };
 
     return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
                                                                           timeout, std::move(maybe_tick_simulator));
diff --git a/tests/simulation/basic_request.cpp b/tests/simulation/basic_request.cpp
index 1f6d60f77..27f083273 100644
--- a/tests/simulation/basic_request.cpp
+++ b/tests/simulation/basic_request.cpp
@@ -9,17 +9,25 @@
 // by the Apache License, Version 2.0, included in the file
 // licenses/APL.txt.
 
+#include <memory>
 #include <thread>
 
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <spdlog/cfg/env.h>
+
+#include "io/message_histogram_collector.hpp"
 #include "io/simulator/simulator.hpp"
 #include "utils/print_helpers.hpp"
 
 using memgraph::io::Address;
 using memgraph::io::Io;
+using memgraph::io::LatencyHistogramSummaries;
 using memgraph::io::ResponseFuture;
 using memgraph::io::ResponseResult;
 using memgraph::io::simulator::Simulator;
 using memgraph::io::simulator::SimulatorConfig;
+using memgraph::io::simulator::SimulatorStats;
 using memgraph::io::simulator::SimulatorTransport;
 
 struct CounterRequest {
@@ -40,6 +48,7 @@ void run_server(Io<SimulatorTransport> io) {
       std::cout << "[SERVER] Error, continue" << std::endl;
       continue;
     }
+    std::cout << "[SERVER] Got message" << std::endl;
     auto request_envelope = request_result.GetValue();
     auto req = std::get<CounterRequest>(request_envelope.message);
 
@@ -50,13 +59,7 @@ void run_server(Io<SimulatorTransport> io) {
   }
 }
 
-int main() {
-  auto config = SimulatorConfig{
-      .drop_percent = 0,
-      .perform_timeouts = true,
-      .scramble_messages = true,
-      .rng_seed = 0,
-  };
+std::pair<SimulatorStats, LatencyHistogramSummaries> RunWorkload(SimulatorConfig &config) {
   auto simulator = Simulator(config);
 
   auto cli_addr = Address::TestAddress(1);
@@ -72,21 +75,47 @@ int main() {
     // send request
     CounterRequest cli_req;
     cli_req.proposal = i;
+    spdlog::info("[CLIENT] calling Request");
     auto res_f = cli_io.Request<CounterRequest, CounterResponse>(srv_addr, cli_req);
+    spdlog::info("[CLIENT] calling Wait");
     auto res_rez = std::move(res_f).Wait();
+    spdlog::info("[CLIENT] Wait returned");
     if (!res_rez.HasError()) {
-      std::cout << "[CLIENT] Got a valid response" << std::endl;
+      spdlog::info("[CLIENT] Got a valid response");
       auto env = res_rez.GetValue();
       MG_ASSERT(env.message.highest_seen == i);
-      std::cout << "response latency: " << env.response_latency.count() << " microseconds" << std::endl;
+      spdlog::info("response latency: {} microseconds", env.response_latency.count());
     } else {
-      std::cout << "[CLIENT] Got an error" << std::endl;
+      spdlog::info("[CLIENT] Got an error");
     }
   }
 
-  using memgraph::utils::print_helpers::operator<<;
-  std::cout << "response latencies: " << cli_io.ResponseLatencies() << std::endl;
-
   simulator.ShutDown();
+
+  return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
+}
+
+int main() {
+  spdlog::cfg::load_env_levels();
+
+  auto config = SimulatorConfig{
+      .drop_percent = 0,
+      .perform_timeouts = true,
+      .scramble_messages = true,
+      .rng_seed = 0,
+  };
+
+  auto [sim_stats_1, latency_stats_1] = RunWorkload(config);
+  auto [sim_stats_2, latency_stats_2] = RunWorkload(config);
+
+  if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
+    spdlog::info("simulator stats diverged across runs");
+    spdlog::info("run 1 simulator stats: {}", sim_stats_1);
+    spdlog::info("run 2 simulator stats: {}", sim_stats_2);
+    spdlog::info("run 1 latency:\n{}", latency_stats_1.SummaryTable());
+    spdlog::info("run 2 latency:\n{}", latency_stats_2.SummaryTable());
+    std::terminate();
+  }
+
   return 0;
 }

From 12880fc71a0e6cf09a64fb6cd0e882c7fb788d63 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 18:27:12 +0000
Subject: [PATCH 02/26] Don't advance the simulator handle from server threads
 themselves

---
 src/io/simulator/simulator_handle.hpp | 11 +-----
 tests/simulation/raft.cpp             | 48 ++++++++++++++++++++-------
 2 files changed, 37 insertions(+), 22 deletions(-)

diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 5e564224b..00aeaee25 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -170,12 +170,7 @@ class SimulatorHandle {
         }
       }
 
-      lock.unlock();
-      spdlog::info("server calling MaybeTickSimulator");
-      bool made_progress = MaybeTickSimulator();
-      spdlog::info("server returned from MaybeTickSimulator");
-      lock.lock();
-      if (!should_shut_down_ && !made_progress) {
+      if (!should_shut_down_) {
         blocked_on_receive_.emplace(receiver);
         cv_.notify_all();
         spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port);
@@ -203,10 +198,6 @@ class SimulatorHandle {
       stats_.total_messages++;
     }  // lock dropped before cv notification
 
-    spdlog::info("sender calling MaybeTickSimulator");
-    MaybeTickSimulator();
-    spdlog::info("sender returned from MaybeTickSimulator");
-
     cv_.notify_all();
   }
 
diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index df619bd42..9adbc152f 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -18,7 +18,10 @@
 #include <thread>
 #include <vector>
 
+#include <spdlog/cfg/env.h>
+
 #include "io/address.hpp"
+#include "io/message_histogram_collector.hpp"
 #include "io/rsm/raft.hpp"
 #include "io/rsm/rsm_client.hpp"
 #include "io/simulator/simulator.hpp"
@@ -27,6 +30,7 @@
 using memgraph::io::Address;
 using memgraph::io::Duration;
 using memgraph::io::Io;
+using memgraph::io::LatencyHistogramSummaries;
 using memgraph::io::ResponseEnvelope;
 using memgraph::io::ResponseFuture;
 using memgraph::io::ResponseResult;
@@ -123,16 +127,7 @@ void RunRaft(Raft<IoImpl, TestState, CasRequest, CasResponse, GetRequest, GetRes
   server.Run();
 }
 
-void RunSimulation() {
-  SimulatorConfig config{
-      .drop_percent = 5,
-      .perform_timeouts = true,
-      .scramble_messages = true,
-      .rng_seed = 0,
-      .start_time = Time::min() + std::chrono::microseconds{256 * 1024},
-      .abort_time = Time::max(),
-  };
-
+std::pair<SimulatorStats, LatencyHistogramSummaries> RunSimulation(SimulatorConfig &config) {
   auto simulator = Simulator(config);
 
   auto cli_addr = Address::TestAddress(1);
@@ -240,6 +235,8 @@ void RunSimulation() {
 
   spdlog::info("========================== SUCCESS :) ==========================");
 
+  return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
+
   /*
   this is implicit in jthread's dtor
   srv_thread_1.join();
@@ -249,12 +246,39 @@ void RunSimulation() {
 }
 
 int main() {
+  spdlog::cfg::load_env_levels();
+
   int n_tests = 50;
 
+  SimulatorConfig config{
+      .drop_percent = 5,
+      .perform_timeouts = true,
+      .scramble_messages = true,
+      .rng_seed = 0,
+      .start_time = Time::min() + std::chrono::microseconds{256 * 1024},
+      .abort_time = Time::max(),
+  };
+
   for (int i = 0; i < n_tests; i++) {
-    spdlog::info("========================== NEW SIMULATION {} ==========================", i);
+    spdlog::error("========================== NEW SIMULATION SEED {} ==========================", i);
     spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
-    RunSimulation();
+
+    // this is vital to cause tests to behave differently across runs
+    config.rng_seed = i;
+
+    auto [sim_stats_1, latency_stats_1] = RunSimulation(config);
+    auto [sim_stats_2, latency_stats_2] = RunSimulation(config);
+
+    if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
+      spdlog::error("simulator stats diverged across runs");
+      spdlog::error("run 1 simulator stats: {}", sim_stats_1);
+      spdlog::error("run 2 simulator stats: {}", sim_stats_2);
+      spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
+      spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
+      std::terminate();
+    }
+    spdlog::error("run 1 simulator stats: {}", sim_stats_1);
+    spdlog::error("run 2 simulator stats: {}", sim_stats_2);
   }
 
   spdlog::info("passed {} tests!", n_tests);

From 098084314e87e1acec8c9965dad05548103249d1 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 21:22:41 +0000
Subject: [PATCH 03/26] Make TestAddress deterministically sortable

---
 src/io/address.hpp                 |  2 +-
 tests/simulation/basic_request.cpp | 10 +++++-----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/io/address.hpp b/src/io/address.hpp
index a6e372edb..533b62bcc 100644
--- a/src/io/address.hpp
+++ b/src/io/address.hpp
@@ -59,7 +59,7 @@ struct Address {
 
   static Address TestAddress(uint16_t port) {
     return Address{
-        .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
+        .unique_id = boost::uuids::uuid{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, static_cast<unsigned char>(port)},
         .last_known_port = port,
     };
   }
diff --git a/tests/simulation/basic_request.cpp b/tests/simulation/basic_request.cpp
index 27f083273..868f4ac10 100644
--- a/tests/simulation/basic_request.cpp
+++ b/tests/simulation/basic_request.cpp
@@ -109,11 +109,11 @@ int main() {
   auto [sim_stats_2, latency_stats_2] = RunWorkload(config);
 
   if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
-    spdlog::info("simulator stats diverged across runs");
-    spdlog::info("run 1 simulator stats: {}", sim_stats_1);
-    spdlog::info("run 2 simulator stats: {}", sim_stats_2);
-    spdlog::info("run 1 latency:\n{}", latency_stats_1.SummaryTable());
-    spdlog::info("run 2 latency:\n{}", latency_stats_2.SummaryTable());
+    spdlog::error("simulator stats diverged across runs");
+    spdlog::error("run 1 simulator stats: {}", sim_stats_1);
+    spdlog::error("run 2 simulator stats: {}", sim_stats_2);
+    spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
+    spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
     std::terminate();
   }
 

From 262df5c6a2e01b09cdb241e04a2e9eb690b1fe08 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 21:24:13 +0000
Subject: [PATCH 04/26] Avoid unordered_map in Raft code for more determinism

---
 src/io/rsm/raft.hpp | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp
index eccbf031b..4ec22ff87 100644
--- a/src/io/rsm/raft.hpp
+++ b/src/io/rsm/raft.hpp
@@ -19,7 +19,6 @@
 #include <map>
 #include <set>
 #include <thread>
-#include <unordered_map>
 #include <vector>
 
 #include <boost/core/demangle.hpp>
@@ -182,7 +181,7 @@ struct PendingClientRequest {
 
 struct Leader {
   std::map<Address, FollowerTracker> followers;
-  std::unordered_map<LogIndex, PendingClientRequest> pending_client_requests;
+  std::map<LogIndex, PendingClientRequest> pending_client_requests;
   Time last_broadcast = Time::min();
 
   std::string static ToString() { return "\tLeader   \t"; }
@@ -683,7 +682,7 @@ class Raft {
 
       return Leader{
           .followers = std::move(followers),
-          .pending_client_requests = std::unordered_map<LogIndex, PendingClientRequest>(),
+          .pending_client_requests = std::map<LogIndex, PendingClientRequest>(),
       };
     }
 

From cf73ed529d901209f42a22d2720fbea8c70b446c Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 21:27:48 +0000
Subject: [PATCH 05/26] Block messages from being delivered upon ShutDown

---
 src/io/simulator/simulator_handle.cpp | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 096673e21..df7895dd7 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -23,6 +23,12 @@ namespace memgraph::io::simulator {
 void SimulatorHandle::ShutDown() {
   std::unique_lock<std::mutex> lock(mu_);
   should_shut_down_ = true;
+  for (auto it = promises_.begin(); it != promises_.end();) {
+    auto &[promise_key, dop] = *it;
+    std::move(dop).promise.TimeOut();
+    it = promises_.erase(it);
+  }
+  can_receive_.clear();
   cv_.notify_all();
 }
 
@@ -62,7 +68,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
 
   const size_t blocked_servers = blocked_on_receive_.size();
 
-  if (blocked_servers < server_addresses_.size()) {
+  if (should_shut_down_ || blocked_servers < server_addresses_.size()) {
     // we only need to advance the simulator when all
     // servers have reached a quiescent state, blocked
     // on their own futures or receive methods.

From 9c3d683942289b2eba282ee3101d83ac810c7008 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 21:28:17 +0000
Subject: [PATCH 06/26] Explicitly join test threads before collecting test
 stats

---
 tests/simulation/raft.cpp | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index 9adbc152f..16fcc9906 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -224,6 +224,10 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunSimulation(SimulatorConf
 
   simulator.ShutDown();
 
+  srv_thread_1.join();
+  srv_thread_2.join();
+  srv_thread_3.join();
+
   SimulatorStats stats = simulator.Stats();
 
   spdlog::info("total messages:     {}", stats.total_messages);

From f6017697d6b1841c6330abd196878762c30250c4 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Thu, 17 Nov 2022 21:32:55 +0000
Subject: [PATCH 07/26] Make raft tests fully deterministic for rng_seeds
 between 0 and 500 at 1% message loss

---
 src/io/simulator/simulator_handle.cpp    | 15 +++++++++++++--
 src/io/simulator/simulator_handle.hpp    |  9 ++++++---
 src/io/simulator/simulator_transport.hpp |  7 +------
 tests/simulation/raft.cpp                | 16 +++++-----------
 4 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index df7895dd7..2a5b0a247 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -75,12 +75,14 @@ bool SimulatorHandle::MaybeTickSimulator() {
     return false;
   }
 
+  spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
   stats_.simulator_ticks++;
 
   bool fired_cv = false;
   bool timed_anything_out = TimeoutPromisesPastDeadline();
 
   if (timed_anything_out) {
+    spdlog::info("simulator progressing: timed out a request");
     fired_cv = true;
     blocked_on_receive_.clear();
     cv_.notify_all();
@@ -95,6 +97,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
     cluster_wide_time_microseconds_ += clock_advance;
 
     if (!fired_cv) {
+      spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
       cv_.notify_all();
       blocked_on_receive_.clear();
       fired_cv = true;
@@ -109,6 +112,9 @@ bool SimulatorHandle::MaybeTickSimulator() {
           "in an expected amount of time.");
       throw utils::BasicException{"Cluster has executed beyond its configured abort_time"};
     }
+
+    // if the clock is advanced, no messages should be delivered also.
+    // do that in a future tick.
     return true;
   }
 
@@ -141,7 +147,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
     if (should_drop || normal_timeout) {
       stats_.timed_out_requests++;
       dop.promise.TimeOut();
-      spdlog::info("timing out request");
+      spdlog::info("timing out request ");
     } else {
       stats_.total_responses++;
       Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
@@ -153,15 +159,20 @@ bool SimulatorHandle::MaybeTickSimulator() {
   } else if (should_drop) {
     // don't add it anywhere, let it drop
     spdlog::info("silently dropping request");
+
+    // we don't want to reset the block list here
+    return true;
   } else {
     // add to can_receive_ if not
-    spdlog::info("adding message to can_receive_");
+    spdlog::info("adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
+                 opaque_message.to_address.last_known_port);
     const auto &[om_vec, inserted] =
         can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
     om_vec->second.emplace_back(std::move(opaque_message));
   }
 
   if (!fired_cv) {
+    spdlog::info("simulator progressing: handled a message");
     cv_.notify_all();
     blocked_on_receive_.clear();
     fired_cv = true;
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 00aeaee25..ce341a50b 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -171,12 +171,15 @@ class SimulatorHandle {
       }
 
       if (!should_shut_down_) {
-        blocked_on_receive_.emplace(receiver);
-        cv_.notify_all();
-        spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port);
+        if (!blocked_on_receive_.contains(receiver)) {
+          blocked_on_receive_.emplace(receiver);
+          spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port);
+          cv_.notify_all();
+        }
         cv_.wait(lock);
       }
     }
+    spdlog::info("timing out receiver {}", receiver.ToPartialAddress().port);
 
     return TimedOut{};
   }
diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp
index 6b6bc7c5c..5e5a24aa9 100644
--- a/src/io/simulator/simulator_transport.hpp
+++ b/src/io/simulator/simulator_transport.hpp
@@ -34,12 +34,7 @@ class SimulatorTransport {
 
   template <Message RequestT, Message ResponseT>
   ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, Duration timeout) {
-    std::function<bool()> maybe_tick_simulator = [this] {
-      spdlog::info("client calling MaybeTickSimulator");
-      bool ret = simulator_handle_->MaybeTickSimulator();
-      spdlog::info("client returned from MaybeTickSimulator");
-      return ret;
-    };
+    std::function<bool()> maybe_tick_simulator = [this] { return simulator_handle_->MaybeTickSimulator(); };
 
     return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(to_address, from_address, std::move(request),
                                                                           timeout, std::move(maybe_tick_simulator));
diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index 16fcc9906..b69470a72 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -240,13 +240,6 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunSimulation(SimulatorConf
   spdlog::info("========================== SUCCESS :) ==========================");
 
   return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
-
-  /*
-  this is implicit in jthread's dtor
-  srv_thread_1.join();
-  srv_thread_2.join();
-  srv_thread_3.join();
-  */
 }
 
 int main() {
@@ -264,25 +257,26 @@ int main() {
   };
 
   for (int i = 0; i < n_tests; i++) {
-    spdlog::error("========================== NEW SIMULATION SEED {} ==========================", i);
     spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
 
     // this is vital to cause tests to behave differently across runs
     config.rng_seed = i;
 
+    spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i);
     auto [sim_stats_1, latency_stats_1] = RunSimulation(config);
+    spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i);
     auto [sim_stats_2, latency_stats_2] = RunSimulation(config);
 
     if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
-      spdlog::error("simulator stats diverged across runs");
+      spdlog::error("simulator stats diverged across runs for test rng_seed {}", i);
       spdlog::error("run 1 simulator stats: {}", sim_stats_1);
       spdlog::error("run 2 simulator stats: {}", sim_stats_2);
       spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
       spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
       std::terminate();
     }
-    spdlog::error("run 1 simulator stats: {}", sim_stats_1);
-    spdlog::error("run 2 simulator stats: {}", sim_stats_2);
+    spdlog::info("run 1 simulator stats: {}", sim_stats_1);
+    spdlog::info("run 2 simulator stats: {}", sim_stats_2);
   }
 
   spdlog::info("passed {} tests!", n_tests);

From 923325b8fa9b0be7014c5734e0fcb5e63fa9623c Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 09:04:29 +0000
Subject: [PATCH 08/26] Progress the simulator clock even when there are
 messages to deliver

---
 src/io/simulator/simulator_handle.cpp | 50 ++++++++++++---------------
 src/io/simulator/simulator_handle.hpp |  2 +-
 2 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 2a5b0a247..2596ad12d 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -75,6 +75,8 @@ bool SimulatorHandle::MaybeTickSimulator() {
     return false;
   }
 
+  // We allow the simulator to progress the state of the system only
+  // after all servers are blocked on receive.
   spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
   stats_.simulator_ticks++;
 
@@ -88,37 +90,31 @@ bool SimulatorHandle::MaybeTickSimulator() {
     cv_.notify_all();
   }
 
+  const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
+  cluster_wide_time_microseconds_ += clock_advance;
+
+  if (!fired_cv) {
+    spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
+    cv_.notify_all();
+    blocked_on_receive_.clear();
+    fired_cv = true;
+  }
+
+  if (cluster_wide_time_microseconds_ >= config_.abort_time) {
+    if (should_shut_down_) {
+      return true;
+    }
+    spdlog::error(
+        "Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
+        "in an expected amount of time.");
+    throw utils::BasicException{"Cluster has executed beyond its configured abort_time"};
+  }
+
   if (in_flight_.empty()) {
-    // return early here because there are no messages to schedule
-
-    // We tick the clock forward when all servers are blocked but
-    // there are no in-flight messages to schedule delivery of.
-    const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
-    cluster_wide_time_microseconds_ += clock_advance;
-
-    if (!fired_cv) {
-      spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
-      cv_.notify_all();
-      blocked_on_receive_.clear();
-      fired_cv = true;
-    }
-
-    if (cluster_wide_time_microseconds_ >= config_.abort_time) {
-      if (should_shut_down_) {
-        return true;
-      }
-      spdlog::error(
-          "Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
-          "in an expected amount of time.");
-      throw utils::BasicException{"Cluster has executed beyond its configured abort_time"};
-    }
-
-    // if the clock is advanced, no messages should be delivered also.
-    // do that in a future tick.
     return true;
   }
 
-  if (config_.scramble_messages) {
+  if (config_.scramble_messages && in_flight_.size() > 1) {
     // scramble messages
     std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1);
     const size_t swap_index = swap_distrib(rng_);
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index ce341a50b..71e60acf3 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -52,7 +52,7 @@ class SimulatorHandle {
   std::set<Address> blocked_on_receive_;
   std::set<Address> server_addresses_;
   std::mt19937 rng_;
-  std::uniform_int_distribution<int> time_distrib_{5, 50};
+  std::uniform_int_distribution<int> time_distrib_{0, 50};
   std::uniform_int_distribution<int> drop_distrib_{0, 99};
   SimulatorConfig config_;
   MessageHistogramCollector histograms_;

From 6b9a617df079ae757e5a3de04d736fabadee4720 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 09:20:15 +0000
Subject: [PATCH 09/26] Streamline simulator tick condition varible
 notification. Advance time more aggressively

---
 src/io/simulator/simulator_handle.cpp | 31 ++++++---------------------
 src/io/simulator/simulator_handle.hpp |  2 +-
 tests/simulation/raft.cpp             |  2 --
 3 files changed, 8 insertions(+), 27 deletions(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 2596ad12d..53b969712 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -79,26 +79,19 @@ bool SimulatorHandle::MaybeTickSimulator() {
   // after all servers are blocked on receive.
   spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
   stats_.simulator_ticks++;
+  blocked_on_receive_.clear();
+  cv_.notify_all();
 
-  bool fired_cv = false;
   bool timed_anything_out = TimeoutPromisesPastDeadline();
 
   if (timed_anything_out) {
     spdlog::info("simulator progressing: timed out a request");
-    fired_cv = true;
-    blocked_on_receive_.clear();
-    cv_.notify_all();
   }
 
   const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
   cluster_wide_time_microseconds_ += clock_advance;
 
-  if (!fired_cv) {
-    spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
-    cv_.notify_all();
-    blocked_on_receive_.clear();
-    fired_cv = true;
-  }
+  spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
 
   if (cluster_wide_time_microseconds_ >= config_.abort_time) {
     if (should_shut_down_) {
@@ -143,37 +136,27 @@ bool SimulatorHandle::MaybeTickSimulator() {
     if (should_drop || normal_timeout) {
       stats_.timed_out_requests++;
       dop.promise.TimeOut();
-      spdlog::info("timing out request ");
+      spdlog::info("simulator timing out request ");
     } else {
       stats_.total_responses++;
       Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
       auto type_info = opaque_message.type_info;
       dop.promise.Fill(std::move(opaque_message), response_latency);
       histograms_.Measure(type_info, response_latency);
-      spdlog::info("replying to request");
+      spdlog::info("simulator replying to request");
     }
   } else if (should_drop) {
     // don't add it anywhere, let it drop
-    spdlog::info("silently dropping request");
-
-    // we don't want to reset the block list here
-    return true;
+    spdlog::info("simulator silently dropping request");
   } else {
     // add to can_receive_ if not
-    spdlog::info("adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
+    spdlog::info("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
                  opaque_message.to_address.last_known_port);
     const auto &[om_vec, inserted] =
         can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
     om_vec->second.emplace_back(std::move(opaque_message));
   }
 
-  if (!fired_cv) {
-    spdlog::info("simulator progressing: handled a message");
-    cv_.notify_all();
-    blocked_on_receive_.clear();
-    fired_cv = true;
-  }
-
   return true;
 }
 
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 71e60acf3..1ef0b3dfb 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -52,7 +52,7 @@ class SimulatorHandle {
   std::set<Address> blocked_on_receive_;
   std::set<Address> server_addresses_;
   std::mt19937 rng_;
-  std::uniform_int_distribution<int> time_distrib_{0, 50};
+  std::uniform_int_distribution<int> time_distrib_{0, 1000};
   std::uniform_int_distribution<int> drop_distrib_{0, 99};
   SimulatorConfig config_;
   MessageHistogramCollector histograms_;
diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index b69470a72..9e2583814 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -275,8 +275,6 @@ int main() {
       spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
       std::terminate();
     }
-    spdlog::info("run 1 simulator stats: {}", sim_stats_1);
-    spdlog::info("run 2 simulator stats: {}", sim_stats_2);
   }
 
   spdlog::info("passed {} tests!", n_tests);

From a37e7e4affc952405bc3936a253feea587b2ae62 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 10:19:55 +0000
Subject: [PATCH 10/26] Add assert to ensure TestAddress will not be higher
 than the uchar max

---
 src/io/address.hpp | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/io/address.hpp b/src/io/address.hpp
index 533b62bcc..e38b1e514 100644
--- a/src/io/address.hpp
+++ b/src/io/address.hpp
@@ -58,6 +58,8 @@ struct Address {
   uint16_t last_known_port;
 
   static Address TestAddress(uint16_t port) {
+    MG_ASSERT(port <= 255);
+
     return Address{
         .unique_id = boost::uuids::uuid{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, static_cast<unsigned char>(port)},
         .last_known_port = port,

From 0f32407bdcc7280a9a90a93a473555ff84d055ca Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 10:20:45 +0000
Subject: [PATCH 11/26] Add compare header to histogram collector header

---
 src/io/message_histogram_collector.hpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/io/message_histogram_collector.hpp b/src/io/message_histogram_collector.hpp
index 27521fdeb..a2cf266b7 100644
--- a/src/io/message_histogram_collector.hpp
+++ b/src/io/message_histogram_collector.hpp
@@ -13,6 +13,7 @@
 
 #include <chrono>
 #include <cmath>
+#include <compare>
 #include <unordered_map>
 
 #include <boost/core/demangle.hpp>

From 7115a7e75bd6c4d6a5692ed494e600d4d95e3baa Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 10:24:19 +0000
Subject: [PATCH 12/26] Apply clang-tidy fixes

---
 src/io/address.hpp                   | 2 ++
 src/io/simulator/simulator_stats.hpp | 4 +++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/src/io/address.hpp b/src/io/address.hpp
index e38b1e514..3bb518b50 100644
--- a/src/io/address.hpp
+++ b/src/io/address.hpp
@@ -20,6 +20,8 @@
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
 
+#include "utils/logging.hpp"
+
 namespace memgraph::io {
 
 struct PartialAddress {
diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp
index 08d1f1dd3..ccc5ec1de 100644
--- a/src/io/simulator/simulator_stats.hpp
+++ b/src/io/simulator/simulator_stats.hpp
@@ -13,6 +13,8 @@
 
 #include <cstdint>
 
+#include <fmt/format.h>
+
 namespace memgraph::io::simulator {
 struct SimulatorStats {
   uint64_t total_messages = 0;
@@ -22,7 +24,7 @@ struct SimulatorStats {
   uint64_t total_responses = 0;
   uint64_t simulator_ticks = 0;
 
-  friend bool operator==(const SimulatorStats &lhs, const SimulatorStats &rhs) = default;
+  friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default;
 
   friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
     std::string formated = fmt::format(

From 3ad84897351f1f23edae78cd1922ef0d5197a0d7 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 10:34:21 +0000
Subject: [PATCH 13/26] Run raft sim with random seeds over time, but allow a
 seed to be easily replayed using the RunWithSeed function

---
 tests/simulation/raft.cpp | 58 +++++++++++++++++++++++----------------
 1 file changed, 34 insertions(+), 24 deletions(-)

diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index 9e2583814..648814667 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -14,6 +14,7 @@
 #include <iostream>
 #include <map>
 #include <optional>
+#include <random>
 #include <set>
 #include <thread>
 #include <vector>
@@ -242,39 +243,48 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunSimulation(SimulatorConf
   return std::make_pair(simulator.Stats(), cli_io.ResponseLatencies());
 }
 
-int main() {
-  spdlog::cfg::load_env_levels();
-
-  int n_tests = 50;
-
+void RunWithSeed(uint64_t seed) {
   SimulatorConfig config{
       .drop_percent = 5,
       .perform_timeouts = true,
       .scramble_messages = true,
-      .rng_seed = 0,
+      .rng_seed = seed,
       .start_time = Time::min() + std::chrono::microseconds{256 * 1024},
       .abort_time = Time::max(),
   };
 
+  spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",
+               seed);
+  spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
+  auto [sim_stats_1, latency_stats_1] = RunSimulation(config);
+
+  spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",
+               seed);
+  spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
+  auto [sim_stats_2, latency_stats_2] = RunSimulation(config);
+
+  if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
+    spdlog::error("simulator stats diverged across runs for test rng_seed: {}", seed);
+    spdlog::error("run 1 simulator stats: {}", sim_stats_1);
+    spdlog::error("run 2 simulator stats: {}", sim_stats_2);
+    spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
+    spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
+    std::terminate();
+  }
+}
+
+int main() {
+  spdlog::cfg::load_env_levels();
+
+  std::random_device random_device;
+  std::mt19937 generator(random_device());
+  std::uniform_int_distribution<> distribution;
+
+  int n_tests = 50;
+
   for (int i = 0; i < n_tests; i++) {
-    spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
-
-    // this is vital to cause tests to behave differently across runs
-    config.rng_seed = i;
-
-    spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i);
-    auto [sim_stats_1, latency_stats_1] = RunSimulation(config);
-    spdlog::info("========================== NEW SIMULATION SEED {} ==========================", i);
-    auto [sim_stats_2, latency_stats_2] = RunSimulation(config);
-
-    if (sim_stats_1 != sim_stats_2 || latency_stats_1 != latency_stats_2) {
-      spdlog::error("simulator stats diverged across runs for test rng_seed {}", i);
-      spdlog::error("run 1 simulator stats: {}", sim_stats_1);
-      spdlog::error("run 2 simulator stats: {}", sim_stats_2);
-      spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
-      spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
-      std::terminate();
-    }
+    uint64_t seed = distribution(generator);
+    RunWithSeed(seed);
   }
 
   spdlog::info("passed {} tests!", n_tests);

From ce45a548c773d19caf0081b5cb2b365c23048ea3 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 16:42:18 +0000
Subject: [PATCH 14/26] Significantly improve the determinism of the
 coordinator, UUID generation, the machine manager, the shard manager, and the
 cluster property test

---
 src/coordinator/coordinator_worker.hpp     | 32 +++++++++++------
 src/coordinator/shard_map.cpp              | 28 +++++++++++++--
 src/io/address.hpp                         | 19 ++++++++++
 src/io/transport.hpp                       |  9 ++++-
 src/machine_manager/machine_config.hpp     |  1 +
 src/machine_manager/machine_manager.hpp    | 41 ++++++++++++++++++----
 src/query/v2/interpreter.cpp               |  6 +++-
 src/storage/v3/shard_manager.hpp           |  6 ++++
 src/storage/v3/shard_worker.hpp            | 32 +++++++++++------
 tests/simulation/cluster_property_test.cpp | 20 ++++++++---
 tests/simulation/test_cluster.hpp          | 23 +++++++-----
 11 files changed, 173 insertions(+), 44 deletions(-)

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
index 46bc546cd..f77d9a263 100644
--- a/src/coordinator/coordinator_worker.hpp
+++ b/src/coordinator/coordinator_worker.hpp
@@ -71,6 +71,9 @@ struct QueueInner {
   // starvation by sometimes randomizing priorities, rather than following a strict
   // prioritization.
   std::deque<Message> queue;
+
+  uint64_t submitted = 0;
+  uint64_t calls_to_pop = 0;
 };
 
 /// There are two reasons to implement our own Queue instead of using
@@ -86,6 +89,8 @@ class Queue {
       MG_ASSERT(inner_.use_count() > 0);
       std::unique_lock<std::mutex> lock(inner_->mu);
 
+      inner_->submitted++;
+
       inner_->queue.emplace_back(std::move(message));
     }  // lock dropped before notifying condition variable
 
@@ -96,6 +101,9 @@ class Queue {
     MG_ASSERT(inner_.use_count() > 0);
     std::unique_lock<std::mutex> lock(inner_->mu);
 
+    inner_->calls_to_pop++;
+    inner_->cv.notify_all();
+
     while (inner_->queue.empty()) {
       inner_->cv.wait(lock);
     }
@@ -105,6 +113,15 @@ class Queue {
 
     return message;
   }
+
+  void BlockOnQuiescence() const {
+    MG_ASSERT(inner_.use_count() > 0);
+    std::unique_lock<std::mutex> lock(inner_->mu);
+
+    while (inner_->calls_to_pop <= inner_->submitted) {
+      inner_->cv.wait(lock);
+    }
+  }
 };
 
 /// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
@@ -129,9 +146,7 @@ class CoordinatorWorker {
 
  public:
   CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
-      : io_(std::move(io)),
-        queue_(std::move(queue)),
-        coordinator_{std::move(io_.ForkLocal()), {}, std::move(coordinator)} {}
+      : io_(std::move(io)), queue_(std::move(queue)), coordinator_{std::move(io_), {}, std::move(coordinator)} {}
 
   CoordinatorWorker(CoordinatorWorker &&) noexcept = default;
   CoordinatorWorker &operator=(CoordinatorWorker &&) noexcept = default;
@@ -140,15 +155,12 @@ class CoordinatorWorker {
   ~CoordinatorWorker() = default;
 
   void Run() {
-    while (true) {
+    bool should_continue = true;
+    while (should_continue) {
       Message message = queue_.Pop();
 
-      const bool should_continue = std::visit(
-          [this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
-
-      if (!should_continue) {
-        return;
-      }
+      should_continue = std::visit([this](auto &&msg) { return this->Process(std::forward<decltype(msg)>(msg)); },
+                                   std::move(message));
     }
   }
 };
diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp
index d0b2cf9ad..687228890 100644
--- a/src/coordinator/shard_map.cpp
+++ b/src/coordinator/shard_map.cpp
@@ -228,7 +228,7 @@ Hlc ShardMap::IncrementShardMapVersion() noexcept {
   return shard_map_version;
 }
 
-// TODO(antaljanosbenjamin) use a single map for all name id 
+// TODO(antaljanosbenjamin) use a single map for all name id
 // mapping and a single counter to maintain the next id
 std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() {
   std::unordered_map<uint64_t, std::string> id_to_names;
@@ -248,6 +248,25 @@ std::unordered_map<uint64_t, std::string> ShardMap::IdToNames() {
 
 Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; }
 
+boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
+  return boost::uuids::uuid{0,
+                            0,
+                            0,
+                            0,
+                            0,
+                            0,
+                            0,
+                            0,
+                            static_cast<unsigned char>(shard_id >> 56),
+                            static_cast<unsigned char>(shard_id >> 48),
+                            static_cast<unsigned char>(shard_id >> 40),
+                            static_cast<unsigned char>(shard_id >> 32),
+                            static_cast<unsigned char>(shard_id >> 24),
+                            static_cast<unsigned char>(shard_id >> 16),
+                            static_cast<unsigned char>(shard_id >> 8),
+                            static_cast<unsigned char>(shard_id)};
+}
+
 std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
                                                       std::set<boost::uuids::uuid> initialized) {
   std::vector<ShardToInitialize> ret{};
@@ -268,6 +287,7 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
         if (initialized.contains(aas.address.unique_id)) {
           machine_contains_shard = true;
           if (aas.status != Status::CONSENSUS_PARTICIPANT) {
+            mutated = true;
             spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id);
             aas.status = Status::CONSENSUS_PARTICIPANT;
           }
@@ -292,10 +312,13 @@ std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager,
       }
 
       if (!machine_contains_shard && shard.size() < label_space.replication_factor) {
+        // increment version for each new uuid for deterministic creation
+        IncrementShardMapVersion();
+
         Address address = storage_manager;
 
         // TODO(tyler) use deterministic UUID so that coordinators don't diverge here
-        address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
+        address.unique_id = NewShardUuid(shard_map_version.logical_id);
 
         spdlog::info("assigning shard manager to shard");
 
@@ -383,6 +406,7 @@ std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std:
 void ShardMap::AddServer(Address server_address) {
   // Find a random place for the server to plug in
 }
+
 std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
   if (const auto it = labels.find(label); it != labels.end()) {
     return it->second;
diff --git a/src/io/address.hpp b/src/io/address.hpp
index 3bb518b50..4e5cb7627 100644
--- a/src/io/address.hpp
+++ b/src/io/address.hpp
@@ -68,12 +68,31 @@ struct Address {
     };
   }
 
+  // NB: don't use this in test code because it is non-deterministic
   static Address UniqueLocalAddress() {
     return Address{
         .unique_id = boost::uuids::uuid{boost::uuids::random_generator()()},
     };
   }
 
+  /// `Coordinator`s have constant UUIDs because there is at most one per ip/port pair.
+  Address ForkLocalCoordinator() {
+    return Address{
+        .unique_id = boost::uuids::uuid{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+        .last_known_ip = last_known_ip,
+        .last_known_port = last_known_port,
+    };
+  }
+
+  /// `ShardManager`s have constant UUIDs because there is at most one per ip/port pair.
+  Address ForkLocalShardManager() {
+    return Address{
+        .unique_id = boost::uuids::uuid{2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+        .last_known_ip = last_known_ip,
+        .last_known_port = last_known_port,
+    };
+  }
+
   /// Returns a new ID with the same IP and port but a unique UUID.
   Address ForkUniqueAddress() {
     return Address{
diff --git a/src/io/transport.hpp b/src/io/transport.hpp
index 2abf10af2..4994cb436 100644
--- a/src/io/transport.hpp
+++ b/src/io/transport.hpp
@@ -137,7 +137,14 @@ class Io {
   Address GetAddress() { return address_; }
   void SetAddress(Address address) { address_ = address; }
 
-  Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); }
+  Io<I> ForkLocal(boost::uuids::uuid uuid) {
+    Address new_address{
+        .unique_id = uuid,
+        .last_known_ip = address_.last_known_ip,
+        .last_known_port = address_.last_known_port,
+    };
+    return Io(implementation_, new_address);
+  }
 
   LatencyHistogramSummaries ResponseLatencies() { return implementation_.ResponseLatencies(); }
 };
diff --git a/src/machine_manager/machine_config.hpp b/src/machine_manager/machine_config.hpp
index 52711642b..8e8f503d7 100644
--- a/src/machine_manager/machine_config.hpp
+++ b/src/machine_manager/machine_config.hpp
@@ -42,6 +42,7 @@ struct MachineConfig {
   boost::asio::ip::address listen_ip;
   uint16_t listen_port;
   size_t shard_worker_threads = std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency());
+  bool sync_message_handling = false;
 };
 
 }  // namespace memgraph::machine_manager
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index f9ea8ff2a..04d5a3444 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -78,10 +78,10 @@ class MachineManager {
   MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
       : io_(io),
         config_(config),
-        coordinator_address_(io.GetAddress().ForkUniqueAddress()),
-        shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} {
-    auto coordinator_io = io.ForkLocal();
-    coordinator_io.SetAddress(coordinator_address_);
+        coordinator_address_(io.GetAddress().ForkLocalCoordinator()),
+        shard_manager_{io.ForkLocal(io.GetAddress().ForkLocalShardManager().unique_id), config.shard_worker_threads,
+                       coordinator_address_} {
+    auto coordinator_io = io.ForkLocal(coordinator_address_.unique_id);
     CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator};
     coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
   }
@@ -101,11 +101,23 @@ class MachineManager {
   Address CoordinatorAddress() { return coordinator_address_; }
 
   void Run() {
-    while (!io_.ShouldShutDown()) {
+    while (true) {
+      MaybeBlockOnSyncHandling();
+
+      if (io_.ShouldShutDown()) {
+        break;
+      }
+
       const auto now = io_.Now();
 
+      uint64_t now_us = now.time_since_epoch().count();
+      uint64_t next_us = next_cron_.time_since_epoch().count();
+
       if (now >= next_cron_) {
+        spdlog::info("now {} >= next_cron_ {}", now_us, next_us);
         next_cron_ = Cron();
+      } else {
+        spdlog::info("now {} < next_cron_ {}", now_us, next_us);
       }
 
       Duration receive_timeout = std::max(next_cron_, now) - now;
@@ -194,10 +206,27 @@ class MachineManager {
   }
 
  private:
+  // This method exists for controlling concurrency
+  // during deterministic simulation testing.
+  void MaybeBlockOnSyncHandling() {
+    if (!config_.sync_message_handling) {
+      return;
+    }
+
+    // block on coordinator
+    coordinator_queue_.BlockOnQuiescence();
+
+    // block on shards
+    shard_manager_.BlockOnQuiescence();
+  }
+
   Time Cron() {
     spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
     coordinator_queue_.Push(coordinator::coordinator_worker::Cron{});
-    return shard_manager_.Cron();
+    MaybeBlockOnSyncHandling();
+    Time ret = shard_manager_.Cron();
+    MaybeBlockOnSyncHandling();
+    return ret;
   }
 };
 
diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp
index c835c3d27..d7e5a9bb6 100644
--- a/src/query/v2/interpreter.cpp
+++ b/src/query/v2/interpreter.cpp
@@ -800,7 +800,11 @@ InterpreterContext::InterpreterContext(storage::v3::Shard *db, const Interpreter
 
 Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
   MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
-  auto query_io = interpreter_context_->io.ForkLocal();
+
+  // TODO(tyler) make this deterministic so that it can be tested.
+  auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
+  auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
+
   shard_request_manager_ = std::make_unique<msgs::ShardRequestManager<io::local_transport::LocalTransport>>(
       coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
           query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index a3dc8b9b6..74ec8d2d1 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -190,6 +190,12 @@ class ShardManager {
                                      });
   }
 
+  void BlockOnQuiescence() {
+    for (const auto &worker : workers_) {
+      worker.BlockOnQuiescence();
+    }
+  }
+
  private:
   io::Io<IoImpl> io_;
   std::vector<shard_worker::Queue> workers_;
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 46e02e6cc..547aa0a6f 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -80,6 +80,9 @@ struct QueueInner {
   // starvation by sometimes randomizing priorities, rather than following a strict
   // prioritization.
   std::deque<Message> queue;
+
+  uint64_t submitted = 0;
+  uint64_t calls_to_pop = 0;
 };
 
 /// There are two reasons to implement our own Queue instead of using
@@ -95,6 +98,8 @@ class Queue {
       MG_ASSERT(inner_.use_count() > 0);
       std::unique_lock<std::mutex> lock(inner_->mu);
 
+      inner_->submitted++;
+
       inner_->queue.emplace_back(std::forward<Message>(message));
     }  // lock dropped before notifying condition variable
 
@@ -105,6 +110,9 @@ class Queue {
     MG_ASSERT(inner_.use_count() > 0);
     std::unique_lock<std::mutex> lock(inner_->mu);
 
+    inner_->calls_to_pop++;
+    inner_->cv.notify_all();
+
     while (inner_->queue.empty()) {
       inner_->cv.wait(lock);
     }
@@ -114,6 +122,15 @@ class Queue {
 
     return message;
   }
+
+  void BlockOnQuiescence() const {
+    MG_ASSERT(inner_.use_count() > 0);
+    std::unique_lock<std::mutex> lock(inner_->mu);
+
+    while (inner_->calls_to_pop <= inner_->submitted) {
+      inner_->cv.wait(lock);
+    }
+  }
 };
 
 /// A ShardWorker owns Raft<ShardRsm> instances. receives messages from the ShardManager.
@@ -122,7 +139,6 @@ class ShardWorker {
   io::Io<IoImpl> io_;
   Queue queue_;
   std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
-  Time next_cron_ = Time::min();
   std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
 
   bool Process(ShutDown && /* shut_down */) { return false; }
@@ -175,10 +191,7 @@ class ShardWorker {
       return;
     }
 
-    auto rsm_io = io_.ForkLocal();
-    auto io_addr = rsm_io.GetAddress();
-    io_addr.unique_id = to_init.uuid;
-    rsm_io.SetAddress(io_addr);
+    auto rsm_io = io_.ForkLocal(to_init.uuid);
 
     // TODO(tyler) get peers from Coordinator in HeartbeatResponse
     std::vector<Address> rsm_peers = {};
@@ -208,15 +221,12 @@ class ShardWorker {
   ~ShardWorker() = default;
 
   void Run() {
-    while (true) {
+    bool should_continue = true;
+    while (should_continue) {
       Message message = queue_.Pop();
 
-      const bool should_continue =
+      should_continue =
           std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
-
-      if (!should_continue) {
-        return;
-      }
     }
   }
 };
diff --git a/tests/simulation/cluster_property_test.cpp b/tests/simulation/cluster_property_test.cpp
index 48327be5b..e74bad697 100644
--- a/tests/simulation/cluster_property_test.cpp
+++ b/tests/simulation/cluster_property_test.cpp
@@ -33,21 +33,31 @@ using io::Time;
 using io::simulator::SimulatorConfig;
 using storage::v3::kMaximumCronInterval;
 
-RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops)) {
-  // TODO(tyler) set abort_time to something more restrictive than Time::max()
-
+RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) {
   spdlog::cfg::load_env_levels();
 
   SimulatorConfig sim_config{
       .drop_percent = 0,
       .perform_timeouts = false,
       .scramble_messages = true,
-      .rng_seed = 0,
+      .rng_seed = rng_seed,
       .start_time = Time::min(),
+      // TODO(tyler) set abort_time to something more restrictive than Time::max()
       .abort_time = Time::max(),
   };
 
-  RunClusterSimulation(sim_config, cluster_config, ops.ops);
+  auto [sim_stats_1, latency_stats_1] = RunClusterSimulation(sim_config, cluster_config, ops.ops);
+  auto [sim_stats_2, latency_stats_2] = RunClusterSimulation(sim_config, cluster_config, ops.ops);
+
+  if (latency_stats_1 != latency_stats_2) {
+    spdlog::error("simulator stats diverged across runs");
+    spdlog::error("run 1 simulator stats: {}", sim_stats_1);
+    spdlog::error("run 2 simulator stats: {}", sim_stats_2);
+    spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
+    spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
+    RC_ASSERT(latency_stats_1 == latency_stats_2);
+    RC_ASSERT(sim_stats_1 == sim_stats_2);
+  }
 }
 
 }  // namespace memgraph::tests::simulation
diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp
index 6a32a391d..0ac7ca1da 100644
--- a/tests/simulation/test_cluster.hpp
+++ b/tests/simulation/test_cluster.hpp
@@ -24,6 +24,7 @@
 #include "coordinator/shard_map.hpp"
 #include "generated_operations.hpp"
 #include "io/address.hpp"
+#include "io/message_histogram_collector.hpp"
 #include "io/simulator/simulator.hpp"
 #include "io/simulator/simulator_config.hpp"
 #include "io/simulator/simulator_transport.hpp"
@@ -57,6 +58,7 @@ using io::simulator::SimulatorStats;
 using io::simulator::SimulatorTransport;
 using machine_manager::MachineConfig;
 using machine_manager::MachineManager;
+using memgraph::io::LatencyHistogramSummaries;
 using msgs::ReadRequests;
 using msgs::ReadResponses;
 using msgs::WriteRequests;
@@ -75,6 +77,8 @@ MachineManager<SimulatorTransport> MkMm(Simulator &simulator, std::vector<Addres
       .is_coordinator = true,
       .listen_ip = addr.last_known_ip,
       .listen_port = addr.last_known_port,
+      .shard_worker_threads = 4,
+      .sync_message_handling = true,
   };
 
   Io<SimulatorTransport> io = simulator.Register(addr);
@@ -210,17 +214,19 @@ struct DetachIfDropped {
   }
 };
 
-void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig &cluster_config,
-                          const std::vector<Op> &ops) {
+std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const SimulatorConfig &sim_config,
+                                                                          const ClusterConfig &cluster_config,
+                                                                          const std::vector<Op> &ops) {
   spdlog::info("========================== NEW SIMULATION ==========================");
 
   auto simulator = Simulator(sim_config);
 
-  auto cli_addr = Address::TestAddress(1);
-  auto machine_1_addr = cli_addr.ForkUniqueAddress();
+  auto machine_1_addr = Address::TestAddress(1);
+  auto cli_addr = Address::TestAddress(2);
+  auto cli_addr_2 = Address::TestAddress(3);
 
   Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
-  Io<SimulatorTransport> cli_io_2 = simulator.Register(Address::TestAddress(2));
+  Io<SimulatorTransport> cli_io_2 = simulator.Register(cli_addr_2);
 
   auto coordinator_addresses = std::vector{
       machine_1_addr,
@@ -232,6 +238,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
   Address coordinator_address = mm_1.CoordinatorAddress();
 
   auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
+  simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr);
 
   auto detach_on_error = DetachIfDropped{.handle = mm_thread_1};
 
@@ -257,6 +264,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
 
   simulator.ShutDown();
 
+  mm_thread_1.join();
+
   SimulatorStats stats = simulator.Stats();
 
   spdlog::info("total messages:     {}", stats.total_messages);
@@ -268,10 +277,8 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
 
   auto histo = cli_io_2.ResponseLatencies();
 
-  using memgraph::utils::print_helpers::operator<<;
-  std::cout << "response latencies: " << histo << std::endl;
-
   spdlog::info("========================== SUCCESS :) ==========================");
+  return std::make_pair(stats, histo);
 }
 
 }  // namespace memgraph::tests::simulation

From 04420a84c76bb63c6846b3259b35fbfb81e8a9ca Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 16:54:38 +0000
Subject: [PATCH 15/26] Fix incorrect usage of
 IncrementServerCountAndWaitForQuiescentState in the shard_rsm.cpp simulation
 test

---
 tests/simulation/shard_rsm.cpp | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp
index 64d0a0861..230bd9222 100644
--- a/tests/simulation/shard_rsm.cpp
+++ b/tests/simulation/shard_rsm.cpp
@@ -1115,11 +1115,12 @@ int TestMessages() {
   ConcreteShardRsm shard_server3(std::move(shard_server_io_3), address_for_3, ShardRsm(std::move(shard_ptr3)));
 
   auto server_thread1 = std::jthread([&shard_server1]() { shard_server1.Run(); });
-  auto server_thread2 = std::jthread([&shard_server2]() { shard_server2.Run(); });
-  auto server_thread3 = std::jthread([&shard_server3]() { shard_server3.Run(); });
-
   simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_1_address);
+
+  auto server_thread2 = std::jthread([&shard_server2]() { shard_server2.Run(); });
   simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_2_address);
+
+  auto server_thread3 = std::jthread([&shard_server3]() { shard_server3.Run(); });
   simulator.IncrementServerCountAndWaitForQuiescentState(shard_server_3_address);
 
   std::cout << "Beginning test after servers have become quiescent." << std::endl;

From 45badbe21fd51ea4463b3c5af1c6c18b83da16d5 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Fri, 18 Nov 2022 17:22:50 +0000
Subject: [PATCH 16/26] Use unsigned integer literals for bit shifting in the
 NewShardUuid function

---
 src/coordinator/shard_map.cpp | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp
index 687228890..918dd4b8c 100644
--- a/src/coordinator/shard_map.cpp
+++ b/src/coordinator/shard_map.cpp
@@ -9,6 +9,7 @@
 // by the Apache License, Version 2.0, included in the file
 // licenses/APL.txt.
 
+#include <cstdint>
 #include <optional>
 #include <unordered_map>
 #include <vector>
@@ -257,13 +258,13 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
                             0,
                             0,
                             0,
-                            static_cast<unsigned char>(shard_id >> 56),
-                            static_cast<unsigned char>(shard_id >> 48),
-                            static_cast<unsigned char>(shard_id >> 40),
-                            static_cast<unsigned char>(shard_id >> 32),
-                            static_cast<unsigned char>(shard_id >> 24),
-                            static_cast<unsigned char>(shard_id >> 16),
-                            static_cast<unsigned char>(shard_id >> 8),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(56)),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(48)),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(40)),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(32)),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(24)),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(16)),
+                            static_cast<unsigned char>(shard_id >> UINT8_C(8)),
                             static_cast<unsigned char>(shard_id)};
 }
 

From e43f4e218180fd4a9a532faa6c5f5df566f2b3bc Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 21 Nov 2022 10:08:42 +0000
Subject: [PATCH 17/26] Sort simulator in_flight_ messages based on a stable
 sort of the sender address

---
 src/io/simulator/simulator_handle.cpp | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 53b969712..eb34bc201 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -63,6 +63,17 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
   }
 }
 
+bool SortInFlight(const std::pair<Address, OpaqueMessage> &lhs, const std::pair<Address, OpaqueMessage> &rhs) {
+  // NB: never sort based on the request ID etc...
+  // This should only be used from std::stable_sort
+  // because by comparing on the from_address alone,
+  // we expect the sender ordering to remain
+  // deterministic.
+  const auto &[addr_1, om_1] = lhs;
+  const auto &[addr_2, om_2] = rhs;
+  return om_1.from_address < om_2.from_address;
+}
+
 bool SimulatorHandle::MaybeTickSimulator() {
   std::unique_lock<std::mutex> lock(mu_);
 
@@ -107,6 +118,8 @@ bool SimulatorHandle::MaybeTickSimulator() {
     return true;
   }
 
+  std::stable_sort(in_flight_.begin(), in_flight_.end(), SortInFlight);
+
   if (config_.scramble_messages && in_flight_.size() > 1) {
     // scramble messages
     std::uniform_int_distribution<size_t> swap_distrib(0, in_flight_.size() - 1);

From 71dcba331ef3c39e216aa2358a996f9905647ac1 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 21 Nov 2022 10:10:45 +0000
Subject: [PATCH 18/26] Increment simulator time by up to 30ms in ticks

---
 src/io/simulator/simulator_handle.hpp | 2 +-
 tests/simulation/raft.cpp             | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 1ef0b3dfb..2fbc74c4d 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -52,7 +52,7 @@ class SimulatorHandle {
   std::set<Address> blocked_on_receive_;
   std::set<Address> server_addresses_;
   std::mt19937 rng_;
-  std::uniform_int_distribution<int> time_distrib_{0, 1000};
+  std::uniform_int_distribution<int> time_distrib_{0, 30000};
   std::uniform_int_distribution<int> drop_distrib_{0, 99};
   SimulatorConfig config_;
   MessageHistogramCollector histograms_;
diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index 648814667..6e7b345f9 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -253,8 +253,8 @@ void RunWithSeed(uint64_t seed) {
       .abort_time = Time::max(),
   };
 
-  spdlog::info("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",
-               seed);
+  spdlog::error("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",
+                seed);
   spdlog::info("\tTime\t\tTerm\tPort\tRole\t\tMessage\n");
   auto [sim_stats_1, latency_stats_1] = RunSimulation(config);
 

From 0f66ae31dd831f222984588efc3e6ecf520134f3 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 21 Nov 2022 11:11:39 +0000
Subject: [PATCH 19/26] Use explicit unsigned integer in right shift operation

---
 src/coordinator/shard_map.cpp | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp
index 918dd4b8c..4839c9efc 100644
--- a/src/coordinator/shard_map.cpp
+++ b/src/coordinator/shard_map.cpp
@@ -9,7 +9,6 @@
 // by the Apache License, Version 2.0, included in the file
 // licenses/APL.txt.
 
-#include <cstdint>
 #include <optional>
 #include <unordered_map>
 #include <vector>
@@ -258,13 +257,13 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
                             0,
                             0,
                             0,
-                            static_cast<unsigned char>(shard_id >> UINT8_C(56)),
-                            static_cast<unsigned char>(shard_id >> UINT8_C(48)),
-                            static_cast<unsigned char>(shard_id >> UINT8_C(40)),
-                            static_cast<unsigned char>(shard_id >> UINT8_C(32)),
-                            static_cast<unsigned char>(shard_id >> UINT8_C(24)),
-                            static_cast<unsigned char>(shard_id >> UINT8_C(16)),
-                            static_cast<unsigned char>(shard_id >> UINT8_C(8)),
+                            static_cast<unsigned char>(shard_id >> 56u),
+                            static_cast<unsigned char>(shard_id >> 48u),
+                            static_cast<unsigned char>(shard_id >> 40u),
+                            static_cast<unsigned char>(shard_id >> 32u),
+                            static_cast<unsigned char>(shard_id >> 24u),
+                            static_cast<unsigned char>(shard_id >> 16u),
+                            static_cast<unsigned char>(shard_id >> 8u),
                             static_cast<unsigned char>(shard_id)};
 }
 

From 081c3e5bed6e84217df6c4cb32d101cb56ba46bd Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 21 Nov 2022 13:16:35 +0000
Subject: [PATCH 20/26] Capitalize unsigned integer literal

---
 src/coordinator/shard_map.cpp | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp
index 4839c9efc..ea167db87 100644
--- a/src/coordinator/shard_map.cpp
+++ b/src/coordinator/shard_map.cpp
@@ -257,13 +257,13 @@ boost::uuids::uuid NewShardUuid(uint64_t shard_id) {
                             0,
                             0,
                             0,
-                            static_cast<unsigned char>(shard_id >> 56u),
-                            static_cast<unsigned char>(shard_id >> 48u),
-                            static_cast<unsigned char>(shard_id >> 40u),
-                            static_cast<unsigned char>(shard_id >> 32u),
-                            static_cast<unsigned char>(shard_id >> 24u),
-                            static_cast<unsigned char>(shard_id >> 16u),
-                            static_cast<unsigned char>(shard_id >> 8u),
+                            static_cast<unsigned char>(shard_id >> 56U),
+                            static_cast<unsigned char>(shard_id >> 48U),
+                            static_cast<unsigned char>(shard_id >> 40U),
+                            static_cast<unsigned char>(shard_id >> 32U),
+                            static_cast<unsigned char>(shard_id >> 24U),
+                            static_cast<unsigned char>(shard_id >> 16U),
+                            static_cast<unsigned char>(shard_id >> 8U),
                             static_cast<unsigned char>(shard_id)};
 }
 

From 66f39f2681ab693ef3b1975157b8872bc59685fc Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 22 Nov 2022 08:55:48 +0000
Subject: [PATCH 21/26] Add elapsed time to the SimulatorStats

---
 src/io/simulator/simulator_handle.cpp | 1 +
 src/io/simulator/simulator_stats.hpp  | 9 +++++++--
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index eb34bc201..a521943f8 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -101,6 +101,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
 
   const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
   cluster_wide_time_microseconds_ += clock_advance;
+  stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time;
 
   spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
 
diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp
index ccc5ec1de..972d09bf8 100644
--- a/src/io/simulator/simulator_stats.hpp
+++ b/src/io/simulator/simulator_stats.hpp
@@ -15,6 +15,8 @@
 
 #include <fmt/format.h>
 
+#include "io/time.hpp"
+
 namespace memgraph::io::simulator {
 struct SimulatorStats {
   uint64_t total_messages = 0;
@@ -23,15 +25,18 @@ struct SimulatorStats {
   uint64_t total_requests = 0;
   uint64_t total_responses = 0;
   uint64_t simulator_ticks = 0;
+  Duration elapsed_time;
 
   friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default;
 
   friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
+    auto elapsed_ms = stats.elapsed_time.count() / 1000;
+
     std::string formated = fmt::format(
         "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, "
-        "total_responses: {}, simulator_ticks: {} }}",
+        "total_responses: {}, simulator_ticks: {}, elapsed_time: {}ms }}",
         stats.total_messages, stats.dropped_messages, stats.timed_out_requests, stats.total_requests,
-        stats.total_responses, stats.simulator_ticks);
+        stats.total_responses, stats.simulator_ticks, elapsed_ms);
 
     in << formated;
 

From 0b19b62b122678b749db5d92a7070e0eb28b869f Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 22 Nov 2022 11:25:24 +0000
Subject: [PATCH 22/26] Set the abort_time for raft tests to 1 simulated hour

---
 tests/simulation/raft.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index 6e7b345f9..f2dd82e9d 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -250,7 +250,7 @@ void RunWithSeed(uint64_t seed) {
       .scramble_messages = true,
       .rng_seed = seed,
       .start_time = Time::min() + std::chrono::microseconds{256 * 1024},
-      .abort_time = Time::max(),
+      .abort_time = Time::min() + std::chrono::seconds{3600},
   };
 
   spdlog::error("========================== NEW SIMULATION, replay with RunWithSeed({}) ==========================",

From c8c72de6ac6d8e8eae988a61c825d9c4c054f175 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 22 Nov 2022 11:30:24 +0000
Subject: [PATCH 23/26] Use duration_cast to ensure that we are retrieving
 milliseconds inside SimulatorStats::operator<<

---
 src/io/simulator/simulator_stats.hpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp
index 972d09bf8..b02dc9f2a 100644
--- a/src/io/simulator/simulator_stats.hpp
+++ b/src/io/simulator/simulator_stats.hpp
@@ -30,7 +30,7 @@ struct SimulatorStats {
   friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default;
 
   friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
-    auto elapsed_ms = stats.elapsed_time.count() / 1000;
+    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(stats.elapsed_time).count() / 1000;
 
     std::string formated = fmt::format(
         "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, "

From c0a103e851c148b478a2eeb0a221e70089684fbc Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 22 Nov 2022 16:00:06 +0000
Subject: [PATCH 24/26] Do not advance the clock with every message, as this
 prevents messages of a certain request depth from ever completing

---
 src/io/simulator/simulator_handle.cpp | 19 +++++++++++++------
 src/io/simulator/simulator_stats.hpp  |  2 +-
 tests/simulation/raft.cpp             |  1 +
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index a521943f8..f216f3c01 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -100,15 +100,22 @@ bool SimulatorHandle::MaybeTickSimulator() {
   }
 
   const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
-  cluster_wide_time_microseconds_ += clock_advance;
-  stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time;
 
-  spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
+  // We don't always want to advance the clock with every message that we deliver because
+  // when we advance it for every message, it causes timeouts to occur for any "request path"
+  // over a certain length. Alternatively, we don't want to simply deliver multiple messages
+  // in a single simulator tick because that would reduce the amount of concurrent message
+  // mixing that may naturally occur in production. This approach is to mod the random clock
+  // advance by a prime number (hopefully avoiding most harmonic effects that would be introduced
+  // by only advancing the clock by an even amount etc...) and only advancing the clock close to
+  // half of the time.
+  if (clock_advance.count() % 97 > 49) {
+    spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
+    cluster_wide_time_microseconds_ += clock_advance;
+    stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time;
+  }
 
   if (cluster_wide_time_microseconds_ >= config_.abort_time) {
-    if (should_shut_down_) {
-      return true;
-    }
     spdlog::error(
         "Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
         "in an expected amount of time.");
diff --git a/src/io/simulator/simulator_stats.hpp b/src/io/simulator/simulator_stats.hpp
index b02dc9f2a..1ae52ef28 100644
--- a/src/io/simulator/simulator_stats.hpp
+++ b/src/io/simulator/simulator_stats.hpp
@@ -30,7 +30,7 @@ struct SimulatorStats {
   friend bool operator==(const SimulatorStats & /* lhs */, const SimulatorStats & /* rhs */) = default;
 
   friend std::ostream &operator<<(std::ostream &in, const SimulatorStats &stats) {
-    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(stats.elapsed_time).count() / 1000;
+    auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(stats.elapsed_time).count();
 
     std::string formated = fmt::format(
         "SimulatorStats {{ total_messages: {}, dropped_messages: {}, timed_out_requests: {}, total_requests: {}, "
diff --git a/tests/simulation/raft.cpp b/tests/simulation/raft.cpp
index f2dd82e9d..d725b0712 100644
--- a/tests/simulation/raft.cpp
+++ b/tests/simulation/raft.cpp
@@ -185,6 +185,7 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunSimulation(SimulatorConf
 
     auto write_cas_response_result = client.SendWriteRequest(cas_req);
     if (write_cas_response_result.HasError()) {
+      spdlog::debug("timed out");
       // timed out
       continue;
     }

From e0086e566620e915f5f69725a05dadd098cd4cd6 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 22 Nov 2022 16:06:35 +0000
Subject: [PATCH 25/26] Use spdlog::trace instead of info for simulator-related
 messages

---
 src/io/simulator/simulator_handle.cpp | 24 ++++++++++++------------
 src/io/simulator/simulator_handle.hpp | 10 +++++-----
 2 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index f216f3c01..674920d8e 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -52,13 +52,13 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
     const bool all_servers_blocked = blocked_servers == server_addresses_.size();
 
     if (all_servers_blocked) {
-      spdlog::info("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers,
-                   server_addresses_.size());
+      spdlog::trace("quiescent state detected - {} out of {} servers now blocked on receive", blocked_servers,
+                    server_addresses_.size());
       return;
     }
 
-    spdlog::info("not returning from quiescent because we see {} blocked out of {}", blocked_servers,
-                 server_addresses_.size());
+    spdlog::trace("not returning from quiescent because we see {} blocked out of {}", blocked_servers,
+                  server_addresses_.size());
     cv_.wait(lock);
   }
 }
@@ -88,7 +88,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
 
   // We allow the simulator to progress the state of the system only
   // after all servers are blocked on receive.
-  spdlog::info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
+  spdlog::trace("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ simulator tick ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
   stats_.simulator_ticks++;
   blocked_on_receive_.clear();
   cv_.notify_all();
@@ -96,7 +96,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
   bool timed_anything_out = TimeoutPromisesPastDeadline();
 
   if (timed_anything_out) {
-    spdlog::info("simulator progressing: timed out a request");
+    spdlog::trace("simulator progressing: timed out a request");
   }
 
   const Duration clock_advance = std::chrono::microseconds{time_distrib_(rng_)};
@@ -110,7 +110,7 @@ bool SimulatorHandle::MaybeTickSimulator() {
   // by only advancing the clock by an even amount etc...) and only advancing the clock close to
   // half of the time.
   if (clock_advance.count() % 97 > 49) {
-    spdlog::info("simulator progressing: clock advanced by {}", clock_advance.count());
+    spdlog::trace("simulator progressing: clock advanced by {}", clock_advance.count());
     cluster_wide_time_microseconds_ += clock_advance;
     stats_.elapsed_time = cluster_wide_time_microseconds_ - config_.start_time;
   }
@@ -157,22 +157,22 @@ bool SimulatorHandle::MaybeTickSimulator() {
     if (should_drop || normal_timeout) {
       stats_.timed_out_requests++;
       dop.promise.TimeOut();
-      spdlog::info("simulator timing out request ");
+      spdlog::trace("simulator timing out request ");
     } else {
       stats_.total_responses++;
       Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
       auto type_info = opaque_message.type_info;
       dop.promise.Fill(std::move(opaque_message), response_latency);
       histograms_.Measure(type_info, response_latency);
-      spdlog::info("simulator replying to request");
+      spdlog::trace("simulator replying to request");
     }
   } else if (should_drop) {
     // don't add it anywhere, let it drop
-    spdlog::info("simulator silently dropping request");
+    spdlog::trace("simulator silently dropping request");
   } else {
     // add to can_receive_ if not
-    spdlog::info("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
-                 opaque_message.to_address.last_known_port);
+    spdlog::trace("simulator adding message to can_receive_ from {} to {}", opaque_message.from_address.last_known_port,
+                  opaque_message.to_address.last_known_port);
     const auto &[om_vec, inserted] =
         can_receive_.try_emplace(to_address.ToPartialAddress(), std::vector<OpaqueMessage>());
     om_vec->second.emplace_back(std::move(opaque_message));
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index 2fbc74c4d..5a5ad1ec0 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -64,7 +64,7 @@ class SimulatorHandle {
     for (auto it = promises_.begin(); it != promises_.end();) {
       auto &[promise_key, dop] = *it;
       if (dop.deadline < now && config_.perform_timeouts) {
-        spdlog::info("timing out request from requester {}.", promise_key.requester_address.ToString());
+        spdlog::trace("timing out request from requester {}.", promise_key.requester_address.ToString());
         std::move(dop).promise.TimeOut();
         it = promises_.erase(it);
 
@@ -106,7 +106,7 @@ class SimulatorHandle {
   template <Message Request, Message Response>
   ResponseFuture<Response> SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout,
                                          std::function<bool()> &&maybe_tick_simulator) {
-    spdlog::info("submitting request to {}", to_address.last_known_port);
+    spdlog::trace("submitting request to {}", to_address.last_known_port);
     auto type_info = TypeInfoFor(request);
 
     auto [future, promise] = memgraph::io::FuturePromisePairWithNotifier<ResponseResult<Response>>(
@@ -173,20 +173,20 @@ class SimulatorHandle {
       if (!should_shut_down_) {
         if (!blocked_on_receive_.contains(receiver)) {
           blocked_on_receive_.emplace(receiver);
-          spdlog::info("blocking receiver {}", receiver.ToPartialAddress().port);
+          spdlog::trace("blocking receiver {}", receiver.ToPartialAddress().port);
           cv_.notify_all();
         }
         cv_.wait(lock);
       }
     }
-    spdlog::info("timing out receiver {}", receiver.ToPartialAddress().port);
+    spdlog::trace("timing out receiver {}", receiver.ToPartialAddress().port);
 
     return TimedOut{};
   }
 
   template <Message M>
   void Send(Address to_address, Address from_address, RequestId request_id, M message) {
-    spdlog::info("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port);
+    spdlog::trace("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port);
     auto type_info = TypeInfoFor(message);
     {
       std::unique_lock<std::mutex> lock(mu_);

From ea533f43fc36bfc13684e519729f2768adbf7758 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Tue, 22 Nov 2022 16:06:57 +0000
Subject: [PATCH 26/26] Print out the simulator seed when we exceed the
 configured abort_time

---
 src/io/simulator/simulator_handle.cpp | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 674920d8e..cc0bd0598 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -118,7 +118,8 @@ bool SimulatorHandle::MaybeTickSimulator() {
   if (cluster_wide_time_microseconds_ >= config_.abort_time) {
     spdlog::error(
         "Cluster has executed beyond its configured abort_time, and something may be failing to make progress "
-        "in an expected amount of time.");
+        "in an expected amount of time. The SimulatorConfig.rng_seed for this run is {}",
+        config_.rng_seed);
     throw utils::BasicException{"Cluster has executed beyond its configured abort_time"};
   }