fix up coordinator instance

This commit is contained in:
antoniofilipovic 2024-03-22 09:49:30 +01:00
parent 9b37ba0ff6
commit 32bb99d919
4 changed files with 36 additions and 18 deletions

View File

@ -70,10 +70,6 @@ void CoordinatorClient::StartFrequentCheck() {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
// Subtle race condition:
// acquiring of lock needs to happen before function call, as function callback can be changed
// for instance after lock is already acquired
// (failover case when instance is promoted to MAIN)
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_instance_, instance_name);

View File

@ -33,7 +33,7 @@ CoordinatorInstance::CoordinatorInstance()
: thread_pool_{1},
raft_state_(RaftState::MakeRaftState(
[this]() {
if (!raft_state_.IsHealthy()) {
if (raft_state_.IsLockOpened()) {
spdlog::error("Leader hasn't encountered healthy state, doing force reset of cluster.");
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
return;
@ -166,12 +166,16 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
}
void CoordinatorInstance::ForceResetCluster() {
// Force reset tries to return cluster to state in which we have all the replicas we had before
// and try to do failover to new MAIN. Only then is force reset successful
spdlog::info("Force resetting cluster!");
is_shutting_down_ = true;
auto lock = std::unique_lock{coord_instance_lock_};
// Ordering is important here, we must stop frequent check before
// taking lock to avoid deadlock between us stopping thread and thread wanting to take lock but can't because
// we have it
std::ranges::for_each(repl_instances_, [](auto &repl_instance) { repl_instance.StopFrequentCheck(); });
auto lock = std::unique_lock{coord_instance_lock_};
repl_instances_.clear();
is_shutting_down_ = false;
spdlog::info("Stopped all replication instance frequent checks.");
if (!raft_state_.IsLeader()) {
@ -179,20 +183,24 @@ void CoordinatorInstance::ForceResetCluster() {
return;
}
// TODO change on scope exit to add task for force reset
if (!raft_state_.AppendOpenLockForceReset()) {
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower");
spdlog::trace("Appending log force reset failed, aborting force reset");
}
utils::OnScopeExit do_another_reset{[this]() {
if (raft_state_.IsLockOpened()) {
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
}
}};
// TODO(antoniofilipovic): Additional action needed only in case of unregister to register it again
if (!raft_state_.AppendOpenLockForceReset()) {
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower, something is wrong.");
spdlog::trace("Appending log force reset failed, aborting force reset");
return;
}
auto const instances = raft_state_.GetReplicationInstances();
// Try contact each instance 5 times
// and set only alive
// to REPLICA with new MAIN UUID
// To each instance we send RPC
// If RPC fails we consider instance dead
// Otherwise we consider instance alive
// If at any point later RPC fails for alive instance, we consider this failure
std::ranges::for_each(instances, [this](auto &replica) {
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
@ -212,6 +220,16 @@ void CoordinatorInstance::ForceResetCluster() {
}
return false;
};
for (auto const &repl_instance : repl_instances_ | ranges::views::filter([&new_uuid](ReplicationInstance &instance) {
instance.OnSuccessPing();
return instance.SendSwapAndUpdateUUID(new_uuid);
})) {
if (!raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance.InstanceName(), new_uuid)) {
return;
}
}
if (std::ranges::any_of(repl_instances_, append_log_update_uuid_failed)) {
spdlog::error("Force reset failed since update log or swap uuid failed, assuming coordinator is now follower.");
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower");

View File

@ -85,6 +85,9 @@ class CoordinatorClient {
CoordinatorToReplicaConfig config_;
CoordinatorInstance *coord_instance_;
// The reason why we have HealthCheckClientCallback is because we need to acquire lock
// before we do correct function call (main or replica), as otherwise we can enter REPLICA callback
// but right before instance was promoted to MAIN
HealthCheckClientCallback succ_cb_;
HealthCheckClientCallback fail_cb_;
};

View File

@ -81,6 +81,7 @@ class CoordinatorInstance {
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability.
// TODO(antoniofilipovic) do we still rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{};