diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 20510e681..29061a3f9 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -11,6 +11,9 @@ target_sources(mg-coordination include/coordination/coordinator_exceptions.hpp include/coordination/coordinator_slk.hpp include/coordination/constants.hpp + include/coordination/failover_status.hpp + include/coordination/coordinator_client_info.hpp + include/coordination/coordinator_cluster_config.hpp PRIVATE coordinator_client.cpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 372080632..486565342 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -46,30 +46,19 @@ CoordinatorClient::~CoordinatorClient() { void CoordinatorClient::StartFrequentCheck() { MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), "Health check frequency must be greater than 0"); - replica_checker_.Run( - "Coord checker", config_.health_check_frequency_sec, - [last_response_time = &last_response_time_, rpc_client = &rpc_client_] { - try { - { - auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; - stream.AwaitResponse(); - last_response_time->store(std::chrono::system_clock::now(), std::memory_order_acq_rel); - } - } catch (const rpc::RpcFailedException &) { - // Nothing to do...wait for a reconnect - } - }); + replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [rpc_client = &rpc_client_] { + try { + auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; + stream.AwaitResponse(); + // last_response_time->store(std::chrono::system_clock::now(), std::memory_order_acq_rel); + } catch (const rpc::RpcFailedException &) { + // Nothing to do...wait for a reconnect + } + }); } void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } -bool CoordinatorClient::DoHealthCheck() const { - auto current_time = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast<std::chrono::seconds>(current_time - - last_response_time_.load(std::memory_order_acquire)); - return duration.count() <= alive_response_time_difference_sec_; -} - auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; } auto CoordinatorClient::Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); } auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; } @@ -86,12 +75,6 @@ auto CoordinatorClient::ReplicationClientInfo() -> std::optional<CoordinatorClie return config_.replication_client_info; } -void CoordinatorClient::UpdateTimeCheck(const std::chrono::system_clock::time_point &last_checked_time) { - last_response_time_.store(last_checked_time, std::memory_order_acq_rel); -} - -auto CoordinatorClient::GetLastTimeResponse() -> std::chrono::system_clock::time_point { return last_response_time_; } - auto CoordinatorClient::SendPromoteReplicaToMainRpc( std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool { try { diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 145ac727d..0ab152aa1 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -10,11 +10,11 @@ // licenses/APL.txt. #include "coordination/coordinator_state.hpp" -#include <span> -#include "coordination/coordinator_client.hpp" #ifdef MG_ENTERPRISE +#include "coordination/coordinator_client.hpp" +#include "coordination/coordinator_cluster_config.hpp" #include "coordination/coordinator_config.hpp" #include "coordination/coordinator_entity_info.hpp" #include "flags/replication.hpp" @@ -53,8 +53,8 @@ CoordinatorState::CoordinatorState() { } } -auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *> { +/// TODO: Don't return client, start here the client +auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus { const auto name_endpoint_status = std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR; @@ -72,12 +72,32 @@ auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) return name_endpoint_status; } + auto callback = [&]() -> void { + MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_; + + auto replica_client_info = std::ranges::find_if( + registered_replicas_info, + [&config](const CoordinatorClientInfo &replica) { return replica.instance_name_ == config.instance_name; }); + + MG_ASSERT(replica_client_info == registered_replicas_info.end(), "Replica {} not found in registered replicas info", + config.instance_name); + + auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::system_clock::now() - replica_client_info->last_response_time_.load(std::memory_order_acquire)); + + replica_client_info->is_alive_ = + sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_; + }; + // Maybe no need to return client if you can start replica client here - return &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(config); + auto *replica_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(config); + replica_client->StartFrequentCheck(); + return RegisterMainReplicaCoordinatorStatus::SUCCESS; } -auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *> { +auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus { const auto endpoint_status = std::visit( memgraph::utils::Overloaded{ [](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { @@ -92,7 +112,39 @@ auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_; registered_main = std::make_unique<CoordinatorClient>(config); - return registered_main.get(); + + auto cb = [&]() -> void { + MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + + auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_; + MG_ASSERT(registered_main_info.instance_name_ == config.instance_name, + "Registered main instance name {} does not match config instance name {}", + registered_main_info.instance_name_, config.instance_name); + + auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::system_clock::now() - registered_main_info.last_response_time_.load(std::memory_order_acquire)); + registered_main_info.is_alive_ = + sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_; + + if (!registered_main_info.is_alive_) { + spdlog::warn("Main is not alive, starting failover"); + switch (auto failover_status = DoFailover(); failover_status) { + using enum DoFailoverStatus; + case ALL_REPLICAS_DOWN: + spdlog::warn("Failover aborted since all replicas are down!"); + case MAIN_ALIVE: + spdlog::warn("Failover aborted since main is alive!"); + case CLUSTER_UNINITIALIZED: + spdlog::warn("Failover aborted since cluster is uninitialized!"); + case SUCCESS: + break; + } + } + }; + + registered_main->StartFrequentCheck(); + return RegisterMainReplicaCoordinatorStatus::SUCCESS; } auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorEntityInfo> { @@ -123,9 +175,9 @@ auto CoordinatorState::PingReplicas() const -> std::unordered_map<std::string_vi std::unordered_map<std::string_view, bool> result; const auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_; result.reserve(registered_replicas.size()); - for (const CoordinatorClient &replica_client : registered_replicas) { - result.emplace(replica_client.InstanceName(), replica_client.DoHealthCheck()); - } + // for (const CoordinatorClient &replica_client : registered_replicas) { + // result.emplace(replica_client.InstanceName(), replica_client.DoHealthCheck()); + // } return result; } @@ -135,12 +187,12 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth "Can't call show main on data_, as variant holds wrong alternative"); const auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_; if (registered_main) { - return CoordinatorEntityHealthInfo{registered_main->InstanceName(), registered_main->DoHealthCheck()}; + // return CoordinatorEntityHealthInfo{registered_main->InstanceName(), registered_main->DoHealthCheck()}; } return std::nullopt; } -auto CoordinatorState::DoFailover() -> DoFailoverStatus { +[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus { // 1. MAIN is already down, stop sending frequent checks // 2. find new replica (coordinator) // 3. make copy replica's client as potential new main client (coordinator) @@ -149,7 +201,7 @@ auto CoordinatorState::DoFailover() -> DoFailoverStatus { // 6. remove replica which was promoted to main from all replicas -> this will shut down RPC frequent check client // (coordinator) // 7. for new main start frequent checks (coordinator) - + /* MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative"); using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; @@ -208,7 +260,7 @@ auto CoordinatorState::DoFailover() -> DoFailoverStatus { // 7. current_main->StartFrequentCheck(); - + */ return DoFailoverStatus::SUCCESS; } diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index baf8a380a..f33c3cf9c 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -24,6 +24,9 @@ namespace memgraph::coordination { class CoordinatorClient { public: + using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo; + using ReplicationClientsInfo = std::vector<ReplClientInfo>; + explicit CoordinatorClient(const CoordinatorClientConfig &config); ~CoordinatorClient(); @@ -37,17 +40,13 @@ class CoordinatorClient { void StartFrequentCheck(); void StopFrequentCheck(); - auto DoHealthCheck() const -> bool; - auto SendPromoteReplicaToMainRpc( - std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool; + auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; auto InstanceName() const -> std::string_view; auto Endpoint() const -> io::network::Endpoint const &; auto Config() const -> CoordinatorClientConfig const &; - auto ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const &; - auto ReplicationClientInfo() -> std::optional<CoordinatorClientConfig::ReplicationClientInfo> &; - void UpdateTimeCheck(const std::chrono::system_clock::time_point &last_checked_time); - auto GetLastTimeResponse() -> std::chrono::system_clock::time_point; + auto ReplicationClientInfo() const -> ReplClientInfo const &; + auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &; friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { return first.config_ == second.config_; @@ -59,10 +58,8 @@ class CoordinatorClient { communication::ClientContext rpc_context_; mutable rpc::Client rpc_client_; - CoordinatorClientConfig config_; - std::atomic<std::chrono::system_clock::time_point> last_response_time_{}; - static constexpr int alive_response_time_difference_sec_{5}; + CoordinatorClientConfig config_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_client_info.hpp b/src/coordination/include/coordination/coordinator_client_info.hpp new file mode 100644 index 000000000..9b31577a6 --- /dev/null +++ b/src/coordination/include/coordination/coordinator_client_info.hpp @@ -0,0 +1,30 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include <atomic> +#include <chrono> + +namespace memgraph::coordination { + +// TODO: better connect this +struct CoordinatorClientInfo { + std::atomic<std::chrono::system_clock::time_point> last_response_time_{}; + bool is_alive_{false}; + std::string_view instance_name_; +}; + +} // namespace memgraph::coordination + +#endif diff --git a/src/coordination/include/coordination/coordinator_cluster_config.hpp b/src/coordination/include/coordination/coordinator_cluster_config.hpp new file mode 100644 index 000000000..e1d91ff7d --- /dev/null +++ b/src/coordination/include/coordination/coordinator_cluster_config.hpp @@ -0,0 +1,22 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE +namespace memgraph::coordination { + +struct CoordinatorClusterConfig { + static constexpr int alive_response_time_difference_sec_{5}; +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index e3cd26108..b7d814a60 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -14,8 +14,11 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_client.hpp" +#include "coordination/coordinator_client_info.hpp" #include "coordination/coordinator_entity_info.hpp" #include "coordination/coordinator_server.hpp" +#include "coordination/failover_status.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" #include "rpc/server.hpp" #include "utils/result.hpp" #include "utils/rw_spin_lock.hpp" @@ -26,16 +29,6 @@ namespace memgraph::coordination { -enum class RegisterMainReplicaCoordinatorStatus : uint8_t { - NAME_EXISTS, - END_POINT_EXISTS, - COULD_NOT_BE_PERSISTED, - NOT_COORDINATOR, - SUCCESS -}; - -enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED }; - class CoordinatorState { public: CoordinatorState(); @@ -44,21 +37,12 @@ class CoordinatorState { CoordinatorState(const CoordinatorState &) = delete; CoordinatorState &operator=(const CoordinatorState &) = delete; - CoordinatorState(CoordinatorState &&other) noexcept : data_(std::move(other.data_)) {} + CoordinatorState(CoordinatorState &&) noexcept = delete; + CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; - CoordinatorState &operator=(CoordinatorState &&other) noexcept { - if (this == &other) { - return *this; - } - data_ = std::move(other.data_); - return *this; - } + [[nodiscard]] auto RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus; - auto RegisterReplica(const CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *>; - - auto RegisterMain(const CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus, CoordinatorClient *>; + [[nodiscard]] auto RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus; auto ShowReplicas() const -> std::vector<CoordinatorEntityInfo>; @@ -71,18 +55,17 @@ class CoordinatorState { // The client code must check that the server exists before calling this method. auto GetCoordinatorServer() const -> CoordinatorServer &; - auto DoFailover() -> DoFailoverStatus; + [[nodiscard]] auto DoFailover() -> DoFailoverStatus; private: // TODO: Data is not thread safe - - // Coordinator stores registered replicas and main struct CoordinatorData { std::list<CoordinatorClient> registered_replicas_; + std::vector<CoordinatorClientInfo> registered_replicas_info_; std::unique_ptr<CoordinatorClient> registered_main_; + CoordinatorClientInfo registered_main_info_; }; - // Data which each main and replica stores struct CoordinatorMainReplicaData { std::unique_ptr<CoordinatorServer> coordinator_server_; }; diff --git a/src/coordination/include/coordination/failover_status.hpp b/src/coordination/include/coordination/failover_status.hpp new file mode 100644 index 000000000..9bc5cd356 --- /dev/null +++ b/src/coordination/include/coordination/failover_status.hpp @@ -0,0 +1,21 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include <cstdint> + +namespace memgraph::coordination { +enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED }; +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp new file mode 100644 index 000000000..9a4a50974 --- /dev/null +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -0,0 +1,29 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include <cstdint> + +namespace memgraph::coordination { + +enum class RegisterMainReplicaCoordinatorStatus : uint8_t { + NAME_EXISTS, + END_POINT_EXISTS, + COULD_NOT_BE_PERSISTED, + NOT_COORDINATOR, + SUCCESS +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index 87a426237..139824607 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -19,49 +19,14 @@ namespace memgraph::dbms { CoordinatorHandler::CoordinatorHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {} -auto CoordinatorHandler::RegisterReplicaOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus> { - auto instance_client = dbms_handler_.CoordinatorState().RegisterReplica(config); - using repl_status = memgraph::coordination::RegisterMainReplicaCoordinatorStatus; - using dbms_status = memgraph::dbms::RegisterMainReplicaCoordinatorStatus; - if (instance_client.HasError()) { - switch (instance_client.GetError()) { - case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR: - MG_ASSERT(false, "Only coordinator instance can register main and replica!"); - return {}; - case repl_status::NAME_EXISTS: - return dbms_status::NAME_EXISTS; - case repl_status::END_POINT_EXISTS: - return dbms_status::END_POINT_EXISTS; - case repl_status::COULD_NOT_BE_PERSISTED: - return dbms_status::COULD_NOT_BE_PERSISTED; - case repl_status::SUCCESS: - break; - } - } - - instance_client.GetValue()->StartFrequentCheck(); - return {}; +auto CoordinatorHandler::RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config) + -> coordination::RegisterMainReplicaCoordinatorStatus { + return dbms_handler_.CoordinatorState().RegisterReplica(config); } auto CoordinatorHandler::RegisterMainOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus> { - auto instance_client = dbms_handler_.CoordinatorState().RegisterMain(config); - if (instance_client.HasError()) switch (instance_client.GetError()) { - case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR: - MG_ASSERT(false, "Only coordinator instance can register main and replica!"); - case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::NAME_EXISTS: - return memgraph::dbms::RegisterMainReplicaCoordinatorStatus::NAME_EXISTS; - case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::END_POINT_EXISTS: - return memgraph::dbms::RegisterMainReplicaCoordinatorStatus::END_POINT_EXISTS; - case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::COULD_NOT_BE_PERSISTED: - return memgraph::dbms::RegisterMainReplicaCoordinatorStatus::COULD_NOT_BE_PERSISTED; - case memgraph::coordination::RegisterMainReplicaCoordinatorStatus::SUCCESS: - break; - } - - instance_client.GetValue()->StartFrequentCheck(); - return {}; + -> coordination::RegisterMainReplicaCoordinatorStatus { + return dbms_handler_.CoordinatorState().RegisterMain(config); } auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo> { @@ -80,18 +45,8 @@ auto CoordinatorHandler::PingMainOnCoordinator() const -> std::optional<coordina return dbms_handler_.CoordinatorState().PingMain(); } -auto CoordinatorHandler::DoFailover() const -> DoFailoverStatus { - auto status = dbms_handler_.CoordinatorState().DoFailover(); - switch (status) { - case memgraph::coordination::DoFailoverStatus::ALL_REPLICAS_DOWN: - return memgraph::dbms::DoFailoverStatus::ALL_REPLICAS_DOWN; - case memgraph::coordination::DoFailoverStatus::SUCCESS: - return memgraph::dbms::DoFailoverStatus::SUCCESS; - case memgraph::coordination::DoFailoverStatus::MAIN_ALIVE: - return memgraph::dbms::DoFailoverStatus::MAIN_ALIVE; - case memgraph::coordination::DoFailoverStatus::CLUSTER_UNINITIALIZED: - return memgraph::dbms::DoFailoverStatus::CLUSTER_UNINITIALIZED; - } +auto CoordinatorHandler::DoFailover() const -> coordination::DoFailoverStatus { + return dbms_handler_.CoordinatorState().DoFailover(); } } // namespace memgraph::dbms diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index 565ee8bf5..b44ec7972 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -15,49 +15,38 @@ #include "utils/result.hpp" +#include "coordination/coordinator_config.hpp" +#include "coordination/coordinator_entity_info.hpp" +#include "coordination/failover_status.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" + #include <cstdint> #include <optional> #include <vector> -namespace memgraph::coordination { -struct CoordinatorEntityInfo; -struct CoordinatorEntityHealthInfo; -struct CoordinatorClientConfig; -} // namespace memgraph::coordination - namespace memgraph::dbms { -enum class RegisterMainReplicaCoordinatorStatus : uint8_t { - NAME_EXISTS, - END_POINT_EXISTS, - COULD_NOT_BE_PERSISTED, - NOT_COORDINATOR, - SUCCESS -}; - -enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED }; - class DbmsHandler; class CoordinatorHandler { public: explicit CoordinatorHandler(DbmsHandler &dbms_handler); - auto RegisterReplicaOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus>; + auto RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config) + -> coordination::RegisterMainReplicaCoordinatorStatus; - auto RegisterMainOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config) - -> utils::BasicResult<RegisterMainReplicaCoordinatorStatus>; + auto RegisterMainOnCoordinator(const coordination::CoordinatorClientConfig &config) + -> coordination::RegisterMainReplicaCoordinatorStatus; - auto ShowReplicasOnCoordinator() const -> std::vector<memgraph::coordination::CoordinatorEntityInfo>; + auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo>; - auto ShowMainOnCoordinator() const -> std::optional<memgraph::coordination::CoordinatorEntityInfo>; + auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityInfo>; auto PingReplicasOnCoordinator() const -> std::unordered_map<std::string_view, bool>; - auto PingMainOnCoordinator() const -> std::optional<memgraph::coordination::CoordinatorEntityHealthInfo>; + auto PingMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityHealthInfo>; - auto DoFailover() const -> DoFailoverStatus; + auto DoFailover() const -> coordination::DoFailoverStatus; private: DbmsHandler &dbms_handler_; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index c2790adaf..46035f859 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -495,8 +495,20 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .replication_client_info = repl_config, .ssl = std::nullopt}; - if (const auto ret = coordinator_handler_.RegisterReplicaOnCoordinator(coordinator_client_config); ret.HasError()) { - throw QueryRuntimeException("Couldn't register replica on coordinator!"); + auto status = coordinator_handler_.RegisterReplicaOnCoordinator(coordinator_client_config); + switch (status) { + using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus; + case NAME_EXISTS: + throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!"); + case END_POINT_EXISTS: + throw QueryRuntimeException( + "Couldn't register replica instance since instance with such endpoint already exists!"); + case COULD_NOT_BE_PERSISTED: + throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!"); + case NOT_COORDINATOR: + throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!"); + case SUCCESS: + break; } } @@ -515,8 +527,20 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .health_check_frequency_sec = instance_check_frequency, .ssl = std::nullopt}; - if (const auto ret = coordinator_handler_.RegisterMainOnCoordinator(config); ret.HasError()) { - throw QueryRuntimeException("Couldn't register main on coordinator!"); + auto status = coordinator_handler_.RegisterMainOnCoordinator(config); + switch (status) { + using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus; + case NAME_EXISTS: + throw QueryRuntimeException("Couldn't register main instance since instance with such name already exists!"); + case END_POINT_EXISTS: + throw QueryRuntimeException( + "Couldn't register main instance since instance with such endpoint already exists!"); + case COULD_NOT_BE_PERSISTED: + throw QueryRuntimeException("Couldn't register main instance since it couldn't be persisted!"); + case NOT_COORDINATOR: + throw QueryRuntimeException("Couldn't register main instance since this instance is not a coordinator!"); + case SUCCESS: + break; } } @@ -528,7 +552,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { auto status = coordinator_handler_.DoFailover(); switch (status) { - using enum memgraph::dbms::DoFailoverStatus; + using enum memgraph::coordination::DoFailoverStatus; case ALL_REPLICAS_DOWN: throw QueryRuntimeException("Failover aborted since all replicas are down!"); case MAIN_ALIVE: