diff --git a/src/coordination/coordinator_data.cpp b/src/coordination/coordinator_data.cpp index 47287606d..1408472cb 100644 --- a/src/coordination/coordinator_data.cpp +++ b/src/coordination/coordinator_data.cpp @@ -57,15 +57,21 @@ CoordinatorData::CoordinatorData() { auto &instance = find_instance(coord_data, instance_name); MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name); if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) { - spdlog::warn("Main is not alive, starting automatic failover"); + spdlog::info("Main instance {} is not alive, starting automatic failover", instance_name); switch (auto failover_status = DoFailover(); failover_status) { using enum DoFailoverStatus; case ALL_REPLICAS_DOWN: spdlog::warn("Failover aborted since all replicas are down!"); + break; case MAIN_ALIVE: spdlog::warn("Failover aborted since main is alive!"); + break; case CLUSTER_UNINITIALIZED: spdlog::warn("Failover aborted since cluster is uninitialized!"); + break; + case RPC_FAILED: + spdlog::warn("Failover aborted since promoting replica to main failed!"); + break; case SUCCESS: break; } @@ -91,14 +97,12 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus { auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica); - auto chosen_replica_instance = - std::ranges::find_if(replica_instances, [](const CoordinatorInstance &instance) { return instance.IsAlive(); }); - + auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive); if (chosen_replica_instance == replica_instances.end()) { return DoFailoverStatus::ALL_REPLICAS_DOWN; } - chosen_replica_instance->client_.PauseFrequentCheck(); + chosen_replica_instance->PrepareForFailover(); std::vector repl_clients_info; repl_clients_info.reserve(std::ranges::distance(replica_instances)); @@ -115,16 +119,11 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus { } if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) { - // TODO: new status and rollback all changes that were done... - MG_ASSERT(false, "Promoting replica to main failed!"); + chosen_replica_instance->RestoreAfterFailedFailover(); + return DoFailoverStatus::RPC_FAILED; } - chosen_replica_instance->client_.SetSuccCallback(main_succ_cb_); - chosen_replica_instance->client_.SetFailCallback(main_fail_cb_); - chosen_replica_instance->client_.ResetReplicationClientInfo(); - chosen_replica_instance->client_.ResumeFrequentCheck(); - chosen_replica_instance->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; - + chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_); main_instance->replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA; return DoFailoverStatus::SUCCESS; diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index f5efa65e1..06ab0a25c 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -52,6 +52,17 @@ class CoordinatorInstance { } auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; } + auto PrepareForFailover() -> void { client_.PauseFrequentCheck(); } + auto RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); } + + auto PostFailover(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void { + replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; + client_.SetSuccCallback(std::move(main_succ_cb)); + client_.SetFailCallback(std::move(main_fail_cb)); + client_.ResetReplicationClientInfo(); + client_.ResumeFrequentCheck(); + } + CoordinatorClient client_; replication_coordination_glue::ReplicationRole replication_role_; std::atomic last_response_time_{}; diff --git a/src/coordination/include/coordination/failover_status.hpp b/src/coordination/include/coordination/failover_status.hpp index 9bc5cd356..d13909b5e 100644 --- a/src/coordination/include/coordination/failover_status.hpp +++ b/src/coordination/include/coordination/failover_status.hpp @@ -16,6 +16,6 @@ #include namespace memgraph::coordination { -enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED }; +enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED, RPC_FAILED }; } // namespace memgraph::coordination #endif diff --git a/src/dbms/replication_handler.cpp b/src/dbms/replication_handler.cpp index 0cbc10375..285752f76 100644 --- a/src/dbms/replication_handler.cpp +++ b/src/dbms/replication_handler.cpp @@ -100,16 +100,16 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio -> memgraph::utils::BasicResult { MG_ASSERT(dbms_handler_.ReplicationState().IsMain(), "Only main instance can register a replica!"); - auto instance_client = dbms_handler_.ReplicationState().RegisterReplica(config); - if (instance_client.HasError()) { - switch (instance_client.GetError()) { + auto maybe_client = dbms_handler_.ReplicationState().RegisterReplica(config); + if (maybe_client.HasError()) { + switch (maybe_client.GetError()) { case memgraph::replication::RegisterReplicaError::NOT_MAIN: MG_ASSERT(false, "Only main instance can register a replica!"); return {}; case memgraph::replication::RegisterReplicaError::NAME_EXISTS: return memgraph::dbms::RegisterReplicaError::NAME_EXISTS; - case memgraph::replication::RegisterReplicaError::END_POINT_EXISTS: - return memgraph::dbms::RegisterReplicaError::END_POINT_EXISTS; + case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS: + return memgraph::dbms::RegisterReplicaError::ENDPOINT_EXISTS; case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: return memgraph::dbms::RegisterReplicaError::COULD_NOT_BE_PERSISTED; case memgraph::replication::RegisterReplicaError::SUCCESS: @@ -123,14 +123,14 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio #ifdef MG_ENTERPRISE // Update system before enabling individual storage <-> replica clients - dbms_handler_.SystemRestore(*instance_client.GetValue()); + dbms_handler_.SystemRestore(*maybe_client.GetValue()); #endif - const auto dbms_error = memgraph::dbms::HandleErrorOnReplicaClient(instance_client); + const auto dbms_error = memgraph::dbms::HandleRegisterReplicaStatus(maybe_client); if (dbms_error.has_value()) { return *dbms_error; } - auto &instance_client_ptr = instance_client.GetValue(); + auto &instance_client_ptr = maybe_client.GetValue(); const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler_, *instance_client_ptr); // NOTE Currently if any databases fails, we revert back @@ -141,7 +141,7 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio } // No client error, start instance level client - StartReplicaClient(dbms_handler_, *instance_client.GetValue()); + StartReplicaClient(dbms_handler_, *instance_client_ptr); return {}; } diff --git a/src/dbms/utils.hpp b/src/dbms/utils.hpp index a738fc386..4d4a2b19f 100644 --- a/src/dbms/utils.hpp +++ b/src/dbms/utils.hpp @@ -18,6 +18,7 @@ namespace memgraph::dbms { inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) { + auto &repl_state = dbms_handler.ReplicationState(); // STEP 1) bring down all REPLICA servers dbms_handler.ForEach([](DatabaseAccess db_acc) { auto *storage = db_acc->storage(); @@ -27,7 +28,7 @@ inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) { // STEP 2) Change to MAIN // TODO: restore replication servers if false? - if (!dbms_handler.ReplicationState().SetReplicationRoleMain()) { + if (!repl_state.SetReplicationRoleMain()) { // TODO: Handle recovery on failure??? return false; } @@ -79,7 +80,7 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler, return all_clients_good; } -inline std::optional HandleErrorOnReplicaClient( +inline std::optional HandleRegisterReplicaStatus( utils::BasicResult &instance_client) { if (instance_client.HasError()) switch (instance_client.GetError()) { case replication::RegisterReplicaError::NOT_MAIN: diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 213e8b86f..aa3f083a9 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -559,6 +559,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("Failover aborted since main is alive!"); case CLUSTER_UNINITIALIZED: throw QueryRuntimeException("Failover aborted since cluster is uninitialized!"); + case RPC_FAILED: + throw QueryRuntimeException("Failover aborted since promoting replica to main failed!"); case SUCCESS: break; } diff --git a/src/replication/include/replication/state.hpp b/src/replication/include/replication/state.hpp index 0d12896b2..a53885aff 100644 --- a/src/replication/include/replication/state.hpp +++ b/src/replication/include/replication/state.hpp @@ -32,6 +32,7 @@ namespace memgraph::replication { enum class RolePersisted : uint8_t { UNKNOWN_OR_NO, YES }; +// TODO: (andi) Rename Error to Status enum class RegisterReplicaError : uint8_t { NAME_EXISTS, ENDPOINT_EXISTS, COULD_NOT_BE_PERSISTED, NOT_MAIN, SUCCESS }; struct RoleMainData { @@ -93,7 +94,6 @@ struct ReplicationState { utils::BasicResult RegisterReplica(const ReplicationClientConfig &config); bool SetReplicationRoleMain(); - bool SetReplicationRoleReplica(const ReplicationServerConfig &config); private: diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 8dd91abab..d04a3d245 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -279,4 +279,5 @@ utils::BasicResult ReplicationState:: } return res; } + } // namespace memgraph::replication