Idempotent failover from coordinator
This commit is contained in:
parent
e9c5cc3b82
commit
1133bb8ecb
@ -57,15 +57,21 @@ CoordinatorData::CoordinatorData() {
|
||||
auto &instance = find_instance(coord_data, instance_name);
|
||||
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
|
||||
if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) {
|
||||
spdlog::warn("Main is not alive, starting automatic failover");
|
||||
spdlog::info("Main instance {} is not alive, starting automatic failover", instance_name);
|
||||
switch (auto failover_status = DoFailover(); failover_status) {
|
||||
using enum DoFailoverStatus;
|
||||
case ALL_REPLICAS_DOWN:
|
||||
spdlog::warn("Failover aborted since all replicas are down!");
|
||||
break;
|
||||
case MAIN_ALIVE:
|
||||
spdlog::warn("Failover aborted since main is alive!");
|
||||
break;
|
||||
case CLUSTER_UNINITIALIZED:
|
||||
spdlog::warn("Failover aborted since cluster is uninitialized!");
|
||||
break;
|
||||
case RPC_FAILED:
|
||||
spdlog::warn("Failover aborted since promoting replica to main failed!");
|
||||
break;
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
@ -91,14 +97,12 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
|
||||
|
||||
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
|
||||
|
||||
auto chosen_replica_instance =
|
||||
std::ranges::find_if(replica_instances, [](const CoordinatorInstance &instance) { return instance.IsAlive(); });
|
||||
|
||||
auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive);
|
||||
if (chosen_replica_instance == replica_instances.end()) {
|
||||
return DoFailoverStatus::ALL_REPLICAS_DOWN;
|
||||
}
|
||||
|
||||
chosen_replica_instance->client_.PauseFrequentCheck();
|
||||
chosen_replica_instance->PrepareForFailover();
|
||||
|
||||
std::vector<ReplicationClientInfo> repl_clients_info;
|
||||
repl_clients_info.reserve(std::ranges::distance(replica_instances));
|
||||
@ -115,16 +119,11 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
|
||||
}
|
||||
|
||||
if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
|
||||
// TODO: new status and rollback all changes that were done...
|
||||
MG_ASSERT(false, "Promoting replica to main failed!");
|
||||
chosen_replica_instance->RestoreAfterFailedFailover();
|
||||
return DoFailoverStatus::RPC_FAILED;
|
||||
}
|
||||
|
||||
chosen_replica_instance->client_.SetSuccCallback(main_succ_cb_);
|
||||
chosen_replica_instance->client_.SetFailCallback(main_fail_cb_);
|
||||
chosen_replica_instance->client_.ResetReplicationClientInfo();
|
||||
chosen_replica_instance->client_.ResumeFrequentCheck();
|
||||
chosen_replica_instance->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
|
||||
|
||||
chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_);
|
||||
main_instance->replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
|
||||
return DoFailoverStatus::SUCCESS;
|
||||
|
@ -52,6 +52,17 @@ class CoordinatorInstance {
|
||||
}
|
||||
auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; }
|
||||
|
||||
auto PrepareForFailover() -> void { client_.PauseFrequentCheck(); }
|
||||
auto RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); }
|
||||
|
||||
auto PostFailover(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void {
|
||||
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
|
||||
client_.SetSuccCallback(std::move(main_succ_cb));
|
||||
client_.SetFailCallback(std::move(main_fail_cb));
|
||||
client_.ResetReplicationClientInfo();
|
||||
client_.ResumeFrequentCheck();
|
||||
}
|
||||
|
||||
CoordinatorClient client_;
|
||||
replication_coordination_glue::ReplicationRole replication_role_;
|
||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
||||
|
@ -16,6 +16,6 @@
|
||||
#include <cstdint>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED };
|
||||
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED, RPC_FAILED };
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -100,16 +100,16 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio
|
||||
-> memgraph::utils::BasicResult<RegisterReplicaError> {
|
||||
MG_ASSERT(dbms_handler_.ReplicationState().IsMain(), "Only main instance can register a replica!");
|
||||
|
||||
auto instance_client = dbms_handler_.ReplicationState().RegisterReplica(config);
|
||||
if (instance_client.HasError()) {
|
||||
switch (instance_client.GetError()) {
|
||||
auto maybe_client = dbms_handler_.ReplicationState().RegisterReplica(config);
|
||||
if (maybe_client.HasError()) {
|
||||
switch (maybe_client.GetError()) {
|
||||
case memgraph::replication::RegisterReplicaError::NOT_MAIN:
|
||||
MG_ASSERT(false, "Only main instance can register a replica!");
|
||||
return {};
|
||||
case memgraph::replication::RegisterReplicaError::NAME_EXISTS:
|
||||
return memgraph::dbms::RegisterReplicaError::NAME_EXISTS;
|
||||
case memgraph::replication::RegisterReplicaError::END_POINT_EXISTS:
|
||||
return memgraph::dbms::RegisterReplicaError::END_POINT_EXISTS;
|
||||
case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS:
|
||||
return memgraph::dbms::RegisterReplicaError::ENDPOINT_EXISTS;
|
||||
case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
|
||||
return memgraph::dbms::RegisterReplicaError::COULD_NOT_BE_PERSISTED;
|
||||
case memgraph::replication::RegisterReplicaError::SUCCESS:
|
||||
@ -123,14 +123,14 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// Update system before enabling individual storage <-> replica clients
|
||||
dbms_handler_.SystemRestore(*instance_client.GetValue());
|
||||
dbms_handler_.SystemRestore(*maybe_client.GetValue());
|
||||
#endif
|
||||
|
||||
const auto dbms_error = memgraph::dbms::HandleErrorOnReplicaClient(instance_client);
|
||||
const auto dbms_error = memgraph::dbms::HandleRegisterReplicaStatus(maybe_client);
|
||||
if (dbms_error.has_value()) {
|
||||
return *dbms_error;
|
||||
}
|
||||
auto &instance_client_ptr = instance_client.GetValue();
|
||||
auto &instance_client_ptr = maybe_client.GetValue();
|
||||
const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler_, *instance_client_ptr);
|
||||
|
||||
// NOTE Currently if any databases fails, we revert back
|
||||
@ -141,7 +141,7 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio
|
||||
}
|
||||
|
||||
// No client error, start instance level client
|
||||
StartReplicaClient(dbms_handler_, *instance_client.GetValue());
|
||||
StartReplicaClient(dbms_handler_, *instance_client_ptr);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
namespace memgraph::dbms {
|
||||
|
||||
inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) {
|
||||
auto &repl_state = dbms_handler.ReplicationState();
|
||||
// STEP 1) bring down all REPLICA servers
|
||||
dbms_handler.ForEach([](DatabaseAccess db_acc) {
|
||||
auto *storage = db_acc->storage();
|
||||
@ -27,7 +28,7 @@ inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) {
|
||||
|
||||
// STEP 2) Change to MAIN
|
||||
// TODO: restore replication servers if false?
|
||||
if (!dbms_handler.ReplicationState().SetReplicationRoleMain()) {
|
||||
if (!repl_state.SetReplicationRoleMain()) {
|
||||
// TODO: Handle recovery on failure???
|
||||
return false;
|
||||
}
|
||||
@ -79,7 +80,7 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
|
||||
return all_clients_good;
|
||||
}
|
||||
|
||||
inline std::optional<RegisterReplicaError> HandleErrorOnReplicaClient(
|
||||
inline std::optional<RegisterReplicaError> HandleRegisterReplicaStatus(
|
||||
utils::BasicResult<replication::RegisterReplicaError, replication::ReplicationClient *> &instance_client) {
|
||||
if (instance_client.HasError()) switch (instance_client.GetError()) {
|
||||
case replication::RegisterReplicaError::NOT_MAIN:
|
||||
|
@ -559,6 +559,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
throw QueryRuntimeException("Failover aborted since main is alive!");
|
||||
case CLUSTER_UNINITIALIZED:
|
||||
throw QueryRuntimeException("Failover aborted since cluster is uninitialized!");
|
||||
case RPC_FAILED:
|
||||
throw QueryRuntimeException("Failover aborted since promoting replica to main failed!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ namespace memgraph::replication {
|
||||
|
||||
enum class RolePersisted : uint8_t { UNKNOWN_OR_NO, YES };
|
||||
|
||||
// TODO: (andi) Rename Error to Status
|
||||
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, ENDPOINT_EXISTS, COULD_NOT_BE_PERSISTED, NOT_MAIN, SUCCESS };
|
||||
|
||||
struct RoleMainData {
|
||||
@ -93,7 +94,6 @@ struct ReplicationState {
|
||||
utils::BasicResult<RegisterReplicaError, ReplicationClient *> RegisterReplica(const ReplicationClientConfig &config);
|
||||
|
||||
bool SetReplicationRoleMain();
|
||||
|
||||
bool SetReplicationRoleReplica(const ReplicationServerConfig &config);
|
||||
|
||||
private:
|
||||
|
@ -279,4 +279,5 @@ utils::BasicResult<RegisterReplicaError, ReplicationClient *> ReplicationState::
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace memgraph::replication
|
||||
|
Loading…
Reference in New Issue
Block a user