diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 4937c0ad3..60de3f21a 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -10,7 +10,6 @@ target_sources(mg-coordination include/coordination/coordinator_exceptions.hpp include/coordination/coordinator_slk.hpp include/coordination/coordinator_instance.hpp - include/coordination/coordinator_cluster_config.hpp include/coordination/coordinator_handlers.hpp include/coordination/constants.hpp include/coordination/instance_status.hpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index a30e504b7..6f9609aaa 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -41,16 +41,21 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; } auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); } +auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds { + return config_.instance_down_timeout_sec; +} + void CoordinatorClient::StartFrequentCheck() { if (instance_checker_.IsRunning()) { return; } - MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), + MG_ASSERT(config_.instance_health_check_frequency_sec > std::chrono::seconds(0), "Health check frequency must be greater than 0"); instance_checker_.Run( - config_.instance_name, config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] { + config_.instance_name, config_.instance_health_check_frequency_sec, + [this, instance_name = config_.instance_name] { try { spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name, rpc_client_.Endpoint().SocketAddress()); @@ -121,5 +126,19 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo return false; } +auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool { + try { + auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(instance_name)}; + if (!stream.AwaitResponse().success) { + spdlog::error("Failed to receive successful RPC response for unregistering replica!"); + return false; + } + return true; + } catch (rpc::RpcFailedException const &) { + spdlog::error("Failed to unregister replica!"); + } + return false; +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index fb0750935..a82fbea1f 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -39,6 +39,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se spdlog::info("Received SwapMainUUIDRPC on coordinator server"); CoordinatorHandlers::SwapMainUUIDHandler(replication_handler, req_reader, res_builder); }); + + server.Register<coordination::UnregisterReplicaRpc>( + [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received UnregisterReplicaRpc on coordinator server"); + CoordinatorHandlers::UnregisterReplicaHandler(replication_handler, req_reader, res_builder); + }); } void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, @@ -146,5 +152,37 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder); } +void CoordinatorHandlers::UnregisterReplicaHandler(replication::ReplicationHandler &replication_handler, + slk::Reader *req_reader, slk::Builder *res_builder) { + if (!replication_handler.IsMain()) { + spdlog::error("Unregistering replica must be performed on main."); + slk::Save(coordination::UnregisterReplicaRes{false}, res_builder); + return; + } + + coordination::UnregisterReplicaReq req; + slk::Load(&req, req_reader); + + auto res = replication_handler.UnregisterReplica(req.instance_name); + switch (res) { + using enum memgraph::query::UnregisterReplicaResult; + case SUCCESS: + slk::Save(coordination::UnregisterReplicaRes{true}, res_builder); + break; + case NOT_MAIN: + spdlog::error("Unregistering replica must be performed on main."); + slk::Save(coordination::UnregisterReplicaRes{false}, res_builder); + break; + case CAN_NOT_UNREGISTER: + spdlog::error("Could not unregister replica."); + slk::Save(coordination::UnregisterReplicaRes{false}, res_builder); + break; + case COULD_NOT_BE_PERSISTED: + spdlog::error("Could not persist replica unregistration."); + slk::Save(coordination::UnregisterReplicaRes{false}, res_builder); + break; + } +} + } // namespace memgraph::dbms #endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 4c3f3e646..070dcc49f 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -122,11 +122,6 @@ CoordinatorInstance::CoordinatorInstance() }; } -auto CoordinatorInstance::ClusterHasAliveMain_() const -> bool { - auto const alive_main = [](ReplicationInstance const &instance) { return instance.IsMain() && instance.IsAlive(); }; - return std::ranges::any_of(repl_instances_, alive_main); -} - auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> { auto const coord_instances = raft_state_.GetAllCoordinators(); @@ -308,6 +303,35 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co return RegisterInstanceCoordinatorStatus::SUCCESS; } +auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name) + -> UnregisterInstanceCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + auto const name_matches = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + + auto inst_to_remove = std::ranges::find_if(repl_instances_, name_matches); + if (inst_to_remove == repl_instances_.end()) { + return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME; + } + + if (inst_to_remove->IsMain() && inst_to_remove->IsAlive()) { + return UnregisterInstanceCoordinatorStatus::IS_MAIN; + } + + inst_to_remove->StopFrequentCheck(); + auto curr_main = std::ranges::find_if(repl_instances_, &ReplicationInstance::IsMain); + MG_ASSERT(curr_main != repl_instances_.end(), "There must be a main instance when unregistering a replica"); + if (!curr_main->SendUnregisterReplicaRpc(instance_name)) { + inst_to_remove->StartFrequentCheck(); + return UnregisterInstanceCoordinatorStatus::RPC_FAILED; + } + std::erase_if(repl_instances_, name_matches); + + return UnregisterInstanceCoordinatorStatus::SUCCESS; +} + auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void { raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index 2b5752a07..884eabfe2 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -52,6 +52,22 @@ void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::R memgraph::slk::Load(self, reader); } +void UnregisterReplicaReq::Save(UnregisterReplicaReq const &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void UnregisterReplicaReq::Load(UnregisterReplicaReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void UnregisterReplicaRes::Save(UnregisterReplicaRes const &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void UnregisterReplicaRes::Load(UnregisterReplicaRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -64,8 +80,15 @@ constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::Typ "CoordDemoteToReplicaReq", nullptr}; constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES, + "CoordDemoteToReplicaRes", nullptr}; +constexpr utils::TypeInfo coordination::UnregisterReplicaReq::kType{utils::TypeId::COORD_UNREGISTER_REPLICA_REQ, + "UnregisterReplicaReq", nullptr}; + +constexpr utils::TypeInfo coordination::UnregisterReplicaRes::kType{utils::TypeId::COORD_UNREGISTER_REPLICA_RES, + "UnregisterReplicaRes", nullptr}; + namespace slk { void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) { @@ -102,6 +125,22 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R memgraph::slk::Load(&self->success, reader); } +void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.instance_name, builder); +} + +void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->instance_name, reader); +} + +void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); +} + +void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index a2f6c9cee..28d6c604e 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -56,6 +56,20 @@ auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig confi data_); } +auto CoordinatorState::UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus { + MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), + "Coordinator cannot unregister instance since variant holds wrong alternative"); + + return std::visit( + memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { + return UnregisterInstanceCoordinatorStatus::NOT_COORDINATOR; + }, + [&instance_name](CoordinatorInstance &coordinator_instance) { + return coordinator_instance.UnregisterReplicationInstance(instance_name); + }}, + data_); +} + auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_), "Coordinator cannot register replica since variant holds wrong alternative"); diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 02bae1c03..bc2bc99c9 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -46,17 +46,22 @@ class CoordinatorClient { auto SocketAddress() const -> std::string; [[nodiscard]] auto DemoteToReplica() const -> bool; + // TODO: (andi) Consistent naming auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const -> bool; auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool; + auto SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool; + auto ReplicationClientInfo() const -> ReplClientInfo; auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; auto RpcClient() -> rpc::Client & { return rpc_client_; } + auto InstanceDownTimeoutSec() const -> std::chrono::seconds; + friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { return first.config_ == second.config_; } diff --git a/src/coordination/include/coordination/coordinator_cluster_config.hpp b/src/coordination/include/coordination/coordinator_cluster_config.hpp deleted file mode 100644 index e1d91ff7d..000000000 --- a/src/coordination/include/coordination/coordinator_cluster_config.hpp +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#pragma once - -#ifdef MG_ENTERPRISE -namespace memgraph::coordination { - -struct CoordinatorClusterConfig { - static constexpr int alive_response_time_difference_sec_{5}; -}; - -} // namespace memgraph::coordination -#endif diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index f72b3a6ad..b93a16085 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -28,7 +28,8 @@ struct CoordinatorClientConfig { std::string instance_name; std::string ip_address; uint16_t port{}; - std::chrono::seconds health_check_frequency_sec{1}; + std::chrono::seconds instance_health_check_frequency_sec{1}; + std::chrono::seconds instance_down_timeout_sec{5}; auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); } diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index 4aa4656c3..b3c5c1580 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -33,6 +33,9 @@ class CoordinatorHandlers { slk::Builder *res_builder); static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); + + static void UnregisterReplicaHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, + slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index bc6954b37..15b377ed9 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -30,6 +30,7 @@ class CoordinatorInstance { CoordinatorInstance(); [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus; [[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; @@ -44,8 +45,6 @@ class CoordinatorInstance { auto SetMainUUID(utils::UUID new_uuid) -> void; private: - auto ClusterHasAliveMain_() const -> bool; - HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_; // NOTE: Must be std::list because we rely on pointer stability diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 56cfdb403..a52597dd8 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -82,6 +82,35 @@ struct DemoteMainToReplicaRes { using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>; +struct UnregisterReplicaReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(UnregisterReplicaReq *self, memgraph::slk::Reader *reader); + static void Save(UnregisterReplicaReq const &self, memgraph::slk::Builder *builder); + + explicit UnregisterReplicaReq(std::string instance_name) : instance_name(std::move(instance_name)) {} + + UnregisterReplicaReq() = default; + + std::string instance_name; +}; + +struct UnregisterReplicaRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(UnregisterReplicaRes *self, memgraph::slk::Reader *reader); + static void Save(const UnregisterReplicaRes &self, memgraph::slk::Builder *builder); + + explicit UnregisterReplicaRes(bool success) : success(success) {} + UnregisterReplicaRes() = default; + + bool success; +}; + +using UnregisterReplicaRpc = rpc::RequestResponse<UnregisterReplicaReq, UnregisterReplicaRes>; + } // namespace memgraph::coordination // SLK serialization declarations @@ -99,6 +128,11 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader); +// UnregisterReplicaRpc +void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader); +void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader); } // namespace memgraph::slk diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 8830d1b49..256af66f9 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -34,6 +34,7 @@ class CoordinatorState { CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus; [[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index 3a0df5607..5fcb93921 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -28,6 +28,15 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t { SUCCESS }; +enum class UnregisterInstanceCoordinatorStatus : uint8_t { + NO_INSTANCE_WITH_NAME, + IS_MAIN, + NOT_COORDINATOR, + NOT_LEADER, + RPC_FAILED, + SUCCESS, +}; + enum class SetInstanceToMainCoordinatorStatus : uint8_t { NO_INSTANCE_WITH_NAME, NOT_COORDINATOR, diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 713a66fd8..1a0b66402 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -14,7 +14,6 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_client.hpp" -#include "coordination/coordinator_cluster_config.hpp" #include "coordination/coordinator_exceptions.hpp" #include "replication_coordination_glue/role.hpp" @@ -59,12 +58,16 @@ class ReplicationInstance { auto ReplicationClientInfo() const -> ReplClientInfo; auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool; + auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool; + auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool; + + // TODO: (andi) Inconsistent API auto GetClient() -> CoordinatorClient &; auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; auto ResetMainUUID() -> void; - auto GetMainUUID() -> const std::optional<utils::UUID> &; + auto GetMainUUID() const -> const std::optional<utils::UUID> &; private: CoordinatorClient client_; diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 0fb13998c..6cbd24985 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -34,9 +34,8 @@ auto ReplicationInstance::OnSuccessPing() -> void { } auto ReplicationInstance::OnFailPing() -> bool { - is_alive_ = - std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() < - CoordinatorClusterConfig::alive_response_time_difference_sec_; + auto elapsed_time = std::chrono::system_clock::now() - last_response_time_; + is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec(); return is_alive_; } @@ -86,9 +85,10 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf } auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } + auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; } -auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; } +auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; } auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { if (!main_uuid_ || *main_uuid_ != curr_main_uuid) { @@ -105,5 +105,9 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid return true; } +auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_name) -> bool { + return client_.SendUnregisterReplicaRpc(instance_name); +} + } // namespace memgraph::coordination #endif diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index b623e1db6..f8e14e2a0 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -25,6 +25,11 @@ auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::Coo return coordinator_state_.RegisterReplicationInstance(config); } +auto CoordinatorHandler::UnregisterReplicationInstance(std::string instance_name) + -> coordination::UnregisterInstanceCoordinatorStatus { + return coordinator_state_.UnregisterReplicationInstance(std::move(instance_name)); +} + auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus { return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name)); diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index 03d45ee41..d06e70676 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -28,9 +28,13 @@ class CoordinatorHandler { public: explicit CoordinatorHandler(coordination::CoordinatorState &coordinator_state); + // TODO: (andi) When moving coordinator state on same instances, rename from RegisterReplicationInstance to + // RegisterInstance auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config) -> coordination::RegisterInstanceCoordinatorStatus; + auto UnregisterReplicationInstance(std::string instance_name) -> coordination::UnregisterInstanceCoordinatorStatus; + auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; auto ShowInstances() const -> std::vector<coordination::InstanceStatus>; diff --git a/src/flags/replication.cpp b/src/flags/replication.cpp index 29c7bfbda..ccecb4a20 100644 --- a/src/flags/replication.cpp +++ b/src/flags/replication.cpp @@ -18,6 +18,10 @@ DEFINE_uint32(coordinator_server_port, 0, "Port on which coordinator servers wil DEFINE_uint32(raft_server_port, 0, "Port on which raft servers will be started."); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint32(raft_server_id, 0, "Unique ID of the raft server."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32(instance_down_timeout_sec, 5, "Time duration after which an instance is considered down."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32(instance_health_check_frequency_sec, 1, "The time duration between two health checks/pings."); #endif // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/flags/replication.hpp b/src/flags/replication.hpp index 025079271..51d50bb09 100644 --- a/src/flags/replication.hpp +++ b/src/flags/replication.hpp @@ -20,6 +20,10 @@ DECLARE_uint32(coordinator_server_port); DECLARE_uint32(raft_server_port); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint32(raft_server_id); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(instance_down_timeout_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(instance_health_check_frequency_sec); #endif // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index b965b82a9..1ba23a744 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -356,6 +356,10 @@ int main(int argc, char **argv) { memgraph::query::InterpreterConfig interp_config{ .query = {.allow_load_csv = FLAGS_allow_load_csv}, .replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec), +#ifdef MG_ENTERPRISE + .instance_down_timeout_sec = std::chrono::seconds(FLAGS_instance_down_timeout_sec), + .instance_health_check_frequency_sec = std::chrono::seconds(FLAGS_instance_health_check_frequency_sec), +#endif .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers, .default_pulsar_service_url = FLAGS_pulsar_service_url, .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries, diff --git a/src/query/config.hpp b/src/query/config.hpp index 88c3dd00e..04fa272c9 100644 --- a/src/query/config.hpp +++ b/src/query/config.hpp @@ -22,6 +22,9 @@ struct InterpreterConfig { // The same as \ref memgraph::replication::ReplicationClientConfig std::chrono::seconds replication_replica_check_frequency{1}; + std::chrono::seconds instance_down_timeout_sec{5}; + std::chrono::seconds instance_health_check_frequency_sec{1}; + std::string default_kafka_bootstrap_servers; std::string default_pulsar_service_url; uint32_t stream_transaction_conflict_retries; diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index 9b3a14672..17cf2a76b 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3071,7 +3071,13 @@ class CoordinatorQuery : public memgraph::query::Query { static const utils::TypeInfo kType; const utils::TypeInfo &GetTypeInfo() const override { return kType; } - enum class Action { REGISTER_INSTANCE, SET_INSTANCE_TO_MAIN, SHOW_INSTANCES, ADD_COORDINATOR_INSTANCE }; + enum class Action { + REGISTER_INSTANCE, + UNREGISTER_INSTANCE, + SET_INSTANCE_TO_MAIN, + SHOW_INSTANCES, + ADD_COORDINATOR_INSTANCE + }; enum class SyncMode { SYNC, ASYNC }; diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index d8ce6df22..f7c64502d 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -399,6 +399,14 @@ antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator( return coordinator_query; } +antlrcpp::Any CypherMainVisitor::visitUnregisterInstanceOnCoordinator( + MemgraphCypher::UnregisterInstanceOnCoordinatorContext *ctx) { + auto *coordinator_query = storage_->Create<CoordinatorQuery>(); + coordinator_query->action_ = CoordinatorQuery::Action::UNREGISTER_INSTANCE; + coordinator_query->instance_name_ = std::any_cast<std::string>(ctx->instanceName()->symbolicName()->accept(this)); + return coordinator_query; +} + antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) { auto *coordinator_query = storage_->Create<CoordinatorQuery>(); diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index 9007ec60a..07b31e92a 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -243,6 +243,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitRegisterInstanceOnCoordinator(MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) override; + /** + * @return CoordinatorQuery* + */ + antlrcpp::Any visitUnregisterInstanceOnCoordinator( + MemgraphCypher::UnregisterInstanceOnCoordinatorContext *ctx) override; + /** * @return CoordinatorQuery* */ diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 892f1b1e3..95bd14c96 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -190,6 +190,7 @@ replicationQuery : setReplicationRole ; coordinatorQuery : registerInstanceOnCoordinator + | unregisterInstanceOnCoordinator | setInstanceToMain | showInstances | addCoordinatorInstance @@ -392,6 +393,8 @@ registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC ) registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ; +unregisterInstanceOnCoordinator : UNREGISTER INSTANCE instanceName ; + setInstanceToMain : SET INSTANCE instanceName TO MAIN ; raftServerId : literal ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index b2d4de661..d40c5d3dc 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -141,6 +141,7 @@ TRIGGER : T R I G G E R ; TRIGGERS : T R I G G E R S ; UNCOMMITTED : U N C O M M I T T E D ; UNLOCK : U N L O C K ; +UNREGISTER : U N R E G I S T E R ; UPDATE : U P D A T E ; USE : U S E ; USER : U S E R ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index acfbd6e4c..53f702e08 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -461,11 +461,32 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { : coordinator_handler_(coordinator_state) {} - /// @throw QueryRuntimeException if an error ocurred. - void RegisterReplicationInstance(const std::string &coordinator_socket_address, - const std::string &replication_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override { + void UnregisterInstance(std::string const &instance_name) override { + auto status = coordinator_handler_.UnregisterReplicationInstance(instance_name); + switch (status) { + using enum memgraph::coordination::UnregisterInstanceCoordinatorStatus; + case NO_INSTANCE_WITH_NAME: + throw QueryRuntimeException("No instance with such name!"); + case IS_MAIN: + throw QueryRuntimeException( + "Alive main instance can't be unregistered! Shut it down to trigger failover and then unregister it!"); + case NOT_COORDINATOR: + throw QueryRuntimeException("UNREGISTER INSTANCE query can only be run on a coordinator!"); + case NOT_LEADER: + throw QueryRuntimeException("Couldn't unregister replica instance since coordinator is not a leader!"); + case RPC_FAILED: + throw QueryRuntimeException( + "Couldn't unregister replica instance because current main instance couldn't unregister replica!"); + case SUCCESS: + break; + } + } + + void RegisterReplicationInstance(std::string const &coordinator_socket_address, + std::string const &replication_socket_address, + std::chrono::seconds const &instance_check_frequency, + std::chrono::seconds const &instance_down_timeout, std::string const &instance_name, + CoordinatorQuery::SyncMode sync_mode) override { const auto maybe_replication_ip_port = io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); if (!maybe_replication_ip_port) { @@ -490,7 +511,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { coordination::CoordinatorClientConfig{.instance_name = instance_name, .ip_address = coordinator_server_ip, .port = coordinator_server_port, - .health_check_frequency_sec = instance_check_frequency, + .instance_health_check_frequency_sec = instance_check_frequency, + .instance_down_timeout_sec = instance_down_timeout, .replication_client_info = repl_config, .ssl = std::nullopt}; @@ -1158,12 +1180,15 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator); auto replication_socket_address_tv = coordinator_query->replication_socket_address_->Accept(evaluator); callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coordinator_socket_address_tv, - replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency, + replication_socket_address_tv, + instance_health_check_frequency_sec = config.instance_health_check_frequency_sec, instance_name = coordinator_query->instance_name_, + instance_down_timeout_sec = config.instance_down_timeout_sec, sync_mode = coordinator_query->sync_mode_]() mutable { handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()), std::string(replication_socket_address_tv.ValueString()), - main_check_frequency, instance_name, sync_mode); + instance_health_check_frequency_sec, instance_down_timeout_sec, + instance_name, sync_mode); return std::vector<std::vector<TypedValue>>(); }; @@ -1173,6 +1198,30 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_)); return callback; } + case CoordinatorQuery::Action::UNREGISTER_INSTANCE: + if (!license::global_license_checker.IsEnterpriseValidFast()) { + throw QueryException("Trying to use enterprise feature without a valid license."); + } + + if constexpr (!coordination::allow_ha) { + throw QueryRuntimeException( + "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " + "be able to use this functionality."); + } + if (!FLAGS_raft_server_id) { + throw QueryRuntimeException("Only coordinator can register coordinator server!"); + } + callback.fn = [handler = CoordQueryHandler{*coordinator_state}, + instance_name = coordinator_query->instance_name_]() mutable { + handler.UnregisterInstance(instance_name); + return std::vector<std::vector<TypedValue>>(); + }; + notifications->emplace_back( + SeverityLevel::INFO, NotificationCode::UNREGISTER_INSTANCE, + fmt::format("Coordinator has unregistered instance {}.", coordinator_query->instance_name_)); + + return callback; + case CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN: { if (!license::global_license_checker.IsEnterpriseValidFast()) { throw QueryException("Trying to use enterprise feature without a valid license."); diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index da032b8e3..642ce302b 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -105,10 +105,14 @@ class CoordinatorQueryHandler { }; /// @throw QueryRuntimeException if an error ocurred. - virtual void RegisterReplicationInstance(const std::string &coordinator_socket_address, - const std::string &replication_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0; + virtual void RegisterReplicationInstance(std::string const &coordinator_socket_address, + std::string const &replication_socket_address, + std::chrono::seconds const &instance_health_check_frequency, + std::chrono::seconds const &instance_down_timeout, + std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0; + + /// @throw QueryRuntimeException if an error ocurred. + virtual void UnregisterInstance(std::string const &instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0; diff --git a/src/query/metadata.cpp b/src/query/metadata.cpp index 59d65e077..e339aad57 100644 --- a/src/query/metadata.cpp +++ b/src/query/metadata.cpp @@ -71,6 +71,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) { return "RegisterCoordinatorServer"sv; case NotificationCode::ADD_COORDINATOR_INSTANCE: return "AddCoordinatorInstance"sv; + case NotificationCode::UNREGISTER_INSTANCE: + return "UnregisterInstance"sv; #endif case NotificationCode::REPLICA_PORT_WARNING: return "ReplicaPortWarning"sv; diff --git a/src/query/metadata.hpp b/src/query/metadata.hpp index 2f357a555..dd8c2db07 100644 --- a/src/query/metadata.hpp +++ b/src/query/metadata.hpp @@ -43,8 +43,9 @@ enum class NotificationCode : uint8_t { REPLICA_PORT_WARNING, REGISTER_REPLICA, #ifdef MG_ENTERPRISE - REGISTER_COORDINATOR_SERVER, + REGISTER_COORDINATOR_SERVER, // TODO: (andi) What is this? ADD_COORDINATOR_INSTANCE, + UNREGISTER_INSTANCE, #endif SET_REPLICA, START_STREAM, diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index 1640a70f7..2fd36cf41 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -107,6 +107,8 @@ enum class TypeId : uint64_t { COORD_SET_REPL_MAIN_RES, COORD_SWAP_UUID_REQ, COORD_SWAP_UUID_RES, + COORD_UNREGISTER_REPLICA_REQ, + COORD_UNREGISTER_REPLICA_RES, // AST AST_LABELIX = 3000, diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index 3c58e2c49..604946644 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -69,6 +69,8 @@ startup_config_dict = { "coordinator_server_port": ("0", "0", "Port on which coordinator servers will be started."), "raft_server_port": ("0", "0", "Port on which raft servers will be started."), "raft_server_id": ("0", "0", "Unique ID of the raft server."), + "instance_down_timeout_sec": ("5", "5", "Time duration after which an instance is considered down."), + "instance_health_check_frequency_sec": ("1", "1", "The time duration between two health checks/pings."), "data_directory": ("mg_data", "mg_data", "Path to directory in which to save all permanent data."), "data_recovery_on_startup": ( "false", diff --git a/tests/e2e/high_availability_experimental/coord_cluster_registration.py b/tests/e2e/high_availability_experimental/coord_cluster_registration.py index 5feb0bb11..819b3ab2a 100644 --- a/tests/e2e/high_availability_experimental/coord_cluster_registration.py +++ b/tests/e2e/high_availability_experimental/coord_cluster_registration.py @@ -280,5 +280,148 @@ def test_coordinators_communication_with_restarts(): mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2) +# TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator +@pytest.mark.parametrize( + "kill_instance", + [True, False], +) +def test_unregister_replicas(kill_instance): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7692).cursor() + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + ) + execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") + + def check_coordinator3(): + return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) + + main_cursor = connect(host="localhost", port=7689).cursor() + + def check_main(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS"))) + + expected_cluster = [ + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + + expected_replicas = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ] + + mg_sleep_and_assert(expected_cluster, check_coordinator3) + mg_sleep_and_assert(expected_replicas, check_main) + + if kill_instance: + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") + execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_1") + + expected_cluster = [ + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + + expected_replicas = [ + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ] + + mg_sleep_and_assert(expected_cluster, check_coordinator3) + mg_sleep_and_assert(expected_replicas, check_main) + + if kill_instance: + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") + execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_2") + + expected_cluster = [ + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + expected_replicas = [] + + mg_sleep_and_assert(expected_cluster, check_coordinator3) + mg_sleep_and_assert(expected_replicas, check_main) + + +def test_unregister_main(): + safe_execute(shutil.rmtree, TEMP_DIR) + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + coordinator3_cursor = connect(host="localhost", port=7692).cursor() + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'" + ) + execute_and_fetch_all( + coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'" + ) + execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN") + + def check_coordinator3(): + return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES"))) + + expected_cluster = [ + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + + mg_sleep_and_assert(expected_cluster, check_coordinator3) + + try: + execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3") + except Exception as e: + assert ( + str(e) + == "Alive main instance can't be unregistered! Shut it down to trigger failover and then unregister it!" + ) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + expected_cluster = [ + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ] + + mg_sleep_and_assert(expected_cluster, check_coordinator3) + + execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3") + + expected_cluster = [ + ("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ] + + expected_replicas = [ + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ] + + main_cursor = connect(host="localhost", port=7687).cursor() + + def check_main(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS"))) + + mg_sleep_and_assert(expected_cluster, check_coordinator3) + mg_sleep_and_assert(expected_replicas, check_main) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"]))