From 951b05811649921d532e3cfd301386d3a0d3c825 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Mon, 31 Oct 2022 18:04:30 +0000
Subject: [PATCH] Complete migration from single-threaded ShardManager to
 multi-threaded ShardWorker processing

---
 src/storage/v3/shard_manager.hpp | 96 ++++++++++++++------------------
 src/storage/v3/shard_worker.hpp  | 32 ++++++-----
 2 files changed, 60 insertions(+), 68 deletions(-)

diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 0b13b1f09..db89c7ded 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -14,6 +14,7 @@
 #include <queue>
 #include <set>
 
+#include <boost/functional/hash.hpp>
 #include <boost/uuid/uuid.hpp>
 
 #include "coordinator/coordinator.hpp"
@@ -116,39 +117,43 @@ class ShardManager {
     // blocked on implicitly when worker_handles_ is destroyed.
   }
 
+  size_t UuidToWorkerIndex(const uuid &to) {
+    size_t hash = boost::hash<boost::uuids::uuid>()(to);
+    return hash % workers_.size();
+  }
+
+  void SendToWorkerByIndex(size_t worker_index, shard_worker::Message &&message) {
+    workers_[worker_index].Push(std::forward<shard_worker::Message>(message));
+  }
+
+  void SendToWorkerByUuid(const uuid &to, shard_worker::Message &&message) {
+    size_t worker_index = UuidToWorkerIndex(to);
+    SendToWorkerByIndex(worker_index, std::forward<shard_worker::Message>(message));
+  }
+
   /// Periodic protocol maintenance. Returns the time that Cron should be called again
   /// in the future.
   Time Cron() {
     spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
     Time now = io_.Now();
 
-    if (now >= next_cron_) {
+    if (now >= next_reconciliation_) {
       Reconciliation();
 
       std::uniform_int_distribution time_distrib(kMinimumCronInterval.count(), kMaximumCronInterval.count());
 
       const auto rand = io_.Rand(time_distrib);
 
-      next_cron_ = now + Duration{rand};
+      next_reconciliation_ = now + Duration{rand};
     }
 
-    if (!cron_schedule_.empty()) {
-      const auto &[time, uuid] = cron_schedule_.top();
-
-      if (time <= now) {
-        auto &rsm = rsm_map_.at(uuid);
-        Time next_for_uuid = rsm.Cron();
-
-        cron_schedule_.pop();
-        cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
-
-        const auto &[next_time, _uuid] = cron_schedule_.top();
-
-        return std::min(next_cron_, next_time);
-      }
+    for (auto &worker : workers_) {
+      worker.Push(shard_worker::Cron{});
     }
 
-    return next_cron_;
+    Time next_worker_cron = now + std::chrono::milliseconds(500);
+
+    return std::min(next_worker_cron, next_reconciliation_);
   }
 
   /// Returns the Address for our underlying Io implementation
@@ -162,18 +167,20 @@ class ShardManager {
     MG_ASSERT(address.last_known_port == to.last_known_port);
     MG_ASSERT(address.last_known_ip == to.last_known_ip);
 
-    auto &rsm = rsm_map_.at(to.unique_id);
-
-    rsm.Handle(std::forward<ShardMessages>(sm), request_id, from);
+    SendToWorkerByUuid(to.unique_id, shard_worker::RouteMessage{
+                                         .message = std::forward<ShardMessages>(sm),
+                                         .request_id = request_id,
+                                         .to = to,
+                                         .from = from,
+                                     });
   }
 
  private:
   io::Io<IoImpl> io_;
   std::vector<shard_worker::Queue> workers_;
   std::vector<std::jthread> worker_handles_;
-  std::map<uuid, ShardRaft<IoImpl>> rsm_map_;
-  std::priority_queue<std::pair<Time, uuid>, std::vector<std::pair<Time, uuid>>, std::greater<>> cron_schedule_;
-  Time next_cron_ = Time::min();
+  std::set<uuid> rsm_set_;
+  Time next_reconciliation_ = Time::min();
   Address coordinator_leader_;
   std::optional<ResponseFuture<WriteResponse<CoordinatorWriteResponses>>> heartbeat_res_;
 
@@ -227,39 +234,20 @@ class ShardManager {
   }
 
   void EnsureShardsInitialized(HeartbeatResponse hr) {
-    for (const auto &shard_to_initialize : hr.shards_to_initialize) {
-      InitializeRsm(shard_to_initialize);
-      initialized_but_not_confirmed_rsm_.emplace(shard_to_initialize.uuid);
+    for (const auto &to_init : hr.shards_to_initialize) {
+      if (rsm_set_.contains(to_init.uuid)) {
+        // it's not a bug for the coordinator to send us UUIDs that we have
+        // already created, because there may have been lag that caused
+        // the coordinator not to hear back from us.
+        return;
+      }
+
+      SendToWorkerByUuid(to_init.uuid, to_init);
+
+      rsm_set_.emplace(to_init.uuid);
+      initialized_but_not_confirmed_rsm_.emplace(to_init.uuid);
     }
   }
-
-  void InitializeRsm(coordinator::ShardToInitialize to_init) {
-    if (rsm_map_.contains(to_init.uuid)) {
-      // it's not a bug for the coordinator to send us UUIDs that we have
-      // already created, because there may have been lag that caused
-      // the coordinator not to hear back from us.
-      return;
-    }
-
-    auto rsm_io = io_.ForkLocal();
-    auto io_addr = rsm_io.GetAddress();
-    io_addr.unique_id = to_init.uuid;
-    rsm_io.SetAddress(io_addr);
-
-    // TODO(tyler) get peers from Coordinator in HeartbeatResponse
-    std::vector<Address> rsm_peers = {};
-
-    std::unique_ptr<Shard> shard = std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key,
-                                                           to_init.schema, to_init.config, to_init.id_to_names);
-
-    ShardRsm rsm_state{std::move(shard)};
-
-    ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
-
-    spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
-
-    rsm_map_.emplace(to_init.uuid, std::move(rsm));
-  }
 };
 
 }  // namespace memgraph::storage::v3
diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp
index 0cbae8c6c..3204b0362 100644
--- a/src/storage/v3/shard_worker.hpp
+++ b/src/storage/v3/shard_worker.hpp
@@ -11,6 +11,7 @@
 
 #pragma once
 
+#include <chrono>
 #include <deque>
 #include <memory>
 #include <queue>
@@ -58,9 +59,7 @@ struct ShutDown {
   io::Promise<bool> acknowledge_shutdown;
 };
 
-struct Cron {
-  io::Promise<io::Time> request_next_cron_at;
-};
+struct Cron {};
 
 struct RouteMessage {
   ShardMessages message;
@@ -132,8 +131,7 @@ class ShardWorker {
   }
 
   bool Process(Cron &&cron) {
-    Time next_cron = Cron();
-    cron.request_next_cron_at.Fill(next_cron);
+    Cron();
     return true;
   }
 
@@ -155,21 +153,23 @@ class ShardWorker {
     spdlog::info("running ShardManager::Cron, address {}", io_.GetAddress().ToString());
     Time now = io_.Now();
 
-    if (!cron_schedule_.empty()) {
+    while (!cron_schedule_.empty()) {
       const auto &[time, uuid] = cron_schedule_.top();
 
-      auto &rsm = rsm_map_.at(uuid);
-      Time next_for_uuid = rsm.Cron();
+      if (time <= now) {
+        auto &rsm = rsm_map_.at(uuid);
+        Time next_for_uuid = rsm.Cron();
 
-      cron_schedule_.pop();
-      cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
+        cron_schedule_.pop();
+        cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
 
-      const auto &[next_time, _uuid] = cron_schedule_.top();
-
-      return std::min(next_cron_, next_time);
+        const auto &[next_time, _uuid] = cron_schedule_.top();
+      } else {
+        return time;
+      }
     }
 
-    return next_cron_;
+    return now + std::chrono::microseconds(1000);
   }
 
   void InitializeRsm(ShardToInitialize to_init) {
@@ -197,6 +197,10 @@ class ShardWorker {
 
     spdlog::info("SM created a new shard with UUID {}", to_init.uuid);
 
+    // perform an initial Cron call for the new RSM
+    Time next_cron = rsm.Cron();
+    cron_schedule_.push(std::make_pair(next_cron, to_init.uuid));
+
     rsm_map_.emplace(to_init.uuid, std::move(rsm));
   }