From c7035f9c8b47613a27e9ce5bc6bdaf01b6f73a6a Mon Sep 17 00:00:00 2001 From: Andi Skrgat Date: Fri, 23 Feb 2024 08:43:21 +0100 Subject: [PATCH] registration backed-up by raft --- .../coordinator_cluster_state.cpp | 17 +++ src/coordination/coordinator_instance.cpp | 124 ++++++++++++++++++ .../coordinator_state_machine.cpp | 1 - .../coordination/coordinator_exceptions.hpp | 11 ++ .../coordination/coordinator_instance.hpp | 3 +- .../include/coordination/raft_state.hpp | 5 +- .../nuraft/coordinator_cluster_state.hpp | 2 + .../include/nuraft/raft_log_action.hpp | 19 +++ src/coordination/raft_state.cpp | 4 + 9 files changed, 183 insertions(+), 3 deletions(-) diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 02c880aed..3a5bb13a4 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -89,5 +89,22 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta return cluster_state; } +auto CoordinatorClusterState::GetInstances() const -> std::vector> { + auto const role_to_string = [](auto const &role) -> std::string { + switch (role) { + case replication_coordination_glue::ReplicationRole::MAIN: + return "main"; + case replication_coordination_glue::ReplicationRole::REPLICA: + return "replica"; + } + }; + + auto const entry_to_pair = [&role_to_string](auto const &entry) { + return std::make_pair(entry.first, role_to_string(entry.second)); + }; + + return instance_roles | ranges::views::transform(entry_to_pair) | ranges::to(); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index e6e0bff9a..b9e514396 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -32,6 +32,7 @@ CoordinatorInstance::CoordinatorInstance() : raft_state_(RaftState::MakeRaftState( [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { +<<<<<<< HEAD <<<<<<< HEAD client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { auto lock = std::unique_lock{self->coord_instance_lock_}; @@ -177,6 +178,129 @@ CoordinatorInstance::CoordinatorInstance() self->TryFailover(); } }; +||||||| parent of 99c53148c (registration backed-up by raft) + auto find_repl_instance = [](CoordinatorInstance *self, + std::string_view repl_instance_name) -> ReplicationInstance & { + auto repl_instance = + std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == repl_instance_name; + }); + + MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!", + repl_instance_name); + return *repl_instance; + }; + + replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); + auto &repl_instance = find_repl_instance(self, repl_instance_name); + + // We need to get replicas UUID from time to time to ensure replica is listening to correct main + // and that it didn't go down for less time than we could notice + // We need to get id of main replica is listening to + // and swap if necessary + if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->main_uuid_)) { + spdlog::error("Failed to swap uuid for alive replica instance {}.", repl_instance.InstanceName()); + return; + } + + repl_instance.OnSuccessPing(); + }; + + replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); + auto &repl_instance = find_repl_instance(self, repl_instance_name); + repl_instance.OnFailPing(); + // We need to restart main uuid from instance since it was "down" at least a second + // There is slight delay, if we choose to use isAlive, instance can be down and back up in less than + // our isAlive time difference, which would lead to instance + // https://github.com/memgraph/memgraph/pull/1720#discussion_r1493833414 setting UUID to nullopt and stopping + // accepting any incoming RPCs from valid main + // TODO(antoniofilipovic) this needs here more complex logic + // We need to get id of main replica is listening to on successful ping + // and swap it to correct uuid if it failed + repl_instance.ResetMainUUID(); + }; + + main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing main successful callback", repl_instance_name); + + auto &repl_instance = find_repl_instance(self, repl_instance_name); + + if (repl_instance.IsAlive()) { + repl_instance.OnSuccessPing(); + return; + } + + const auto &repl_instance_uuid = repl_instance.GetMainUUID(); + MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set."); + + if (self->main_uuid_ == repl_instance_uuid.value()) { + if (!repl_instance.EnableWritingOnMain()) { + spdlog::error("Failed to enable writing on main instance {}", repl_instance_name); + return; + } + + repl_instance.OnSuccessPing(); + return; + } + + if (!self->raft_state_.RequestLeadership()) { + spdlog::error("Failed to request leadership for demoting instance to replica {}.", repl_instance_name); + return; + } + + // TODO: (andi) Improve std::string, appending... + auto const res = self->raft_state_.AppendSetInstanceAsReplica(std::string(repl_instance_name)); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not the " + "leader.", + repl_instance_name); + return; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to demote instance {} with error code {}", repl_instance_name, res->get_result_code()); + return; + } + + spdlog::info("Request for demoting instance {} accepted", repl_instance_name); + + // TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but + // swapUUID can fail + if (!repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) { + spdlog::error("Instance {} failed to become replica", repl_instance_name); + return; + } + + if (!repl_instance.SendSwapAndUpdateUUID(self->main_uuid_)) { + spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName())); + return; + } + + repl_instance.OnSuccessPing(); + spdlog::info("Instance {} demoted to replica", repl_instance_name); + }; + + main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::lock_guard{self->coord_instance_lock_}; + spdlog::trace("Instance {} performing main failure callback", repl_instance_name); + auto &repl_instance = find_repl_instance(self, repl_instance_name); + repl_instance.OnFailPing(); + const auto &repl_instance_uuid = repl_instance.GetMainUUID(); + MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set"); + + if (!repl_instance.IsAlive() && self->main_uuid_ == repl_instance_uuid.value()) { + spdlog::info("Cluster without main instance, trying automatic failover"); + self->TryFailover(); + } + }; +======= +>>>>>>> 99c53148c (registration backed-up by raft) } auto CoordinatorInstance::ShowInstances() const -> std::vector { diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 76bd9b8c4..2e503fe74 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -49,7 +49,6 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair + explicit InvalidRaftLogActionException(fmt::format_string fmt, Args &&...args) noexcept + : InvalidRaftLogActionException(fmt::format(fmt, std::forward(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(InvalidRaftLogActionException) +}; + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index ee5a6fb6e..1b1413260 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -67,7 +67,8 @@ class CoordinatorInstance { private: HealthCheckClientCallback client_succ_cb_, client_fail_cb_; - // NOTE: Must be std::list because we rely on pointer stability + // NOTE: Only leader should have repl_instances_, not followers. + // NOTE: Must be std::list because we rely on pointer stability. std::list repl_instances_; mutable utils::ResourceLock coord_instance_lock_{}; diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index 4f0743191..dbe8761b2 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -63,13 +63,16 @@ class RaftState { auto IsMain(std::string const &instance_name) const -> bool; auto IsReplica(std::string const &instance_name) const -> bool; + /// TODO: (andi) Add log in the name of methods auto AppendRegisterReplicationInstance(std::string const &instance_name) -> ptr; auto AppendUnregisterReplicationInstance(std::string const &instance_name) -> ptr; auto AppendSetInstanceAsMain(std::string const &instance_name) -> ptr; auto AppendSetInstanceAsReplica(std::string const &instance_name) -> ptr; + auto GetInstances() const -> std::vector>; + private: - // TODO: (andi) I think variables below can be abstracted + // TODO: (andi) I think variables below can be abstracted/clean them. uint32_t raft_server_id_; uint32_t raft_port_; std::string raft_address_; diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index af657e571..5819a00fc 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -44,6 +44,8 @@ class CoordinatorClusterState { static auto Deserialize(buffer &data) -> CoordinatorClusterState; + auto GetInstances() const -> std::vector>; + private: std::map instance_roles; }; diff --git a/src/coordination/include/nuraft/raft_log_action.hpp b/src/coordination/include/nuraft/raft_log_action.hpp index f38e99538..4e8cca75c 100644 --- a/src/coordination/include/nuraft/raft_log_action.hpp +++ b/src/coordination/include/nuraft/raft_log_action.hpp @@ -13,7 +13,10 @@ #ifdef MG_ENTERPRISE +#include "coordination/coordinator_exceptions.hpp" + #include +#include namespace memgraph::coordination { @@ -24,5 +27,21 @@ enum class RaftLogAction : uint8_t { SET_INSTANCE_AS_REPLICA }; +inline auto ParseRaftLogAction(std::string const &action) -> RaftLogAction { + if (action == "register") { + return RaftLogAction::REGISTER_REPLICATION_INSTANCE; + } + if (action == "unregister") { + return RaftLogAction::UNREGISTER_REPLICATION_INSTANCE; + } + if (action == "set_main") { + return RaftLogAction::SET_INSTANCE_AS_MAIN; + } + if (action == "set_replica") { + return RaftLogAction::SET_INSTANCE_AS_REPLICA; + } + throw InvalidRaftLogActionException("Invalid Raft log action: " + action); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index 73a93e3b9..a3de6f928 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -156,5 +156,9 @@ auto RaftState::IsReplica(std::string const &instance_name) const -> bool { return state_machine_->IsReplica(instance_name); } +auto RaftState::GetInstances() const -> std::vector> { + return state_machine_->GetInstances(); +} + } // namespace memgraph::coordination #endif