From 9d457eafa8d6f272bd4230dcc2c4fea7628e8a7b Mon Sep 17 00:00:00 2001
From: Andi Skrgat <andi8647@gmail.com>
Date: Mon, 22 Jan 2024 09:37:06 +0100
Subject: [PATCH] AF thread issue

---
 src/coordination/coordinator_client.cpp       |  41 +++--
 src/coordination/coordinator_state.cpp        | 155 ++++++++++--------
 .../coordination/coordinator_client.hpp       |  18 +-
 .../coordination/coordinator_client_info.hpp  |  21 ++-
 .../coordination/coordinator_state.hpp        |   2 +-
 5 files changed, 139 insertions(+), 98 deletions(-)

diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp
index 18b01663d..1d7b85822 100644
--- a/src/coordination/coordinator_client.cpp
+++ b/src/coordination/coordinator_client.cpp
@@ -27,15 +27,15 @@ auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig &
 }
 }  // namespace
 
-CoordinatorClient::CoordinatorClient(CoordinatorClientConfig config,
-                                     std::function<void(std::string_view)> freq_check_cb)
+CoordinatorClient::CoordinatorClient(CoordinatorState *coord_state, CoordinatorClientConfig config,
+                                     HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
     : rpc_context_{CreateClientContext(config)},
       rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
                   &rpc_context_},
       config_{std::move(config)},
-      freq_check_cb_{std::move(freq_check_cb)} {
-  StartFrequentCheck();
-}
+      coord_state_{coord_state},
+      succ_cb_{std::move(succ_cb)},
+      fail_cb_{std::move(fail_cb)} {}
 
 CoordinatorClient::~CoordinatorClient() {
   auto exit_job = utils::OnScopeExit([&] {
@@ -51,26 +51,31 @@ void CoordinatorClient::StartFrequentCheck() {
   MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
             "Health check frequency must be greater than 0");
 
-  replica_checker_.Run(
-      "Coord checker", config_.health_check_frequency_sec,
-      [instance_name = config_.instance_name, rpc_client = &rpc_client_, freq_check_cb = freq_check_cb_] {
-        try {
-          auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
-          stream.AwaitResponse();
-          freq_check_cb(instance_name);
-        } catch (const rpc::RpcFailedException &) {
-          // Nothing to do...wait for a reconnect
-        }
-      });
+  std::string_view instance_name = config_.instance_name;
+  replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [this, instance_name] {
+    try {
+      spdlog::trace("Sending frequent heartbeat to machine {} on {}:{}", instance_name, rpc_client_.Endpoint().address,
+                    rpc_client_.Endpoint().port);
+      auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
+      if (stream.AwaitResponse().success) {
+        succ_cb_(coord_state_, instance_name);
+      } else {
+        fail_cb_(coord_state_, instance_name);
+      }
+    } catch (const rpc::RpcFailedException &) {
+      fail_cb_(coord_state_, instance_name);
+    }
+  });
 }
 
 void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
 
 auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
 auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); }
-// TODO: remove this method and implement copy constructor
+// TODO: remove these method and implement copy constructor
 auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
-auto CoordinatorClient::Callback() const -> std::function<void(std::string_view)> const & { return freq_check_cb_; }
+auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; }
+auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; }
 
 ////// AF design choice
 auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & {
diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp
index 84c075cc7..fa8e84ea6 100644
--- a/src/coordination/coordinator_state.cpp
+++ b/src/coordination/coordinator_state.cpp
@@ -42,11 +42,14 @@ CoordinatorState::CoordinatorState() {
   MG_ASSERT(!(FLAGS_coordinator && FLAGS_coordinator_server_port),
             "Instance cannot be a coordinator and have registered coordinator server.");
 
+  spdlog::info("Executing coordinator constructor");
   if (FLAGS_coordinator_server_port) {
+    spdlog::info("Coordinator server port set");
     auto const config = CoordinatorServerConfig{
         .ip_address = kDefaultReplicationServerIp,
         .port = static_cast<uint16_t>(FLAGS_coordinator_server_port),
     };
+    spdlog::info("Executing coordinator constructor main replica");
 
     data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique<CoordinatorServer>(config)};
   }
@@ -73,7 +76,13 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist
     return name_endpoint_status;
   }
 
-  auto freq_check_cb = [&](std::string_view instance_name) -> void {
+  // TODO: Refactor so that we just know about instances, nothing more. Design struct Instance which will know about
+  // role... From CoordinatorState perspective there should just be instances, no specific handling of main vs. replica.
+  // Here after failover it can happen that replica
+  // has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in
+  // registered replicas info",
+  //          instance_name);
+  auto find_client_info = [&](std::string_view instance_name) -> CoordinatorClientInfo & {
     MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
               "Can't execute CoordinatorClient's callback since variant holds wrong alternative");
     auto &registered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
@@ -82,25 +91,33 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist
         registered_replicas_info,
         [instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; });
 
-    // TODO: Refactor so that we just know about instances, nothing more. Here after failover it can happen that replica
-    // has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in
-    // registered replicas info",
-    //          instance_name);
-
-    if (replica_client_info == registered_replicas_info.end()) {
-      auto &registered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
-      MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main...");
-      registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
-    } else {
-      replica_client_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
+    if (replica_client_info != registered_replicas_info.end()) {
+      return *replica_client_info;
     }
+
+    auto &registered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
+    MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main...");
+    return *registered_main_info;
   };
 
-  auto *coord_client =
-      &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(std::move(config), std::move(freq_check_cb));
+  auto repl_succ_cb = [&](std::string_view instance_name) -> void {
+    auto &client_info = find_client_info(instance_name);
+    client_info.UpdateLastResponseTime();
+  };
+
+  auto repl_fail_cb = [&](std::string_view instance_name) -> void {
+    auto &client_info = find_client_info(instance_name);
+    client_info.UpdateInstanceStatus();
+  };
+
+  // CoordinatorClient coord_client{this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)};
+
+  auto *coord_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(
+      this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb));
 
   std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
                                                                           coord_client->Endpoint());
+  coord_client->StartFrequentCheck();
 
   return RegisterMainReplicaCoordinatorStatus::SUCCESS;
 }
@@ -121,42 +138,54 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM
     return endpoint_status;
   }
 
-  auto freq_check_cb = [&](std::string_view instance_name) -> void {
-    MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
+  // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks?
+  // We should probably at that point search for main also in instances as for replicas...
+  auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & {
+    MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
               "Can't execute CoordinatorClient's callback since variant holds wrong alternative");
-    MG_ASSERT(std::get<CoordinatorData>(data_).registered_main_info_.has_value(),
+    MG_ASSERT(std::get<CoordinatorData>(coord_state->data_).registered_main_info_.has_value(),
               "Main info is not set, but callback is called");
 
     // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
     // this point....
-    auto &registered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
+    auto &registered_main_info = std::get<CoordinatorData>(coord_state->data_).registered_main_info_;
     MG_ASSERT(registered_main_info->instance_name_ == instance_name,
               "Callback called for wrong instance name: {}, expected: {}", instance_name,
               registered_main_info->instance_name_);
+    return *registered_main_info;
+  };
 
-    registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
+  auto succ_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
+    spdlog::trace("Executing success callback for main: {}", std::string(instance_name));
+    auto &registered_main_info = get_client_info(coord_state, instance_name);
+    registered_main_info.UpdateLastResponseTime();
+  };
 
-    // if (!registered_main_info->is_alive_) {
-    // spdlog::warn("Main is not alive, starting failover");
-    // switch (auto failover_status = DoFailover(); failover_status) {
-    //   using enum DoFailoverStatus;
-    //   case ALL_REPLICAS_DOWN:
-    //     spdlog::warn("Failover aborted since all replicas are down!");
-    //   case MAIN_ALIVE:
-    //     spdlog::warn("Failover aborted since main is alive!");
-    //   case CLUSTER_UNINITIALIZED:
-    //     spdlog::warn("Failover aborted since cluster is uninitialized!");
-    //   case SUCCESS:
-    //     break;
-    // }
-    // }
+  auto fail_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
+    auto &registered_main_info = get_client_info(coord_state, instance_name);
+    if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
+      // spdlog::warn("Main is not alive, starting failover");
+      // switch (auto failover_status = DoFailover(); failover_status) {
+      //   using enum DoFailoverStatus;
+      //   case ALL_REPLICAS_DOWN:
+      //     spdlog::warn("Failover aborted since all replicas are down!");
+      //   case MAIN_ALIVE:
+      //     spdlog::warn("Failover aborted since main is alive!");
+      //   case CLUSTER_UNINITIALIZED:
+      //     spdlog::warn("Failover aborted since cluster is uninitialized!");
+      //   case SUCCESS:
+      //     break;
+      // }
+    }
   };
 
   auto &registered_main = std::get<CoordinatorData>(data_).registered_main_;
-  registered_main = std::make_unique<CoordinatorClient>(std::move(config), std::move(freq_check_cb));
+  registered_main =
+      std::make_unique<CoordinatorClient>(this, std::move(config), std::move(succ_cb), std::move(fail_cb));
 
-  auto &registered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
-  registered_main_info.emplace(registered_main->InstanceName(), registered_main->Endpoint());
+  std::get<CoordinatorData>(data_).registered_main_info_.emplace(registered_main->InstanceName(),
+                                                                 registered_main->Endpoint());
+  registered_main->StartFrequentCheck();
 
   return RegisterMainReplicaCoordinatorStatus::SUCCESS;
 }
@@ -168,19 +197,13 @@ auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorInstanceSt
   std::vector<CoordinatorInstanceStatus> instances_status;
   instances_status.reserve(registered_replicas_info.size());
 
-  std::ranges::transform(
-      registered_replicas_info, std::back_inserter(instances_status),
-      [](const CoordinatorClientInfo &coord_client_info) {
-        const auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
-                                                 std::chrono::system_clock::now() -
-                                                 coord_client_info.last_response_time_.load(std::memory_order_acquire))
-                                                 .count();
-
-        return CoordinatorInstanceStatus{
-            .instance_name = coord_client_info.instance_name_,
-            .socket_address = coord_client_info.endpoint->SocketAddress(),
-            .is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_};
-      });
+  std::ranges::transform(registered_replicas_info, std::back_inserter(instances_status),
+                         [](const CoordinatorClientInfo &coord_client_info) {
+                           return CoordinatorInstanceStatus{
+                               .instance_name = coord_client_info.instance_name_,
+                               .socket_address = coord_client_info.endpoint->SocketAddress(),
+                               .is_alive = coord_client_info.is_alive_};
+                         });
   return instances_status;
 }
 
@@ -191,15 +214,10 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
   if (!main.has_value()) {
     return std::nullopt;
   }
-  const auto sec_since_last_response =
-      std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
-                                                       main->last_response_time_.load(std::memory_order_acquire))
-          .count();
 
-  return CoordinatorInstanceStatus{
-      .instance_name = main->instance_name_,
-      .socket_address = main->endpoint->SocketAddress(),
-      .is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_};
+  return CoordinatorInstanceStatus{.instance_name = main->instance_name_,
+                                   .socket_address = main->endpoint->SocketAddress(),
+                                   .is_alive = main->is_alive_};
 };
 
 [[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
@@ -222,12 +240,7 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
     return DoFailoverStatus::CLUSTER_UNINITIALIZED;
   }
 
-  auto sec_since_last_response_main =
-      std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
-                                                       current_main_info->last_response_time_.load())
-          .count();
-
-  if (sec_since_last_response_main <= CoordinatorClusterConfig::alive_response_time_difference_sec_) {
+  if (current_main_info->is_alive_) {
     return DoFailoverStatus::MAIN_ALIVE;
   }
 
@@ -239,19 +252,14 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
   // Get all replicas and find new main
   auto &registered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
 
-  const auto chosen_replica_info =
-      std::ranges::find_if(registered_replicas_info, [](const CoordinatorClientInfo &client_info) {
-        auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
-                                           std::chrono::system_clock::now() - client_info.last_response_time_.load())
-                                           .count();
-        return sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
-      });
+  const auto chosen_replica_info = std::ranges::find_if(
+      registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.is_alive_; });
   if (chosen_replica_info == registered_replicas_info.end()) {
     return DoFailoverStatus::ALL_REPLICAS_DOWN;
   }
 
   auto &registered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
-  const auto chosen_replica =
+  auto chosen_replica =
       std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
         return replica.InstanceName() == chosen_replica_info->instance_name_;
       });
@@ -270,7 +278,8 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
   // Set on coordinator data of new main
   // allocate resources for new main, clear replication info on this replica as main
   // set last response time
-  auto potential_new_main = std::make_unique<CoordinatorClient>(chosen_replica->Config(), chosen_replica->Callback());
+  auto potential_new_main = std::make_unique<CoordinatorClient>(
+      this, chosen_replica->Config(), chosen_replica->SuccCallback(), chosen_replica->FailCallback());
   potential_new_main->ReplicationClientInfo().reset();
   auto potential_new_main_info = *chosen_replica_info;
 
@@ -291,6 +300,8 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
   registered_replicas.erase(chosen_replica);
   registered_replicas_info.erase(chosen_replica_info);
 
+  current_main->StartFrequentCheck();
+
   return DoFailoverStatus::SUCCESS;
 }
 
diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp
index 535684286..296880745 100644
--- a/src/coordination/include/coordination/coordinator_client.hpp
+++ b/src/coordination/include/coordination/coordinator_client.hpp
@@ -22,17 +22,22 @@
 
 namespace memgraph::coordination {
 
+class CoordinatorState;
+
 class CoordinatorClient {
  public:
   using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
   using ReplicationClientsInfo = std::vector<ReplClientInfo>;
 
-  explicit CoordinatorClient(CoordinatorClientConfig config, std::function<void(std::string_view)> freq_check_cb);
+  using HealthCheckCallback = std::function<void(CoordinatorState *, std::string_view)>;
+
+  explicit CoordinatorClient(CoordinatorState *coord_state_, CoordinatorClientConfig config,
+                             HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
 
   ~CoordinatorClient();
 
-  CoordinatorClient(CoordinatorClient &other) = delete;
-  CoordinatorClient &operator=(CoordinatorClient const &other) = delete;
+  CoordinatorClient(CoordinatorClient &) = delete;
+  CoordinatorClient &operator=(CoordinatorClient const &) = delete;
 
   CoordinatorClient(CoordinatorClient &&) noexcept = delete;
   CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
@@ -48,7 +53,8 @@ class CoordinatorClient {
   auto ReplicationClientInfo() const -> ReplClientInfo const &;
   auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
   // TODO: We should add copy constructor and then there won't be need for this
-  auto Callback() const -> std::function<void(std::string_view)> const &;
+  auto SuccCallback() const -> HealthCheckCallback const &;
+  auto FailCallback() const -> HealthCheckCallback const &;
 
   friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
     return first.config_ == second.config_;
@@ -62,7 +68,9 @@ class CoordinatorClient {
   mutable rpc::Client rpc_client_;
 
   CoordinatorClientConfig config_;
-  std::function<void(std::string_view)> freq_check_cb_;
+  CoordinatorState *coord_state_;
+  HealthCheckCallback succ_cb_;
+  HealthCheckCallback fail_cb_;
 };
 
 }  // namespace memgraph::coordination
diff --git a/src/coordination/include/coordination/coordinator_client_info.hpp b/src/coordination/include/coordination/coordinator_client_info.hpp
index b8ac32a43..1a725a51c 100644
--- a/src/coordination/include/coordination/coordinator_client_info.hpp
+++ b/src/coordination/include/coordination/coordinator_client_info.hpp
@@ -13,6 +13,7 @@
 
 #ifdef MG_ENTERPRISE
 
+#include "coordination/coordinator_cluster_config.hpp"
 #include "io/network/endpoint.hpp"
 
 #include <atomic>
@@ -22,18 +23,23 @@ namespace memgraph::coordination {
 
 struct CoordinatorClientInfo {
   CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint)
-      : last_response_time_(std::chrono::system_clock::now()), instance_name_(instance_name), endpoint(endpoint) {}
+      : last_response_time_(std::chrono::system_clock::now()),
+        is_alive_(true),
+        instance_name_(instance_name),
+        endpoint(endpoint) {}
 
   ~CoordinatorClientInfo() = default;
 
   CoordinatorClientInfo(const CoordinatorClientInfo &other)
       : last_response_time_(other.last_response_time_.load()),
+        is_alive_(other.is_alive_),
         instance_name_(other.instance_name_),
         endpoint(other.endpoint) {}
 
   CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
     if (this != &other) {
       last_response_time_.store(other.last_response_time_.load());
+      is_alive_ = other.is_alive_;
       instance_name_ = other.instance_name_;
       endpoint = other.endpoint;
     }
@@ -42,21 +48,32 @@ struct CoordinatorClientInfo {
 
   CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
       : last_response_time_(other.last_response_time_.load()),
+        is_alive_(other.is_alive_),
         instance_name_(other.instance_name_),
         endpoint(other.endpoint) {}
 
   CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
     if (this != &other) {
       last_response_time_.store(other.last_response_time_.load());
+      is_alive_ = other.is_alive_;
       instance_name_ = other.instance_name_;
       endpoint = other.endpoint;
     }
     return *this;
   }
 
-  /// TODO: Add a method is_alive
+  auto UpdateInstanceStatus() -> bool {
+    is_alive_ =
+        std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_.load())
+            .count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
+    return is_alive_;
+  }
 
+  auto UpdateLastResponseTime() -> void { last_response_time_.store(std::chrono::system_clock::now()); }
+
+  // TODO: (andi) Wrap in private to forbid modification
   std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
+  bool is_alive_{false};
   std::string_view instance_name_;
   const io::network::Endpoint *endpoint;
 };
diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp
index 64d3d75a5..7c63d7075 100644
--- a/src/coordination/include/coordination/coordinator_state.hpp
+++ b/src/coordination/include/coordination/coordinator_state.hpp
@@ -57,7 +57,7 @@ class CoordinatorState {
   // TODO: Data is not thread safe
   struct CoordinatorData {
     std::list<CoordinatorClient> registered_replicas_;
-    std::vector<CoordinatorClientInfo> registered_replicas_info_;
+    std::list<CoordinatorClientInfo> registered_replicas_info_;
     std::unique_ptr<CoordinatorClient> registered_main_;
     std::optional<CoordinatorClientInfo> registered_main_info_;
   };