fix issue with follower callback

This commit is contained in:
antoniofilipovic 2024-03-12 16:53:10 +01:00
parent 7d9352beb5
commit 742f3a2e4c
2 changed files with 53 additions and 5 deletions

View File

@ -61,23 +61,38 @@ CoordinatorInstance::CoordinatorInstance()
std::ranges::for_each(repl_instances_, [this](auto &instance) { std::ranges::for_each(repl_instances_, [this](auto &instance) {
instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename
instance.StartFrequentCheck(); instance.ResumeFrequentCheck();
}); });
is_running_.store(true, std::memory_order::acquire);
}, },
[this]() { [this]() {
spdlog::info("Leader changed, trying to stop all replication instances!"); spdlog::info("Leader changed, trying to stop all replication instances, thread id {}!",
std::this_thread::get_id());
/// TODO Add to pool /// TODO Add to pool
auto lock = std::lock_guard{coord_instance_lock_}; // RAII // auto lock = std::lock_guard{coord_instance_lock_}; // RAII
// std::ranges::for_each(repl_instances_)
repl_instances_.clear(); repl_instances_.clear();
spdlog::info("Leader changed, stopped all replication instances!"); spdlog::info("Leader changed, stopped all replication instances!");
return;
is_running_.store(false, std::memory_order::acquire);
pool_.AddTask([this]() {
auto lock = std::lock_guard{coord_instance_lock_};
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.PauseFrequentCheck(); });
});
})) { })) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
if (!self->is_running_) {
return;
}
auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
}; };
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
if (!self->is_running_) {
return;
}
auto lock = std::lock_guard{self->coord_instance_lock_}; auto lock = std::lock_guard{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
@ -149,6 +164,9 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
} }
auto CoordinatorInstance::TryFailover() -> void { auto CoordinatorInstance::TryFailover() -> void {
if (!is_running_) {
return;
}
auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); }; auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); };
auto alive_replicas = auto alive_replicas =
@ -219,9 +237,12 @@ auto CoordinatorInstance::TryFailover() -> void {
spdlog::warn("Failover failed since promoting replica to main failed!"); spdlog::warn("Failover failed since promoting replica to main failed!");
return; return;
} }
spdlog::info("Promote to main done, trying to append update uuid"); spdlog::info("Promote to main done, trying to append update uuid, thread id {}, is leader {}",
std::this_thread::get_id(), raft_state_.IsLeader());
// TODO (antoniofilipovic) : This can fail and we don't know we change uuid // TODO (antoniofilipovic) : This can fail and we don't know we change uuid
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
spdlog::trace("Append entry update uuid failed, thread id {}, is leader {}", std::this_thread::get_id(),
raft_state_.IsLeader());
return; return;
} }
@ -229,6 +250,8 @@ auto CoordinatorInstance::TryFailover() -> void {
spdlog::info("Promote to main done, trying to set instance as main log"); spdlog::info("Promote to main done, trying to set instance as main log");
// TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN // TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
spdlog::trace("Append set instance as main failed, thread id {}, is leader {}", std::this_thread::get_id(),
raft_state_.IsLeader());
return; return;
} }
@ -244,6 +267,10 @@ auto CoordinatorInstance::TryFailover() -> void {
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name) auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
-> SetInstanceToMainCoordinatorStatus { -> SetInstanceToMainCoordinatorStatus {
if (!is_running_) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_}; auto lock = std::lock_guard{coord_instance_lock_};
if (raft_state_.MainExists()) { if (raft_state_.MainExists()) {
@ -307,6 +334,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config) auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus { -> RegisterInstanceCoordinatorStatus {
if (!is_running_) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_}; auto lock = std::lock_guard{coord_instance_lock_};
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) { if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
@ -353,6 +384,10 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name) auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus { -> UnregisterInstanceCoordinatorStatus {
if (!is_running_) {
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_}; auto lock = std::lock_guard{coord_instance_lock_};
if (!raft_state_.RequestLeadership()) { if (!raft_state_.RequestLeadership()) {
@ -402,6 +437,9 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32
} }
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) { void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing main fail callback", repl_instance_name); spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);
repl_instance.OnFailPing(); repl_instance.OnFailPing();
@ -416,6 +454,9 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name)
} }
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) { void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing main successful callback", repl_instance_name); spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);
@ -464,6 +505,9 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
} }
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) { void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);
@ -484,6 +528,9 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
} }
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) { void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);

View File

@ -59,7 +59,8 @@ auto RaftState::InitRaftServer() -> void {
raft_server::init_options init_opts; raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
spdlog::info("Received some message, my id {}, leader id {}", param->myId, param->leaderId); spdlog::info("Received some message, my id {}, leader id {}, event_type {}, thread_id {}", param->myId,
param->leaderId, event_type, std::this_thread::get_id());
if (event_type == cb_func::BecomeLeader) { if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId); spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_(); become_leader_cb_();