add impl for force sync

This commit is contained in:
antoniofilipovic 2024-03-22 17:10:27 +01:00
parent 3cc2dfafc4
commit 71f0f4a4b1
4 changed files with 130 additions and 67 deletions

View File

@ -72,7 +72,11 @@ CoordinatorInstance::CoordinatorInstance()
repl_instance.StopFrequentCheck();
}
auto lock = std::unique_lock{coord_instance_lock_};
std::ranges::for_each(repl_instances_, [](auto &repl_instance) { repl_instance.StopFrequentCheck(); });
std::ranges::for_each(repl_instances_, [](auto &repl_instance) {
spdlog::trace("Stopping frequent checks for instance {}", repl_instance.InstanceName());
repl_instance.StopFrequentCheck();
spdlog::trace("Stopped frequent checks for instance {}", repl_instance.InstanceName());
});
repl_instances_.clear();
spdlog::info("Stopped all replication instance frequent checks.");
});
@ -167,11 +171,24 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
void CoordinatorInstance::ForceResetCluster() {
// Force reset tries to return cluster to state in which we have all the replicas we had before
// and try to do failover to new MAIN. Only then is force reset successful
// 0. Open lock
// 1. Try to demote each instance to replica
// 2. Instances which are demoted proceed in next step as part of selection process
// 3. For selected instances try to send SWAP UUID and update log -> both must succeed
// 4. Do failover
// 5. For instances which were down set correct callback as before
// 6. After instance get's back up, do steps needed to recover
spdlog::info("Force resetting cluster!");
// Ordering is important here, we must stop frequent check before
// taking lock to avoid deadlock between us stopping thread and thread wanting to take lock but can't because
// we have it
std::ranges::for_each(repl_instances_, [](auto &repl_instance) { repl_instance.StopFrequentCheck(); });
std::ranges::for_each(repl_instances_, [](auto &repl_instance) {
spdlog::trace("Stopping frequent check for instance {}", repl_instance.InstanceName());
repl_instance.StopFrequentCheck();
spdlog::trace("Stopped frequent check for instance {}", repl_instance.InstanceName());
});
auto lock = std::unique_lock{coord_instance_lock_};
repl_instances_.clear();
@ -209,23 +226,34 @@ void CoordinatorInstance::ForceResetCluster() {
&CoordinatorInstance::ReplicaFailCallback);
});
auto instances_mapped_to_resp =
repl_instances_ | ranges::views::transform([](ReplicationInstance &instance) {
return std::pair{instance.InstanceName(), instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)};
}) |
ranges::to<std::unordered_map<std::string, bool>>();
auto alive_instances =
repl_instances_ | ranges::views::filter([&instances_mapped_to_resp](ReplicationInstance &instance) {
return instances_mapped_to_resp[instance.InstanceName()];
});
if (std::ranges::any_of(alive_instances, [this](ReplicationInstance &instance) {
return !raft_state_.AppendSetInstanceAsReplicaLog(instance.InstanceName());
})) {
spdlog::error("Failed to send log instance demoted to replica.");
return;
}
auto const new_uuid = utils::UUID{};
auto const update_on_success_ping_on_swap_uuid = [&new_uuid](ReplicationInstance &instance) {
if (!instance.SendSwapAndUpdateUUID(new_uuid)) {
return;
}
instance.OnSuccessPing();
};
std::ranges::for_each(repl_instances_, update_on_success_ping_on_swap_uuid);
auto const is_swap_uuid_success = [](ReplicationInstance &instance) { return instance.IsAlive(); };
auto instances_with_swapped_uuid = repl_instances_ | ranges::views::filter(is_swap_uuid_success);
auto update_uuid_failed = [&new_uuid, this](auto &repl_instance) {
if (!repl_instance.SendSwapAndUpdateUUID(new_uuid)) {
return true;
}
return !raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance.InstanceName(), new_uuid);
};
if (std::ranges::any_of(instances_with_swapped_uuid, update_uuid_failed)) {
if (std::ranges::any_of(alive_instances, update_uuid_failed)) {
spdlog::error("Force reset failed since update log swap uuid failed, assuming coordinator is now follower.");
return;
}
@ -234,34 +262,19 @@ void CoordinatorInstance::ForceResetCluster() {
spdlog::error("Update log for new MAIN failed, assuming coordinator is now follower");
return;
}
// TODO(antoniofilipovic) Think how we can isolate this part in common function
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
auto maybe_instance_db_histories =
instances_with_swapped_uuid | ranges::views::transform(get_ts) | ranges::to<std::vector>();
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_instances);
auto const ts_has_value = [](auto const &zipped) -> bool {
auto &[replica, res] = zipped;
return res.HasValue();
};
if (maybe_most_up_to_date_instance->empty()) {
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
}
auto transform_to_pairs = ranges::views::transform([](auto const &zipped) {
auto &[replica, res] = zipped;
return std::make_pair(replica.InstanceName(), res.GetValue());
});
auto instance_db_histories = ranges::views::zip(instances_with_swapped_uuid, maybe_instance_db_histories) |
ranges::views::filter(ts_has_value) | transform_to_pairs | ranges::to<std::vector>();
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_db_histories);
auto &new_main = FindReplicationInstance(most_up_to_date_instance);
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
auto const is_not_new_main = [&new_main](ReplicationInstance const &repl_instance) {
return repl_instance.InstanceName() != new_main.InstanceName();
};
auto repl_clients_info = instances_with_swapped_uuid | ranges::views::filter(is_not_new_main) |
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
@ -272,11 +285,36 @@ void CoordinatorInstance::ForceResetCluster() {
}
// This will set cluster in healthy state again
if (!raft_state_.AppendSetInstanceAsMainLog(most_up_to_date_instance, new_uuid)) {
if (!raft_state_.AppendSetInstanceAsMainLog(*maybe_most_up_to_date_instance, new_uuid)) {
spdlog::error("Update log for new MAIN failed");
return;
}
// Go through instances which were down and update callbacks
std::ranges::for_each(repl_instances_, [&instances_mapped_to_resp, this](ReplicationInstance &repl_instance) {
if (instances_mapped_to_resp[repl_instance.InstanceName()]) {
if (raft_state_.HasReplicaState(repl_instance.InstanceName())) {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::ReplicaSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::ReplicaFailCallback,
"Callbacks are wrong");
}
if (raft_state_.HasMainState(repl_instance.InstanceName())) {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::MainSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::MainFailCallback);
}
} else {
if (raft_state_.HasReplicaState(repl_instance.InstanceName())) {
repl_instance.SetCallbacks(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
} else {
repl_instance.SetCallbacks(&CoordinatorInstance::MainSuccessCallback, &CoordinatorInstance::MainFailCallback);
}
}
});
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
MG_ASSERT(!raft_state_.IsLockOpened(), "After force reset we need to be in healthy state.");
}
@ -293,32 +331,13 @@ auto CoordinatorInstance::TryFailover() -> void {
return;
}
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
auto maybe_most_up_to_date_instance = GetMostUpToDateInstanceFromHistories(alive_replicas);
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
auto const ts_has_error = [](auto const &res) -> bool { return res.HasError(); };
if (std::ranges::any_of(maybe_instance_db_histories, ts_has_error)) {
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
return;
if (maybe_most_up_to_date_instance->empty()) {
spdlog::error("Couldn't choose instance for failover, check logs for more details.");
}
auto transform_to_pairs = ranges::views::transform([](auto const &zipped) {
auto &[replica, res] = zipped;
return std::make_pair(replica.InstanceName(), res.GetValue());
});
auto instance_db_histories =
ranges::views::zip(alive_replicas, maybe_instance_db_histories) | transform_to_pairs | ranges::to<std::vector>();
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_db_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, latest_epoch, latest_commit_timestamp); // NOLINT
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
auto &new_main = FindReplicationInstance(*maybe_most_up_to_date_instance);
if (!raft_state_.AppendOpenLock()) {
spdlog::error("Aborting failover as instance is not anymore leader.");
@ -331,11 +350,12 @@ auto CoordinatorInstance::TryFailover() -> void {
}
}};
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
// We don't need to stop frequent check as we have lock, and we will swap callback function during locked phase
// In frequent check only when we take lock we then check which function (MAIN/REPLICA) success or fail callback
// we need to call
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
return instance.InstanceName() != new_main->InstanceName();
return instance.InstanceName() != new_main.InstanceName();
};
auto const new_main_uuid = utils::UUID{};
@ -356,8 +376,8 @@ auto CoordinatorInstance::TryFailover() -> void {
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
if (!new_main.PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
@ -366,17 +386,17 @@ auto CoordinatorInstance::TryFailover() -> void {
return;
}
auto const new_main_instance_name = new_main->InstanceName();
auto const new_main_instance_name = new_main.InstanceName();
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name, new_main_uuid)) {
return;
}
if (!new_main->EnableWritingOnMain()) {
if (!new_main.EnableWritingOnMain()) {
spdlog::error("Failover successful but couldn't enable writing on instance.");
}
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
spdlog::info("Failover successful! Instance {} promoted to main.", new_main.InstanceName());
}
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
@ -506,6 +526,7 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
&CoordinatorInstance::ReplicaFailCallback);
if (!new_instance->SendDemoteToReplicaRpc()) {
// TODO(antoniofilipovic) We don't need to do here force reset, only close lock later on
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}

View File

@ -67,6 +67,41 @@ class CoordinatorInstance {
auto HasReplicaState(std::string_view instance_name) const -> bool;
private:
template <ranges::forward_range R>
auto GetMostUpToDateInstanceFromHistories(R &&alive_instances) -> std::optional<std::string> {
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
auto maybe_instance_db_histories = alive_instances | ranges::views::transform(get_ts) | ranges::to<std::vector>();
auto const ts_has_error = [](auto const &res) -> bool { return res.HasError(); };
if (std::ranges::any_of(maybe_instance_db_histories, ts_has_error)) {
spdlog::error("At least one instance which was alive didn't provide per database history.");
return std::nullopt;
}
auto const ts_has_value = [](auto const &zipped) -> bool {
auto &[replica, res] = zipped;
return res.HasValue();
};
auto transform_to_pairs = ranges::views::transform([](auto const &zipped) {
auto &[replica, res] = zipped;
return std::make_pair(replica.InstanceName(), res.GetValue());
});
auto instance_db_histories = ranges::views::zip(alive_instances, maybe_instance_db_histories) |
ranges::views::filter(ts_has_value) | transform_to_pairs | ranges::to<std::vector>();
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_db_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, latest_epoch, latest_commit_timestamp); // NOLINT
return most_up_to_date_instance;
}
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
void MainFailCallback(std::string_view);

View File

@ -82,6 +82,8 @@ class ReplicationInstance {
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
auto GetFailCallback() -> HealthCheckInstanceCallback &;
void SetCallbacks(HealthCheckInstanceCallback succ_cb, HealthCheckInstanceCallback fail_cb);
private:
CoordinatorClient client_;
std::chrono::system_clock::time_point last_response_time_{};

View File

@ -128,5 +128,10 @@ auto ReplicationInstance::SendGetInstanceUUID()
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
void ReplicationInstance::SetCallbacks(HealthCheckInstanceCallback succ_cb, HealthCheckInstanceCallback fail_cb) {
succ_cb_ = succ_cb;
fail_cb_ = fail_cb;
}
} // namespace memgraph::coordination
#endif