diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 00bbc1336..2ee95ae6d 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -19,36 +19,47 @@ namespace memgraph::coordination { void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state) { - j = nlohmann::json{{"config", instance_state.config}, {"status", instance_state.status}}; + j = nlohmann::json{ + {"config", instance_state.config}, {"status", instance_state.status}, {"uuid", instance_state.instance_uuid}}; } void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state) { j.at("config").get_to(instance_state.config); j.at("status").get_to(instance_state.status); + j.at("uuid").get_to(instance_state.instance_uuid); } -CoordinatorClusterState::CoordinatorClusterState(std::map> instances) - : repl_instances_{std::move(instances)} {} +CoordinatorClusterState::CoordinatorClusterState(std::map> instances, + utils::UUID const ¤t_main_uuid, bool is_lock_opened) + : repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {} CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other) - : repl_instances_{other.repl_instances_} {} + : repl_instances_{other.repl_instances_}, + current_main_uuid_(other.current_main_uuid_), + is_lock_opened_(other.is_lock_opened_) {} CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) { if (this == &other) { return *this; } repl_instances_ = other.repl_instances_; + current_main_uuid_ = other.current_main_uuid_; + is_lock_opened_ = other.is_lock_opened_; return *this; } CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept - : repl_instances_{std::move(other.repl_instances_)} {} + : repl_instances_{std::move(other.repl_instances_)}, + current_main_uuid_(other.current_main_uuid_), + is_lock_opened_(other.is_lock_opened_) {} CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept { if (this == &other) { return *this; } repl_instances_ = std::move(other.repl_instances_); + current_main_uuid_ = other.current_main_uuid_; + is_lock_opened_ = other.is_lock_opened_; return *this; } @@ -58,68 +69,127 @@ auto CoordinatorClusterState::MainExists() const -> bool { [](auto const &entry) { return entry.second.status == ReplicationRole::MAIN; }); } -auto CoordinatorClusterState::IsMain(std::string_view instance_name) const -> bool { +auto CoordinatorClusterState::HasMainState(std::string_view instance_name) const -> bool { auto lock = std::shared_lock{log_lock_}; auto const it = repl_instances_.find(instance_name); return it != repl_instances_.end() && it->second.status == ReplicationRole::MAIN; } -auto CoordinatorClusterState::IsReplica(std::string_view instance_name) const -> bool { +auto CoordinatorClusterState::HasReplicaState(std::string_view instance_name) const -> bool { auto lock = std::shared_lock{log_lock_}; auto const it = repl_instances_.find(instance_name); return it != repl_instances_.end() && it->second.status == ReplicationRole::REPLICA; } -auto CoordinatorClusterState::InsertInstance(std::string instance_name, ReplicationInstanceState instance_state) - -> void { - auto lock = std::lock_guard{log_lock_}; - repl_instances_.insert_or_assign(std::move(instance_name), std::move(instance_state)); +auto CoordinatorClusterState::IsCurrentMain(std::string_view instance_name) const -> bool { + auto lock = std::shared_lock{log_lock_}; + auto const it = repl_instances_.find(instance_name); + return it != repl_instances_.end() && it->second.status == ReplicationRole::MAIN && + it->second.instance_uuid == current_main_uuid_; } auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void { auto lock = std::lock_guard{log_lock_}; switch (log_action) { + // end of OPEN_LOCK_REGISTER_REPLICATION_INSTANCE case RaftLogAction::REGISTER_REPLICATION_INSTANCE: { auto const &config = std::get(log_entry); - repl_instances_[config.instance_name] = ReplicationInstanceState{config, ReplicationRole::REPLICA}; + spdlog::trace("DoAction: register replication instance {}", config.instance_name); + // Setting instance uuid to random, if registration fails, we are still in random state + repl_instances_.emplace(config.instance_name, + ReplicationInstanceState{config, ReplicationRole::REPLICA, utils::UUID{}}); + is_lock_opened_ = false; break; } + // end of OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: { auto const instance_name = std::get(log_entry); + spdlog::trace("DoAction: unregister replication instance {}", instance_name); repl_instances_.erase(instance_name); + is_lock_opened_ = false; break; } + // end of OPEN_LOCK_SET_INSTANCE_AS_MAIN and OPEN_LOCK_FAILOVER case RaftLogAction::SET_INSTANCE_AS_MAIN: { - auto const instance_name = std::get(log_entry); - auto it = repl_instances_.find(instance_name); + auto const instance_uuid_change = std::get(log_entry); + auto it = repl_instances_.find(instance_uuid_change.instance_name); MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!"); it->second.status = ReplicationRole::MAIN; + it->second.instance_uuid = instance_uuid_change.uuid; + is_lock_opened_ = false; + spdlog::trace("DoAction: set replication instance {} as main with uuid {}", instance_uuid_change.instance_name, + std::string{instance_uuid_change.uuid}); break; } + // end of OPEN_LOCK_SET_INSTANCE_AS_REPLICA case RaftLogAction::SET_INSTANCE_AS_REPLICA: { auto const instance_name = std::get(log_entry); auto it = repl_instances_.find(instance_name); MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!"); it->second.status = ReplicationRole::REPLICA; + is_lock_opened_ = false; + spdlog::trace("DoAction: set replication instance {} as replica", instance_name); break; } - case RaftLogAction::UPDATE_UUID: { - uuid_ = std::get(log_entry); + case RaftLogAction::UPDATE_UUID_OF_NEW_MAIN: { + current_main_uuid_ = std::get(log_entry); + spdlog::trace("DoAction: update uuid of new main {}", std::string{current_main_uuid_}); + break; + } + case RaftLogAction::UPDATE_UUID_FOR_INSTANCE: { + auto const instance_uuid_change = std::get(log_entry); + auto it = repl_instances_.find(instance_uuid_change.instance_name); + MG_ASSERT(it != repl_instances_.end(), "Instance doesn't exist as part of RAFT state"); + it->second.instance_uuid = instance_uuid_change.uuid; + spdlog::trace("DoAction: update uuid for instance {} to {}", instance_uuid_change.instance_name, + std::string{instance_uuid_change.uuid}); break; } case RaftLogAction::ADD_COORDINATOR_INSTANCE: { auto const &config = std::get(log_entry); coordinators_.emplace_back(CoordinatorInstanceState{config}); + spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id); break; } + case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: { + is_lock_opened_ = true; + spdlog::trace("DoAction: open lock register"); + break; + // TODO(antoniofilipovic) save what we are doing to be able to undo.... + } + case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE: { + is_lock_opened_ = true; + spdlog::trace("DoAction: open lock unregister"); + break; + // TODO(antoniofilipovic) save what we are doing + } + case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN: { + is_lock_opened_ = true; + spdlog::trace("DoAction: open lock set instance as main"); + break; + // TODO(antoniofilipovic) save what we are doing + } + case RaftLogAction::OPEN_LOCK_FAILOVER: { + is_lock_opened_ = true; + spdlog::trace("DoAction: open lock failover"); + break; + // TODO(antoniofilipovic) save what we are doing + } + case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: { + is_lock_opened_ = true; + spdlog::trace("DoAction: open lock set instance as replica"); + break; + // TODO(antoniofilipovic) save what we need to undo + } } } auto CoordinatorClusterState::Serialize(ptr &data) -> void { auto lock = std::shared_lock{log_lock_}; - - auto const log = nlohmann::json(repl_instances_).dump(); - + nlohmann::json j = {{"repl_instances", repl_instances_}, + {"is_lock_opened", is_lock_opened_}, + {"current_main_uuid", current_main_uuid_}}; + auto const log = j.dump(); data = buffer::alloc(sizeof(uint32_t) + log.size()); buffer_serializer bs(data); bs.put_str(log); @@ -128,9 +198,10 @@ auto CoordinatorClusterState::Serialize(ptr &data) -> void { auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState { buffer_serializer bs(data); auto const j = nlohmann::json::parse(bs.get_str()); - auto instances = j.get>>(); - - return CoordinatorClusterState{std::move(instances)}; + auto instances = j["repl_instances"].get>>(); + auto current_main_uuid = j["current_main_uuid"].get(); + bool is_lock_opened = j["is_lock_opened"].get(); + return CoordinatorClusterState{std::move(instances), current_main_uuid, is_lock_opened}; } auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector { @@ -138,12 +209,24 @@ auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector>; } +auto CoordinatorClusterState::GetCurrentMainUUID() const -> utils::UUID { return current_main_uuid_; } + +auto CoordinatorClusterState::GetInstanceUUID(std::string_view instance_name) const -> utils::UUID { + auto lock = std::shared_lock{log_lock_}; + auto const it = repl_instances_.find(instance_name); + MG_ASSERT(it != repl_instances_.end(), "Instance with that name doesn't exist."); + return it->second.instance_uuid; +} + auto CoordinatorClusterState::GetCoordinatorInstances() const -> std::vector { auto lock = std::shared_lock{log_lock_}; return coordinators_; } -auto CoordinatorClusterState::GetUUID() const -> utils::UUID { return uuid_; } +auto CoordinatorClusterState::IsLockOpened() const -> bool { + auto lock = std::shared_lock{log_lock_}; + return is_lock_opened_; +} } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_communication_config.cpp b/src/coordination/coordinator_communication_config.cpp index 31ed20fd0..43e7fbc37 100644 --- a/src/coordination/coordinator_communication_config.cpp +++ b/src/coordination/coordinator_communication_config.cpp @@ -60,5 +60,14 @@ void from_json(nlohmann::json const &j, CoordinatorToReplicaConfig &config) { config.replication_client_info = j.at("replication_client_info").get(); } +void from_json(nlohmann::json const &j, InstanceUUIDUpdate &instance_uuid_change) { + instance_uuid_change.uuid = j.at("uuid").get(); + instance_uuid_change.instance_name = j.at("instance_name").get(); +} + +void to_json(nlohmann::json &j, InstanceUUIDUpdate const &instance_uuid_change) { + j = nlohmann::json{{"instance_name", instance_uuid_change.instance_name}, {"uuid", instance_uuid_change.uuid}}; +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 2182e2405..6dc4a2eaf 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -30,7 +30,8 @@ using nuraft::ptr; using nuraft::srv_config; CoordinatorInstance::CoordinatorInstance() - : raft_state_(RaftState::MakeRaftState( + : thread_pool_{1}, + raft_state_(RaftState::MakeRaftState( [this]() { spdlog::info("Leader changed, starting all replication instances!"); auto const instances = raft_state_.GetReplicationInstances(); @@ -55,23 +56,34 @@ CoordinatorInstance::CoordinatorInstance() &CoordinatorInstance::MainFailCallback); }); - std::ranges::for_each(repl_instances_, [this](auto &instance) { - instance.SetNewMainUUID(raft_state_.GetUUID()); - instance.StartFrequentCheck(); - }); + std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); }); }, [this]() { - spdlog::info("Leader changed, stopping all replication instances!"); - repl_instances_.clear(); + thread_pool_.AddTask([this]() { + spdlog::info("Leader changed, trying to stop all replication instances frequent checks!"); + // We need to stop checks before taking a lock because deadlock can happen if instances waits + // to take a lock in frequent check, and this thread already has a lock and waits for instance to + // be done with frequent check + for (auto &repl_instance : repl_instances_) { + repl_instance.StopFrequentCheck(); + } + auto lock = std::unique_lock{coord_instance_lock_}; + repl_instances_.clear(); + spdlog::info("Stopped all replication instance frequent checks."); + }); })) { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::lock_guard{self->coord_instance_lock_}; + auto lock = std::unique_lock{self->coord_instance_lock_}; + // when coordinator is becoming follower it will want to stop all threads doing frequent checks + // Thread can get stuck here waiting for lock so we need to frequently check if we are in shutdown state + auto &repl_instance = self->FindReplicationInstance(repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); }; client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::lock_guard{self->coord_instance_lock_}; + auto lock = std::unique_lock{self->coord_instance_lock_}; + auto &repl_instance = self->FindReplicationInstance(repl_instance_name); std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); }; @@ -100,7 +112,7 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector { if (raft_state_.IsLeader()) { auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string { if (!instance.IsAlive()) return "unknown"; - if (raft_state_.IsMain(instance.InstanceName())) return "main"; + if (raft_state_.IsCurrentMain(instance.InstanceName())) return "main"; return "replica"; }; @@ -121,26 +133,36 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector { std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader); } } else { - auto const stringify_inst_status = [](ReplicationRole status) -> std::string { - return status == ReplicationRole::MAIN ? "main" : "replica"; + auto const stringify_inst_status = [raft_state_ptr = &raft_state_]( + utils::UUID const &main_uuid, + ReplicationInstanceState const &instance) -> std::string { + if (raft_state_ptr->IsCurrentMain(instance.config.instance_name)) { + return "main"; + } + if (raft_state_ptr->HasMainState(instance.config.instance_name)) { + return "unknown"; + } + return "replica"; }; // TODO: (andi) Add capability that followers can also return socket addresses - auto process_repl_instance_as_follower = [&stringify_inst_status](auto const &instance) -> InstanceStatus { + auto process_repl_instance_as_follower = + [this, &stringify_inst_status](ReplicationInstanceState const &instance) -> InstanceStatus { return {.instance_name = instance.config.instance_name, - .cluster_role = stringify_inst_status(instance.status), + .cluster_role = stringify_inst_status(raft_state_.GetCurrentMainUUID(), instance), .health = "unknown"}; }; std::ranges::transform(raft_state_.GetReplicationInstances(), std::back_inserter(instances_status), process_repl_instance_as_follower); } - return instances_status; } auto CoordinatorInstance::TryFailover() -> void { - auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); }; + auto const is_replica = [this](ReplicationInstance const &instance) { + return HasReplicaState(instance.InstanceName()); + }; auto alive_replicas = repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive); @@ -150,11 +172,6 @@ auto CoordinatorInstance::TryFailover() -> void { return; } - if (!raft_state_.RequestLeadership()) { - spdlog::error("Failover failed since the instance is not the leader!"); - return; - } - auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); }; auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to(); @@ -182,6 +199,10 @@ auto CoordinatorInstance::TryFailover() -> void { auto *new_main = &FindReplicationInstance(most_up_to_date_instance); + if (!raft_state_.AppendOpenLockFailover(most_up_to_date_instance)) { + spdlog::error("Aborting failover as instance is not anymore leader."); + return; + } new_main->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; @@ -191,16 +212,18 @@ auto CoordinatorInstance::TryFailover() -> void { auto const new_main_uuid = utils::UUID{}; - auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) { - return !instance.SendSwapAndUpdateUUID(new_main_uuid); + auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) { + return !instance.SendSwapAndUpdateUUID(new_main_uuid) || + !raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid); }; // If for some replicas swap fails, for others on successful ping we will revert back on next change // or we will do failover first again and then it will be consistent again if (std::ranges::any_of(alive_replicas | ranges::views::filter(is_not_new_main), failed_to_swap)) { - spdlog::error("Failed to swap uuid for all instances"); + spdlog::error("Aborting failover. Failed to swap uuid for all alive instances."); return; } + auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | ranges::to(); @@ -211,27 +234,36 @@ auto CoordinatorInstance::TryFailover() -> void { return; } - if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { + if (!raft_state_.AppendUpdateUUIDForNewMainLog(new_main_uuid)) { return; } auto const new_main_instance_name = new_main->InstanceName(); - if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { + if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name, new_main_uuid)) { return; } + if (!new_main->EnableWritingOnMain()) { + spdlog::error("Failover successful but couldn't enable writing on instance."); + } + spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); } auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; + if (raft_state_.IsLockOpened()) { + return SetInstanceToMainCoordinatorStatus::LOCK_OPENED; + } if (raft_state_.MainExists()) { return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS; } + // TODO(antoniofilipovic) Check if request leadership can cause problems due to changing of leadership while other + // doing failover if (!raft_state_.RequestLeadership()) { return SetInstanceToMainCoordinatorStatus::NOT_LEADER; } @@ -248,6 +280,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; } + if (!raft_state_.AppendOpenLockSetInstanceToMain(instance_name)) { + return SetInstanceToMainCoordinatorStatus::OPEN_LOCK; + } + new_main->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; @@ -257,12 +293,13 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance auto const new_main_uuid = utils::UUID{}; - auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) { - return !instance.SendSwapAndUpdateUUID(new_main_uuid); + auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) { + return !instance.SendSwapAndUpdateUUID(new_main_uuid) || + !raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid); }; if (std::ranges::any_of(repl_instances_ | ranges::views::filter(is_not_new_main), failed_to_swap)) { - spdlog::error("Failed to swap uuid for all instances"); + spdlog::error("Failed to swap uuid for all currently alive instances."); return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; } @@ -274,22 +311,28 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance &CoordinatorInstance::MainFailCallback)) { return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; } - - if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { + if (!raft_state_.AppendUpdateUUIDForNewMainLog(new_main_uuid)) { return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR; } - if (!raft_state_.AppendSetInstanceAsMainLog(instance_name)) { + if (!raft_state_.AppendSetInstanceAsMainLog(instance_name, new_main_uuid)) { return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR; } spdlog::info("Instance {} promoted to main on leader", instance_name); + + if (!new_main->EnableWritingOnMain()) { + return SetInstanceToMainCoordinatorStatus::ENABLE_WRITING_FAILED; + } return SetInstanceToMainCoordinatorStatus::SUCCESS; } auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config) -> RegisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; + if (raft_state_.IsLockOpened()) { + return RegisterInstanceCoordinatorStatus::LOCK_OPENED; + } if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; @@ -309,11 +352,14 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS; } + // TODO(antoniofilipovic) Check if this is an issue if (!raft_state_.RequestLeadership()) { return RegisterInstanceCoordinatorStatus::NOT_LEADER; } - auto const undo_action_ = [this]() { repl_instances_.pop_back(); }; + if (!raft_state_.AppendOpenLockRegister(config)) { + return RegisterInstanceCoordinatorStatus::OPEN_LOCK; + } auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_, &CoordinatorInstance::ReplicaSuccessCallback, @@ -321,15 +367,12 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig if (!new_instance->SendDemoteToReplicaRpc()) { spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name); - undo_action_(); return RegisterInstanceCoordinatorStatus::RPC_FAILED; } if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) { - undo_action_(); return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR; } - new_instance->StartFrequentCheck(); spdlog::info("Instance {} registered", config.instance_name); @@ -340,6 +383,11 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc -> UnregisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; + if (raft_state_.IsLockOpened()) { + return UnregisterInstanceCoordinatorStatus::LOCK_OPENED; + } + + // TODO(antoniofilipovic) Check if this is an issue if (!raft_state_.RequestLeadership()) { return UnregisterInstanceCoordinatorStatus::NOT_LEADER; } @@ -353,19 +401,23 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME; } - auto const is_main = [this](ReplicationInstance const &instance) { - return IsMain(instance.InstanceName()) && instance.GetMainUUID() == raft_state_.GetUUID() && instance.IsAlive(); + auto const is_current_main = [this](ReplicationInstance const &instance) { + return raft_state_.IsCurrentMain(instance.InstanceName()) && instance.IsAlive(); }; - if (is_main(*inst_to_remove)) { + if (is_current_main(*inst_to_remove)) { return UnregisterInstanceCoordinatorStatus::IS_MAIN; } + if (!raft_state_.AppendOpenLockUnregister(instance_name)) { + return UnregisterInstanceCoordinatorStatus::OPEN_LOCK; + } + inst_to_remove->StopFrequentCheck(); - auto curr_main = std::ranges::find_if(repl_instances_, is_main); + auto curr_main = std::ranges::find_if(repl_instances_, is_current_main); - if (curr_main != repl_instances_.end() && curr_main->IsAlive()) { + if (curr_main != repl_instances_.end()) { if (!curr_main->SendUnregisterReplicaRpc(instance_name)) { inst_to_remove->StartFrequentCheck(); return UnregisterInstanceCoordinatorStatus::RPC_FAILED; @@ -383,7 +435,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void { raft_state_.AddCoordinatorInstance(config); - // NOTE: We ignore error we added coordinator instance to networkign stuff but not in raft log. + // NOTE: We ignore error we added coordinator instance to networking stuff but not in raft log. if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) { spdlog::error("Failed to append add coordinator instance log"); } @@ -391,13 +443,15 @@ auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoor void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) { spdlog::trace("Instance {} performing main fail callback", repl_instance_name); + if (raft_state_.IsLockOpened()) { + spdlog::error("Returning from main fail callback as the last action didn't successfully finish"); + } + auto &repl_instance = FindReplicationInstance(repl_instance_name); repl_instance.OnFailPing(); - const auto &repl_instance_uuid = repl_instance.GetMainUUID(); - MG_ASSERT(repl_instance_uuid.has_value(), "Replication instance must have uuid set"); // NOLINTNEXTLINE - if (!repl_instance.IsAlive() && raft_state_.GetUUID() == repl_instance_uuid.value()) { + if (!repl_instance.IsAlive() && raft_state_.IsCurrentMain(repl_instance_name)) { spdlog::info("Cluster without main instance, trying automatic failover"); TryFailover(); } @@ -405,6 +459,12 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) { spdlog::trace("Instance {} performing main successful callback", repl_instance_name); + + if (raft_state_.IsLockOpened()) { + spdlog::error("Stopping main successful callback as the last action didn't successfully finish"); + return; + } + auto &repl_instance = FindReplicationInstance(repl_instance_name); if (repl_instance.IsAlive()) { @@ -412,11 +472,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam return; } - const auto &repl_instance_uuid = repl_instance.GetMainUUID(); - MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set."); - // NOLINTNEXTLINE - if (raft_state_.GetUUID() == repl_instance_uuid.value()) { + if (raft_state_.IsCurrentMain(repl_instance.InstanceName())) { if (!repl_instance.EnableWritingOnMain()) { spdlog::error("Failed to enable writing on main instance {}", repl_instance_name); return; @@ -426,9 +483,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam return; } - if (!raft_state_.RequestLeadership()) { - spdlog::error("Demoting main instance {} to replica failed since the instance is not the leader!", - repl_instance_name); + if (!raft_state_.AppendOpenLockSetInstanceToReplica(repl_instance.InstanceName())) { + spdlog::error("Failed to open lock for demoting OLD MAIN {} to REPLICA", repl_instance_name); return; } @@ -441,29 +497,38 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam return; } - if (!repl_instance.SendSwapAndUpdateUUID(raft_state_.GetUUID())) { + if (!repl_instance.SendSwapAndUpdateUUID(raft_state_.GetCurrentMainUUID())) { spdlog::error("Failed to swap uuid for demoted main instance {}", repl_instance_name); return; } + if (!raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance_name, raft_state_.GetCurrentMainUUID())) { + spdlog::error("Failed to update log of changing instance uuid {} to {}", repl_instance_name, + std::string{raft_state_.GetCurrentMainUUID()}); + return; + } + if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) { + spdlog::error("Failed to append log that OLD MAIN was demoted to REPLICA {}", repl_instance_name); return; } } void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) { spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); - auto &repl_instance = FindReplicationInstance(repl_instance_name); - if (!IsReplica(repl_instance_name)) { - spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name); + if (raft_state_.IsLockOpened()) { + spdlog::error("Stopping main successful callback as the last action didn't successfully finish"); return; } + + auto &repl_instance = FindReplicationInstance(repl_instance_name); + // We need to get replicas UUID from time to time to ensure replica is listening to correct main // and that it didn't go down for less time than we could notice // We need to get id of main replica is listening to // and swap if necessary - if (!repl_instance.EnsureReplicaHasCorrectMainUUID(raft_state_.GetUUID())) { + if (!repl_instance.EnsureReplicaHasCorrectMainUUID(raft_state_.GetCurrentMainUUID())) { spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()); return; } @@ -473,13 +538,14 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) { spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); - auto &repl_instance = FindReplicationInstance(repl_instance_name); - if (!IsReplica(repl_instance_name)) { - spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name); + if (raft_state_.IsLockOpened()) { + spdlog::error("Stopping main successful callback as the last action didn't successfully finish."); return; } + auto &repl_instance = FindReplicationInstance(repl_instance_name); + repl_instance.OnFailPing(); } @@ -551,12 +617,12 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span bool { - return raft_state_.IsMain(instance_name); +auto CoordinatorInstance::HasMainState(std::string_view instance_name) const -> bool { + return raft_state_.HasMainState(instance_name); } -auto CoordinatorInstance::IsReplica(std::string_view instance_name) const -> bool { - return raft_state_.IsReplica(instance_name); +auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const -> bool { + return raft_state_.HasReplicaState(instance_name); } auto CoordinatorInstance::GetRoutingTable(std::map const &routing) -> RoutingTable { diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 789ac2e5e..28c8b0768 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -22,12 +22,12 @@ namespace memgraph::coordination { auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); } -auto CoordinatorStateMachine::IsMain(std::string_view instance_name) const -> bool { - return cluster_state_.IsMain(instance_name); +auto CoordinatorStateMachine::HasMainState(std::string_view instance_name) const -> bool { + return cluster_state_.HasMainState(instance_name); } -auto CoordinatorStateMachine::IsReplica(std::string_view instance_name) const -> bool { - return cluster_state_.IsReplica(instance_name); +auto CoordinatorStateMachine::HasReplicaState(std::string_view instance_name) const -> bool { + return cluster_state_.HasReplicaState(instance_name); } auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr { @@ -38,6 +38,23 @@ auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr { return log_buf; } +auto CoordinatorStateMachine::SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr { + return CreateLog({{"action", RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE}, {"info", config}}); +} + +auto CoordinatorStateMachine::SerializeOpenLockUnregister(std::string_view instance_name) -> ptr { + return CreateLog( + {{"action", RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE}, {"info", std::string{instance_name}}}); +} + +auto CoordinatorStateMachine::SerializeOpenLockFailover(std::string_view instance_name) -> ptr { + return CreateLog({{"action", RaftLogAction::OPEN_LOCK_FAILOVER}, {"info", std::string(instance_name)}}); +} + +auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr { + return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN}, {"info", std::string(instance_name)}}); +} + auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr { return CreateLog({{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}}); } @@ -46,16 +63,22 @@ auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view insta return CreateLog({{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", instance_name}}); } -auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr { - return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_name}}); +auto CoordinatorStateMachine::SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change) + -> ptr { + return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_uuid_change}}); } auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr { return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}}); } -auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr { - return CreateLog({{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}}); +auto CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr { + return CreateLog({{"action", RaftLogAction::UPDATE_UUID_OF_NEW_MAIN}, {"info", uuid}}); +} + +auto CoordinatorStateMachine::SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) + -> ptr { + return CreateLog({{"action", RaftLogAction::UPDATE_UUID_FOR_INSTANCE}, {"info", instance_uuid_change}}); } auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) @@ -63,20 +86,37 @@ auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoord return CreateLog({{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}}); } +auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr { + return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA}, {"info", instance_name}}); +} + auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair { buffer_serializer bs(data); auto const json = nlohmann::json::parse(bs.get_str()); - auto const action = json["action"].get(); auto const &info = json["info"]; switch (action) { + case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: { + return {info.get(), action}; + } + case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE: + [[fallthrough]]; + case RaftLogAction::OPEN_LOCK_FAILOVER: + [[fallthrough]]; + case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN: + [[fallthrough]]; + case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: { + return {info.get(), action}; + } case RaftLogAction::REGISTER_REPLICATION_INSTANCE: return {info.get(), action}; - case RaftLogAction::UPDATE_UUID: + case RaftLogAction::UPDATE_UUID_OF_NEW_MAIN: return {info.get(), action}; - case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: + case RaftLogAction::UPDATE_UUID_FOR_INSTANCE: case RaftLogAction::SET_INSTANCE_AS_MAIN: + return {info.get(), action}; + case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: [[fallthrough]]; case RaftLogAction::SET_INSTANCE_AS_REPLICA: return {info.get(), action}; @@ -214,11 +254,20 @@ auto CoordinatorStateMachine::GetReplicationInstances() const -> std::vector utils::UUID { return cluster_state_.GetCurrentMainUUID(); } + +auto CoordinatorStateMachine::IsCurrentMain(std::string_view instance_name) const -> bool { + return cluster_state_.IsCurrentMain(instance_name); +} auto CoordinatorStateMachine::GetCoordinatorInstances() const -> std::vector { return cluster_state_.GetCoordinatorInstances(); } -auto CoordinatorStateMachine::GetUUID() const -> utils::UUID { return cluster_state_.GetUUID(); } +auto CoordinatorStateMachine::GetInstanceUUID(std::string_view instance_name) const -> utils::UUID { + return cluster_state_.GetInstanceUUID(instance_name); +} + +auto CoordinatorStateMachine::IsLockOpened() const -> bool { return cluster_state_.IsLockOpened(); } } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_communication_config.hpp b/src/coordination/include/coordination/coordinator_communication_config.hpp index 4f11b188f..56453d3ea 100644 --- a/src/coordination/include/coordination/coordinator_communication_config.hpp +++ b/src/coordination/include/coordination/coordinator_communication_config.hpp @@ -24,6 +24,7 @@ #include #include "json/json.hpp" +#include "utils/uuid.hpp" namespace memgraph::coordination { @@ -88,6 +89,11 @@ struct ManagementServerConfig { friend bool operator==(ManagementServerConfig const &, ManagementServerConfig const &) = default; }; +struct InstanceUUIDUpdate { + std::string instance_name; + memgraph::utils::UUID uuid; +}; + void to_json(nlohmann::json &j, CoordinatorToReplicaConfig const &config); void from_json(nlohmann::json const &j, CoordinatorToReplicaConfig &config); @@ -97,5 +103,8 @@ void from_json(nlohmann::json const &j, CoordinatorToCoordinatorConfig &config); void to_json(nlohmann::json &j, ReplicationClientInfo const &config); void from_json(nlohmann::json const &j, ReplicationClientInfo &config); +void to_json(nlohmann::json &j, InstanceUUIDUpdate const &config); +void from_json(nlohmann::json const &j, InstanceUUIDUpdate &config); + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index a778d1238..5f74d1410 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -62,9 +62,11 @@ class CoordinatorInstance { static auto ChooseMostUpToDateInstance(std::span histories) -> NewMainRes; - private: - HealthCheckClientCallback client_succ_cb_, client_fail_cb_; + auto HasMainState(std::string_view instance_name) const -> bool; + auto HasReplicaState(std::string_view instance_name) const -> bool; + + private: auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; void MainFailCallback(std::string_view); @@ -75,13 +77,14 @@ class CoordinatorInstance { void ReplicaFailCallback(std::string_view); - auto IsMain(std::string_view instance_name) const -> bool; - auto IsReplica(std::string_view instance_name) const -> bool; - + HealthCheckClientCallback client_succ_cb_, client_fail_cb_; // NOTE: Must be std::list because we rely on pointer stability. std::list repl_instances_; mutable utils::ResourceLock coord_instance_lock_{}; + // Thread pool needs to be constructed before raft state as raft state can call thread pool + utils::ThreadPool thread_pool_; + RaftState raft_state_; }; diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index c4958a5ba..03e00df06 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -64,22 +64,32 @@ class RaftState { auto RequestLeadership() -> bool; auto IsLeader() const -> bool; - auto MainExists() const -> bool; - auto IsMain(std::string_view instance_name) const -> bool; - auto IsReplica(std::string_view instance_name) const -> bool; - auto AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig const &config) -> bool; auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool; - auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool; + auto AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool; auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool; - auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool; + auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool; + auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool; + auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool; + auto AppendOpenLockUnregister(std::string_view) -> bool; + auto AppendOpenLockFailover(std::string_view instance_name) -> bool; + auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool; + auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool; auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool; auto GetReplicationInstances() const -> std::vector; // TODO: (andi) Do we need then GetAllCoordinators? auto GetCoordinatorInstances() const -> std::vector; - auto GetUUID() const -> utils::UUID; + auto MainExists() const -> bool; + auto HasMainState(std::string_view instance_name) const -> bool; + auto HasReplicaState(std::string_view instance_name) const -> bool; + auto IsCurrentMain(std::string_view instance_name) const -> bool; + + auto GetCurrentMainUUID() const -> utils::UUID; + auto GetInstanceUUID(std::string_view) const -> utils::UUID; + + auto IsLockOpened() const -> bool; private: // TODO: (andi) I think variables below can be abstracted/clean them. diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index 13b58ff9f..4366d20a5 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -25,7 +25,9 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t { NOT_LEADER, RPC_FAILED, RAFT_LOG_ERROR, - SUCCESS + SUCCESS, + LOCK_OPENED, + OPEN_LOCK }; enum class UnregisterInstanceCoordinatorStatus : uint8_t { @@ -36,6 +38,8 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t { NOT_LEADER, RAFT_LOG_ERROR, SUCCESS, + LOCK_OPENED, + OPEN_LOCK }; enum class SetInstanceToMainCoordinatorStatus : uint8_t { @@ -47,6 +51,9 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t { COULD_NOT_PROMOTE_TO_MAIN, SWAP_UUID_FAILED, SUCCESS, + LOCK_OPENED, + OPEN_LOCK, + ENABLE_WRITING_FAILED }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 1e6c042c5..19127e7eb 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -79,10 +79,6 @@ class ReplicationInstance { auto EnableWritingOnMain() -> bool; - auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; - auto ResetMainUUID() -> void; - auto GetMainUUID() const -> std::optional const &; - auto GetSuccessCallback() -> HealthCheckInstanceCallback &; auto GetFailCallback() -> HealthCheckInstanceCallback &; @@ -92,19 +88,12 @@ class ReplicationInstance { bool is_alive_{false}; std::chrono::system_clock::time_point last_check_of_uuid_{}; - // for replica this is main uuid of current main - // for "main" main this same as in CoordinatorData - // it is set to nullopt when replica is down - // TLDR; when replica is down and comes back up we reset uuid of main replica is listening to - // so we need to send swap uuid again - std::optional main_uuid_; - HealthCheckInstanceCallback succ_cb_; HealthCheckInstanceCallback fail_cb_; friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) { return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ && - first.is_alive_ == second.is_alive_ && first.main_uuid_ == second.main_uuid_; + first.is_alive_ == second.is_alive_; } }; diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index 5d9afe89e..5bd733c43 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -36,8 +36,15 @@ struct ReplicationInstanceState { CoordinatorToReplicaConfig config; ReplicationRole status; + // for replica this is main uuid of current main + // for "main" main this same as current_main_id_ + // when replica is down and comes back up we reset uuid of main replica is listening to + // so we need to send swap uuid again + // For MAIN we don't enable writing until cluster is in healthy state + utils::UUID instance_uuid; + friend auto operator==(ReplicationInstanceState const &lhs, ReplicationInstanceState const &rhs) -> bool { - return lhs.config == rhs.config && lhs.status == rhs.status; + return lhs.config == rhs.config && lhs.status == rhs.status && lhs.instance_uuid == rhs.instance_uuid; } }; @@ -54,7 +61,8 @@ struct CoordinatorInstanceState { void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state); void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state); -using TRaftLog = std::variant; +using TRaftLog = std::variant; using nuraft::buffer; using nuraft::buffer_serializer; @@ -63,7 +71,8 @@ using nuraft::ptr; class CoordinatorClusterState { public: CoordinatorClusterState() = default; - explicit CoordinatorClusterState(std::map> instances); + explicit CoordinatorClusterState(std::map> instances, + utils::UUID const ¤t_main_uuid, bool is_lock_opened); CoordinatorClusterState(CoordinatorClusterState const &); CoordinatorClusterState &operator=(CoordinatorClusterState const &); @@ -74,11 +83,11 @@ class CoordinatorClusterState { auto MainExists() const -> bool; - auto IsMain(std::string_view instance_name) const -> bool; + auto HasMainState(std::string_view instance_name) const -> bool; - auto IsReplica(std::string_view instance_name) const -> bool; + auto HasReplicaState(std::string_view instance_name) const -> bool; - auto InsertInstance(std::string instance_name, ReplicationInstanceState instance_state) -> void; + auto IsCurrentMain(std::string_view instance_name) const -> bool; auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void; @@ -88,15 +97,20 @@ class CoordinatorClusterState { auto GetReplicationInstances() const -> std::vector; - auto GetCoordinatorInstances() const -> std::vector; + auto GetCurrentMainUUID() const -> utils::UUID; - auto GetUUID() const -> utils::UUID; + auto GetInstanceUUID(std::string_view) const -> utils::UUID; + + auto IsLockOpened() const -> bool; + + auto GetCoordinatorInstances() const -> std::vector; private: std::vector coordinators_{}; std::map> repl_instances_{}; - utils::UUID uuid_{}; + utils::UUID current_main_uuid_{}; mutable utils::ResourceLock log_lock_{}; + bool is_lock_opened_{false}; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 6340cf604..754cb45af 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -40,20 +40,21 @@ class CoordinatorStateMachine : public state_machine { CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete; CoordinatorStateMachine(CoordinatorStateMachine &&) = delete; CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete; - ~CoordinatorStateMachine() override {} - - // TODO: (andi) Check API of this class. - auto MainExists() const -> bool; - auto IsMain(std::string_view instance_name) const -> bool; - auto IsReplica(std::string_view instance_name) const -> bool; + ~CoordinatorStateMachine() override = default; static auto CreateLog(nlohmann::json &&log) -> ptr; + static auto SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr; + static auto SerializeOpenLockUnregister(std::string_view instance_name) -> ptr; + static auto SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr; + static auto SerializeOpenLockFailover(std::string_view instance_name) -> ptr; static auto SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr; static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr; - static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr; + static auto SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change) -> ptr; static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr; - static auto SerializeUpdateUUID(utils::UUID const &uuid) -> ptr; + static auto SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr; + static auto SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) -> ptr; static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr; + static auto SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr; static auto DecodeLog(buffer &data) -> std::pair; @@ -85,7 +86,15 @@ class CoordinatorStateMachine : public state_machine { auto GetCoordinatorInstances() const -> std::vector; - auto GetUUID() const -> utils::UUID; + // Getters + auto MainExists() const -> bool; + auto HasMainState(std::string_view instance_name) const -> bool; + auto HasReplicaState(std::string_view instance_name) const -> bool; + auto IsCurrentMain(std::string_view instance_name) const -> bool; + + auto GetCurrentMainUUID() const -> utils::UUID; + auto GetInstanceUUID(std::string_view instance_name) const -> utils::UUID; + auto IsLockOpened() const -> bool; private: struct SnapshotCtx { diff --git a/src/coordination/include/nuraft/raft_log_action.hpp b/src/coordination/include/nuraft/raft_log_action.hpp index b9cdd233a..ea1a4b9d7 100644 --- a/src/coordination/include/nuraft/raft_log_action.hpp +++ b/src/coordination/include/nuraft/raft_log_action.hpp @@ -23,20 +23,34 @@ namespace memgraph::coordination { enum class RaftLogAction : uint8_t { + OPEN_LOCK_REGISTER_REPLICATION_INSTANCE, + OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE, + OPEN_LOCK_FAILOVER, + OPEN_LOCK_SET_INSTANCE_AS_MAIN, + OPEN_LOCK_SET_INSTANCE_AS_REPLICA, REGISTER_REPLICATION_INSTANCE, UNREGISTER_REPLICATION_INSTANCE, SET_INSTANCE_AS_MAIN, SET_INSTANCE_AS_REPLICA, - UPDATE_UUID, - ADD_COORDINATOR_INSTANCE + UPDATE_UUID_OF_NEW_MAIN, + ADD_COORDINATOR_INSTANCE, + UPDATE_UUID_FOR_INSTANCE, }; -NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"}, - {RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"}, - {RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"}, - {RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"}, - {RaftLogAction::UPDATE_UUID, "update_uuid"}, - {RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"}}) +NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, + {{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"}, + {RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"}, + {RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"}, + {RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"}, + {RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"}, + {RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"}, + {RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"}, + {RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE, "open_lock_register_instance"}, + {RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE, + "open_lock_unregister_instance"}, + {RaftLogAction::OPEN_LOCK_FAILOVER, "open_lock_failover"}, + {RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN, "open_lock_set_instance_as_main"}, + {RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA, "open_lock_set_instance_as_replica"}}) } // namespace memgraph::coordination #endif diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index 3c1cbd158..db88169c8 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -12,7 +12,6 @@ #ifdef MG_ENTERPRISE #include -#include #include "coordination/coordinator_communication_config.hpp" #include "coordination/coordinator_exceptions.hpp" #include "coordination/raft_state.hpp" @@ -63,13 +62,18 @@ auto RaftState::InitRaftServer() -> void { params.leadership_expiry_ = 200; raft_server::init_options init_opts; + init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { if (event_type == cb_func::BecomeLeader) { spdlog::info("Node {} became leader", param->leaderId); become_leader_cb_(); } else if (event_type == cb_func::BecomeFollower) { - spdlog::info("Node {} became follower", param->myId); + // TODO(antoniofilipovic) Check what happens when becoming follower while doing failover + // There is no way to stop becoming a follower: + // https://github.com/eBay/NuRaft/blob/188947bcc73ce38ab1c3cf9d01015ca8a29decd9/src/raft_server.cxx#L1334-L1335 + spdlog::trace("Got request to become follower"); become_follower_cb_(); + spdlog::trace("Node {} became follower", param->myId); } return CbReturnCode::Ok; }; @@ -82,7 +86,6 @@ auto RaftState::InitRaftServer() -> void { if (!raft_server_) { throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint_.SocketAddress()); } - auto maybe_stop = utils::ResettableCounter<20>(); do { if (raft_server_->is_initialized()) { @@ -157,6 +160,78 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); } auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } +auto RaftState::AppendOpenLockRegister(CoordinatorToReplicaConfig const &config) -> bool { + auto new_log = CoordinatorStateMachine::SerializeOpenLockRegister(config); + auto const res = raft_server_->append_entries({new_log}); + + if (!res->get_accepted()) { + spdlog::error("Failed to accept request to open lock to register instance {}", config.instance_name); + return false; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to open lock for registering instance {} with error code {}", config.instance_name, + int(res->get_result_code())); + return false; + } + + return true; +} + +auto RaftState::AppendOpenLockUnregister(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeOpenLockUnregister(instance_name); + auto const res = raft_server_->append_entries({new_log}); + + if (!res->get_accepted()) { + spdlog::error("Failed to accept request to open lock to unregister instance {}.", instance_name); + return false; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to open lock for unregistering instance {} with error code {}", instance_name, + int(res->get_result_code())); + return false; + } + + return true; +} + +auto RaftState::AppendOpenLockFailover(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeOpenLockFailover(instance_name); + auto const res = raft_server_->append_entries({new_log}); + + if (!res->get_accepted()) { + spdlog::error("Failed to accept request to open lock for failover {}", instance_name); + return false; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to open lock for failover to instance {} with error code {}", instance_name, + int(res->get_result_code())); + return false; + } + + return true; +} + +auto RaftState::AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(instance_name); + auto const res = raft_server_->append_entries({new_log}); + + if (!res->get_accepted()) { + spdlog::error("Failed to accept request to open lock and set instance {} to MAIN", instance_name); + return false; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to open lock to set instance {} to MAIN with error code {}", instance_name, + int(res->get_result_code())); + return false; + } + + return true; +} + auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig const &config) -> bool { auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config); auto const res = raft_server_->append_entries({new_log}); @@ -201,8 +276,9 @@ auto RaftState::AppendUnregisterReplicationInstanceLog(std::string_view instance return true; } -auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool { - auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name); +auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool { + auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain( + InstanceUUIDUpdate{.instance_name = std::string{instance_name}, .uuid = uuid}); auto const res = raft_server_->append_entries({new_log}); if (!res->get_accepted()) { spdlog::error( @@ -241,8 +317,28 @@ auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> return true; } -auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool { - auto new_log = CoordinatorStateMachine::SerializeUpdateUUID(uuid); +auto RaftState::AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(instance_name); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + spdlog::info("Request for demoting instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, int(res->get_result_code())); + return false; + } + + return true; +} + +auto RaftState::AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool { + auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid); auto const res = raft_server_->append_entries({new_log}); if (!res->get_accepted()) { spdlog::error( @@ -250,7 +346,7 @@ auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool { "the leader."); return false; } - spdlog::info("Request for updating UUID accepted"); + spdlog::trace("Request for updating UUID accepted"); if (res->get_result_code() != nuraft::cmd_result_code::OK) { spdlog::error("Failed to update UUID with error code {}", int(res->get_result_code())); @@ -282,23 +378,53 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c return true; } +auto RaftState::AppendUpdateUUIDForInstanceLog(std::string_view instance_name, const utils::UUID &uuid) -> bool { + auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForInstance( + {.instance_name = std::string{instance_name}, .uuid = uuid}); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error("Failed to accept request for updating UUID of instance."); + return false; + } + spdlog::trace("Request for updating UUID of instance accepted"); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to update UUID of instance with error code {}", int(res->get_result_code())); + return false; + } + + return true; +} + auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); } -auto RaftState::IsMain(std::string_view instance_name) const -> bool { return state_machine_->IsMain(instance_name); } +auto RaftState::HasMainState(std::string_view instance_name) const -> bool { + return state_machine_->HasMainState(instance_name); +} -auto RaftState::IsReplica(std::string_view instance_name) const -> bool { - return state_machine_->IsReplica(instance_name); +auto RaftState::HasReplicaState(std::string_view instance_name) const -> bool { + return state_machine_->HasReplicaState(instance_name); } auto RaftState::GetReplicationInstances() const -> std::vector { return state_machine_->GetReplicationInstances(); } +auto RaftState::GetCurrentMainUUID() const -> utils::UUID { return state_machine_->GetCurrentMainUUID(); } + +auto RaftState::IsCurrentMain(std::string_view instance_name) const -> bool { + return state_machine_->IsCurrentMain(instance_name); +} + +auto RaftState::IsLockOpened() const -> bool { return state_machine_->IsLockOpened(); } + +auto RaftState::GetInstanceUUID(std::string_view instance_name) const -> utils::UUID { + return state_machine_->GetInstanceUUID(instance_name); +} + auto RaftState::GetCoordinatorInstances() const -> std::vector { return state_machine_->GetCoordinatorInstances(); } -auto RaftState::GetUUID() const -> utils::UUID { return state_machine_->GetUUID(); } - } // namespace memgraph::coordination #endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 34d889775..00e4a98e0 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -56,7 +56,6 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication return false; } - main_uuid_ = new_uuid; succ_cb_ = main_succ_cb; fail_cb_ = main_fail_cb; @@ -91,9 +90,6 @@ auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { r auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } -auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } -auto ReplicationInstance::GetMainUUID() const -> std::optional const & { return main_uuid_; } - auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { if (!IsReadyForUUIDPing()) { return true; @@ -116,7 +112,6 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) { return false; } - SetNewMainUUID(new_main_uuid); return true; } diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 87eccca87..1d02f8435 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -407,6 +407,11 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { case RPC_FAILED: throw QueryRuntimeException( "Couldn't unregister replica instance because current main instance couldn't unregister replica!"); + case LOCK_OPENED: + throw QueryRuntimeException("Couldn't unregister replica because the last action didn't finish successfully!"); + case OPEN_LOCK: + throw QueryRuntimeException( + "Couldn't register instance as cluster didn't accept entering unregistration state!"); case SUCCESS: break; } @@ -469,6 +474,12 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException( "Couldn't register replica instance because setting instance to replica failed! Check logs on replica to " "find out more info!"); + case LOCK_OPENED: + throw QueryRuntimeException( + "Couldn't register replica instance because because the last action didn't finish successfully!"); + case OPEN_LOCK: + throw QueryRuntimeException( + "Couldn't register replica instance because cluster didn't accept registration query!"); case SUCCESS: break; } @@ -514,6 +525,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { "Couldn't set replica instance to main! Check coordinator and replica for more logs"); case SWAP_UUID_FAILED: throw QueryRuntimeException("Couldn't set replica instance to main. Replicas didn't swap uuid of new main."); + case OPEN_LOCK: + throw QueryRuntimeException( + "Couldn't set replica instance to main as cluster didn't accept setting instance state."); + case LOCK_OPENED: + throw QueryRuntimeException( + "Couldn't register replica instance because because the last action didn't finish successfully!"); + case ENABLE_WRITING_FAILED: + throw QueryRuntimeException("Instance promoted to MAIN, but couldn't enable writing to instance."); case SUCCESS: break; } @@ -529,7 +548,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { #endif /// returns false if the replication role can't be set -/// @throw QueryRuntimeException if an error ocurred. +/// @throw QueryRuntimeException if an error occurred. Callback HandleAuthQuery(AuthQuery *auth_query, InterpreterContext *interpreter_context, const Parameters ¶meters, Interpreter &interpreter) { diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 2e00670ec..f04d00761 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -254,7 +254,8 @@ bool ReplicationState::SetReplicationRoleMain(const utils::UUID &main_uuid) { return false; } - replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, true, main_uuid}; + // By default, writing on MAIN is disabled until cluster is in healthy state + replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, /*is_writing enabled*/ false, main_uuid}; return true; } diff --git a/src/utils/thread_pool.cpp b/src/utils/thread_pool.cpp index 8d16e6c0b..215cca35c 100644 --- a/src/utils/thread_pool.cpp +++ b/src/utils/thread_pool.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,7 +10,6 @@ // licenses/APL.txt. #include "utils/thread_pool.hpp" - namespace memgraph::utils { ThreadPool::ThreadPool(const size_t pool_size) { diff --git a/tests/e2e/high_availability/coord_cluster_registration.py b/tests/e2e/high_availability/coord_cluster_registration.py index 16f91214d..ffb51e0e1 100644 --- a/tests/e2e/high_availability/coord_cluster_registration.py +++ b/tests/e2e/high_availability/coord_cluster_registration.py @@ -241,7 +241,10 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { def test_coordinators_communication_with_restarts(): + # 1 Start all instances safe_execute(shutil.rmtree, TEMP_DIR) + + # 1 interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) coordinator3_cursor = connect(host="localhost", port=7692).cursor() diff --git a/tests/e2e/high_availability/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py index b863ca519..dff668ec1 100644 --- a/tests/e2e/high_availability/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -13,6 +13,7 @@ import os import shutil import sys import tempfile +import time import interactive_mg_runner import pytest @@ -261,7 +262,7 @@ def test_old_main_comes_back_on_new_leader_as_replica(): ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), ("instance_1", "", "", "unknown", "main"), ("instance_2", "", "", "unknown", "replica"), - ("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. + ("instance_3", "", "", "unknown", "unknown"), ] mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) @@ -456,7 +457,7 @@ def test_distributed_automatic_failover_with_leadership_change(): ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), ("instance_1", "", "", "unknown", "main"), ("instance_2", "", "", "unknown", "replica"), - ("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. + ("instance_3", "", "", "unknown", "unknown"), ] mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) @@ -1092,8 +1093,8 @@ def test_multiple_failovers_in_row_no_leadership_change(): "", "", "unknown", - "main", - ), # TODO(antoniofilipovic) change to unknown after PR with transitions + "unknown", + ), ] ) @@ -1119,9 +1120,9 @@ def test_multiple_failovers_in_row_no_leadership_change(): follower_data.extend(coordinator_data) follower_data.extend( [ - ("instance_1", "", "", "unknown", "main"), - ("instance_2", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown - ("instance_3", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_1", "", "", "unknown", "unknown"), + ("instance_2", "", "", "unknown", "main"), + ("instance_3", "", "", "unknown", "unknown"), ] ) @@ -1149,7 +1150,7 @@ def test_multiple_failovers_in_row_no_leadership_change(): follower_data.extend(coordinator_data) follower_data.extend( [ - ("instance_1", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_1", "", "", "unknown", "unknown"), ("instance_2", "", "", "unknown", "main"), ("instance_3", "", "", "unknown", "replica"), ] @@ -1177,8 +1178,8 @@ def test_multiple_failovers_in_row_no_leadership_change(): follower_data.extend(coordinator_data) follower_data.extend( [ - ("instance_1", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown - ("instance_2", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_1", "", "", "unknown", "unknown"), + ("instance_2", "", "", "unknown", "unknown"), ("instance_3", "", "", "unknown", "main"), ] ) @@ -1258,5 +1259,166 @@ def test_multiple_failovers_in_row_no_leadership_change(): mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7688, host="localhost").cursor())) +def test_multiple_old_mains_single_failover(): + # Goal of this test is to check when leadership changes + # and we have old MAIN down, that we don't start failover + # 1. Start all instances. + # 2. Kill the main instance + # 3. Do failover + # 4. Kill other main + # 5. Kill leader + # 6. Leave first main down, and start second main + # 7. Second main should write data to new instance all the time + + # 1 + safe_execute(shutil.rmtree, TEMP_DIR) + inner_instances_description = get_instances_description_no_setup() + + interactive_mg_runner.start_all(inner_instances_description) + + setup_queries = [ + "ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}", + "ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}", + "REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};", + "REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};", + "REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};", + "SET INSTANCE instance_3 TO MAIN", + ] + coord_cursor_3 = connect(host="localhost", port=7692).cursor() + for query in setup_queries: + execute_and_fetch_all(coord_cursor_3, query) + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor_3, "SHOW INSTANCES;"))) + + coordinators = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ] + + basic_instances = [ + ("instance_1", "", "127.0.0.1:10011", "up", "replica"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "up", "main"), + ] + + expected_data_on_coord = [] + expected_data_on_coord.extend(coordinators) + expected_data_on_coord.extend(basic_instances) + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + # 2 + + interactive_mg_runner.kill(inner_instances_description, "instance_3") + + # 3 + + basic_instances = [ + ("instance_1", "", "127.0.0.1:10011", "up", "main"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + + expected_data_on_coord = [] + expected_data_on_coord.extend(coordinators) + expected_data_on_coord.extend(basic_instances) + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + # 4 + + interactive_mg_runner.kill(inner_instances_description, "instance_1") + + # 5 + interactive_mg_runner.kill(inner_instances_description, "coordinator_3") + + # 6 + + interactive_mg_runner.start(inner_instances_description, "instance_1") + + # 7 + + coord_cursor_1 = connect(host="localhost", port=7690).cursor() + + def show_instances_coord1(): + return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;"))) + + coord_cursor_2 = connect(host="localhost", port=7691).cursor() + + def show_instances_coord2(): + return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;"))) + + leader_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "up", "main"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + + follower_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "", "unknown", "main"), + ("instance_2", "", "", "unknown", "replica"), + ("instance_3", "", "", "unknown", "unknown"), + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) + + instance_1_cursor = connect(host="localhost", port=7687).cursor() + + def show_replicas(): + return sorted(list(execute_and_fetch_all(instance_1_cursor, "SHOW REPLICAS;"))) + + replicas = [ + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_3", + "127.0.0.1:10003", + "sync", + {"behind": None, "status": "invalid", "ts": 0}, + {"memgraph": {"behind": 0, "status": "invalid", "ts": 0}}, + ), + ] + mg_sleep_and_assert_collection(replicas, show_replicas) + + def get_vertex_count_func(cursor): + def get_vertex_count(): + return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(n)")[0][0] + + return get_vertex_count + + vertex_count = 0 + instance_1_cursor = connect(port=7687, host="localhost").cursor() + instance_2_cursor = connect(port=7688, host="localhost").cursor() + + mg_sleep_and_assert(vertex_count, get_vertex_count_func(instance_1_cursor)) + mg_sleep_and_assert(vertex_count, get_vertex_count_func(instance_2_cursor)) + + time_slept = 0 + failover_time = 5 + while time_slept < failover_time: + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_1_cursor, "CREATE ();") + vertex_count += 1 + + assert vertex_count == execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] + assert vertex_count == execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] + time.sleep(0.1) + time_slept += 0.1 + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/unit/raft_log_serialization.cpp b/tests/unit/raft_log_serialization.cpp index bda690855..3f24b43c7 100644 --- a/tests/unit/raft_log_serialization.cpp +++ b/tests/unit/raft_log_serialization.cpp @@ -101,8 +101,8 @@ TEST_F(RaftLogSerialization, RaftLogActionDemote) { ASSERT_EQ(action, action2); } -TEST_F(RaftLogSerialization, RaftLogActionUpdateUUID) { - auto action = RaftLogAction::UPDATE_UUID; +TEST_F(RaftLogSerialization, RaftLogActionUpdateUUIDForInstance) { + auto action = RaftLogAction::UPDATE_UUID_FOR_INSTANCE; nlohmann::json j = action; RaftLogAction action2 = j.get(); @@ -135,10 +135,14 @@ TEST_F(RaftLogSerialization, UnregisterInstance) { } TEST_F(RaftLogSerialization, SetInstanceAsMain) { - auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsMain("instance3"); + auto instance_uuid_update = + memgraph::coordination::InstanceUUIDUpdate{.instance_name = "instance3", .uuid = memgraph::utils::UUID{}}; + auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_uuid_update); auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_MAIN); - ASSERT_EQ("instance3", std::get(payload)); + ASSERT_EQ(instance_uuid_update.instance_name, + std::get(payload).instance_name); + ASSERT_EQ(instance_uuid_update.uuid, std::get(payload).uuid); } TEST_F(RaftLogSerialization, SetInstanceAsReplica) { @@ -148,10 +152,10 @@ TEST_F(RaftLogSerialization, SetInstanceAsReplica) { ASSERT_EQ("instance3", std::get(payload)); } -TEST_F(RaftLogSerialization, UpdateUUID) { +TEST_F(RaftLogSerialization, UpdateUUIDForNewMain) { UUID uuid; - auto buffer = CoordinatorStateMachine::SerializeUpdateUUID(uuid); + auto buffer = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid); auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer); - ASSERT_EQ(action, RaftLogAction::UPDATE_UUID); + ASSERT_EQ(action, RaftLogAction::UPDATE_UUID_OF_NEW_MAIN); ASSERT_EQ(uuid, std::get(payload)); }