From a815ec9617d57101c45a7ce70d41d6ae461d43a9 Mon Sep 17 00:00:00 2001
From: Tyler Neely <t@jujit.su>
Date: Wed, 2 Nov 2022 17:15:52 +0000
Subject: [PATCH] Handle Coordinator work on a separate thread, unblocking the
 MachineManager to route additional messages to Shards

---
 src/coordinator/coordinator_worker.hpp  | 157 ++++++++++++++++++++++++
 src/machine_manager/machine_manager.hpp | 105 ++++++++++------
 2 files changed, 224 insertions(+), 38 deletions(-)
 create mode 100644 src/coordinator/coordinator_worker.hpp

diff --git a/src/coordinator/coordinator_worker.hpp b/src/coordinator/coordinator_worker.hpp
new file mode 100644
index 000000000..b89a9c496
--- /dev/null
+++ b/src/coordinator/coordinator_worker.hpp
@@ -0,0 +1,157 @@
+// Copyright 2022 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+#include <chrono>
+#include <deque>
+#include <memory>
+#include <queue>
+#include <variant>
+
+#include <boost/uuid/uuid.hpp>
+
+#include "coordinator/coordinator.hpp"
+#include "coordinator/shard_map.hpp"
+#include "io/address.hpp"
+#include "io/future.hpp"
+#include "io/messages.hpp"
+#include "io/rsm/raft.hpp"
+#include "io/time.hpp"
+#include "io/transport.hpp"
+#include "query/v2/requests.hpp"
+
+namespace memgraph::coordinator::coordinator_worker {
+
+/// Obligations:
+/// * ShutDown
+/// * Cron
+/// * RouteMessage
+
+using boost::uuids::uuid;
+
+using coordinator::Coordinator;
+using coordinator::CoordinatorRsm;
+using coordinator::ShardToInitialize;
+using io::Address;
+using io::RequestId;
+using io::Time;
+using io::messages::CoordinatorMessages;
+using io::rsm::Raft;
+using msgs::ReadRequests;
+using msgs::ReadResponses;
+using msgs::WriteRequests;
+using msgs::WriteResponses;
+
+struct ShutDown {};
+
+struct Cron {};
+
+struct RouteMessage {
+  CoordinatorMessages message;
+  RequestId request_id;
+  Address to;
+  Address from;
+};
+
+using Message = std::variant<RouteMessage, Cron, ShutDown>;
+
+struct QueueInner {
+  std::mutex mu{};
+  std::condition_variable cv;
+  // TODO(tyler) handle simulator communication std::shared_ptr<std::atomic<int>> blocked;
+
+  // TODO(tyler) investigate using a priority queue that prioritizes messages in a way that
+  // improves overall QoS. For example, maybe we want to schedule raft Append messages
+  // ahead of Read messages or generally writes before reads for lowering the load on the
+  // overall system faster etc... When we do this, we need to make sure to avoid
+  // starvation by sometimes randomizing priorities, rather than following a strict
+  // prioritization.
+  std::deque<Message> queue;
+};
+
+/// There are two reasons to implement our own Queue instead of using
+/// one off-the-shelf:
+/// 1. we will need to know in the simulator when all threads are waiting
+/// 2. we will want to implement our own priority queue within this for QoS
+class Queue {
+  std::shared_ptr<QueueInner> inner_ = std::make_shared<QueueInner>();
+
+ public:
+  void Push(Message &&message) {
+    {
+      std::unique_lock<std::mutex> lock(inner_->mu);
+
+      inner_->queue.emplace_back(std::forward<Message>(message));
+    }  // lock dropped before notifying condition variable
+
+    inner_->cv.notify_all();
+  }
+
+  Message Pop() {
+    std::unique_lock<std::mutex> lock(inner_->mu);
+
+    while (inner_->queue.empty()) {
+      inner_->cv.wait(lock);
+    }
+
+    Message message = std::move(inner_->queue.front());
+    inner_->queue.pop_front();
+
+    return message;
+  }
+};
+
+/// A CoordinatorWorker owns Raft<CoordinatorRsm> instances. receives messages from the MachineManager.
+template <class IoImpl>
+class CoordinatorWorker {
+  io::Io<IoImpl> io_;
+  Queue queue_;
+  CoordinatorRsm<IoImpl> coordinator_;
+
+  bool Process(ShutDown && /*shut_down*/) { return false; }
+
+  bool Process(Cron &&cron) {
+    coordinator_.Cron();
+    return true;
+  }
+
+  bool Process(RouteMessage &&route_message) {
+    coordinator_.Handle(std::move(route_message.message), route_message.request_id, route_message.from);
+
+    return true;
+  }
+
+ public:
+  CoordinatorWorker(io::Io<IoImpl> io, Queue queue, Coordinator coordinator)
+      : io_(io), queue_(queue), coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)} {}
+
+  CoordinatorWorker(CoordinatorWorker &&) = default;
+  CoordinatorWorker &operator=(CoordinatorWorker &&) = default;
+  CoordinatorWorker(const CoordinatorWorker &) = delete;
+  CoordinatorWorker &operator=(const CoordinatorWorker &) = delete;
+  ~CoordinatorWorker() = default;
+
+  void Run() {
+    while (true) {
+      Message message = queue_.Pop();
+
+      const bool should_continue =
+          std::visit([&](auto &&msg) { return Process(std::forward<decltype(msg)>(msg)); }, std::move(message));
+
+      if (!should_continue) {
+        return;
+      }
+    }
+  }
+};
+
+}  // namespace memgraph::coordinator::coordinator_worker
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 388a27e2f..75dd0d564 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -11,39 +11,43 @@
 
 #pragma once
 
-#include <coordinator/coordinator_rsm.hpp>
-#include <io/message_conversion.hpp>
-#include <io/messages.hpp>
-#include <io/rsm/rsm_client.hpp>
-#include <io/time.hpp>
-#include <machine_manager/machine_config.hpp>
-#include <storage/v3/shard_manager.hpp>
+#include "coordinator/coordinator_rsm.hpp"
+#include "coordinator/coordinator_worker.hpp"
+#include "io/message_conversion.hpp"
+#include "io/messages.hpp"
+#include "io/rsm/rsm_client.hpp"
+#include "io/time.hpp"
+#include "machine_manager/machine_config.hpp"
+#include "storage/v3/shard_manager.hpp"
 
 namespace memgraph::machine_manager {
 
-using memgraph::coordinator::Coordinator;
-using memgraph::coordinator::CoordinatorReadRequests;
-using memgraph::coordinator::CoordinatorReadResponses;
-using memgraph::coordinator::CoordinatorRsm;
-using memgraph::coordinator::CoordinatorWriteRequests;
-using memgraph::coordinator::CoordinatorWriteResponses;
-using memgraph::io::ConvertVariant;
-using memgraph::io::Duration;
-using memgraph::io::RequestId;
-using memgraph::io::Time;
-using memgraph::io::messages::CoordinatorMessages;
-using memgraph::io::messages::ShardManagerMessages;
-using memgraph::io::messages::ShardMessages;
-using memgraph::io::messages::StorageReadRequest;
-using memgraph::io::messages::StorageWriteRequest;
-using memgraph::io::rsm::AppendRequest;
-using memgraph::io::rsm::AppendResponse;
-using memgraph::io::rsm::ReadRequest;
-using memgraph::io::rsm::VoteRequest;
-using memgraph::io::rsm::VoteResponse;
-using memgraph::io::rsm::WriteRequest;
-using memgraph::io::rsm::WriteResponse;
-using memgraph::storage::v3::ShardManager;
+using coordinator::Coordinator;
+using coordinator::CoordinatorReadRequests;
+using coordinator::CoordinatorReadResponses;
+using coordinator::CoordinatorRsm;
+using coordinator::CoordinatorWriteRequests;
+using coordinator::CoordinatorWriteResponses;
+using coordinator::coordinator_worker::CoordinatorWorker;
+using CoordinatorRouteMessage = coordinator::coordinator_worker::RouteMessage;
+using CoordinatorQueue = coordinator::coordinator_worker::Queue;
+using io::ConvertVariant;
+using io::Duration;
+using io::RequestId;
+using io::Time;
+using io::messages::CoordinatorMessages;
+using io::messages::ShardManagerMessages;
+using io::messages::ShardMessages;
+using io::messages::StorageReadRequest;
+using io::messages::StorageWriteRequest;
+using io::rsm::AppendRequest;
+using io::rsm::AppendResponse;
+using io::rsm::ReadRequest;
+using io::rsm::VoteRequest;
+using io::rsm::VoteResponse;
+using io::rsm::WriteRequest;
+using io::rsm::WriteResponse;
+using storage::v3::ShardManager;
 
 /// The MachineManager is responsible for:
 /// * starting the entire system and ensuring that high-level
@@ -62,7 +66,9 @@ template <typename IoImpl>
 class MachineManager {
   io::Io<IoImpl> io_;
   MachineConfig config_;
-  CoordinatorRsm<IoImpl> coordinator_;
+  Address coordinator_address_;
+  CoordinatorQueue coordinator_queue_;
+  std::jthread coordinator_handle_;
   ShardManager<IoImpl> shard_manager_;
   Time next_cron_ = Time::min();
 
@@ -72,10 +78,27 @@ class MachineManager {
   MachineManager(io::Io<IoImpl> io, MachineConfig config, Coordinator coordinator)
       : io_(io),
         config_(config),
-        coordinator_{std::move(io.ForkLocal()), {}, std::move(coordinator)},
-        shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_.GetAddress()} {}
+        coordinator_address_(io.GetAddress().ForkUniqueAddress()),
+        shard_manager_{io.ForkLocal(), config.shard_worker_threads, coordinator_address_} {
+    auto coordinator_io = io.ForkLocal();
+    coordinator_io.SetAddress(coordinator_address_);
+    CoordinatorWorker coordinator_worker{coordinator_io, coordinator_queue_, coordinator};
+    coordinator_handle_ = std::jthread([coordinator = std::move(coordinator_worker)]() mutable { coordinator.Run(); });
+  }
 
-  Address CoordinatorAddress() { return coordinator_.GetAddress(); }
+  MachineManager(MachineManager &&) = default;
+  MachineManager &operator=(MachineManager &&) = default;
+  MachineManager(const MachineManager &) = delete;
+  MachineManager &operator=(const MachineManager &) = delete;
+
+  ~MachineManager() {
+    coordinator_queue_.Push(coordinator::coordinator_worker::ShutDown{});
+    spdlog::error("a");
+    std::move(coordinator_handle_).join();
+    spdlog::error("b");
+  }
+
+  Address CoordinatorAddress() { return coordinator_address_; }
 
   void Run() {
     while (!io_.ShouldShutDown()) {
@@ -85,7 +108,7 @@ class MachineManager {
         next_cron_ = Cron();
       }
 
-      Duration receive_timeout = next_cron_ - now;
+      Duration receive_timeout = std::max(next_cron_, now) - now;
 
       // Note: this parameter pack must be kept in-sync with the ReceiveWithTimeout parameter pack below
       using AllMessages =
@@ -113,7 +136,7 @@ class MachineManager {
       spdlog::info("MM got message to {}", request_envelope.to_address.ToString());
 
       // If message is for the coordinator, cast it to subset and pass it to the coordinator
-      bool to_coordinator = coordinator_.GetAddress() == request_envelope.to_address;
+      bool to_coordinator = coordinator_address_ == request_envelope.to_address;
       if (to_coordinator) {
         std::optional<CoordinatorMessages> conversion_attempt =
             ConvertVariant<AllMessages, ReadRequest<CoordinatorReadRequests>, AppendRequest<CoordinatorWriteRequests>,
@@ -126,8 +149,13 @@ class MachineManager {
 
         CoordinatorMessages &&cm = std::move(conversion_attempt.value());
 
-        coordinator_.Handle(std::forward<CoordinatorMessages>(cm), request_envelope.request_id,
-                            request_envelope.from_address);
+        CoordinatorRouteMessage route_message{
+            .message = std::forward<CoordinatorMessages>(cm),
+            .request_id = request_envelope.request_id,
+            .to = request_envelope.to_address,
+            .from = request_envelope.from_address,
+        };
+        coordinator_queue_.Push(std::move(route_message));
         continue;
       }
 
@@ -168,6 +196,7 @@ class MachineManager {
  private:
   Time Cron() {
     spdlog::info("running MachineManager::Cron, address {}", io_.GetAddress().ToString());
+    coordinator_queue_.Push(coordinator::coordinator_worker::Cron{});
     return shard_manager_.Cron();
   }
 };