From ce788f5f65e2a6296a21cc3ca7253a392e033627 Mon Sep 17 00:00:00 2001
From: Tyler Neely <tylerneely@gmail.com>
Date: Thu, 22 Sep 2022 13:55:16 +0200
Subject: [PATCH] Machine manager and shard stitch (#569)

---
 src/coordinator/shard_map.hpp           |  1 +
 src/io/messages.hpp                     |  8 +++-----
 src/io/simulator/simulator_handle.cpp   |  4 ++--
 src/io/simulator/simulator_handle.hpp   |  8 ++++----
 src/machine_manager/machine_manager.hpp |  7 ++-----
 src/storage/v3/shard_manager.hpp        | 24 ++++++++++++++----------
 tests/unit/machine_manager.cpp          | 24 +++++++++++++++---------
 7 files changed, 41 insertions(+), 35 deletions(-)

diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp
index 923406e00..a842d798d 100644
--- a/src/coordinator/shard_map.hpp
+++ b/src/coordinator/shard_map.hpp
@@ -26,6 +26,7 @@
 #include "storage/v3/id_types.hpp"
 #include "storage/v3/property_value.hpp"
 #include "storage/v3/schemas.hpp"
+#include "storage/v3/temporal.hpp"
 
 namespace memgraph::coordinator {
 
diff --git a/src/io/messages.hpp b/src/io/messages.hpp
index fc74a309a..46c2665ce 100644
--- a/src/io/messages.hpp
+++ b/src/io/messages.hpp
@@ -15,7 +15,7 @@
 
 #include <coordinator/coordinator.hpp>
 #include <io/rsm/raft.hpp>
-#include <io/rsm/shard_rsm.hpp>
+#include "query/v2/requests.hpp"
 #include "utils/concepts.hpp"
 
 namespace memgraph::io::messages {
@@ -23,10 +23,8 @@ namespace memgraph::io::messages {
 using memgraph::coordinator::CoordinatorReadRequests;
 using memgraph::coordinator::CoordinatorWriteRequests;
 using memgraph::coordinator::CoordinatorWriteResponses;
-
-// TODO(everbody) change these to the real shard messages
-using memgraph::io::rsm::StorageReadRequest;
-using memgraph::io::rsm::StorageWriteRequest;
+using StorageReadRequest = msgs::ReadRequests;
+using StorageWriteRequest = msgs::WriteRequests;
 
 using memgraph::io::rsm::AppendRequest;
 using memgraph::io::rsm::AppendResponse;
diff --git a/src/io/simulator/simulator_handle.cpp b/src/io/simulator/simulator_handle.cpp
index 16b2b71a1..fe4bc8c1f 100644
--- a/src/io/simulator/simulator_handle.cpp
+++ b/src/io/simulator/simulator_handle.cpp
@@ -38,7 +38,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
   server_addresses_.insert(address);
 
   while (true) {
-    const size_t blocked_servers = blocked_on_receive_;
+    const size_t blocked_servers = blocked_on_receive_.size();
 
     const bool all_servers_blocked = blocked_servers == server_addresses_.size();
 
@@ -53,7 +53,7 @@ void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address addre
 bool SimulatorHandle::MaybeTickSimulator() {
   std::unique_lock<std::mutex> lock(mu_);
 
-  const size_t blocked_servers = blocked_on_receive_;
+  const size_t blocked_servers = blocked_on_receive_.size();
 
   if (blocked_servers < server_addresses_.size()) {
     // we only need to advance the simulator when all
diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp
index a673b917c..ba64da3a5 100644
--- a/src/io/simulator/simulator_handle.hpp
+++ b/src/io/simulator/simulator_handle.hpp
@@ -48,7 +48,7 @@ class SimulatorHandle {
   Time cluster_wide_time_microseconds_;
   bool should_shut_down_ = false;
   SimulatorStats stats_;
-  size_t blocked_on_receive_ = 0;
+  std::set<Address> blocked_on_receive_;
   std::set<Address> server_addresses_;
   std::mt19937 rng_;
   SimulatorConfig config_;
@@ -115,7 +115,7 @@ class SimulatorHandle {
   requires(sizeof...(Ms) > 0) RequestResult<Ms...> Receive(const Address &receiver, Duration timeout) {
     std::unique_lock<std::mutex> lock(mu_);
 
-    blocked_on_receive_ += 1;
+    blocked_on_receive_.emplace(receiver);
 
     const Time deadline = cluster_wide_time_microseconds_ + timeout;
 
@@ -130,7 +130,7 @@ class SimulatorHandle {
           // than asserting that the last item in can_rx matches.
           auto m_opt = std::move(message).Take<Ms...>();
 
-          blocked_on_receive_ -= 1;
+          blocked_on_receive_.erase(receiver);
 
           return std::move(m_opt).value();
         }
@@ -144,7 +144,7 @@ class SimulatorHandle {
       }
     }
 
-    blocked_on_receive_ -= 1;
+    blocked_on_receive_.erase(receiver);
 
     return TimedOut{};
   }
diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp
index 909d7235b..fe18b18b2 100644
--- a/src/machine_manager/machine_manager.hpp
+++ b/src/machine_manager/machine_manager.hpp
@@ -11,8 +11,6 @@
 
 #pragma once
 
-#include <boost/uuid/uuid.hpp>
-
 #include <coordinator/coordinator_rsm.hpp>
 #include <io/message_conversion.hpp>
 #include <io/messages.hpp>
@@ -36,6 +34,8 @@ using memgraph::io::Time;
 using memgraph::io::messages::CoordinatorMessages;
 using memgraph::io::messages::ShardManagerMessages;
 using memgraph::io::messages::ShardMessages;
+using memgraph::io::messages::StorageReadRequest;
+using memgraph::io::messages::StorageWriteRequest;
 using memgraph::io::rsm::AppendRequest;
 using memgraph::io::rsm::AppendResponse;
 using memgraph::io::rsm::ReadRequest;
@@ -45,9 +45,6 @@ using memgraph::io::rsm::WriteRequest;
 using memgraph::io::rsm::WriteResponse;
 using memgraph::storage::v3::ShardManager;
 
-using memgraph::io::rsm::StorageReadRequest;
-using memgraph::io::rsm::StorageWriteRequest;
-
 /// The MachineManager is responsible for:
 /// * starting the entire system and ensuring that high-level
 ///   operational requirements continue to be met
diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp
index 2f0ec2d49..6288c011d 100644
--- a/src/storage/v3/shard_manager.hpp
+++ b/src/storage/v3/shard_manager.hpp
@@ -21,9 +21,12 @@
 #include <io/message_conversion.hpp>
 #include <io/messages.hpp>
 #include <io/rsm/raft.hpp>
-#include <io/rsm/shard_rsm.hpp>
 #include <io/time.hpp>
 #include <io/transport.hpp>
+#include <query/v2/requests.hpp>
+#include <storage/v3/shard.hpp>
+#include <storage/v3/shard_rsm.hpp>
+#include "storage/v3/config.hpp"
 
 namespace memgraph::storage::v3 {
 
@@ -43,20 +46,19 @@ using memgraph::io::messages::CoordinatorMessages;
 using memgraph::io::messages::ShardManagerMessages;
 using memgraph::io::messages::ShardMessages;
 using memgraph::io::rsm::Raft;
-using memgraph::io::rsm::ShardRsm;
-using memgraph::io::rsm::StorageReadRequest;
-using memgraph::io::rsm::StorageReadResponse;
-using memgraph::io::rsm::StorageWriteRequest;
-using memgraph::io::rsm::StorageWriteResponse;
 using memgraph::io::rsm::WriteRequest;
 using memgraph::io::rsm::WriteResponse;
+using memgraph::msgs::ReadRequests;
+using memgraph::msgs::ReadResponses;
+using memgraph::msgs::WriteRequests;
+using memgraph::msgs::WriteResponses;
+using memgraph::storage::v3::ShardRsm;
 
 using ShardManagerOrRsmMessage = std::variant<ShardMessages, ShardManagerMessages>;
 using TimeUuidPair = std::pair<Time, uuid>;
 
 template <typename IoImpl>
-using ShardRaft =
-    Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
+using ShardRaft = Raft<IoImpl, ShardRsm, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
 
 using namespace std::chrono_literals;
 static constexpr Duration kMinimumCronInterval = 1000ms;
@@ -208,8 +210,10 @@ class ShardManager {
     // TODO(tyler) get geers from Coordinator in HeartbeatResponse
     std::vector<Address> rsm_peers = {};
 
-    // TODO(everbody) change this to storage::Shard
-    ShardRsm rsm_state{};
+    std::unique_ptr<Shard> shard =
+        std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key, to_init.config);
+
+    ShardRsm rsm_state{std::move(shard)};
 
     ShardRaft<IoImpl> rsm{std::move(rsm_io), rsm_peers, std::move(rsm_state)};
 
diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp
index 3c3a69ff1..93f2cef23 100644
--- a/tests/unit/machine_manager.cpp
+++ b/tests/unit/machine_manager.cpp
@@ -25,8 +25,8 @@
 #include <io/transport.hpp>
 #include <machine_manager/machine_config.hpp>
 #include <machine_manager/machine_manager.hpp>
+#include <query/v2/requests.hpp>
 #include "io/rsm/rsm_client.hpp"
-#include "io/rsm/shard_rsm.hpp"
 #include "storage/v3/id_types.hpp"
 #include "storage/v3/schemas.hpp"
 
@@ -45,19 +45,19 @@ using memgraph::coordinator::ShardMap;
 using memgraph::io::Io;
 using memgraph::io::local_transport::LocalSystem;
 using memgraph::io::local_transport::LocalTransport;
-using memgraph::io::rsm::RsmClient;
-using memgraph::io::rsm::StorageReadRequest;
-using memgraph::io::rsm::StorageReadResponse;
-using memgraph::io::rsm::StorageWriteRequest;
-using memgraph::io::rsm::StorageWriteResponse;
 using memgraph::machine_manager::MachineConfig;
 using memgraph::machine_manager::MachineManager;
 using memgraph::storage::v3::LabelId;
 using memgraph::storage::v3::SchemaProperty;
 
+using memgraph::io::rsm::RsmClient;
+using memgraph::msgs::ReadRequests;
+using memgraph::msgs::ReadResponses;
+using memgraph::msgs::WriteRequests;
+using memgraph::msgs::WriteResponses;
+
 using CompoundKey = std::vector<memgraph::storage::v3::PropertyValue>;
-using ShardClient =
-    RsmClient<LocalTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
+using ShardClient = RsmClient<LocalTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
 
 ShardMap TestShardMap() {
   ShardMap sm{};
@@ -192,16 +192,22 @@ TEST(MachineManager, BasicFunctionality) {
   // submit a read request and assert that the requested key does not yet exist
 
   LabelId label_id = sm.labels.at(label_name);
-  StorageReadRequest storage_get_req;
+
+  ReadRequests storage_get_req;
+  /*
+  TODO(tyler,kostas) set this to a real request
   storage_get_req.label_id = label_id;
   storage_get_req.key = compound_key;
   storage_get_req.transaction_id = hlc_response.new_hlc;
+  */
 
   auto get_response_result = shard_client.SendReadRequest(storage_get_req);
   auto get_response = get_response_result.GetValue();
+  /*
   auto val = get_response.value;
 
   MG_ASSERT(!val.has_value());
+  */
 
   local_system.ShutDown();
 };