Set instance to main shared with commit

This commit is contained in:
Andi Skrgat 2024-02-27 12:43:36 +01:00
parent 2069cfefbd
commit f0f7ed104d
3 changed files with 235 additions and 58 deletions

View File

@ -518,57 +518,58 @@ auto CoordinatorInstance::TryFailover() -> void {
auto *new_main = &new_repl_instance;
>>>>>>> 9081c5c24 (Optional main on unregistering)
if (!raft_state_.RequestLeadership()) {
spdlog::error("Failed to request leadership for promoting instance to main {}.", new_main->InstanceName());
return;
}
// if (!raft_state_.RequestLeadership()) {
// spdlog::error("Failed to request leadership for promoting instance to main {}.", new_main->InstanceName());
// return;
// }
// TODO: (andi) Improve std::string, appending...
auto const res = raft_state_.AppendSetInstanceAsMain(new_main->InstanceName());
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not the "
"leader.",
new_main->InstanceName());
return;
}
// // TODO: (andi) Improve std::string, appending...
// auto const res = raft_state_.AppendSetInstanceAsMain(new_main->InstanceName());
// if (!res->get_accepted()) {
// spdlog::error(
// "Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not the "
// "leader.",
// new_main->InstanceName());
// return;
// }
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", new_main->InstanceName(), res->get_result_code());
return;
}
// if (res->get_result_code() != nuraft::cmd_result_code::OK) {
// spdlog::error("Failed to promote instance {} with error code {}", new_main->InstanceName(),
// res->get_result_code()); return;
// }
spdlog::info("Request for promoting instance {} accepted", new_main->InstanceName());
// spdlog::info("Request for promoting instance {} accepted", new_main->InstanceName());
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
// new_main->PauseFrequentCheck();
// utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
return instance.InstanceName() != new_main->InstanceName();
};
// auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
// return instance.InstanceName() != new_main->InstanceName();
// };
auto const new_main_uuid = utils::UUID{};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_new_main)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
}
// auto const new_main_uuid = utils::UUID{};
// // If for some replicas swap fails, for others on successful ping we will revert back on next change
// // or we will do failover first again and then it will be consistent again
// for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_new_main)) {
// if (!other_replica_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
// spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
// other_replica_instance.InstanceName()));
// return;
// }
// }
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
// auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
// ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
// ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
main_uuid_ = new_main_uuid;
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
// if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info),
// &CoordinatorInstance::MainSuccessCallback,
// &CoordinatorInstance::MainFailCallback)) {
// spdlog::warn("Failover failed since promoting replica to main failed!");
// return;
// }
// main_uuid_ = new_main_uuid;
// spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
}
<<<<<<< HEAD
@ -1025,6 +1026,7 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam
repl_instance.OnFailPing();
}
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> b1af5ceeb (Move ReplRole to ClusterState)
@ -1191,6 +1193,78 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
||||||| parent of ecf7e6839 (Set instance to main shared with commit)
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name)
-> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
if (raft_state_.MainExists()) {
return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS;
}
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
if (!raft_state_.RequestLeadership()) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
}
auto const res = raft_state_.AppendSetInstanceAsMain(instance_name);
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_APPEND;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto const new_main_uuid = utils::UUID{};
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
}
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
main_uuid_ = new_main_uuid;
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
=======
>>>>>>> ecf7e6839 (Set instance to main shared with commit)
// TODO: (andi) Status of registration, maybe not all needed.
// Incorporate checking of replication socket address.
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config)
@ -1272,6 +1346,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_nam
instance_name);
return UnregisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
spdlog::info("Request for unregistering instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to unregister instance {} with error code {}", instance_name, res->get_result_code());
@ -1290,6 +1365,7 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; }
// TODO: (andi) Add to the RAFT log.
@ -1389,6 +1465,53 @@ auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_
>>>>>>> 1b150ee92 (Address PR comments)
||||||| parent of fab8d3d76 (Shared (Un)Registration networking part with raft)
=======
||||||| parent of ecf7e6839 (Set instance to main shared with commit)
=======
// TODO: (andi) Make sure you cannot put coordinator instance to the main
// change arg types
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name)
-> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
if (raft_state_.MainExists()) {
return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS;
}
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
if (!raft_state_.RequestLeadership()) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
}
auto const res = raft_state_.AppendSetInstanceAsMain(instance_name);
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
spdlog::info("Request for promoting instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_APPEND;
}
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
>>>>>>> ecf7e6839 (Set instance to main shared with commit)
auto CoordinatorInstance::OnRaftCommitCallback(TRaftLog const &log_entry, RaftLogAction log_action) -> void {
// TODO: (andi) Solve it locking scheme and write comment.
switch (log_action) {
@ -1416,6 +1539,7 @@ auto CoordinatorInstance::OnRaftCommitCallback(TRaftLog const &log_entry, RaftLo
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: {
auto const instance_name = std::get<std::string>(log_entry);
if (raft_state_.IsLeader()) {
auto &inst_to_remove = FindReplicationInstance(instance_name);
inst_to_remove.StopFrequentCheck();
@ -1429,6 +1553,7 @@ auto CoordinatorInstance::OnRaftCommitCallback(TRaftLog const &log_entry, RaftLo
inst_to_remove.StartFrequentCheck();
}
}
}
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
@ -1440,6 +1565,47 @@ auto CoordinatorInstance::OnRaftCommitCallback(TRaftLog const &log_entry, RaftLo
break;
}
case RaftLogAction::SET_INSTANCE_AS_MAIN: {
auto const instance_name = std::get<std::string>(log_entry);
auto &new_main = FindReplicationInstance(instance_name);
new_main.PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main.ResumeFrequentCheck(); }};
auto const new_main_uuid = utils::UUID{};
if (raft_state_.IsLeader()) {
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto alive_instances = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsAlive) |
ranges::views::filter(is_not_new_main);
if (std::ranges::any_of(alive_instances, [&new_main_uuid](ReplicationInstance &instance) {
return !instance.SendSwapAndUpdateUUID(new_main_uuid);
})) {
spdlog::error("Failed to swap uuid for instance {}.", new_main.InstanceName());
// TODO: (andi) What to do on network failure when appended to raft log?
}
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main.PromoteToMainAsLeader(new_main_uuid, std::move(repl_clients_info),
&CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
spdlog::error("Failed to promote instance {} to main", instance_name);
// TODO: (andi) What to do on network failure when appended to raft log?
}
} else {
new_main.PromoteToMainAsFollower(new_main_uuid, &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback);
}
main_uuid_ = new_main_uuid;
spdlog::info("Instance {} promoted to main", instance_name);
break;
}
case RaftLogAction::SET_INSTANCE_AS_REPLICA: {

View File

@ -53,12 +53,16 @@ class ReplicationInstance {
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
auto PromoteToMainAsLeader(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb)
-> bool;
auto PromoteToMainAsFollower(utils::UUID uuid, HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> void;
auto SendDemoteToReplicaRpc() -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;
auto SendDemoteToReplicaRpc() -> bool;
auto StartFrequentCheck() -> void;
auto StopFrequentCheck() -> void;

View File

@ -48,7 +48,7 @@ auto ReplicationInstance::InstanceName() const -> std::string { return client_.I
auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
auto ReplicationInstance::PromoteToMainAsLeader(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
@ -62,6 +62,13 @@ auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClients
return true;
}
auto ReplicationInstance::PromoteToMainAsFollower(utils::UUID new_uuid, HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> void {
main_uuid_ = new_uuid;
succ_cb_ = main_succ_cb;
fail_cb_ = main_fail_cb;
}
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,