diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 25c569f37..b46843639 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -12,7 +12,6 @@ target_sources(mg-coordination include/coordination/coordinator_slk.hpp include/coordination/coordinator_data.hpp include/coordination/constants.hpp - include/coordination/failover_status.hpp include/coordination/coordinator_cluster_config.hpp PRIVATE @@ -21,6 +20,7 @@ target_sources(mg-coordination coordinator_rpc.cpp coordinator_server.cpp coordinator_data.cpp + coordinator_instance.cpp ) target_include_directories(mg-coordination PUBLIC include) diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 93ef3e3af..db8d692f6 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -20,7 +20,7 @@ namespace memgraph::coordination { namespace { -auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig &config) +auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &config) -> communication::ClientContext { return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file} : communication::ClientContext{}; @@ -45,38 +45,33 @@ void CoordinatorClient::StartFrequentCheck() { "Health check frequency must be greater than 0"); instance_checker_.Run( - "Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] { + config_.instance_name, config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] { try { spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name, rpc_client_.Endpoint().SocketAddress()); - auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; - stream.AwaitResponse(); + { // NOTE: This is intentionally scoped so that stream lock could get released. + auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; + stream.AwaitResponse(); + } succ_cb_(coord_data_, instance_name); - } catch (const rpc::RpcFailedException &) { + } catch (rpc::RpcFailedException const &) { fail_cb_(coord_data_, instance_name); } }); } void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); } - void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); } void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); } -auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); } -auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); } - -auto CoordinatorClient::ReplicationClientInfo() const -> const CoordinatorClientConfig::ReplicationClientInfo & { - return config_.replication_client_info; +auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void { + succ_cb_ = std::move(succ_cb); + fail_cb_ = std::move(fail_cb); } -auto CoordinatorClient::ResetReplicationClientInfo() -> void { - // TODO (antoniofilipovic) Sync with Andi on this one - // config_.replication_client_info.reset(); -} +auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; } -auto CoordinatorClient::SendPromoteReplicaToMainRpc( - std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool { +auto CoordinatorClient::SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool { try { auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))}; if (!stream.AwaitResponse().success) { @@ -84,23 +79,24 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc( return false; } return true; - } catch (const rpc::RpcFailedException &) { + } catch (rpc::RpcFailedException const &) { spdlog::error("RPC error occurred while sending failover RPC!"); } return false; } -auto CoordinatorClient::SendSetToReplicaRpc(CoordinatorClient::ReplClientInfo replication_client_info) const -> bool { +auto CoordinatorClient::DemoteToReplica() const -> bool { + auto const &instance_name = config_.instance_name; try { - auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(std::move(replication_client_info))}; + auto stream{rpc_client_.Stream<DemoteMainToReplicaRpc>(config_.replication_client_info)}; if (!stream.AwaitResponse().success) { - spdlog::error("Failed to set main to replica!"); + spdlog::error("Failed to receive successful RPC response for setting instance {} to replica!", instance_name); return false; } spdlog::info("Sent request RPC from coordinator to instance to set it as replica!"); return true; - } catch (const rpc::RpcFailedException &) { - spdlog::error("Failed to send failover RPC from coordinator to new main!"); + } catch (rpc::RpcFailedException const &) { + spdlog::error("Failed to set instance {} to replica!", instance_name); } return false; } diff --git a/src/coordination/coordinator_data.cpp b/src/coordination/coordinator_data.cpp index 5bc8066de..2af21949c 100644 --- a/src/coordination/coordinator_data.cpp +++ b/src/coordination/coordinator_data.cpp @@ -25,7 +25,7 @@ CoordinatorData::CoordinatorData() { auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & { auto instance = std::ranges::find_if( coord_data->registered_instances_, - [instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; }); + [instance_name](CoordinatorInstance const &instance) { return instance.InstanceName() == instance_name; }); MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!", instance_name); @@ -35,105 +35,94 @@ CoordinatorData::CoordinatorData() { replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing replica successful callback", instance_name); - auto &instance = find_instance(coord_data, instance_name); - MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name); - instance.UpdateLastResponseTime(); + find_instance(coord_data, instance_name).OnSuccessPing(); }; replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing replica failure callback", instance_name); - auto &instance = find_instance(coord_data, instance_name); - MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name); - instance.UpdateInstanceStatus(); + find_instance(coord_data, instance_name).OnFailPing(); }; main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing main successful callback", instance_name); + auto &instance = find_instance(coord_data, instance_name); - MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name); - instance.UpdateLastResponseTime(); + + if (instance.IsAlive() || !coord_data->ClusterHasAliveMain_()) { + instance.OnSuccessPing(); + return; + } + + bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_); + if (demoted) { + instance.OnSuccessPing(); + spdlog::info("Instance {} demoted to replica", instance_name); + } else { + spdlog::error("Instance {} failed to become replica", instance_name); + } }; - main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { + main_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { auto lock = std::lock_guard{coord_data->coord_data_lock_}; spdlog::trace("Instance {} performing main failure callback", instance_name); - auto &instance = find_instance(coord_data, instance_name); - MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name); - if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) { - spdlog::info("Main instance {} is not alive, starting automatic failover", instance_name); - switch (auto failover_status = DoFailover(); failover_status) { - using enum DoFailoverStatus; - case ALL_REPLICAS_DOWN: - spdlog::warn("Failover aborted since all replicas are down!"); - break; - case MAIN_ALIVE: - spdlog::warn("Failover aborted since main is alive!"); - break; - case RPC_FAILED: - spdlog::warn("Failover aborted since promoting replica to main failed!"); - break; - case SUCCESS: - break; - } + find_instance(coord_data, instance_name).OnFailPing(); + + if (!coord_data->ClusterHasAliveMain_()) { + spdlog::info("Cluster without main instance, trying automatic failover"); + coord_data->TryFailover(); } }; } -auto CoordinatorData::DoFailover() -> DoFailoverStatus { - using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; +auto CoordinatorData::ClusterHasAliveMain_() const -> bool { + auto const alive_main = [](CoordinatorInstance const &instance) { return instance.IsMain() && instance.IsAlive(); }; + return std::ranges::any_of(registered_instances_, alive_main); +} +auto CoordinatorData::TryFailover() -> void { auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica); auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive); if (chosen_replica_instance == replica_instances.end()) { - return DoFailoverStatus::ALL_REPLICAS_DOWN; + spdlog::warn("Failover failed since all replicas are down!"); + return; } - chosen_replica_instance->PrepareForFailover(); + chosen_replica_instance->PauseFrequentCheck(); + utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }}; - std::vector<ReplicationClientInfo> repl_clients_info; + std::vector<ReplClientInfo> repl_clients_info; repl_clients_info.reserve(std::ranges::distance(replica_instances)); - auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) { + auto const not_chosen_replica_instance = [&chosen_replica_instance](CoordinatorInstance const &instance) { return instance != *chosen_replica_instance; }; - auto const not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); }; - // TODO (antoniofilipovic): Should we send also data on old MAIN??? - // TODO: (andi) Don't send replicas which aren't alive - for (const auto &unchosen_replica_instance : - replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) { - repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo()); + std::ranges::transform(registered_instances_ | ranges::views::filter(not_chosen_replica_instance), + std::back_inserter(repl_clients_info), + [](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); }); + + if (!chosen_replica_instance->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { + spdlog::warn("Failover failed since promoting replica to main failed!"); + return; } - - if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) { - chosen_replica_instance->RestoreAfterFailedFailover(); - return DoFailoverStatus::RPC_FAILED; - } - - auto old_main = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain); - // TODO: (andi) For performing restoration we will have to improve this - old_main->client_.PauseFrequentCheck(); - - chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_); - - return DoFailoverStatus::SUCCESS; + spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName()); } auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> { std::vector<CoordinatorInstanceStatus> instances_status; instances_status.reserve(registered_instances_.size()); - auto const stringify_repl_role = [](const CoordinatorInstance &instance) -> std::string { - if (!instance.IsAlive()) return ""; + auto const stringify_repl_role = [](CoordinatorInstance const &instance) -> std::string { + if (!instance.IsAlive()) return "unknown"; if (instance.IsMain()) return "main"; return "replica"; }; auto const instance_to_status = - [&stringify_repl_role](const CoordinatorInstance &instance) -> CoordinatorInstanceStatus { + [&stringify_repl_role](CoordinatorInstance const &instance) -> CoordinatorInstanceStatus { return {.instance_name = instance.InstanceName(), .socket_address = instance.SocketAddress(), .replication_role = stringify_repl_role(instance), @@ -151,70 +140,59 @@ auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceSt auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { auto lock = std::lock_guard{coord_data_lock_}; - // Find replica we already registered - auto registered_replica = std::find_if( - registered_instances_.begin(), registered_instances_.end(), - [instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; }); + auto const is_new_main = [&instance_name](CoordinatorInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + auto new_main = std::ranges::find_if(registered_instances_, is_new_main); - // if replica not found... - if (registered_replica == registered_instances_.end()) { - spdlog::error("You didn't register instance with given name {}", instance_name); + if (new_main == registered_instances_.end()) { + spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name, + instance_name); return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; } - registered_replica->client_.PauseFrequentCheck(); + new_main->PauseFrequentCheck(); + utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; - std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info; + ReplicationClientsInfo repl_clients_info; repl_clients_info.reserve(registered_instances_.size() - 1); - std::ranges::for_each(registered_instances_, - [registered_replica, &repl_clients_info](const CoordinatorInstance &replica) { - if (replica != *registered_replica) { - repl_clients_info.emplace_back(replica.client_.ReplicationClientInfo()); - } - }); - // PROMOTE REPLICA TO MAIN - // THIS SHOULD FAIL HERE IF IT IS DOWN - if (auto result = registered_replica->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) { - registered_replica->client_.ResumeFrequentCheck(); + auto const is_not_new_main = [&instance_name](CoordinatorInstance const &instance) { + return instance.InstanceName() != instance_name; + }; + std::ranges::transform(registered_instances_ | ranges::views::filter(is_not_new_main), + std::back_inserter(repl_clients_info), + [](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); }); + + if (!new_main->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; } - registered_replica->client_.SetSuccCallback(main_succ_cb_); - registered_replica->client_.SetFailCallback(main_fail_cb_); - registered_replica->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; - registered_replica->client_.ResumeFrequentCheck(); - + spdlog::info("Instance {} promoted to main", instance_name); return SetInstanceToMainCoordinatorStatus::SUCCESS; } auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_data_lock_}; - if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { + if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) { return instance.InstanceName() == config.instance_name; })) { return RegisterInstanceCoordinatorStatus::NAME_EXISTS; } - if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { - spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress()); + if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) { return instance.SocketAddress() == config.SocketAddress(); })) { return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS; } - CoordinatorClientConfig::ReplicationClientInfo replication_client_info_copy = config.replication_client_info; + try { + registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); + return RegisterInstanceCoordinatorStatus::SUCCESS; - // TODO (antoniofilipovic) create and then push back - auto *instance = ®istered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_, - replication_coordination_glue::ReplicationRole::REPLICA); - if (auto res = instance->client_.SendSetToReplicaRpc(replication_client_info_copy); !res) { + } catch (CoordinatorRegisterInstanceException const &) { return RegisterInstanceCoordinatorStatus::RPC_FAILED; } - - instance->client_.StartFrequentCheck(); - - return RegisterInstanceCoordinatorStatus::SUCCESS; } } // namespace memgraph::coordination diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp new file mode 100644 index 000000000..0180b99c1 --- /dev/null +++ b/src/coordination/coordinator_instance.cpp @@ -0,0 +1,84 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_instance.hpp" + +namespace memgraph::coordination { + +CoordinatorInstance::CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, + HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) + : client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)), + replication_role_(replication_coordination_glue::ReplicationRole::REPLICA), + is_alive_(true) { + if (!client_.DemoteToReplica()) { + throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); + } + client_.StartFrequentCheck(); +} + +auto CoordinatorInstance::OnSuccessPing() -> void { + last_response_time_ = std::chrono::system_clock::now(); + is_alive_ = true; +} + +auto CoordinatorInstance::OnFailPing() -> bool { + is_alive_ = + std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() < + CoordinatorClusterConfig::alive_response_time_difference_sec_; + return is_alive_; +} + +auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); } +auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); } +auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; } + +auto CoordinatorInstance::IsReplica() const -> bool { + return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA; +} +auto CoordinatorInstance::IsMain() const -> bool { + return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; +} + +auto CoordinatorInstance::PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb, + HealthCheckCallback main_fail_cb) -> bool { + if (!client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) { + return false; + } + + replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; + client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb)); + + return true; +} + +auto CoordinatorInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) + -> bool { + if (!client_.DemoteToReplica()) { + return false; + } + + replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA; + client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb)); + + return true; +} + +auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); } +auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); } + +auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo { + return client_.ReplicationClientInfo(); +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index e8a16f0e2..053e46f13 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -36,19 +36,19 @@ void PromoteReplicaToMainRes::Load(PromoteReplicaToMainRes *self, memgraph::slk: memgraph::slk::Load(self, reader); } -void SetMainToReplicaReq::Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder) { +void DemoteMainToReplicaReq::Save(const DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self, builder); } -void SetMainToReplicaReq::Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader) { +void DemoteMainToReplicaReq::Load(DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(self, reader); } -void SetMainToReplicaRes::Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder) { +void DemoteMainToReplicaRes::Save(const DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self, builder); } -void SetMainToReplicaRes::Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader) { +void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(self, reader); } @@ -60,11 +60,11 @@ constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::Ty constexpr utils::TypeInfo coordination::PromoteReplicaToMainRes::kType{utils::TypeId::COORD_FAILOVER_RES, "CoordPromoteReplicaToMainRes", nullptr}; -constexpr utils::TypeInfo coordination::SetMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ, - "CoordSetReplMainReq", nullptr}; +constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ, + "CoordDemoteToReplicaReq", nullptr}; -constexpr utils::TypeInfo coordination::SetMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES, - "CoordSetReplMainRes", nullptr}; +constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES, + "CoordDemoteToReplicaRes", nullptr}; namespace slk { @@ -84,19 +84,19 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk:: memgraph::slk::Load(&self->replication_clients_info, reader); } -void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder) { +void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.replication_client_info, builder); } -void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader) { +void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(&self->replication_client_info, reader); } -void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder) { +void Save(const memgraph::coordination::DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.success, builder); } -void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader) { +void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(&self->success, reader); } diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 60ec458ac..96ad1902e 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -74,12 +74,6 @@ auto CoordinatorState::ShowInstances() const -> std::vector<CoordinatorInstanceS return std::get<CoordinatorData>(data_).ShowInstances(); } -[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative"); - auto &coord_state = std::get<CoordinatorData>(data_); - return coord_state.DoFailover(); -} - auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & { MG_ASSERT(std::holds_alternative<CoordinatorMainReplicaData>(data_), "Cannot get coordinator server since variant holds wrong alternative"); diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 1bc361a57..a8d49e00e 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -21,12 +21,10 @@ namespace memgraph::coordination { class CoordinatorData; using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>; +using ReplicationClientsInfo = std::vector<ReplClientInfo>; class CoordinatorClient { public: - using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo; - using ReplicationClientsInfo = std::vector<ReplClientInfo>; - explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb, HealthCheckCallback fail_cb); @@ -46,15 +44,12 @@ class CoordinatorClient { auto InstanceName() const -> std::string; auto SocketAddress() const -> std::string; - auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; + [[nodiscard]] auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; + [[nodiscard]] auto DemoteToReplica() const -> bool; - auto ReplicationClientInfo() const -> const ReplClientInfo &; - auto ResetReplicationClientInfo() -> void; + auto ReplicationClientInfo() const -> ReplClientInfo; - auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool; - - auto SetSuccCallback(HealthCheckCallback succ_cb) -> void; - auto SetFailCallback(HealthCheckCallback fail_cb) -> void; + auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { return first.config_ == second.config_; diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index bbbed9dd7..f72b3a6ad 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -32,9 +32,7 @@ struct CoordinatorClientConfig { auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); } - // Info which coordinator will send to new main when performing failover struct ReplicationClientInfo { - // Must be the same as CoordinatorClientConfig's instance_name std::string instance_name; replication_coordination_glue::ReplicationMode replication_mode{}; std::string replication_ip_address; @@ -43,7 +41,6 @@ struct CoordinatorClientConfig { friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default; }; - // Each instance has replication config in case it fails ReplicationClientInfo replication_client_info; struct SSL { @@ -58,6 +55,8 @@ struct CoordinatorClientConfig { friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default; }; +using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo; + struct CoordinatorServerConfig { std::string ip_address; uint16_t port{}; diff --git a/src/coordination/include/coordination/coordinator_data.hpp b/src/coordination/include/coordination/coordinator_data.hpp index d14f5e1db..4dff209f0 100644 --- a/src/coordination/include/coordination/coordinator_data.hpp +++ b/src/coordination/include/coordination/coordinator_data.hpp @@ -16,9 +16,9 @@ #include "coordination/coordinator_instance.hpp" #include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_server.hpp" -#include "coordination/failover_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" #include "utils/rw_lock.hpp" +#include "utils/thread_pool.hpp" #include <list> @@ -27,17 +27,20 @@ class CoordinatorData { public: CoordinatorData(); - [[nodiscard]] auto DoFailover() -> DoFailoverStatus; - [[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + auto TryFailover() -> void; + auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>; private: + auto ClusterHasAliveMain_() const -> bool; + mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ}; HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_; - // Must be std::list because we rely on pointer stability + // NOTE: Must be std::list because we rely on pointer stability std::list<CoordinatorInstance> registered_instances_; }; diff --git a/src/coordination/include/coordination/coordinator_exceptions.hpp b/src/coordination/include/coordination/coordinator_exceptions.hpp index 708fb81f3..c9e2dff81 100644 --- a/src/coordination/include/coordination/coordinator_exceptions.hpp +++ b/src/coordination/include/coordination/coordinator_exceptions.hpp @@ -16,16 +16,16 @@ #include "utils/exceptions.hpp" namespace memgraph::coordination { -class CoordinatorFailoverException final : public utils::BasicException { +class CoordinatorRegisterInstanceException final : public utils::BasicException { public: - explicit CoordinatorFailoverException(const std::string_view what) noexcept - : BasicException("Failover didn't complete successfully: " + std::string(what)) {} + explicit CoordinatorRegisterInstanceException(const std::string_view what) noexcept + : BasicException("Failed to create instance: " + std::string(what)) {} template <class... Args> - explicit CoordinatorFailoverException(fmt::format_string<Args...> fmt, Args &&...args) noexcept - : CoordinatorFailoverException(fmt::format(fmt, std::forward<Args>(args)...)) {} + explicit CoordinatorRegisterInstanceException(fmt::format_string<Args...> fmt, Args &&...args) noexcept + : CoordinatorRegisterInstanceException(fmt::format(fmt, std::forward<Args>(args)...)) {} - SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorFailoverException) + SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorRegisterInstanceException) }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 31a6d8204..e03fa30c7 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -15,6 +15,7 @@ #include "coordination/coordinator_client.hpp" #include "coordination/coordinator_cluster_config.hpp" +#include "coordination/coordinator_exceptions.hpp" #include "replication_coordination_glue/role.hpp" namespace memgraph::coordination { @@ -24,10 +25,7 @@ class CoordinatorData; class CoordinatorInstance { public: CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb, - HealthCheckCallback fail_cb, replication_coordination_glue::ReplicationRole replication_role) - : client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)), - replication_role_(replication_role), - is_alive_(true) {} + HealthCheckCallback fail_cb); CoordinatorInstance(CoordinatorInstance const &other) = delete; CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete; @@ -35,34 +33,27 @@ class CoordinatorInstance { CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete; ~CoordinatorInstance() = default; - auto UpdateInstanceStatus() -> bool { - is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_) - .count() < CoordinatorClusterConfig::alive_response_time_difference_sec_; - return is_alive_; - } - auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); } + auto OnSuccessPing() -> void; + auto OnFailPing() -> bool; - auto InstanceName() const -> std::string { return client_.InstanceName(); } - auto SocketAddress() const -> std::string { return client_.SocketAddress(); } - auto IsAlive() const -> bool { return is_alive_; } + auto IsAlive() const -> bool; - auto IsReplica() const -> bool { - return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA; - } - auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; } + auto InstanceName() const -> std::string; + auto SocketAddress() const -> std::string; - auto PrepareForFailover() -> void { client_.PauseFrequentCheck(); } - auto RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); } + auto IsReplica() const -> bool; + auto IsMain() const -> bool; - auto PostFailover(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void { - replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; - client_.SetSuccCallback(std::move(main_succ_cb)); - client_.SetFailCallback(std::move(main_fail_cb)); - // Comment with Andi but we shouldn't delete this, what if this MAIN FAILS AGAIN - // client_.ResetReplicationClientInfo(); - client_.ResumeFrequentCheck(); - } + auto PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb, + HealthCheckCallback main_fail_cb) -> bool; + auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool; + auto PauseFrequentCheck() -> void; + auto ResumeFrequentCheck() -> void; + + auto ReplicationClientInfo() const -> ReplClientInfo; + + private: CoordinatorClient client_; replication_coordination_glue::ReplicationRole replication_role_; std::chrono::system_clock::time_point last_response_time_{}; diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 99996ef52..d2786ef62 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -48,35 +48,35 @@ struct PromoteReplicaToMainRes { using PromoteReplicaToMainRpc = rpc::RequestResponse<PromoteReplicaToMainReq, PromoteReplicaToMainRes>; -struct SetMainToReplicaReq { +struct DemoteMainToReplicaReq { static const utils::TypeInfo kType; static const utils::TypeInfo &GetTypeInfo() { return kType; } - static void Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader); - static void Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder); + static void Load(DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader); + static void Save(const DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder); - explicit SetMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info) + explicit DemoteMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info) : replication_client_info(std::move(replication_client_info)) {} - SetMainToReplicaReq() = default; + DemoteMainToReplicaReq() = default; CoordinatorClientConfig::ReplicationClientInfo replication_client_info; }; -struct SetMainToReplicaRes { +struct DemoteMainToReplicaRes { static const utils::TypeInfo kType; static const utils::TypeInfo &GetTypeInfo() { return kType; } - static void Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader); - static void Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder); + static void Load(DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader); + static void Save(const DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder); - explicit SetMainToReplicaRes(bool success) : success(success) {} - SetMainToReplicaRes() = default; + explicit DemoteMainToReplicaRes(bool success) : success(success) {} + DemoteMainToReplicaRes() = default; bool success; }; -using SetMainToReplicaRpc = rpc::RequestResponse<SetMainToReplicaReq, SetMainToReplicaRes>; +using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>; } // namespace memgraph::coordination @@ -91,13 +91,13 @@ void Save(const memgraph::coordination::PromoteReplicaToMainReq &self, memgraph: void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::Reader *reader); -void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder); +void Save(const memgraph::coordination::DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder); -void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader); +void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader); -void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder); +void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder); -void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader); +void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader); } // namespace memgraph::slk diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 9cf2d2471..5f52f85e5 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -16,7 +16,6 @@ #include "coordination/coordinator_data.hpp" #include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_server.hpp" -#include "coordination/failover_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" #include <variant> @@ -28,8 +27,8 @@ class CoordinatorState { CoordinatorState(); ~CoordinatorState() = default; - CoordinatorState(const CoordinatorState &) = delete; - CoordinatorState &operator=(const CoordinatorState &) = delete; + CoordinatorState(CoordinatorState const &) = delete; + CoordinatorState &operator=(CoordinatorState const &) = delete; CoordinatorState(CoordinatorState &&) noexcept = delete; CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; @@ -43,8 +42,6 @@ class CoordinatorState { // The client code must check that the server exists before calling this method. auto GetCoordinatorServer() const -> CoordinatorServer &; - [[nodiscard]] auto DoFailover() -> DoFailoverStatus; - private: std::variant<CoordinatorData, CoordinatorMainReplicaData> data_; }; diff --git a/src/coordination/include/coordination/failover_status.hpp b/src/coordination/include/coordination/failover_status.hpp deleted file mode 100644 index 9cfa0ffe6..000000000 --- a/src/coordination/include/coordination/failover_status.hpp +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#pragma once - -#ifdef MG_ENTERPRISE - -#include <cstdint> - -namespace memgraph::coordination { -enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, RPC_FAILED }; -} // namespace memgraph::coordination -#endif diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index acb191bfd..2a8d199af 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -20,7 +20,6 @@ namespace memgraph::coordination { enum class RegisterInstanceCoordinatorStatus : uint8_t { NAME_EXISTS, END_POINT_EXISTS, - COULD_NOT_BE_PERSISTED, NOT_COORDINATOR, RPC_FAILED, SUCCESS diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index 233532cbc..6f7ad8ce5 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -13,12 +13,10 @@ #ifdef MG_ENTERPRISE -#include "utils/result.hpp" - #include "coordination/coordinator_config.hpp" #include "coordination/coordinator_instance_status.hpp" -#include "coordination/failover_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" +#include "utils/result.hpp" #include <cstdint> #include <optional> diff --git a/src/dbms/coordinator_handlers.cpp b/src/dbms/coordinator_handlers.cpp index 5c051408e..42f3a336b 100644 --- a/src/dbms/coordinator_handlers.cpp +++ b/src/dbms/coordinator_handlers.cpp @@ -12,12 +12,12 @@ #ifdef MG_ENTERPRISE #include "dbms/coordinator_handlers.hpp" -#include "dbms/utils.hpp" #include "coordination/coordinator_exceptions.hpp" #include "coordination/coordinator_rpc.hpp" #include "dbms/dbms_handler.hpp" #include "dbms/replication_client.hpp" +#include "dbms/utils.hpp" #include "range/v3/view.hpp" @@ -32,31 +32,33 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) { CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder); }); - server.Register<coordination::SetMainToReplicaRpc>( + server.Register<coordination::DemoteMainToReplicaRpc>( [&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { - spdlog::info("Received SetMainToReplicaRpc from coordinator server"); - CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder); + spdlog::info("Received DemoteMainToReplicaRpc from coordinator server"); + CoordinatorHandlers::DemoteMainToReplicaHandler(dbms_handler, req_reader, res_builder); }); } -void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, - slk::Builder *res_builder) { +void CoordinatorHandlers::DemoteMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, + slk::Builder *res_builder) { auto &repl_state = dbms_handler.ReplicationState(); + spdlog::info("Executing SetMainToReplicaHandler"); - if (!repl_state.IsMain()) { + if (repl_state.IsReplica()) { spdlog::error("Setting to replica must be performed on main."); - slk::Save(coordination::SetMainToReplicaRes{false}, res_builder); + slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder); return; } - coordination::SetMainToReplicaReq req; + coordination::DemoteMainToReplicaReq req; slk::Load(&req, req_reader); - replication::ReplicationServerConfig clients_config{.ip_address = req.replication_client_info.replication_ip_address, - .port = req.replication_client_info.replication_port}; + const replication::ReplicationServerConfig clients_config{ + .ip_address = req.replication_client_info.replication_ip_address, + .port = req.replication_client_info.replication_port}; - if (bool success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) { - spdlog::error("Setting main to replica failed!"); + if (bool const success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) { + spdlog::error("Demoting main to replica failed!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; } @@ -69,16 +71,14 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, auto &repl_state = dbms_handler.ReplicationState(); if (!repl_state.IsReplica()) { - spdlog::error("Failover must be performed on replica!"); + spdlog::error("Only replica can be promoted to main!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; } - auto repl_server_config = std::get<replication::RoleReplicaData>(repl_state.ReplicationData()).config; - // This can fail because of disk. If it does, the cluster state could get inconsistent. // We don't handle disk issues. - if (bool success = memgraph::dbms::DoReplicaToMainPromotion(dbms_handler); !success) { + if (bool const success = memgraph::dbms::DoReplicaToMainPromotion(dbms_handler); !success) { spdlog::error("Promoting replica to main failed!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; @@ -104,28 +104,29 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) { auto instance_client = repl_state.RegisterReplica(config); if (instance_client.HasError()) { + using enum memgraph::replication::RegisterReplicaError; switch (instance_client.GetError()) { // Can't happen, we are already replica - case memgraph::replication::RegisterReplicaError::NOT_MAIN: - spdlog::error("Failover must be performed to main!"); + case NOT_MAIN: + spdlog::error("Failover must be performed on main!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; // Can't happen, checked on the coordinator side - case memgraph::replication::RegisterReplicaError::NAME_EXISTS: + case NAME_EXISTS: spdlog::error("Replica with the same name already exists!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; // Can't happen, checked on the coordinator side - case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS: + case ENDPOINT_EXISTS: spdlog::error("Replica with the same endpoint already exists!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; // We don't handle disk issues - case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: + case COULD_NOT_BE_PERSISTED: spdlog::error("Registered replica could not be persisted!"); slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); return; - case memgraph::replication::RegisterReplicaError::SUCCESS: + case SUCCESS: break; } } @@ -138,9 +139,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, // Update system before enabling individual storage <-> replica clients dbms_handler.SystemRestore(instance_client_ref); - // TODO: (andi) Policy for register all databases - // Will be resolved after deciding about choosing new replica - const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler, instance_client_ref); + const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients<true>(dbms_handler, instance_client_ref); MG_ASSERT(all_clients_good, "Failed to register one or more databases to the REPLICA \"{}\".", config.name); StartReplicaClient(dbms_handler, instance_client_ref); diff --git a/src/dbms/coordinator_handlers.hpp b/src/dbms/coordinator_handlers.hpp index ae4c59a0a..f41de50a9 100644 --- a/src/dbms/coordinator_handlers.hpp +++ b/src/dbms/coordinator_handlers.hpp @@ -26,7 +26,7 @@ class CoordinatorHandlers { private: static void PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder); - static void SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder); + static void DemoteMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/dbms/utils.hpp b/src/dbms/utils.hpp index fd5db9cf1..801ac9be3 100644 --- a/src/dbms/utils.hpp +++ b/src/dbms/utils.hpp @@ -76,6 +76,7 @@ inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler, return success; } +template <bool AllowRPCFailure = false> inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler, replication::ReplicationClient &instance_client) { if (!allow_mt_repl && dbms_handler.All().size() > 1) { @@ -84,7 +85,6 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler, bool all_clients_good = true; - // Add database specific clients (NOTE Currently all databases are connected to each replica) dbms_handler.ForEach([&](DatabaseAccess db_acc) { auto *storage = db_acc->storage(); if (!allow_mt_repl && storage->name() != kDefaultDB) { @@ -93,16 +93,14 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler, // TODO: ATM only IN_MEMORY_TRANSACTIONAL, fix other modes if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return; + using enum storage::replication::ReplicaState; + all_clients_good &= storage->repl_storage_state_.replication_clients_.WithLock( [storage, &instance_client, db_acc = std::move(db_acc)](auto &storage_clients) mutable { // NOLINT auto client = std::make_unique<storage::ReplicationStorageClient>(instance_client); - // All good, start replica client client->Start(storage, std::move(db_acc)); - // After start the storage <-> replica state should be READY or RECOVERING (if correctly started) - // MAYBE_BEHIND isn't a statement of the current state, this is the default value - // Failed to start due an error like branching of MAIN and REPLICA - if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { - return false; // TODO: sometimes we need to still add to storage_clients + if (client->State() == MAYBE_BEHIND && !AllowRPCFailure) { + return false; } storage_clients.push_back(std::move(client)); return true; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index e41184468..a99eda3e9 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -386,7 +386,7 @@ replicationSocketAddress : literal ; registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC ) TO socketAddress ; -registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ; +registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ; setInstanceToMain : SET INSTANCE instanceName TO MAIN ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 1576df7c4..af46865db 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -500,14 +500,12 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { case END_POINT_EXISTS: throw QueryRuntimeException( "Couldn't register replica instance since instance with such endpoint already exists!"); - case COULD_NOT_BE_PERSISTED: - throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!"); case NOT_COORDINATOR: - throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!"); + throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!"); case RPC_FAILED: throw QueryRuntimeException( - "Couldn't register replica because promotion on replica failed! Check logs on replica to find out more " - "info!"); + "Couldn't register replica instance because setting instance to replica failed! Check logs on replica to " + "find out more info!"); case SUCCESS: break; } @@ -520,10 +518,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { case NO_INSTANCE_WITH_NAME: throw QueryRuntimeException("No instance with such name!"); case NOT_COORDINATOR: - throw QueryRuntimeException("Couldn't set replica instance to main since this instance is not a coordinator!"); + throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!"); case COULD_NOT_PROMOTE_TO_MAIN: throw QueryRuntimeException( - "Couldn't set replica instance to main. Check coordinator and replica for more logs"); + "Couldn't set replica instance to main!. Check coordinator and replica for more logs"); case SUCCESS: break; } diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index 3533594ce..b2a55a40a 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -15,6 +15,7 @@ #include <cstdint> #include <filesystem> +#include "flags/replication.hpp" #include "storage/v2/isolation_level.hpp" #include "storage/v2/storage_mode.hpp" #include "utils/exceptions.hpp" @@ -128,10 +129,15 @@ struct Config { }; inline auto ReplicationStateRootPath(memgraph::storage::Config const &config) -> std::optional<std::filesystem::path> { - if (!config.durability.restore_replication_state_on_startup) { + if (!config.durability.restore_replication_state_on_startup +#ifdef MG_ENTERPRISE + && !FLAGS_coordinator_server_port +#endif + ) { spdlog::warn( "Replication configuration will NOT be stored. When the server restarts, replication state will be " "forgotten."); + return std::nullopt; } return {config.durability.storage_directory}; diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index b68618e04..bbe9a9bb1 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -35,6 +35,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce auto hb_stream{client_.rpc_client_.Stream<replication::HeartbeatRpc>( storage->uuid(), replStorageState.last_commit_timestamp_, std::string{replStorageState.epoch_.id()})}; + const auto replica = hb_stream.AwaitResponse(); #ifdef MG_ENTERPRISE // Multi-tenancy is only supported in enterprise @@ -67,7 +68,6 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce "now hold unique data. Please resolve data conflicts and start the " "replication on a clean instance.", client_.name_, client_.name_, client_.name_); - // TODO: (andi) Talk about renaming MAYBE_BEHIND to branching // State not updated, hence in MAYBE_BEHIND state return; } diff --git a/src/utils/on_scope_exit.hpp b/src/utils/on_scope_exit.hpp index 1c0b34fef..a5398b017 100644 --- a/src/utils/on_scope_exit.hpp +++ b/src/utils/on_scope_exit.hpp @@ -32,7 +32,7 @@ namespace memgraph::utils { * void long_function() { * resource.enable(); * OnScopeExit on_exit([&resource] { resource.disable(); }); - * // long block of code, might trow an exception + * // long block of code, might throw an exception * } */ template <typename Callable> diff --git a/tests/e2e/high_availability_experimental/CMakeLists.txt b/tests/e2e/high_availability_experimental/CMakeLists.txt index 76e1a6956..e587d6fef 100644 --- a/tests/e2e/high_availability_experimental/CMakeLists.txt +++ b/tests/e2e/high_availability_experimental/CMakeLists.txt @@ -4,7 +4,6 @@ copy_e2e_python_files(ha_experimental coordinator.py) copy_e2e_python_files(ha_experimental automatic_failover.py) copy_e2e_python_files(ha_experimental manual_setting_replicas.py) copy_e2e_python_files(ha_experimental common.py) -copy_e2e_python_files(ha_experimental conftest.py) copy_e2e_python_files(ha_experimental workloads.yaml) copy_e2e_python_files_from_parent_folder(ha_experimental ".." memgraph.py) diff --git a/tests/e2e/high_availability_experimental/automatic_failover.py b/tests/e2e/high_availability_experimental/automatic_failover.py index f3ffadfe8..5c43ac62d 100644 --- a/tests/e2e/high_availability_experimental/automatic_failover.py +++ b/tests/e2e/high_availability_experimental/automatic_failover.py @@ -1,4 +1,4 @@ -# Copyright 2024 Memgraph Ltd. +# Copyright 2022 Memgraph Ltd. # # Use of this software is governed by the Business Source License # included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,11 +10,13 @@ # licenses/APL.txt. import os +import shutil import sys +import tempfile import interactive_mg_runner import pytest -from common import execute_and_fetch_all +from common import connect, execute_and_fetch_all, safe_execute from mg_utils import mg_sleep_and_assert interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -24,25 +26,51 @@ interactive_mg_runner.PROJECT_DIR = os.path.normpath( interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) +TEMP_DIR = tempfile.TemporaryDirectory().name + MEMGRAPH_INSTANCES_DESCRIPTION = { "instance_1": { - "args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"], - "log_file": "replica1.log", + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/instance_1", "setup_queries": [], }, "instance_2": { - "args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"], - "log_file": "replica2.log", + "args": [ + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", "setup_queries": [], }, "instance_3": { - "args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"], - "log_file": "main.log", + "args": [ + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", "setup_queries": [], }, "coordinator": { "args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"], - "log_file": "replica3.log", + "log_file": "coordinator.log", "setup_queries": [ "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", @@ -53,7 +81,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { } -def test_replication_works_on_failover(connection): +def test_replication_works_on_failover(): # Goal of this test is to check the replication works after failover command. # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. # 2. We check that main has correct state @@ -61,12 +89,13 @@ def test_replication_works_on_failover(connection): # 4. We check that coordinator and new main have correct state # 5. We insert one vertex on new main # 6. We check that vertex appears on new replica + safe_execute(shutil.rmtree, TEMP_DIR) # 1 interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) # 2 - main_cursor = connection(7687, "instance_3").cursor() + main_cursor = connect(host="localhost", port=7687).cursor() expected_data_on_main = [ ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), @@ -78,7 +107,7 @@ def test_replication_works_on_failover(connection): interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") # 4 - coord_cursor = connection(7690, "coordinator").cursor() + coord_cursor = connect(host="localhost", port=7690).cursor() def retrieve_data_show_repl_cluster(): return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) @@ -86,17 +115,25 @@ def test_replication_works_on_failover(connection): expected_data_on_coord = [ ("instance_1", "127.0.0.1:10011", True, "main"), ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", False, ""), + ("instance_3", "127.0.0.1:10013", False, "unknown"), ] mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) - new_main_cursor = connection(7688, "instance_1").cursor() + new_main_cursor = connect(host="localhost", port=7688).cursor() def retrieve_data_show_replicas(): return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) expected_data_on_new_main = [ ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"), + ] + mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + expected_data_on_new_main = [ + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"), ] mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) @@ -104,62 +141,68 @@ def test_replication_works_on_failover(connection): execute_and_fetch_all(new_main_cursor, "CREATE ();") # 6 - alive_replica_cursror = connection(7689, "instance_2").cursor() + alive_replica_cursror = connect(host="localhost", port=7689).cursor() res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0] assert res == 1, "Vertex should be replicated" interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION) -def test_show_replication_cluster(connection): - # Goal of this test is to check the SHOW REPLICATION CLUSTER command. - # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. - # 2. We check that all replicas and main have the correct state: they should all be alive. - # 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command. - # 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command. - - # 1. +def test_show_replication_cluster(): + safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - cursor = connection(7690, "coordinator").cursor() + instance1_cursor = connect(host="localhost", port=7688).cursor() + instance2_cursor = connect(host="localhost", port=7689).cursor() + instance3_cursor = connect(host="localhost", port=7687).cursor() + coord_cursor = connect(host="localhost", port=7690).cursor() - # 2. - - # We leave some time for the coordinator to realise the replicas are down. - def retrieve_data(): - return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;"))) + def show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) expected_data = [ ("instance_1", "127.0.0.1:10011", True, "replica"), ("instance_2", "127.0.0.1:10012", True, "replica"), ("instance_3", "127.0.0.1:10013", True, "main"), ] - mg_sleep_and_assert(expected_data, retrieve_data) + mg_sleep_and_assert(expected_data, show_repl_cluster) + + def retrieve_data_show_repl_role_instance1(): + return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;"))) + + def retrieve_data_show_repl_role_instance2(): + return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;"))) + + def retrieve_data_show_repl_role_instance3(): + return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;"))) + + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1) + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2) + mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3) - # 3. interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") expected_data = [ - ("instance_1", "127.0.0.1:10011", False, ""), + ("instance_1", "127.0.0.1:10011", False, "unknown"), ("instance_2", "127.0.0.1:10012", True, "replica"), ("instance_3", "127.0.0.1:10013", True, "main"), ] - mg_sleep_and_assert(expected_data, retrieve_data) + mg_sleep_and_assert(expected_data, show_repl_cluster) - # 4. interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") expected_data = [ - ("instance_1", "127.0.0.1:10011", False, ""), - ("instance_2", "127.0.0.1:10012", False, ""), + ("instance_1", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "127.0.0.1:10012", False, "unknown"), ("instance_3", "127.0.0.1:10013", True, "main"), ] - mg_sleep_and_assert(expected_data, retrieve_data) + mg_sleep_and_assert(expected_data, show_repl_cluster) -def test_simple_automatic_failover(connection): +def test_simple_automatic_failover(): + safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - main_cursor = connection(7687, "instance_3").cursor() + main_cursor = connect(host="localhost", port=7687).cursor() expected_data_on_main = [ ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), @@ -169,7 +212,7 @@ def test_simple_automatic_failover(connection): interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") - coord_cursor = connection(7690, "coordinator").cursor() + coord_cursor = connect(host="localhost", port=7690).cursor() def retrieve_data_show_repl_cluster(): return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) @@ -177,46 +220,189 @@ def test_simple_automatic_failover(connection): expected_data_on_coord = [ ("instance_1", "127.0.0.1:10011", True, "main"), ("instance_2", "127.0.0.1:10012", True, "replica"), - ("instance_3", "127.0.0.1:10013", False, ""), + ("instance_3", "127.0.0.1:10013", False, "unknown"), ] mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) - new_main_cursor = connection(7688, "instance_1").cursor() + new_main_cursor = connect(host="localhost", port=7688).cursor() def retrieve_data_show_replicas(): return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) expected_data_on_new_main = [ ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"), ] mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + expected_data_on_new_main_old_alive = [ + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"), + ] -def test_registering_replica_fails_name_exists(connection): + mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) + + +def test_registering_replica_fails_name_exists(): + safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - coord_cursor = connection(7690, "coordinator").cursor() + coord_cursor = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all( coord_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';", ) assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!" + shutil.rmtree(TEMP_DIR) -def test_registering_replica_fails_endpoint_exists(connection): +def test_registering_replica_fails_endpoint_exists(): + safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - coord_cursor = connection(7690, "coordinator").cursor() + coord_cursor = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all( coord_cursor, - "REGISTER INSTANCE instance_5 ON '127.0.0.1:10001' WITH '127.0.0.1:10013';", + "REGISTER INSTANCE instance_5 ON '127.0.0.1:10011' WITH '127.0.0.1:10005';", ) - assert ( - str(e.value) - == "Couldn't register replica because promotion on replica failed! Check logs on replica to find out more info!" - ) + assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!" + + +def test_replica_instance_restarts(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + cursor = connect(host="localhost", port=7690).cursor() + + def show_repl_cluster(): + return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;"))) + + expected_data_up = [ + ("instance_1", "127.0.0.1:10011", True, "replica"), + ("instance_2", "127.0.0.1:10012", True, "replica"), + ("instance_3", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_up, show_repl_cluster) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + + expected_data_down = [ + ("instance_1", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "127.0.0.1:10012", True, "replica"), + ("instance_3", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_down, show_repl_cluster) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + + mg_sleep_and_assert(expected_data_up, show_repl_cluster) + + instance1_cursor = connect(host="localhost", port=7688).cursor() + + def retrieve_data_show_repl_role_instance1(): + return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;"))) + + expected_data_replica = [("replica",)] + mg_sleep_and_assert(expected_data_replica, retrieve_data_show_repl_role_instance1) + + +def test_automatic_failover_main_back_as_replica(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) + + expected_data_after_failover = [ + ("instance_1", "127.0.0.1:10011", True, "main"), + ("instance_2", "127.0.0.1:10012", True, "replica"), + ("instance_3", "127.0.0.1:10013", False, "unknown"), + ] + mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster) + + expected_data_after_main_coming_back = [ + ("instance_1", "127.0.0.1:10011", True, "main"), + ("instance_2", "127.0.0.1:10012", True, "replica"), + ("instance_3", "127.0.0.1:10013", True, "replica"), + ] + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + mg_sleep_and_assert(expected_data_after_main_coming_back, retrieve_data_show_repl_cluster) + + instance3_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_repl_role_instance3(): + return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;"))) + + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance3) + + +def test_automatic_failover_main_back_as_main(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))) + + expected_data_all_down = [ + ("instance_1", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "127.0.0.1:10013", False, "unknown"), + ] + + mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + expected_data_main_back = [ + ("instance_1", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster) + + instance3_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_repl_role_instance3(): + return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;"))) + + mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3) + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") + + expected_data_replicas_back = [ + ("instance_1", "127.0.0.1:10011", True, "replica"), + ("instance_2", "127.0.0.1:10012", True, "replica"), + ("instance_3", "127.0.0.1:10013", True, "main"), + ] + + mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster) + + instance1_cursor = connect(host="localhost", port=7688).cursor() + instance2_cursor = connect(host="localhost", port=7689).cursor() + + def retrieve_data_show_repl_role_instance1(): + return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;"))) + + def retrieve_data_show_repl_role_instance2(): + return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;"))) + + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1) + mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2) + mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3) if __name__ == "__main__": diff --git a/tests/e2e/high_availability_experimental/common.py b/tests/e2e/high_availability_experimental/common.py index dc104d628..adfabd87a 100644 --- a/tests/e2e/high_availability_experimental/common.py +++ b/tests/e2e/high_availability_experimental/common.py @@ -23,3 +23,10 @@ def connect(**kwargs) -> mgclient.Connection: connection = mgclient.connect(**kwargs) connection.autocommit = True return connection + + +def safe_execute(function, *args): + try: + function(*args) + except: + pass diff --git a/tests/e2e/high_availability_experimental/conftest.py b/tests/e2e/high_availability_experimental/conftest.py deleted file mode 100644 index 9100a63cf..000000000 --- a/tests/e2e/high_availability_experimental/conftest.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2022 Memgraph Ltd. -# -# Use of this software is governed by the Business Source License -# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -# License, and you may not use this file except in compliance with the Business Source License. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0, included in the file -# licenses/APL.txt. - -import pytest -from common import connect, execute_and_fetch_all - - -# The fixture here is more complex because the connection has to be -# parameterized based on the test parameters (info has to be available on both -# sides). -# -# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization -# is not an elegant/feasible solution here. -# -# The solution was independently developed and then I stumbled upon the same -# approach here https://stackoverflow.com/a/68286553/4888809 which I think is -# optimal. -@pytest.fixture(scope="function") -def connection(): - connection_holder = None - role_holder = None - - def inner_connection(port, role): - nonlocal connection_holder, role_holder - connection_holder = connect(host="localhost", port=port) - role_holder = role - return connection_holder - - yield inner_connection - - # Only main instance can be cleaned up because replicas do NOT accept - # writes. - if role_holder == "main": - cursor = connection_holder.cursor() - execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;") diff --git a/tests/e2e/high_availability_experimental/coordinator.py b/tests/e2e/high_availability_experimental/coordinator.py index e34e9f069..9e34a4167 100644 --- a/tests/e2e/high_availability_experimental/coordinator.py +++ b/tests/e2e/high_availability_experimental/coordinator.py @@ -12,33 +12,33 @@ import sys import pytest -from common import execute_and_fetch_all +from common import connect, execute_and_fetch_all from mg_utils import mg_sleep_and_assert -def test_disable_cypher_queries(connection): - cursor = connection(7690, "coordinator").cursor() +def test_disable_cypher_queries(): + cursor = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(cursor, "CREATE (n:TestNode {prop: 'test'})") assert str(e.value) == "Coordinator can run only coordinator queries!" -def test_coordinator_cannot_be_replica_role(connection): - cursor = connection(7690, "coordinator").cursor() +def test_coordinator_cannot_be_replica_role(): + cursor = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;") assert str(e.value) == "Coordinator can run only coordinator queries!" -def test_coordinator_cannot_run_show_repl_role(connection): - cursor = connection(7690, "coordinator").cursor() +def test_coordinator_cannot_run_show_repl_role(): + cursor = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(cursor, "SHOW REPLICATION ROLE;") assert str(e.value) == "Coordinator can run only coordinator queries!" -def test_coordinator_show_replication_cluster(connection): - cursor = connection(7690, "coordinator").cursor() +def test_coordinator_show_replication_cluster(): + cursor = connect(host="localhost", port=7690).cursor() def retrieve_data(): return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;"))) @@ -51,30 +51,30 @@ def test_coordinator_show_replication_cluster(connection): mg_sleep_and_assert(expected_data, retrieve_data) -def test_coordinator_cannot_call_show_replicas(connection): - cursor = connection(7690, "coordinator").cursor() +def test_coordinator_cannot_call_show_replicas(): + cursor = connect(host="localhost", port=7690).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(cursor, "SHOW REPLICAS;") assert str(e.value) == "Coordinator can run only coordinator queries!" @pytest.mark.parametrize( - "port, role", - [(7687, "main"), (7688, "replica"), (7689, "replica")], + "port", + [7687, 7688, 7689], ) -def test_main_and_replicas_cannot_call_show_repl_cluster(port, role, connection): - cursor = connection(port, role).cursor() +def test_main_and_replicas_cannot_call_show_repl_cluster(port): + cursor = connect(host="localhost", port=port).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;") assert str(e.value) == "Only coordinator can run SHOW REPLICATION CLUSTER." @pytest.mark.parametrize( - "port, role", - [(7687, "main"), (7688, "replica"), (7689, "replica")], + "port", + [7687, 7688, 7689], ) -def test_main_and_replicas_cannot_register_coord_server(port, role, connection): - cursor = connection(port, role).cursor() +def test_main_and_replicas_cannot_register_coord_server(port): + cursor = connect(host="localhost", port=port).cursor() with pytest.raises(Exception) as e: execute_and_fetch_all( cursor,