From 742f3a2e4c869d8eb0daef17ac91fe32a0d62094 Mon Sep 17 00:00:00 2001 From: antoniofilipovic Date: Tue, 12 Mar 2024 16:53:10 +0100 Subject: [PATCH] fix issue with follower callback --- src/coordination/coordinator_instance.cpp | 55 +++++++++++++++++++++-- src/coordination/raft_state.cpp | 3 +- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 46a32bb60..814f9e228 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -61,23 +61,38 @@ CoordinatorInstance::CoordinatorInstance() std::ranges::for_each(repl_instances_, [this](auto &instance) { instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename - instance.StartFrequentCheck(); + instance.ResumeFrequentCheck(); }); + is_running_.store(true, std::memory_order::acquire); }, [this]() { - spdlog::info("Leader changed, trying to stop all replication instances!"); + spdlog::info("Leader changed, trying to stop all replication instances, thread id {}!", + std::this_thread::get_id()); /// TODO Add to pool - auto lock = std::lock_guard{coord_instance_lock_}; // RAII + // auto lock = std::lock_guard{coord_instance_lock_}; // RAII + // std::ranges::for_each(repl_instances_) repl_instances_.clear(); spdlog::info("Leader changed, stopped all replication instances!"); + return; + is_running_.store(false, std::memory_order::acquire); + pool_.AddTask([this]() { + auto lock = std::lock_guard{coord_instance_lock_}; + std::ranges::for_each(repl_instances_, [](auto &instance) { instance.PauseFrequentCheck(); }); + }); })) { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + if (!self->is_running_) { + return; + } auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII auto &repl_instance = self->FindReplicationInstance(repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); }; client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + if (!self->is_running_) { + return; + } auto lock = std::lock_guard{self->coord_instance_lock_}; auto &repl_instance = self->FindReplicationInstance(repl_instance_name); std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); @@ -149,6 +164,9 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector { } auto CoordinatorInstance::TryFailover() -> void { + if (!is_running_) { + return; + } auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); }; auto alive_replicas = @@ -219,9 +237,12 @@ auto CoordinatorInstance::TryFailover() -> void { spdlog::warn("Failover failed since promoting replica to main failed!"); return; } - spdlog::info("Promote to main done, trying to append update uuid"); + spdlog::info("Promote to main done, trying to append update uuid, thread id {}, is leader {}", + std::this_thread::get_id(), raft_state_.IsLeader()); // TODO (antoniofilipovic) : This can fail and we don't know we change uuid if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { + spdlog::trace("Append entry update uuid failed, thread id {}, is leader {}", std::this_thread::get_id(), + raft_state_.IsLeader()); return; } @@ -229,6 +250,8 @@ auto CoordinatorInstance::TryFailover() -> void { spdlog::info("Promote to main done, trying to set instance as main log"); // TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { + spdlog::trace("Append set instance as main failed, thread id {}, is leader {}", std::this_thread::get_id(), + raft_state_.IsLeader()); return; } @@ -244,6 +267,10 @@ auto CoordinatorInstance::TryFailover() -> void { auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus { + if (!is_running_) { + return SetInstanceToMainCoordinatorStatus::NOT_LEADER; + ; + } auto lock = std::lock_guard{coord_instance_lock_}; if (raft_state_.MainExists()) { @@ -307,6 +334,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config) -> RegisterInstanceCoordinatorStatus { + if (!is_running_) { + return RegisterInstanceCoordinatorStatus::NOT_LEADER; + ; + } auto lock = std::lock_guard{coord_instance_lock_}; if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) { @@ -353,6 +384,10 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name) -> UnregisterInstanceCoordinatorStatus { + if (!is_running_) { + return UnregisterInstanceCoordinatorStatus::NOT_LEADER; + ; + } auto lock = std::lock_guard{coord_instance_lock_}; if (!raft_state_.RequestLeadership()) { @@ -402,6 +437,9 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32 } void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) { + if (!is_running_) { + return; + } spdlog::trace("Instance {} performing main fail callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); repl_instance.OnFailPing(); @@ -416,6 +454,9 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) } void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) { + if (!is_running_) { + return; + } spdlog::trace("Instance {} performing main successful callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); @@ -464,6 +505,9 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam } void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) { + if (!is_running_) { + return; + } spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); @@ -484,6 +528,9 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_ } void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) { + if (!is_running_) { + return; + } spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index 0210447a3..d9b96f09c 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -59,7 +59,8 @@ auto RaftState::InitRaftServer() -> void { raft_server::init_options init_opts; init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { - spdlog::info("Received some message, my id {}, leader id {}", param->myId, param->leaderId); + spdlog::info("Received some message, my id {}, leader id {}, event_type {}, thread_id {}", param->myId, + param->leaderId, event_type, std::this_thread::get_id()); if (event_type == cb_func::BecomeLeader) { spdlog::info("Node {} became leader", param->leaderId); become_leader_cb_();