diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 24545cac6..443e1203e 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -35,30 +35,30 @@ auto CoordinatorClusterState::InsertInstance(std::string const &instance_name, R instance_roles[instance_name] = role; } -auto CoordinatorClusterState::DoAction(std::string const &instance_name, RaftLogAction log_action) -> void { +auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void { switch (log_action) { case RaftLogAction::REGISTER_REPLICATION_INSTANCE: { + auto const instance_name = std::get(log_entry).instance_name; instance_roles[instance_name] = ReplicationRole::REPLICA; - spdlog::info("Instance {} registered", instance_name); break; } case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: { + auto const instance_name = std::get(log_entry); instance_roles.erase(instance_name); - spdlog::info("Instance {} unregistered", instance_name); break; } case RaftLogAction::SET_INSTANCE_AS_MAIN: { + auto const instance_name = std::get(log_entry); auto it = instance_roles.find(instance_name); MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!"); it->second = ReplicationRole::MAIN; - spdlog::info("Instance {} set as main", instance_name); break; } case RaftLogAction::SET_INSTANCE_AS_REPLICA: { + auto const instance_name = std::get(log_entry); auto it = instance_roles.find(instance_name); MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!"); it->second = ReplicationRole::REPLICA; - spdlog::info("Instance {} set as replica", instance_name); break; } } diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 84ebf51e8..b6122aa94 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -34,6 +34,7 @@ using nuraft::srv_config; CoordinatorInstance::CoordinatorInstance() : raft_state_(RaftState::MakeRaftState( [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, +<<<<<<< HEAD [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { <<<<<<< HEAD <<<<<<< HEAD @@ -307,10 +308,17 @@ CoordinatorInstance::CoordinatorInstance() >>>>>>> 99c53148c (registration backed-up by raft) ||||||| parent of 9081c5c24 (Optional main on unregistering) ======= +||||||| parent of fab8d3d76 (Shared (Un)Registration networking part with raft) + [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { +======= + [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); }, + [this](TRaftLog const &log_entry, RaftLogAction log_action) { + OnRaftCommitCallback(log_entry, log_action); + })) { +>>>>>>> fab8d3d76 (Shared (Un)Registration networking part with raft) client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { auto lock = std::unique_lock{self->coord_instance_lock_}; auto &repl_instance = self->FindReplicationInstance(repl_instance_name); - std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name, std::move(lock)); }; @@ -563,6 +571,7 @@ auto CoordinatorInstance::TryFailover() -> void { spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); } +<<<<<<< HEAD // TODO: (andi) Make sure you cannot put coordinator instance to the main auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { @@ -768,6 +777,132 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam } ||||||| parent of b1af5ceeb (Move ReplRole to ClusterState) +||||||| parent of fab8d3d76 (Shared (Un)Registration networking part with raft) +// TODO: (andi) Make sure you cannot put coordinator instance to the main +auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name) + -> SetInstanceToMainCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + if (raft_state_.MainExists()) { + return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS; + } + + auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + auto new_main = std::ranges::find_if(repl_instances_, is_new_main); + + if (new_main == repl_instances_.end()) { + spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name, + instance_name); + return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; + } + + if (!raft_state_.RequestLeadership()) { + return SetInstanceToMainCoordinatorStatus::NOT_LEADER; + } + + auto const res = raft_state_.AppendSetInstanceAsMain(instance_name); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_ACCEPT; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code()); + return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_APPEND; + } + + new_main->PauseFrequentCheck(); + utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; + + auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() != instance_name; + }; + + auto const new_main_uuid = utils::UUID{}; + + for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) { + if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) { + spdlog::error( + fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName())); + return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; + } + } + + auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | + ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | + ranges::to(); + + if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, + &CoordinatorInstance::MainFailCallback)) { + return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; + } + + main_uuid_ = new_main_uuid; + spdlog::info("Instance {} promoted to main", instance_name); + return SetInstanceToMainCoordinatorStatus::SUCCESS; +} + +auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config) + -> RegisterInstanceCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + auto instance_name = config.instance_name; + + auto const name_matches = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + + if (std::ranges::any_of(repl_instances_, name_matches)) { + return RegisterInstanceCoordinatorStatus::NAME_EXISTS; + } + + auto const socket_address_matches = [&config](ReplicationInstance const &instance) { + return instance.SocketAddress() == config.SocketAddress(); + }; + + if (std::ranges::any_of(repl_instances_, socket_address_matches)) { + return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS; + } + + if (!raft_state_.RequestLeadership()) { + return RegisterInstanceCoordinatorStatus::NOT_LEADER; + } + + auto const res = raft_state_.AppendRegisterReplicationInstance(instance_name); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not " + "the " + "leader.", + config.instance_name); + return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT; + } + + spdlog::info("Request for registering instance {} accepted", instance_name); + try { + repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_, + &CoordinatorInstance::ReplicaSuccessCallback, + &CoordinatorInstance::ReplicaFailCallback); + } catch (CoordinatorRegisterInstanceException const &) { + return RegisterInstanceCoordinatorStatus::RPC_FAILED; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to register instance {} with error code {}", instance_name, res->get_result_code()); + return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND; + } + + spdlog::info("Instance {} registered", instance_name); + return RegisterInstanceCoordinatorStatus::SUCCESS; +} + +======= +>>>>>>> fab8d3d76 (Shared (Un)Registration networking part with raft) void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name, std::unique_lock lock) { MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock"); @@ -890,6 +1025,7 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam repl_instance.OnFailPing(); } +<<<<<<< HEAD ======= >>>>>>> b1af5ceeb (Move ReplRole to ClusterState) ||||||| parent of 9081c5c24 (Optional main on unregistering) @@ -984,6 +1120,124 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam } >>>>>>> 9081c5c24 (Optional main on unregistering) +||||||| parent of fab8d3d76 (Shared (Un)Registration networking part with raft) +======= +// TODO: (andi) Make sure you cannot put coordinator instance to the main +auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name) + -> SetInstanceToMainCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + if (raft_state_.MainExists()) { + return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS; + } + + auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + auto new_main = std::ranges::find_if(repl_instances_, is_new_main); + + if (new_main == repl_instances_.end()) { + spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name, + instance_name); + return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; + } + + if (!raft_state_.RequestLeadership()) { + return SetInstanceToMainCoordinatorStatus::NOT_LEADER; + } + + auto const res = raft_state_.AppendSetInstanceAsMain(instance_name); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_ACCEPT; + } + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code()); + return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_APPEND; + } + + new_main->PauseFrequentCheck(); + utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; + + auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() != instance_name; + }; + + auto const new_main_uuid = utils::UUID{}; + + for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) { + if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) { + spdlog::error( + fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName())); + return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; + } + } + + auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | + ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | + ranges::to(); + + if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, + &CoordinatorInstance::MainFailCallback)) { + return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; + } + + main_uuid_ = new_main_uuid; + spdlog::info("Instance {} promoted to main", instance_name); + return SetInstanceToMainCoordinatorStatus::SUCCESS; +} + +// TODO: (andi) Status of registration, maybe not all needed. +// Incorporate checking of replication socket address. +auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config) + -> RegisterInstanceCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + auto const name_matches = [&config](ReplicationInstance const &instance) { + return instance.InstanceName() == config.instance_name; + }; + + if (std::ranges::any_of(repl_instances_, name_matches)) { + return RegisterInstanceCoordinatorStatus::NAME_EXISTS; + } + + auto const socket_address_matches = [&config](ReplicationInstance const &instance) { + return instance.SocketAddress() == config.SocketAddress(); + }; + + if (std::ranges::any_of(repl_instances_, socket_address_matches)) { + return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS; + } + + if (!raft_state_.RequestLeadership()) { + return RegisterInstanceCoordinatorStatus::NOT_LEADER; + } + + auto const res = raft_state_.AppendRegisterReplicationInstance(config); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not " + "the " + "leader.", + config.instance_name); + return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT; + } + + spdlog::info("Request for registering instance {} accepted", config.instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code()); + return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND; + } + + return RegisterInstanceCoordinatorStatus::SUCCESS; +} + +>>>>>>> fab8d3d76 (Shared (Un)Registration networking part with raft) auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; @@ -1024,16 +1278,6 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_nam return UnregisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND; } - inst_to_remove->StopFrequentCheck(); - if (auto curr_main = std::ranges::find_if(repl_instances_, is_main); curr_main != repl_instances_.end()) { - if (!curr_main->SendUnregisterReplicaRpc(instance_name)) { - // TODO: (andi) Restore state in the RAFT log if needed. - inst_to_remove->StartFrequentCheck(); - return UnregisterInstanceCoordinatorStatus::RPC_FAILED; - } - } - std::erase_if(repl_instances_, name_matches); - return UnregisterInstanceCoordinatorStatus::SUCCESS; } @@ -1045,6 +1289,7 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32 <<<<<<< HEAD <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; } // TODO: (andi) Add to the RAFT log. @@ -1142,5 +1387,67 @@ auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ ======= >>>>>>> 1b150ee92 (Address PR comments) +||||||| parent of fab8d3d76 (Shared (Un)Registration networking part with raft) +======= +auto CoordinatorInstance::OnRaftCommitCallback(TRaftLog const &log_entry, RaftLogAction log_action) -> void { + // TODO: (andi) Solve it locking scheme and write comment. + switch (log_action) { + case RaftLogAction::REGISTER_REPLICATION_INSTANCE: { + auto config = std::get(log_entry); + auto *new_instance = &repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_, + &CoordinatorInstance::ReplicaSuccessCallback, + &CoordinatorInstance::ReplicaFailCallback); + + if (raft_state_.IsLeader()) { + if (!new_instance->SendDemoteToReplicaRpc()) { + throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica on registration.", + new_instance->InstanceName()); + } + + new_instance->StartFrequentCheck(); + // TODO: (andi) Pinging for InstanceName() raft_state? + spdlog::info("Leader instance {} started frequent check on ", raft_state_.InstanceName(), + new_instance->InstanceName()); + } + + spdlog::info("Instance {} registered", new_instance->InstanceName()); + break; + } + case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: { + auto const instance_name = std::get(log_entry); + + auto &inst_to_remove = FindReplicationInstance(instance_name); + inst_to_remove.StopFrequentCheck(); + + auto const is_main = [this](ReplicationInstance const &instance) { + return raft_state_.IsMain(instance.InstanceName()); + }; + + if (auto curr_main = std::ranges::find_if(repl_instances_, is_main); curr_main != repl_instances_.end()) { + if (!curr_main->SendUnregisterReplicaRpc(instance_name)) { + // TODO: (andi) Restore state in the RAFT log if needed. + inst_to_remove.StartFrequentCheck(); + } + } + + auto const name_matches = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + + std::erase_if(repl_instances_, name_matches); + + spdlog::info("Instance {} unregistered", instance_name); + break; + } + case RaftLogAction::SET_INSTANCE_AS_MAIN: { + break; + } + case RaftLogAction::SET_INSTANCE_AS_REPLICA: { + break; + } + }; +} + +>>>>>>> fab8d3d76 (Shared (Un)Registration networking part with raft) } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 2e503fe74..ec0dba09b 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -15,6 +15,8 @@ namespace memgraph::coordination { +CoordinatorStateMachine::CoordinatorStateMachine(OnRaftCommitCb raft_commit_cb) : raft_commit_cb_(raft_commit_cb) {} + auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); } auto CoordinatorStateMachine::IsMain(std::string const &instance_name) const -> bool { @@ -25,37 +27,54 @@ auto CoordinatorStateMachine::IsReplica(std::string const &instance_name) const return cluster_state_.IsReplica(instance_name); } -auto CoordinatorStateMachine::EncodeLogAction(std::string const &name, RaftLogAction log_action) -> ptr { - auto const str_log = [&name, log_action] { - switch (log_action) { - case RaftLogAction::REGISTER_REPLICATION_INSTANCE: - return "register_" + name; - case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: - return "unregister_" + name; - case RaftLogAction::SET_INSTANCE_AS_MAIN: - return "promote_" + name; - case RaftLogAction::SET_INSTANCE_AS_REPLICA: - return "demote_" + name; - } - }(); - - ptr log = buffer::alloc(sizeof(uint32_t) + str_log.size()); - buffer_serializer bs(log); - bs.put_str(str_log); - return log; +auto CoordinatorStateMachine::CreateLog(std::string const &log) -> ptr { + ptr log_buf = buffer::alloc(sizeof(uint32_t) + log.size()); + buffer_serializer bs(log_buf); + bs.put_str(log); + return log_buf; } -auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair { +auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr { + auto const str_log = fmt::format("{}*register", config.ToString()); + return CreateLog(str_log); +} + +auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view instance_name) -> ptr { + auto const str_log = fmt::format("{}*unregister", instance_name); + return CreateLog(str_log); +} + +auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr { + auto const str_log = fmt::format("{}*promote", instance_name); + return CreateLog(str_log); +} + +auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr { + auto const str_log = fmt::format("{}*demote", instance_name); + return CreateLog(str_log); +} + +auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair { buffer_serializer bs(data); auto const log_str = bs.get_str(); - auto const sep = log_str.find('_'); - auto const action = log_str.substr(0, sep); - auto const name = log_str.substr(sep + 1); + auto const sep = log_str.find('*'); + auto const action = log_str.substr(sep + 1); + auto const info = log_str.substr(0, sep); - spdlog::info("Decoding log: {} {}", name, action); - - return {name, ParseRaftLogAction(action)}; + if (action == "register") { + return {CoordinatorClientConfig::FromString(info), RaftLogAction::REGISTER_REPLICATION_INSTANCE}; + } + if (action == "unregister") { + return {info, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}; + } + if (action == "promote") { + return {info, RaftLogAction::SET_INSTANCE_AS_MAIN}; + } + if (action == "demote") { + return {info, RaftLogAction::SET_INSTANCE_AS_REPLICA}; + } + throw std::runtime_error("Unknown action"); } auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr { return nullptr; } @@ -64,12 +83,12 @@ auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr #include #include #include +#include + namespace memgraph::coordination { inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; @@ -32,14 +35,31 @@ struct CoordinatorClientConfig { std::chrono::seconds instance_down_timeout_sec{5}; std::chrono::seconds instance_get_uuid_frequency_sec{10}; - auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); } + auto SocketAddress() const -> std::string { return fmt::format("{}:{}", ip_address, port); } struct ReplicationClientInfo { + // TODO: (andi) Do we even need here instance_name for this struct? std::string instance_name; replication_coordination_glue::ReplicationMode replication_mode{}; std::string replication_ip_address; uint16_t replication_port{}; + auto ToString() const -> std::string { + return fmt::format("{}#{}#{}#{}", instance_name, replication_ip_address, replication_port, + replication_coordination_glue::ReplicationModeToString(replication_mode)); + } + + // TODO: (andi) How can I make use of monadic parsers here? + static auto FromString(std::string_view log) -> ReplicationClientInfo { + ReplicationClientInfo replication_client_info; + auto splitted = utils::Split(log, "#"); + replication_client_info.instance_name = splitted[0]; + replication_client_info.replication_ip_address = splitted[1]; + replication_client_info.replication_port = std::stoi(splitted[2]); + replication_client_info.replication_mode = replication_coordination_glue::ReplicationModeFromString(splitted[3]); + return replication_client_info; + } + friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default; }; @@ -54,6 +74,25 @@ struct CoordinatorClientConfig { std::optional ssl; + auto ToString() const -> std::string { + return fmt::format("{}|{}|{}|{}|{}|{}|{}", instance_name, ip_address, port, + instance_health_check_frequency_sec.count(), instance_down_timeout_sec.count(), + instance_get_uuid_frequency_sec.count(), replication_client_info.ToString()); + } + + static auto FromString(std::string_view log) -> CoordinatorClientConfig { + CoordinatorClientConfig config; + auto splitted = utils::Split(log, "|"); + config.instance_name = splitted[0]; + config.ip_address = splitted[1]; + config.port = std::stoi(splitted[2]); + config.instance_health_check_frequency_sec = std::chrono::seconds(std::stoi(splitted[3])); + config.instance_down_timeout_sec = std::chrono::seconds(std::stoi(splitted[4])); + config.instance_get_uuid_frequency_sec = std::chrono::seconds(std::stoi(splitted[5])); + config.replication_client_info = ReplicationClientInfo::FromString(splitted[6]); + return config; + } + friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default; }; diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 08586747a..f83f62841 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -48,6 +48,11 @@ class CoordinatorInstance { auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + private: + HealthCheckClientCallback client_succ_cb_, client_fail_cb_; + + auto OnRaftCommitCallback(TRaftLog const &log_entry, RaftLogAction log_action) -> void; + auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; void MainFailCallback(std::string_view); @@ -60,9 +65,6 @@ class CoordinatorInstance { static auto ChooseMostUpToDateInstance(const std::vector &) -> NewMainRes; - private: - HealthCheckClientCallback client_succ_cb_, client_fail_cb_; - // NOTE: Only leader should have repl_instances_, not followers. // NOTE: Must be std::list because we rely on pointer stability. std::list repl_instances_; diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index 0f1b018de..d9f2376bf 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -21,6 +21,9 @@ namespace memgraph::coordination { +class CoordinatorInstance; +struct CoordinatorClientConfig; + using BecomeLeaderCb = std::function; using BecomeFollowerCb = std::function; @@ -36,8 +39,9 @@ using raft_result = nuraft::cmd_result>; class RaftState { private: - explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t raft_server_id, - uint32_t raft_port, std::string raft_address); + explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, + OnRaftCommitCb raft_commit_cb, uint32_t raft_server_id, uint32_t raft_port, + std::string raft_address); auto InitRaftServer() -> void; @@ -49,7 +53,8 @@ class RaftState { RaftState &operator=(RaftState &&other) noexcept = default; ~RaftState(); - static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState; + static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, + OnRaftCommitCb raft_commit_cb) -> RaftState; auto InstanceName() const -> std::string; auto RaftSocketAddress() const -> std::string; @@ -65,10 +70,10 @@ class RaftState { auto IsReplica(std::string const &instance_name) const -> bool; /// TODO: (andi) Add log in the name of methods - auto AppendRegisterReplicationInstance(std::string const &instance_name) -> ptr; - auto AppendUnregisterReplicationInstance(std::string const &instance_name) -> ptr; - auto AppendSetInstanceAsMain(std::string const &instance_name) -> ptr; - auto AppendSetInstanceAsReplica(std::string const &instance_name) -> ptr; + auto AppendRegisterReplicationInstance(CoordinatorClientConfig const &config) -> ptr; + auto AppendUnregisterReplicationInstance(std::string_view instance_name) -> ptr; + auto AppendSetInstanceAsMain(std::string_view instance_name) -> ptr; + auto AppendSetInstanceAsReplica(std::string_view instance_name) -> ptr; auto GetInstances() const -> std::vector>; diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 4143064fc..b5fa4d834 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -17,11 +17,12 @@ #include "coordination/coordinator_exceptions.hpp" #include "replication_coordination_glue/role.hpp" -#include #include "utils/resource_lock.hpp" #include "utils/result.hpp" #include "utils/uuid.hpp" +#include + namespace memgraph::coordination { class CoordinatorInstance; @@ -54,6 +55,8 @@ class ReplicationInstance { auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool; + + auto SendDemoteToReplicaRpc() -> bool; auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool; diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp index 373847a59..700ca8601 100644 --- a/src/coordination/include/nuraft/coordinator_cluster_state.hpp +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -13,6 +13,7 @@ #ifdef MG_ENTERPRISE +#include "coordination/coordinator_config.hpp" #include "nuraft/raft_log_action.hpp" #include "replication_coordination_glue/role.hpp" #include "utils/rw_lock.hpp" @@ -23,9 +24,12 @@ #include #include #include +#include namespace memgraph::coordination { +using TRaftLog = std::variant; + using nuraft::buffer; using nuraft::buffer_serializer; using nuraft::ptr; @@ -40,7 +44,7 @@ class CoordinatorClusterState { auto InsertInstance(std::string const &instance_name, replication_coordination_glue::ReplicationRole role) -> void; - auto DoAction(std::string const &instance_name, RaftLogAction log_action) -> void; + auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void; auto Serialize(ptr &data) -> void; diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 3e3110d86..9b73a0ee2 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -13,14 +13,19 @@ #ifdef MG_ENTERPRISE +#include "coordination/coordinator_config.hpp" #include "nuraft/coordinator_cluster_state.hpp" #include "nuraft/raft_log_action.hpp" #include #include +#include + namespace memgraph::coordination { +using OnRaftCommitCb = std::function; + using nuraft::async_result; using nuraft::buffer; using nuraft::buffer_serializer; @@ -32,7 +37,7 @@ using nuraft::state_machine; class CoordinatorStateMachine : public state_machine { public: - CoordinatorStateMachine() = default; + explicit CoordinatorStateMachine(OnRaftCommitCb raft_commit_cb); CoordinatorStateMachine(CoordinatorStateMachine const &) = delete; CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete; CoordinatorStateMachine(CoordinatorStateMachine &&) = delete; @@ -43,9 +48,13 @@ class CoordinatorStateMachine : public state_machine { auto IsMain(std::string const &instance_name) const -> bool; auto IsReplica(std::string const &instance_name) const -> bool; - static auto EncodeLogAction(std::string const &instance_name, RaftLogAction log_action) -> ptr; + static auto CreateLog(std::string const &log) -> ptr; + static auto SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr; + static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr; + static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr; + static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr; - static auto DecodeLog(buffer &data) -> std::pair; + static auto DecodeLog(buffer &data) -> std::pair; auto pre_commit(ulong log_idx, buffer &data) -> ptr override; @@ -86,8 +95,7 @@ class CoordinatorStateMachine : public state_machine { CoordinatorClusterState cluster_state_; - - //mutable utils::RWLock lock{utils::RWLock::Priority::READ}; + // mutable utils::RWLock lock{utils::RWLock::Priority::READ}; std::atomic last_committed_idx_{0}; @@ -95,6 +103,8 @@ class CoordinatorStateMachine : public state_machine { std::map> snapshots_; std::mutex snapshots_lock_; + OnRaftCommitCb raft_commit_cb_; + ptr last_snapshot_; std::mutex last_snapshot_lock_; }; diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index de4694eae..bf7c99cee 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -13,6 +13,7 @@ #include "coordination/raft_state.hpp" +#include "coordination/coordinator_config.hpp" #include "coordination/coordinator_exceptions.hpp" #include "utils/counter.hpp" @@ -29,12 +30,13 @@ using nuraft::raft_server; using nuraft::srv_config; using raft_result = cmd_result>; -RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t raft_server_id, - uint32_t raft_port, std::string raft_address) +RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, + OnRaftCommitCb raft_commit_cb, uint32_t raft_server_id, uint32_t raft_port, + std::string raft_address) : raft_server_id_(raft_server_id), raft_port_(raft_port), raft_address_(std::move(raft_address)), - state_machine_(cs_new()), + state_machine_(cs_new(raft_commit_cb)), state_manager_( cs_new(raft_server_id_, raft_address_ + ":" + std::to_string(raft_port_))), logger_(nullptr), @@ -88,7 +90,8 @@ auto RaftState::InitRaftServer() -> void { throw RaftServerStartException("Failed to initialize raft server on {}:{}", raft_address_, raft_port_); } -auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState { +auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, + OnRaftCommitCb raft_commit_cb) -> RaftState { uint32_t raft_server_id{0}; uint32_t raft_port{0}; try { @@ -98,8 +101,8 @@ auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb throw RaftCouldNotParseFlagsException("Failed to parse flags: {}", e.what()); } - auto raft_state = - RaftState(std::move(become_leader_cb), std::move(become_follower_cb), raft_server_id, raft_port, "127.0.0.1"); + auto raft_state = RaftState(std::move(become_leader_cb), std::move(become_follower_cb), raft_commit_cb, + raft_server_id, raft_port, "127.0.0.1"); raft_state.InitRaftServer(); return raft_state; } @@ -129,24 +132,23 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); } auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } -auto RaftState::AppendRegisterReplicationInstance(std::string const &instance_name) -> ptr { - auto new_log = CoordinatorStateMachine::EncodeLogAction(instance_name, RaftLogAction::REGISTER_REPLICATION_INSTANCE); +auto RaftState::AppendRegisterReplicationInstance(CoordinatorClientConfig const &config) -> ptr { + auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config); return raft_server_->append_entries({new_log}); } -auto RaftState::AppendUnregisterReplicationInstance(std::string const &instance_name) -> ptr { - auto new_log = - CoordinatorStateMachine::EncodeLogAction(instance_name, RaftLogAction::UNREGISTER_REPLICATION_INSTANCE); +auto RaftState::AppendUnregisterReplicationInstance(std::string_view instance_name) -> ptr { + auto new_log = CoordinatorStateMachine::SerializeUnregisterInstance(instance_name); return raft_server_->append_entries({new_log}); } -auto RaftState::AppendSetInstanceAsMain(std::string const &instance_name) -> ptr { - auto new_log = CoordinatorStateMachine::EncodeLogAction(instance_name, RaftLogAction::SET_INSTANCE_AS_MAIN); +auto RaftState::AppendSetInstanceAsMain(std::string_view instance_name) -> ptr { + auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name); return raft_server_->append_entries({new_log}); } -auto RaftState::AppendSetInstanceAsReplica(std::string const &instance_name) -> ptr { - auto new_log = CoordinatorStateMachine::EncodeLogAction(instance_name, RaftLogAction::SET_INSTANCE_AS_REPLICA); +auto RaftState::AppendSetInstanceAsReplica(std::string_view instance_name) -> ptr { + auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsReplica(instance_name); return raft_server_->append_entries({new_log}); } diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 7f54787aa..600765095 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -26,13 +26,7 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC HealthCheckInstanceCallback fail_instance_cb) : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), succ_cb_(succ_instance_cb), - fail_cb_(fail_instance_cb) { - if (!client_.DemoteToReplica()) { - throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); - } - - client_.StartFrequentCheck(); -} + fail_cb_(fail_instance_cb) {} auto ReplicationInstance::OnSuccessPing() -> void { last_response_time_ = std::chrono::system_clock::now(); @@ -68,6 +62,8 @@ auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClients return true; } +auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } + auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool { if (!client_.DemoteToReplica()) { diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 22252edb6..01aed4c11 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -473,7 +473,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .replication_client_info = repl_config, .ssl = std::nullopt}; - auto status = coordinator_handler_.RegisterReplicationInstance(coordinator_client_config); + auto status = coordinator_handler_.RegisterReplicationInstance(std::move(coordinator_client_config)); switch (status) { using enum memgraph::coordination::RegisterInstanceCoordinatorStatus; case NAME_EXISTS: diff --git a/src/replication_coordination_glue/mode.hpp b/src/replication_coordination_glue/mode.hpp index d0b415733..3f27afb05 100644 --- a/src/replication_coordination_glue/mode.hpp +++ b/src/replication_coordination_glue/mode.hpp @@ -12,7 +12,32 @@ #pragma once #include +#include +#include +#include namespace memgraph::replication_coordination_glue { + enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; + +inline auto ReplicationModeToString(ReplicationMode mode) -> std::string { + switch (mode) { + case ReplicationMode::SYNC: + return "SYNC"; + case ReplicationMode::ASYNC: + return "ASYNC"; + } + throw std::invalid_argument("Invalid replication mode"); +} + +inline auto ReplicationModeFromString(std::string_view mode) -> ReplicationMode { + if (mode == "SYNC") { + return ReplicationMode::SYNC; + } + if (mode == "ASYNC") { + return ReplicationMode::ASYNC; + } + throw std::invalid_argument("Invalid replication mode"); +} + } // namespace memgraph::replication_coordination_glue