diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 18b01663d..1d7b85822 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -27,15 +27,15 @@ auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig & } } // namespace -CoordinatorClient::CoordinatorClient(CoordinatorClientConfig config, - std::function freq_check_cb) +CoordinatorClient::CoordinatorClient(CoordinatorState *coord_state, CoordinatorClientConfig config, + HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) : rpc_context_{CreateClientContext(config)}, rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), &rpc_context_}, config_{std::move(config)}, - freq_check_cb_{std::move(freq_check_cb)} { - StartFrequentCheck(); -} + coord_state_{coord_state}, + succ_cb_{std::move(succ_cb)}, + fail_cb_{std::move(fail_cb)} {} CoordinatorClient::~CoordinatorClient() { auto exit_job = utils::OnScopeExit([&] { @@ -51,26 +51,31 @@ void CoordinatorClient::StartFrequentCheck() { MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), "Health check frequency must be greater than 0"); - replica_checker_.Run( - "Coord checker", config_.health_check_frequency_sec, - [instance_name = config_.instance_name, rpc_client = &rpc_client_, freq_check_cb = freq_check_cb_] { - try { - auto stream{rpc_client->Stream()}; - stream.AwaitResponse(); - freq_check_cb(instance_name); - } catch (const rpc::RpcFailedException &) { - // Nothing to do...wait for a reconnect - } - }); + std::string_view instance_name = config_.instance_name; + replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [this, instance_name] { + try { + spdlog::trace("Sending frequent heartbeat to machine {} on {}:{}", instance_name, rpc_client_.Endpoint().address, + rpc_client_.Endpoint().port); + auto stream{rpc_client_.Stream()}; + if (stream.AwaitResponse().success) { + succ_cb_(coord_state_, instance_name); + } else { + fail_cb_(coord_state_, instance_name); + } + } catch (const rpc::RpcFailedException &) { + fail_cb_(coord_state_, instance_name); + } + }); } void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; } auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); } -// TODO: remove this method and implement copy constructor +// TODO: remove these method and implement copy constructor auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; } -auto CoordinatorClient::Callback() const -> std::function const & { return freq_check_cb_; } +auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; } +auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; } ////// AF design choice auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & { diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 84c075cc7..fa8e84ea6 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -42,11 +42,14 @@ CoordinatorState::CoordinatorState() { MG_ASSERT(!(FLAGS_coordinator && FLAGS_coordinator_server_port), "Instance cannot be a coordinator and have registered coordinator server."); + spdlog::info("Executing coordinator constructor"); if (FLAGS_coordinator_server_port) { + spdlog::info("Coordinator server port set"); auto const config = CoordinatorServerConfig{ .ip_address = kDefaultReplicationServerIp, .port = static_cast(FLAGS_coordinator_server_port), }; + spdlog::info("Executing coordinator constructor main replica"); data_ = CoordinatorMainReplicaData{.coordinator_server_ = std::make_unique(config)}; } @@ -73,7 +76,13 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist return name_endpoint_status; } - auto freq_check_cb = [&](std::string_view instance_name) -> void { + // TODO: Refactor so that we just know about instances, nothing more. Design struct Instance which will know about + // role... From CoordinatorState perspective there should just be instances, no specific handling of main vs. replica. + // Here after failover it can happen that replica + // has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in + // registered replicas info", + // instance_name); + auto find_client_info = [&](std::string_view instance_name) -> CoordinatorClientInfo & { MG_ASSERT(std::holds_alternative(data_), "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); auto ®istered_replicas_info = std::get(data_).registered_replicas_info_; @@ -82,25 +91,33 @@ auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> Regist registered_replicas_info, [instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; }); - // TODO: Refactor so that we just know about instances, nothing more. Here after failover it can happen that replica - // has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in - // registered replicas info", - // instance_name); - - if (replica_client_info == registered_replicas_info.end()) { - auto ®istered_main_info = std::get(data_).registered_main_info_; - MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main..."); - registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release); - } else { - replica_client_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release); + if (replica_client_info != registered_replicas_info.end()) { + return *replica_client_info; } + + auto ®istered_main_info = std::get(data_).registered_main_info_; + MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main..."); + return *registered_main_info; }; - auto *coord_client = - &std::get(data_).registered_replicas_.emplace_back(std::move(config), std::move(freq_check_cb)); + auto repl_succ_cb = [&](std::string_view instance_name) -> void { + auto &client_info = find_client_info(instance_name); + client_info.UpdateLastResponseTime(); + }; + + auto repl_fail_cb = [&](std::string_view instance_name) -> void { + auto &client_info = find_client_info(instance_name); + client_info.UpdateInstanceStatus(); + }; + + // CoordinatorClient coord_client{this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)}; + + auto *coord_client = &std::get(data_).registered_replicas_.emplace_back( + this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)); std::get(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), coord_client->Endpoint()); + coord_client->StartFrequentCheck(); return RegisterMainReplicaCoordinatorStatus::SUCCESS; } @@ -121,42 +138,54 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM return endpoint_status; } - auto freq_check_cb = [&](std::string_view instance_name) -> void { - MG_ASSERT(std::holds_alternative(data_), + // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks? + // We should probably at that point search for main also in instances as for replicas... + auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & { + MG_ASSERT(std::holds_alternative(coord_state->data_), "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); - MG_ASSERT(std::get(data_).registered_main_info_.has_value(), + MG_ASSERT(std::get(coord_state->data_).registered_main_info_.has_value(), "Main info is not set, but callback is called"); // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at // this point.... - auto ®istered_main_info = std::get(data_).registered_main_info_; + auto ®istered_main_info = std::get(coord_state->data_).registered_main_info_; MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Callback called for wrong instance name: {}, expected: {}", instance_name, registered_main_info->instance_name_); + return *registered_main_info; + }; - registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release); + auto succ_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + spdlog::trace("Executing success callback for main: {}", std::string(instance_name)); + auto ®istered_main_info = get_client_info(coord_state, instance_name); + registered_main_info.UpdateLastResponseTime(); + }; - // if (!registered_main_info->is_alive_) { - // spdlog::warn("Main is not alive, starting failover"); - // switch (auto failover_status = DoFailover(); failover_status) { - // using enum DoFailoverStatus; - // case ALL_REPLICAS_DOWN: - // spdlog::warn("Failover aborted since all replicas are down!"); - // case MAIN_ALIVE: - // spdlog::warn("Failover aborted since main is alive!"); - // case CLUSTER_UNINITIALIZED: - // spdlog::warn("Failover aborted since cluster is uninitialized!"); - // case SUCCESS: - // break; - // } - // } + auto fail_cb = [&get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto ®istered_main_info = get_client_info(coord_state, instance_name); + if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { + // spdlog::warn("Main is not alive, starting failover"); + // switch (auto failover_status = DoFailover(); failover_status) { + // using enum DoFailoverStatus; + // case ALL_REPLICAS_DOWN: + // spdlog::warn("Failover aborted since all replicas are down!"); + // case MAIN_ALIVE: + // spdlog::warn("Failover aborted since main is alive!"); + // case CLUSTER_UNINITIALIZED: + // spdlog::warn("Failover aborted since cluster is uninitialized!"); + // case SUCCESS: + // break; + // } + } }; auto ®istered_main = std::get(data_).registered_main_; - registered_main = std::make_unique(std::move(config), std::move(freq_check_cb)); + registered_main = + std::make_unique(this, std::move(config), std::move(succ_cb), std::move(fail_cb)); - auto ®istered_main_info = std::get(data_).registered_main_info_; - registered_main_info.emplace(registered_main->InstanceName(), registered_main->Endpoint()); + std::get(data_).registered_main_info_.emplace(registered_main->InstanceName(), + registered_main->Endpoint()); + registered_main->StartFrequentCheck(); return RegisterMainReplicaCoordinatorStatus::SUCCESS; } @@ -168,19 +197,13 @@ auto CoordinatorState::ShowReplicas() const -> std::vector instances_status; instances_status.reserve(registered_replicas_info.size()); - std::ranges::transform( - registered_replicas_info, std::back_inserter(instances_status), - [](const CoordinatorClientInfo &coord_client_info) { - const auto sec_since_last_response = std::chrono::duration_cast( - std::chrono::system_clock::now() - - coord_client_info.last_response_time_.load(std::memory_order_acquire)) - .count(); - - return CoordinatorInstanceStatus{ - .instance_name = coord_client_info.instance_name_, - .socket_address = coord_client_info.endpoint->SocketAddress(), - .is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_}; - }); + std::ranges::transform(registered_replicas_info, std::back_inserter(instances_status), + [](const CoordinatorClientInfo &coord_client_info) { + return CoordinatorInstanceStatus{ + .instance_name = coord_client_info.instance_name_, + .socket_address = coord_client_info.endpoint->SocketAddress(), + .is_alive = coord_client_info.is_alive_}; + }); return instances_status; } @@ -191,15 +214,10 @@ auto CoordinatorState::ShowMain() const -> std::optional(std::chrono::system_clock::now() - - main->last_response_time_.load(std::memory_order_acquire)) - .count(); - return CoordinatorInstanceStatus{ - .instance_name = main->instance_name_, - .socket_address = main->endpoint->SocketAddress(), - .is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_}; + return CoordinatorInstanceStatus{.instance_name = main->instance_name_, + .socket_address = main->endpoint->SocketAddress(), + .is_alive = main->is_alive_}; }; [[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus { @@ -222,12 +240,7 @@ auto CoordinatorState::ShowMain() const -> std::optional(std::chrono::system_clock::now() - - current_main_info->last_response_time_.load()) - .count(); - - if (sec_since_last_response_main <= CoordinatorClusterConfig::alive_response_time_difference_sec_) { + if (current_main_info->is_alive_) { return DoFailoverStatus::MAIN_ALIVE; } @@ -239,19 +252,14 @@ auto CoordinatorState::ShowMain() const -> std::optional(data_).registered_replicas_info_; - const auto chosen_replica_info = - std::ranges::find_if(registered_replicas_info, [](const CoordinatorClientInfo &client_info) { - auto sec_since_last_response = std::chrono::duration_cast( - std::chrono::system_clock::now() - client_info.last_response_time_.load()) - .count(); - return sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_; - }); + const auto chosen_replica_info = std::ranges::find_if( + registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.is_alive_; }); if (chosen_replica_info == registered_replicas_info.end()) { return DoFailoverStatus::ALL_REPLICAS_DOWN; } auto ®istered_replicas = std::get(data_).registered_replicas_; - const auto chosen_replica = + auto chosen_replica = std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) { return replica.InstanceName() == chosen_replica_info->instance_name_; }); @@ -270,7 +278,8 @@ auto CoordinatorState::ShowMain() const -> std::optional(chosen_replica->Config(), chosen_replica->Callback()); + auto potential_new_main = std::make_unique( + this, chosen_replica->Config(), chosen_replica->SuccCallback(), chosen_replica->FailCallback()); potential_new_main->ReplicationClientInfo().reset(); auto potential_new_main_info = *chosen_replica_info; @@ -291,6 +300,8 @@ auto CoordinatorState::ShowMain() const -> std::optionalStartFrequentCheck(); + return DoFailoverStatus::SUCCESS; } diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 535684286..296880745 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -22,17 +22,22 @@ namespace memgraph::coordination { +class CoordinatorState; + class CoordinatorClient { public: using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo; using ReplicationClientsInfo = std::vector; - explicit CoordinatorClient(CoordinatorClientConfig config, std::function freq_check_cb); + using HealthCheckCallback = std::function; + + explicit CoordinatorClient(CoordinatorState *coord_state_, CoordinatorClientConfig config, + HealthCheckCallback succ_cb, HealthCheckCallback fail_cb); ~CoordinatorClient(); - CoordinatorClient(CoordinatorClient &other) = delete; - CoordinatorClient &operator=(CoordinatorClient const &other) = delete; + CoordinatorClient(CoordinatorClient &) = delete; + CoordinatorClient &operator=(CoordinatorClient const &) = delete; CoordinatorClient(CoordinatorClient &&) noexcept = delete; CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete; @@ -48,7 +53,8 @@ class CoordinatorClient { auto ReplicationClientInfo() const -> ReplClientInfo const &; auto ReplicationClientInfo() -> std::optional &; // TODO: We should add copy constructor and then there won't be need for this - auto Callback() const -> std::function const &; + auto SuccCallback() const -> HealthCheckCallback const &; + auto FailCallback() const -> HealthCheckCallback const &; friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { return first.config_ == second.config_; @@ -62,7 +68,9 @@ class CoordinatorClient { mutable rpc::Client rpc_client_; CoordinatorClientConfig config_; - std::function freq_check_cb_; + CoordinatorState *coord_state_; + HealthCheckCallback succ_cb_; + HealthCheckCallback fail_cb_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_client_info.hpp b/src/coordination/include/coordination/coordinator_client_info.hpp index b8ac32a43..1a725a51c 100644 --- a/src/coordination/include/coordination/coordinator_client_info.hpp +++ b/src/coordination/include/coordination/coordinator_client_info.hpp @@ -13,6 +13,7 @@ #ifdef MG_ENTERPRISE +#include "coordination/coordinator_cluster_config.hpp" #include "io/network/endpoint.hpp" #include @@ -22,18 +23,23 @@ namespace memgraph::coordination { struct CoordinatorClientInfo { CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint) - : last_response_time_(std::chrono::system_clock::now()), instance_name_(instance_name), endpoint(endpoint) {} + : last_response_time_(std::chrono::system_clock::now()), + is_alive_(true), + instance_name_(instance_name), + endpoint(endpoint) {} ~CoordinatorClientInfo() = default; CoordinatorClientInfo(const CoordinatorClientInfo &other) : last_response_time_(other.last_response_time_.load()), + is_alive_(other.is_alive_), instance_name_(other.instance_name_), endpoint(other.endpoint) {} CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) { if (this != &other) { last_response_time_.store(other.last_response_time_.load()); + is_alive_ = other.is_alive_; instance_name_ = other.instance_name_; endpoint = other.endpoint; } @@ -42,21 +48,32 @@ struct CoordinatorClientInfo { CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept : last_response_time_(other.last_response_time_.load()), + is_alive_(other.is_alive_), instance_name_(other.instance_name_), endpoint(other.endpoint) {} CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept { if (this != &other) { last_response_time_.store(other.last_response_time_.load()); + is_alive_ = other.is_alive_; instance_name_ = other.instance_name_; endpoint = other.endpoint; } return *this; } - /// TODO: Add a method is_alive + auto UpdateInstanceStatus() -> bool { + is_alive_ = + std::chrono::duration_cast(std::chrono::system_clock::now() - last_response_time_.load()) + .count() < CoordinatorClusterConfig::alive_response_time_difference_sec_; + return is_alive_; + } + auto UpdateLastResponseTime() -> void { last_response_time_.store(std::chrono::system_clock::now()); } + + // TODO: (andi) Wrap in private to forbid modification std::atomic last_response_time_{}; + bool is_alive_{false}; std::string_view instance_name_; const io::network::Endpoint *endpoint; }; diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 64d3d75a5..7c63d7075 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -57,7 +57,7 @@ class CoordinatorState { // TODO: Data is not thread safe struct CoordinatorData { std::list registered_replicas_; - std::vector registered_replicas_info_; + std::list registered_replicas_info_; std::unique_ptr registered_main_; std::optional registered_main_info_; };