diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 197164886..cb3070c45 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -12,59 +12,92 @@ #ifdef MG_ENTERPRISE #include "nuraft/coordinator_cluster_state.hpp" +#include "utils/logging.hpp" + +#include namespace memgraph::coordination { using replication_coordination_glue::ReplicationRole; +CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other) + : instance_roles_{other.instance_roles_} {} + +CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) { + if (this == &other) { + return *this; + } + instance_roles_ = other.instance_roles_; + return *this; +} + +CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept + : instance_roles_{std::move(other.instance_roles_)} {} + +CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept { + if (this == &other) { + return *this; + } + instance_roles_ = std::move(other.instance_roles_); + return *this; +} + auto CoordinatorClusterState::MainExists() const -> bool { - return std::ranges::any_of(instance_roles, [](auto const &entry) { return entry.second == ReplicationRole::MAIN; }); + auto lock = std::shared_lock{log_lock_}; + return std::ranges::any_of(instance_roles_, + [](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; }); } auto CoordinatorClusterState::IsMain(std::string_view instance_name) const -> bool { - auto const it = instance_roles.find(instance_name); - return it != instance_roles.end() && it->second == ReplicationRole::MAIN; + auto lock = std::shared_lock{log_lock_}; + auto const it = instance_roles_.find(instance_name); + return it != instance_roles_.end() && it->second.role == ReplicationRole::MAIN; } auto CoordinatorClusterState::IsReplica(std::string_view instance_name) const -> bool { - auto const it = instance_roles.find(instance_name); - return it != instance_roles.end() && it->second == ReplicationRole::REPLICA; + auto lock = std::shared_lock{log_lock_}; + auto const it = instance_roles_.find(instance_name); + return it != instance_roles_.end() && it->second.role == ReplicationRole::REPLICA; } auto CoordinatorClusterState::InsertInstance(std::string_view instance_name, ReplicationRole role) -> void { - instance_roles[instance_name.data()] = role; + auto lock = std::unique_lock{log_lock_}; + instance_roles_[instance_name.data()].role = role; } auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void { + auto lock = std::unique_lock{log_lock_}; switch (log_action) { case RaftLogAction::REGISTER_REPLICATION_INSTANCE: { - auto const instance_name = std::get(log_entry).instance_name; - instance_roles[instance_name] = ReplicationRole::REPLICA; + auto const &config = std::get(log_entry); + instance_roles_[config.instance_name] = InstanceState{config, ReplicationRole::REPLICA}; break; } case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: { auto const instance_name = std::get(log_entry); - instance_roles.erase(instance_name); + instance_roles_.erase(instance_name); break; } case RaftLogAction::SET_INSTANCE_AS_MAIN: { auto const instance_name = std::get(log_entry); - auto it = instance_roles.find(instance_name); - MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!"); - it->second = ReplicationRole::MAIN; + auto it = instance_roles_.find(instance_name); + MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!"); + it->second.role = ReplicationRole::MAIN; break; } case RaftLogAction::SET_INSTANCE_AS_REPLICA: { auto const instance_name = std::get(log_entry); - auto it = instance_roles.find(instance_name); - MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!"); - it->second = ReplicationRole::REPLICA; + auto it = instance_roles_.find(instance_name); + MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!"); + it->second.role = ReplicationRole::REPLICA; break; } } } +// TODO: (andi) Improve based on Gareth's comments auto CoordinatorClusterState::Serialize(ptr &data) -> void { + auto lock = std::shared_lock{log_lock_}; auto const role_to_string = [](auto const &role) -> std::string_view { switch (role) { case ReplicationRole::MAIN: @@ -75,10 +108,10 @@ auto CoordinatorClusterState::Serialize(ptr &data) -> void { }; auto const entry_to_string = [&role_to_string](auto const &entry) { - return fmt::format("{}_{}", entry.first, role_to_string(entry.second)); + return fmt::format("{}_{}", entry.first, role_to_string(entry.second.role)); }; - auto instances_str_view = instance_roles | ranges::views::transform(entry_to_string); + auto instances_str_view = instance_roles_ | ranges::views::transform(entry_to_string); uint32_t size = std::accumulate(instances_str_view.begin(), instances_str_view.end(), 0, [](uint32_t acc, auto const &entry) { return acc + sizeof(uint32_t) + entry.size(); }); @@ -109,6 +142,8 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta } auto CoordinatorClusterState::GetInstances() const -> std::vector> { + auto lock = std::shared_lock{log_lock_}; + // TODO: (andi) Abstract auto const role_to_string = [](auto const &role) -> std::string { switch (role) { case ReplicationRole::MAIN: @@ -119,10 +154,27 @@ auto CoordinatorClusterState::GetInstances() const -> std::vector(); + return instance_roles_ | ranges::views::transform(entry_to_pair) | ranges::to(); +} + +auto CoordinatorClusterState::GetClientConfigs() const -> std::vector { + auto lock = std::shared_lock{log_lock_}; + return instance_roles_ | ranges::views::values | + ranges::views::transform([](auto const &instance_state) { return instance_state.config; }) | + ranges::to(); +} + +auto CoordinatorClusterState::FindCurrentMainInstanceName() const -> std::optional { + auto lock = std::shared_lock{log_lock_}; + auto const it = std::ranges::find_if(instance_roles_, + [](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; }); + if (it == instance_roles_.end()) { + return {}; + } + return it->first; } } // namespace memgraph::coordination diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index a8a5d0c59..ffd330816 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -562,3 +562,4 @@ auto CoordinatorInstance::IsReplica(std::string_view instance_name) const -> boo } // namespace memgraph::coordination #endif + diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index cc27dd6c7..4eaf41107 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -12,9 +12,14 @@ #ifdef MG_ENTERPRISE #include "nuraft/coordinator_state_machine.hpp" +#include "utils/logging.hpp" namespace memgraph::coordination { +auto CoordinatorStateMachine::FindCurrentMainInstanceName() const -> std::optional { + return cluster_state_.FindCurrentMainInstanceName(); +} + auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); } auto CoordinatorStateMachine::IsMain(std::string_view instance_name) const -> bool { @@ -78,7 +83,6 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair ptr { return nullptr; } auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr { - // TODO: (andi) think about locking scheme buffer_serializer bs(data); auto const [parsed_data, log_action] = DecodeLog(data); @@ -183,5 +187,9 @@ auto CoordinatorStateMachine::GetInstances() const -> std::vector std::vector { + return cluster_state_.GetClientConfigs(); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index dfeed8676..620d9ba7c 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -67,6 +67,9 @@ class CoordinatorInstance { void ReplicaFailCallback(std::string_view); + auto IsMain(std::string_view instance_name) const -> bool; + auto IsReplica(std::string_view instance_name) const -> bool; + // NOTE: Must be std::list because we rely on pointer stability. // Leader and followers should both have same view on repl_instances_ std::list repl_instances_; diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index 2d60b2a8f..e61ae0e2c 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -63,18 +63,20 @@ class RaftState { auto RequestLeadership() -> bool; auto IsLeader() const -> bool; + auto FindCurrentMainInstanceName() const -> std::optional; auto MainExists() const -> bool; auto IsMain(std::string_view instance_name) const -> bool; auto IsReplica(std::string_view instance_name) const -> bool; - /// TODO: (andi) Add log in the name of methods - auto AppendRegisterReplicationInstance(CoordinatorClientConfig const &config) -> ptr; - auto AppendUnregisterReplicationInstance(std::string_view instance_name) -> ptr; - auto AppendSetInstanceAsMain(std::string_view instance_name) -> ptr; - auto AppendSetInstanceAsReplica(std::string_view instance_name) -> ptr; + auto AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool; + auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool; + auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool; + auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool; auto GetInstances() const -> std::vector>; + auto GetClientConfigs() const -> std::vector; + private: // TODO: (andi) I think variables below can be abstracted/clean them. uint32_t raft_server_id_; diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index 83548313e..13b58ff9f 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -24,8 +24,7 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t { NOT_COORDINATOR, NOT_LEADER, RPC_FAILED, - RAFT_COULD_NOT_ACCEPT, - RAFT_COULD_NOT_APPEND, + RAFT_LOG_ERROR, SUCCESS }; @@ -35,8 +34,7 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t { NOT_COORDINATOR, RPC_FAILED, NOT_LEADER, - RAFT_COULD_NOT_ACCEPT, - RAFT_COULD_NOT_APPEND, + RAFT_LOG_ERROR, SUCCESS, }; @@ -45,8 +43,7 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t { MAIN_ALREADY_EXISTS, NOT_COORDINATOR, NOT_LEADER, - RAFT_COULD_NOT_ACCEPT, - RAFT_COULD_NOT_APPEND, + RAFT_LOG_ERROR, COULD_NOT_PROMOTE_TO_MAIN, SWAP_UUID_FAILED, SUCCESS, diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index fe61474c2..7b5d73b81 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -57,6 +57,8 @@ class ReplicationInstance { auto PromoteToMain(utils::UUID const &uuid, ReplicationClientsInfo repl_clients_info, HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool; + auto SendDemoteToReplicaRpc() -> bool; + auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool; diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index ab6c86100..88263d803 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -16,7 +16,7 @@ #include "coordination/coordinator_config.hpp" #include "nuraft/raft_log_action.hpp" #include "replication_coordination_glue/role.hpp" -#include "utils/rw_lock.hpp" +#include "utils/resource_lock.hpp" #include #include @@ -28,6 +28,13 @@ namespace memgraph::coordination { +using replication_coordination_glue::ReplicationRole; + +struct InstanceState { + CoordinatorClientConfig config; + ReplicationRole role; +}; + using TRaftLog = std::variant; using nuraft::buffer; @@ -36,13 +43,23 @@ using nuraft::ptr; class CoordinatorClusterState { public: + CoordinatorClusterState() = default; + CoordinatorClusterState(CoordinatorClusterState const &); + CoordinatorClusterState &operator=(CoordinatorClusterState const &); + + CoordinatorClusterState(CoordinatorClusterState &&other) noexcept; + CoordinatorClusterState &operator=(CoordinatorClusterState &&other) noexcept; + ~CoordinatorClusterState() = default; + + auto FindCurrentMainInstanceName() const -> std::optional; + auto MainExists() const -> bool; auto IsMain(std::string_view instance_name) const -> bool; auto IsReplica(std::string_view instance_name) const -> bool; - auto InsertInstance(std::string_view instance_name, replication_coordination_glue::ReplicationRole role) -> void; + auto InsertInstance(std::string_view instance_name, ReplicationRole role) -> void; auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void; @@ -52,9 +69,11 @@ class CoordinatorClusterState { auto GetInstances() const -> std::vector>; + auto GetClientConfigs() const -> std::vector; + private: - std::map> instance_roles; - // TODO: (andi) Good place for separate lock + std::map> instance_roles_; + mutable utils::ResourceLock log_lock_{}; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 5fcf5cd65..9ecbf62b7 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -42,6 +42,7 @@ class CoordinatorStateMachine : public state_machine { CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete; ~CoordinatorStateMachine() override {} + auto FindCurrentMainInstanceName() const -> std::optional; auto MainExists() const -> bool; auto IsMain(std::string_view instance_name) const -> bool; auto IsReplica(std::string_view instance_name) const -> bool; @@ -80,6 +81,8 @@ class CoordinatorStateMachine : public state_machine { auto GetInstances() const -> std::vector>; + auto GetClientConfigs() const -> std::vector; + private: struct SnapshotCtx { SnapshotCtx(ptr &snapshot, CoordinatorClusterState const &cluster_state) diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index d0496cdb0..14d71a91e 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -126,24 +126,91 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); } auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } -auto RaftState::AppendRegisterReplicationInstance(CoordinatorClientConfig const &config) -> ptr { +auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool { auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config); - return raft_server_->append_entries({new_log}); + auto const res = raft_server_->append_entries({new_log}); + + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not " + "the " + "leader.", + config.instance_name); + return false; + } + + spdlog::info("Request for registering instance {} accepted", config.instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code()); + return false; + } + + return true; } -auto RaftState::AppendUnregisterReplicationInstance(std::string_view instance_name) -> ptr { +auto RaftState::AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool { auto new_log = CoordinatorStateMachine::SerializeUnregisterInstance(instance_name); - return raft_server_->append_entries({new_log}); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for unregistering instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + + spdlog::info("Request for unregistering instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to unregister instance {} with error code {}", instance_name, res->get_result_code()); + return false; + } + return true; } -auto RaftState::AppendSetInstanceAsMain(std::string_view instance_name) -> ptr { +auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool { auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name); - return raft_server_->append_entries({new_log}); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + + spdlog::info("Request for promoting instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code()); + return false; + } + return true; } -auto RaftState::AppendSetInstanceAsReplica(std::string_view instance_name) -> ptr { +auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool { auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsReplica(instance_name); - return raft_server_->append_entries({new_log}); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + spdlog::info("Request for demoting instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code()); + return false; + } + + return true; +} + +auto RaftState::FindCurrentMainInstanceName() const -> std::optional { + return state_machine_->FindCurrentMainInstanceName(); } auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); } @@ -158,5 +225,9 @@ auto RaftState::GetInstances() const -> std::vectorGetInstances(); } +auto RaftState::GetClientConfigs() const -> std::vector { + return state_machine_->GetClientConfigs(); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 03ebe6605..b5c633978 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -27,10 +27,6 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), succ_cb_(succ_instance_cb), fail_cb_(fail_instance_cb) { - if (!client_.DemoteToReplica()) { - throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); - } - client_.StartFrequentCheck(); } @@ -69,6 +65,8 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication return true; } +auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } + auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool { if (!client_.DemoteToReplica()) { diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 900d12dc7..f7213bed1 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -423,11 +423,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("UNREGISTER INSTANCE query can only be run on a coordinator!"); case NOT_LEADER: throw QueryRuntimeException("Couldn't unregister replica instance since coordinator is not a leader!"); - case RAFT_COULD_NOT_ACCEPT: - throw QueryRuntimeException( - "Couldn't unregister replica instance since raft server couldn't accept the log! Most likely the raft " - "instance is not a leader!"); - case RAFT_COULD_NOT_APPEND: + case RAFT_LOG_ERROR: throw QueryRuntimeException("Couldn't unregister replica instance since raft server couldn't append the log!"); case RPC_FAILED: throw QueryRuntimeException( @@ -486,11 +482,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!"); case NOT_LEADER: throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!"); - case RAFT_COULD_NOT_ACCEPT: - throw QueryRuntimeException( - "Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft " - "instance is not a leader!"); - case RAFT_COULD_NOT_APPEND: + case RAFT_LOG_ERROR: throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!"); case RPC_FAILED: throw QueryRuntimeException( @@ -524,11 +516,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!"); case NOT_LEADER: throw QueryRuntimeException("Couldn't set instance to main since coordinator is not a leader!"); - case RAFT_COULD_NOT_ACCEPT: - throw QueryRuntimeException( - "Couldn't promote instance since raft server couldn't accept the log! Most likely the raft " - "instance is not a leader!"); - case RAFT_COULD_NOT_APPEND: + case RAFT_LOG_ERROR: throw QueryRuntimeException("Couldn't promote instance since raft server couldn't append the log!"); case COULD_NOT_PROMOTE_TO_MAIN: throw QueryRuntimeException(