Address PR comments
This commit is contained in:
parent
a3870300e4
commit
56da8dbb7d
@ -15,62 +15,67 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
using replication_coordination_glue::ReplicationRole;
|
||||
|
||||
auto CoordinatorClusterState::MainExists() const -> bool {
|
||||
return std::any_of(instance_roles.begin(), instance_roles.end(), [](auto const &entry) {
|
||||
return entry.second == replication_coordination_glue::ReplicationRole::MAIN;
|
||||
});
|
||||
return std::ranges::any_of(instance_roles, [](auto const &entry) { return entry.second == ReplicationRole::MAIN; });
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::IsMain(std::string const &instance_name) const -> bool {
|
||||
auto const it = instance_roles.find(instance_name);
|
||||
return it != instance_roles.end() && it->second == replication_coordination_glue::ReplicationRole::MAIN;
|
||||
return it != instance_roles.end() && it->second == ReplicationRole::MAIN;
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::IsReplica(std::string const &instance_name) const -> bool {
|
||||
auto const it = instance_roles.find(instance_name);
|
||||
return it != instance_roles.end() && it->second == replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
return it != instance_roles.end() && it->second == ReplicationRole::REPLICA;
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::InsertInstance(std::string const &instance_name,
|
||||
replication_coordination_glue::ReplicationRole role) -> void {
|
||||
auto CoordinatorClusterState::InsertInstance(std::string const &instance_name, ReplicationRole role) -> void {
|
||||
instance_roles[instance_name] = role;
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::DoAction(std::string const &instance_name, RaftLogAction log_action) -> void {
|
||||
switch (log_action) {
|
||||
case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
|
||||
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
case RaftLogAction::REGISTER_REPLICATION_INSTANCE: {
|
||||
instance_roles[instance_name] = ReplicationRole::REPLICA;
|
||||
spdlog::info("Instance {} registered", instance_name);
|
||||
break;
|
||||
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
|
||||
spdlog::info("Unregistering instance: {}", instance_name);
|
||||
}
|
||||
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: {
|
||||
instance_roles.erase(instance_name);
|
||||
spdlog::info("Instance {} unregistered", instance_name);
|
||||
break;
|
||||
case RaftLogAction::SET_INSTANCE_AS_MAIN:
|
||||
MG_ASSERT(instance_roles.find(instance_name) != instance_roles.end(),
|
||||
"Instance does not exist as part of raft state!");
|
||||
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::MAIN;
|
||||
}
|
||||
case RaftLogAction::SET_INSTANCE_AS_MAIN: {
|
||||
auto it = instance_roles.find(instance_name);
|
||||
MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!");
|
||||
it->second = ReplicationRole::MAIN;
|
||||
spdlog::info("Instance {} set as main", instance_name);
|
||||
break;
|
||||
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
|
||||
MG_ASSERT(instance_roles.find(instance_name) != instance_roles.end(),
|
||||
"Instance does not exist as part of raft state!");
|
||||
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
}
|
||||
case RaftLogAction::SET_INSTANCE_AS_REPLICA: {
|
||||
auto it = instance_roles.find(instance_name);
|
||||
MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!");
|
||||
it->second = ReplicationRole::REPLICA;
|
||||
spdlog::info("Instance {} set as replica", instance_name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
||||
auto const role_to_string = [](auto const &role) {
|
||||
auto const role_to_string = [](auto const &role) -> std::string_view {
|
||||
switch (role) {
|
||||
case replication_coordination_glue::ReplicationRole::MAIN:
|
||||
case ReplicationRole::MAIN:
|
||||
return "main";
|
||||
case replication_coordination_glue::ReplicationRole::REPLICA:
|
||||
case ReplicationRole::REPLICA:
|
||||
return "replica";
|
||||
}
|
||||
};
|
||||
|
||||
auto const entry_to_string = [&role_to_string](auto const &entry) {
|
||||
return entry.first + "_" + role_to_string(entry.second);
|
||||
return fmt::format("{}_{}", entry.first, role_to_string(entry.second));
|
||||
};
|
||||
|
||||
auto instances_str_view = instance_roles | ranges::views::transform(entry_to_string);
|
||||
@ -84,11 +89,11 @@ auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
||||
}
|
||||
|
||||
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
|
||||
auto const str_to_role = [](auto const &str) {
|
||||
auto const str_to_role = [](auto const &str) -> ReplicationRole {
|
||||
if (str == "main") {
|
||||
return replication_coordination_glue::ReplicationRole::MAIN;
|
||||
return ReplicationRole::MAIN;
|
||||
}
|
||||
return replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
return ReplicationRole::REPLICA;
|
||||
};
|
||||
|
||||
CoordinatorClusterState cluster_state;
|
||||
@ -106,9 +111,9 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta
|
||||
auto CoordinatorClusterState::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
|
||||
auto const role_to_string = [](auto const &role) -> std::string {
|
||||
switch (role) {
|
||||
case replication_coordination_glue::ReplicationRole::MAIN:
|
||||
case ReplicationRole::MAIN:
|
||||
return "main";
|
||||
case replication_coordination_glue::ReplicationRole::REPLICA:
|
||||
case ReplicationRole::REPLICA:
|
||||
return "replica";
|
||||
}
|
||||
};
|
||||
|
@ -14,8 +14,8 @@
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
|
||||
#include "coordination/coordinator_exceptions.hpp"
|
||||
#include "coordination/utils.hpp"
|
||||
#include "coordination/fmt.hpp"
|
||||
#include "coordination/utils.hpp"
|
||||
#include "dbms/constants.hpp"
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
#include "nuraft/coordinator_state_manager.hpp"
|
||||
@ -336,39 +336,58 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i
|
||||
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
auto const coord_instances = raft_state_.GetAllCoordinators();
|
||||
|
||||
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
|
||||
if (!instance.IsAlive()) return "unknown";
|
||||
if (instance.IsMain()) return "main";
|
||||
return "replica";
|
||||
};
|
||||
|
||||
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.InstanceName(),
|
||||
.coord_socket_address = instance.SocketAddress(),
|
||||
.cluster_role = stringify_repl_role(instance),
|
||||
.is_alive = instance.IsAlive()};
|
||||
};
|
||||
|
||||
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
|
||||
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
|
||||
.raft_socket_address = instance->get_endpoint(),
|
||||
.cluster_role = "coordinator",
|
||||
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
|
||||
// CoordinatorState to every instance, we can be smarter about this using our RPC.
|
||||
.health = "unknown"}; // TODO: (andi) Improve
|
||||
};
|
||||
|
||||
auto instances_status = utils::fmap(coord_instance_to_status, coord_instances);
|
||||
{
|
||||
auto lock = std::shared_lock{coord_instance_lock_};
|
||||
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
|
||||
|
||||
if (raft_state_.IsLeader()) {
|
||||
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
|
||||
if (!instance.IsAlive()) return "unknown";
|
||||
if (raft_state_.IsMain(instance.InstanceName())) return "main";
|
||||
return "replica";
|
||||
};
|
||||
|
||||
auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string {
|
||||
return instance.IsAlive() ? "up" : "down";
|
||||
};
|
||||
|
||||
auto process_repl_instance_as_leader =
|
||||
[&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.InstanceName(),
|
||||
.coord_socket_address = instance.SocketAddress(),
|
||||
.cluster_role = stringify_repl_role(instance),
|
||||
.health = stringify_repl_health(instance)};
|
||||
};
|
||||
{
|
||||
auto lock = std::shared_lock{coord_instance_lock_};
|
||||
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader);
|
||||
}
|
||||
} else {
|
||||
auto process_repl_instance_as_follower = [](auto const &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.first, .cluster_role = instance.second, .health = "unknown"};
|
||||
};
|
||||
{
|
||||
auto lock = std::shared_lock{coord_instance_lock_};
|
||||
std::ranges::transform(raft_state_.GetInstances(), std::back_inserter(instances_status),
|
||||
process_repl_instance_as_follower);
|
||||
}
|
||||
}
|
||||
|
||||
return instances_status;
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::TryFailover() -> void {
|
||||
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
|
||||
ranges::views::filter(&ReplicationInstance::IsAlive);
|
||||
auto const is_replica = [this](ReplicationInstance const &instance) {
|
||||
return raft_state_.IsReplica(instance.InstanceName());
|
||||
};
|
||||
|
||||
auto alive_replicas =
|
||||
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
|
||||
|
||||
if (ranges::empty(alive_replicas)) {
|
||||
spdlog::warn("Failover failed since all replicas are down!");
|
||||
@ -439,8 +458,9 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
most_up_to_date_instance =
|
||||
coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch, latest_commit_timestamp);
|
||||
}
|
||||
|
||||
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
|
||||
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
|
||||
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp); // NOLINT
|
||||
|
||||
auto &new_repl_instance = FindReplicationInstance(most_up_to_date_instance);
|
||||
auto *new_main = &new_repl_instance;
|
||||
@ -490,6 +510,28 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
auto *new_main = &new_repl_instance;
|
||||
>>>>>>> 9081c5c24 (Optional main on unregistering)
|
||||
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
spdlog::error("Failed to request leadership for promoting instance to main {}.", new_main->InstanceName());
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: (andi) Improve std::string, appending...
|
||||
auto const res = raft_state_.AppendSetInstanceAsMain(new_main->InstanceName());
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error(
|
||||
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not the "
|
||||
"leader.",
|
||||
new_main->InstanceName());
|
||||
return;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to promote instance {} with error code {}", new_main->InstanceName(), res->get_result_code());
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::info("Request for promoting instance {} accepted", new_main->InstanceName());
|
||||
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
@ -517,8 +559,7 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
spdlog::warn("Failover failed since promoting replica to main failed!");
|
||||
return;
|
||||
}
|
||||
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
|
||||
SetMainUUID(new_main_uuid);
|
||||
main_uuid_ = new_main_uuid;
|
||||
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
|
||||
}
|
||||
|
||||
@ -527,7 +568,7 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
|
||||
-> SetInstanceToMainCoordinatorStatus {
|
||||
auto lock = std::lock_guard{coord_instance_lock_};
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, &ReplicationInstance::IsMain)) {
|
||||
if (raft_state_.MainExists()) {
|
||||
return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS;
|
||||
}
|
||||
|
||||
@ -542,6 +583,24 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
|
||||
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||
}
|
||||
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
auto const res = raft_state_.AppendSetInstanceAsMain(instance_name);
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error(
|
||||
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not "
|
||||
"the leader.",
|
||||
instance_name);
|
||||
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
|
||||
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_APPEND;
|
||||
}
|
||||
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
@ -559,18 +618,16 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
|
||||
}
|
||||
}
|
||||
|
||||
ReplicationClientsInfo repl_clients_info;
|
||||
repl_clients_info.reserve(repl_instances_.size() - 1);
|
||||
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
|
||||
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
&CoordinatorInstance::MainFailCallback)) {
|
||||
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
|
||||
}
|
||||
|
||||
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
|
||||
SetMainUUID(new_main_uuid);
|
||||
main_uuid_ = new_main_uuid;
|
||||
spdlog::info("Instance {} promoted to main", instance_name);
|
||||
return SetInstanceToMainCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
@ -720,7 +777,8 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name,
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
|
||||
|
||||
if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) {
|
||||
// NOLINTNEXTLINE
|
||||
if (!repl_instance.IsAlive() && main_uuid_ == repl_instance_uuid.value()) {
|
||||
spdlog::info("Cluster without main instance, trying automatic failover");
|
||||
TryFailover();
|
||||
}
|
||||
@ -741,8 +799,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
|
||||
|
||||
auto const curr_main_uuid = GetMainUUID();
|
||||
if (curr_main_uuid == repl_instance_uuid.value()) {
|
||||
// NOLINTNEXTLINE
|
||||
if (main_uuid_ == repl_instance_uuid.value()) {
|
||||
if (!repl_instance.EnableWritingOnMain()) {
|
||||
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
|
||||
return;
|
||||
@ -752,6 +810,28 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
||||
return;
|
||||
}
|
||||
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
spdlog::error("Failed to request leadership for demoting instance to replica {}.", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: (andi) Improve std::string, appending...
|
||||
auto const res = raft_state_.AppendSetInstanceAsReplica(std::string(repl_instance_name));
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error(
|
||||
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not the "
|
||||
"leader.",
|
||||
repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to demote instance {} with error code {}", repl_instance_name, res->get_result_code());
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::info("Request for demoting instance {} accepted", repl_instance_name);
|
||||
|
||||
if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
|
||||
&CoordinatorInstance::ReplicaFailCallback)) {
|
||||
repl_instance.OnSuccessPing();
|
||||
@ -761,7 +841,7 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
||||
return;
|
||||
}
|
||||
|
||||
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
|
||||
if (!repl_instance.SendSwapAndUpdateUUID(main_uuid_)) {
|
||||
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
|
||||
return;
|
||||
}
|
||||
@ -771,7 +851,12 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
|
||||
std::unique_lock<utils::ResourceLock> lock) {
|
||||
MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock");
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
if (!repl_instance.IsReplica()) {
|
||||
|
||||
auto const is_replica = [this](ReplicationInstance const &instance) {
|
||||
return raft_state_.IsReplica(instance.InstanceName());
|
||||
};
|
||||
|
||||
if (!is_replica(repl_instance)) {
|
||||
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
@ -780,7 +865,7 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
|
||||
// and that it didn't go down for less time than we could notice
|
||||
// We need to get id of main replica is listening to
|
||||
// and swap if necessary
|
||||
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) {
|
||||
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(main_uuid_)) {
|
||||
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
|
||||
return;
|
||||
}
|
||||
@ -792,10 +877,15 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam
|
||||
std::unique_lock<utils::ResourceLock> lock) {
|
||||
MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock");
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
if (!repl_instance.IsReplica()) {
|
||||
auto const is_replica = [this](ReplicationInstance const &instance) {
|
||||
return raft_state_.IsReplica(instance.InstanceName());
|
||||
};
|
||||
|
||||
if (!is_replica(repl_instance)) {
|
||||
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
}
|
||||
@ -907,16 +997,40 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_nam
|
||||
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||
}
|
||||
|
||||
if (inst_to_remove->IsMain() && inst_to_remove->IsAlive()) {
|
||||
auto const is_main = [this](ReplicationInstance const &instance) {
|
||||
return raft_state_.IsMain(instance.InstanceName());
|
||||
};
|
||||
|
||||
// TODO: (andi) Better thing would be to have role UNKNOWN and then no need to check if it is alive. Or optional role.
|
||||
if (std::invoke(is_main, *inst_to_remove) && inst_to_remove->IsAlive()) {
|
||||
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
|
||||
}
|
||||
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
auto const res = raft_state_.AppendUnregisterReplicationInstance(instance_name);
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error(
|
||||
"Failed to accept request for unregistering instance {}. Most likely the reason is that the instance is not "
|
||||
"the leader.",
|
||||
instance_name);
|
||||
return UnregisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to unregister instance {} with error code {}", instance_name, res->get_result_code());
|
||||
return UnregisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
|
||||
}
|
||||
|
||||
inst_to_remove->StopFrequentCheck();
|
||||
auto curr_main = std::ranges::find_if(repl_instances_, &ReplicationInstance::IsMain);
|
||||
MG_ASSERT(curr_main != repl_instances_.end(), "There must be a main instance when unregistering a replica");
|
||||
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
|
||||
inst_to_remove->StartFrequentCheck();
|
||||
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
|
||||
if (auto curr_main = std::ranges::find_if(repl_instances_, is_main); curr_main != repl_instances_.end()) {
|
||||
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
|
||||
// TODO: (andi) Restore state in the RAFT log if needed.
|
||||
inst_to_remove->StartFrequentCheck();
|
||||
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
|
||||
}
|
||||
}
|
||||
std::erase_if(repl_instances_, name_matches);
|
||||
|
||||
@ -928,6 +1042,7 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32
|
||||
raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
|
||||
}
|
||||
|
||||
<<<<<<< HEAD
|
||||
<<<<<<< HEAD
|
||||
<<<<<<< HEAD
|
||||
auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; }
|
||||
@ -1019,5 +1134,13 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_
|
||||
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
|
||||
|
||||
>>>>>>> 9081c5c24 (Optional main on unregistering)
|
||||
||||||| parent of 1b150ee92 (Address PR comments)
|
||||
auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; }
|
||||
|
||||
// TODO: (andi) Add to the RAFT log.
|
||||
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
|
||||
|
||||
=======
|
||||
>>>>>>> 1b150ee92 (Address PR comments)
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -52,9 +52,6 @@ class ReplicationInstance {
|
||||
auto InstanceName() const -> std::string;
|
||||
auto SocketAddress() const -> std::string;
|
||||
|
||||
auto IsReplica() const -> bool;
|
||||
auto IsMain() const -> bool;
|
||||
|
||||
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
|
||||
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
|
||||
|
@ -27,7 +27,7 @@ enum class RaftLogAction : uint8_t {
|
||||
SET_INSTANCE_AS_REPLICA
|
||||
};
|
||||
|
||||
inline auto ParseRaftLogAction(std::string const &action) -> RaftLogAction {
|
||||
inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction {
|
||||
if (action == "register") {
|
||||
return RaftLogAction::REGISTER_REPLICATION_INSTANCE;
|
||||
}
|
||||
@ -40,7 +40,7 @@ inline auto ParseRaftLogAction(std::string const &action) -> RaftLogAction {
|
||||
if (action == "demote") {
|
||||
return RaftLogAction::SET_INSTANCE_AS_REPLICA;
|
||||
}
|
||||
throw InvalidRaftLogActionException("Invalid Raft log action: " + action);
|
||||
throw InvalidRaftLogActionException("Invalid Raft log action: {}.", action);
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -200,7 +200,7 @@ def test_not_replicate_old_main_register_new_cluster():
|
||||
return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;")))
|
||||
|
||||
expected_data_up_first_cluster = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", "up", "coordinator"),
|
||||
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||
("instance_2", "", "127.0.0.1:10012", "up", "main"),
|
||||
("shared_instance", "", "127.0.0.1:10011", "up", "replica"),
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user