diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 60de3f21a..3150a6c02 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -15,6 +15,7 @@ target_sources(mg-coordination include/coordination/instance_status.hpp include/coordination/replication_instance.hpp include/coordination/raft_state.hpp + include/coordination/rpc_errors.hpp include/nuraft/coordinator_log_store.hpp include/nuraft/coordinator_state_machine.hpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 8d494dfb7..84044b04a 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -17,6 +17,7 @@ #include "coordination/coordinator_config.hpp" #include "coordination/coordinator_rpc.hpp" #include "replication_coordination_glue/messages.hpp" +#include "utils/result.hpp" namespace memgraph::coordination { @@ -45,6 +46,10 @@ auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds { return config_.instance_down_timeout_sec; } +auto CoordinatorClient::InstanceGetUUIDFrequencySec() const -> std::chrono::seconds { + return config_.instance_get_uuid_frequency_sec; +} + void CoordinatorClient::StartFrequentCheck() { if (instance_checker_.IsRunning()) { return; @@ -140,6 +145,18 @@ auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_nam return false; } +auto CoordinatorClient::SendGetInstanceUUIDRpc() const + -> utils::BasicResult> { + try { + auto stream{rpc_client_.Stream()}; + auto res = stream.AwaitResponse(); + return res.uuid; + } catch (const rpc::RpcFailedException &) { + spdlog::error("RPC error occured while sending GetInstance UUID RPC"); + return GetInstanceUUIDError::RPC_EXCEPTION; + } +} + auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool { try { auto stream{rpc_client_.Stream()}; diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index 5153eeb4d..ff534b549 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -51,6 +51,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se spdlog::info("Received EnableWritingOnMainRpc on coordinator server"); CoordinatorHandlers::EnableWritingOnMainHandler(replication_handler, req_reader, res_builder); }); + + server.Register( + [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received GetInstanceUUIDRpc on coordinator server"); + CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder); + }); } void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, @@ -74,12 +80,6 @@ void CoordinatorHandlers::DemoteMainToReplicaHandler(replication::ReplicationHan slk::Reader *req_reader, slk::Builder *res_builder) { spdlog::info("Executing DemoteMainToReplicaHandler"); - if (!replication_handler.IsMain()) { - spdlog::error("Setting to replica must be performed on main."); - slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder); - return; - } - coordination::DemoteMainToReplicaReq req; slk::Load(&req, req_reader); @@ -89,11 +89,18 @@ void CoordinatorHandlers::DemoteMainToReplicaHandler(replication::ReplicationHan if (!replication_handler.SetReplicationRoleReplica(clients_config, std::nullopt)) { spdlog::error("Demoting main to replica failed!"); - slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); + slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder); return; } - slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder); + slk::Save(coordination::DemoteMainToReplicaRes{true}, res_builder); +} + +void CoordinatorHandlers::GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, + slk::Reader * /*req_reader*/, slk::Builder *res_builder) { + spdlog::info("Executing GetInstanceUUIDHandler"); + + slk::Save(coordination::GetInstanceUUIDRes{replication_handler.GetReplicaUUID()}, res_builder); } void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHandler &replication_handler, diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 24bdf6df3..b8afbb6a3 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -48,9 +48,12 @@ CoordinatorInstance::CoordinatorInstance() spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); auto &repl_instance = find_repl_instance(self, repl_instance_name); + // We need to get replicas UUID from time to time to ensure replica is listening to correct main + // and that it didn't go down for less time than we could notice + // We need to get id of main replica is listening to + // and swap if necessary if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) { - spdlog::error( - fmt::format("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName())); + spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()); return; } @@ -62,14 +65,6 @@ CoordinatorInstance::CoordinatorInstance() spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); auto &repl_instance = find_repl_instance(self, repl_instance_name); repl_instance.OnFailPing(); - // We need to restart main uuid from instance since it was "down" at least a second - // There is slight delay, if we choose to use isAlive, instance can be down and back up in less than - // our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any - // incoming RPCs from valid main - // TODO(antoniofilipovic) this needs here more complex logic - // We need to get id of main replica is listening to on successful ping - // and swap it to correct uuid if it failed - repl_instance.ResetMainUUID(); }; main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index 7291c6fb0..4115f1979 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -80,6 +80,23 @@ void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph:: void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {} +// GetInstanceUUID +void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetInstanceUUIDReq::Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void GetInstanceUUIDRes::Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -107,8 +124,16 @@ constexpr utils::TypeInfo coordination::EnableWritingOnMainReq::kType{utils::Typ constexpr utils::TypeInfo coordination::EnableWritingOnMainRes::kType{utils::TypeId::COORD_ENABLE_WRITING_ON_MAIN_RES, "CoordEnableWritingOnMainRes", nullptr}; +constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId::COORD_GET_UUID_REQ, "CoordGetUUIDReq", + nullptr}; + +constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes", + nullptr}; + namespace slk { +// PromoteReplicaToMainRpc + void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.success, builder); } @@ -127,6 +152,7 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk:: memgraph::slk::Load(&self->replication_clients_info, reader); } +// DemoteMainToReplicaRpc void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.replication_client_info, builder); } @@ -143,6 +169,8 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R memgraph::slk::Load(&self->success, reader); } +// UnregisterReplicaRpc + void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.instance_name, builder); } @@ -167,6 +195,24 @@ void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::R memgraph::slk::Load(&self->success, reader); } +// GetInstanceUUIDRpc + +void Save(const memgraph::coordination::GetInstanceUUIDReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* nothing to serialize*/ +} + +void Load(memgraph::coordination::GetInstanceUUIDReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* nothing to serialize*/ +} + +void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.uuid, builder); +} + +void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->uuid, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index cb6167540..5e10af89d 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -15,6 +15,8 @@ #include "coordination/coordinator_config.hpp" #include "rpc/client.hpp" +#include "rpc_errors.hpp" +#include "utils/result.hpp" #include "utils/scheduler.hpp" #include "utils/uuid.hpp" @@ -46,7 +48,7 @@ class CoordinatorClient { auto SocketAddress() const -> std::string; [[nodiscard]] auto DemoteToReplica() const -> bool; - // TODO: (andi) Consistent naming + auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const -> bool; @@ -56,6 +58,8 @@ class CoordinatorClient { auto SendEnableWritingOnMainRpc() const -> bool; + auto SendGetInstanceUUIDRpc() const -> memgraph::utils::BasicResult>; + auto ReplicationClientInfo() const -> ReplClientInfo; auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; @@ -64,6 +68,8 @@ class CoordinatorClient { auto InstanceDownTimeoutSec() const -> std::chrono::seconds; + auto InstanceGetUUIDFrequencySec() const -> std::chrono::seconds; + friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { return first.config_ == second.config_; } @@ -71,7 +77,6 @@ class CoordinatorClient { private: utils::Scheduler instance_checker_; - // TODO: (andi) Pimpl? communication::ClientContext rpc_context_; mutable rpc::Client rpc_client_; diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index b93a16085..df7a5f94f 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -30,6 +30,7 @@ struct CoordinatorClientConfig { uint16_t port{}; std::chrono::seconds instance_health_check_frequency_sec{1}; std::chrono::seconds instance_down_timeout_sec{5}; + std::chrono::seconds instance_get_uuid_frequency_sec{10}; auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); } diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index ac7caba2e..b9ed4b519 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -38,6 +38,9 @@ class CoordinatorHandlers { slk::Builder *res_builder); static void EnableWritingOnMainHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); + + static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, + slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 137c95e73..1578b4577 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -136,6 +136,31 @@ struct EnableWritingOnMainRes { using EnableWritingOnMainRpc = rpc::RequestResponse; +struct GetInstanceUUIDReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader); + static void Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder); + + GetInstanceUUIDReq() = default; +}; + +struct GetInstanceUUIDRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); + static void Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); + + explicit GetInstanceUUIDRes(std::optional uuid) : uuid(uuid) {} + GetInstanceUUIDRes() = default; + + std::optional uuid; +}; + +using GetInstanceUUIDRpc = rpc::RequestResponse; + } // namespace memgraph::coordination // SLK serialization declarations @@ -153,6 +178,11 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader); +// GetInstanceUUIDRpc +void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader); +void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); // UnregisterReplicaRpc void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader); diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 573b96be6..8001d0905 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -18,6 +18,7 @@ #include "replication_coordination_glue/role.hpp" #include +#include "utils/result.hpp" #include "utils/uuid.hpp" namespace memgraph::coordination { @@ -37,6 +38,9 @@ class ReplicationInstance { auto OnSuccessPing() -> void; auto OnFailPing() -> bool; + auto IsReadyForUUIDPing() -> bool; + + void UpdateReplicaLastResponseUUID(); auto IsAlive() const -> bool; @@ -62,7 +66,8 @@ class ReplicationInstance { auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool; auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool; - // TODO: (andi) Inconsistent API + + auto SendGetInstanceUUID() -> utils::BasicResult>; auto GetClient() -> CoordinatorClient &; auto EnableWritingOnMain() -> bool; @@ -76,6 +81,7 @@ class ReplicationInstance { replication_coordination_glue::ReplicationRole replication_role_; std::chrono::system_clock::time_point last_response_time_{}; bool is_alive_{false}; + std::chrono::system_clock::time_point last_check_of_uuid_{}; // for replica this is main uuid of current main // for "main" main this same as in CoordinatorData diff --git a/src/coordination/include/coordination/rpc_errors.hpp b/src/coordination/include/coordination/rpc_errors.hpp new file mode 100644 index 000000000..f6bfbf3e0 --- /dev/null +++ b/src/coordination/include/coordination/rpc_errors.hpp @@ -0,0 +1,14 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +namespace memgraph::coordination { +enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION }; +} // namespace memgraph::coordination diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index a550bb844..0d16db648 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -14,6 +14,7 @@ #include "coordination/replication_instance.hpp" #include "replication_coordination_glue/handler.hpp" +#include "utils/result.hpp" namespace memgraph::coordination { @@ -39,6 +40,11 @@ auto ReplicationInstance::OnFailPing() -> bool { return is_alive_; } +auto ReplicationInstance::IsReadyForUUIDPing() -> bool { + return std::chrono::duration_cast(std::chrono::system_clock::now() - last_check_of_uuid_) > + client_.InstanceGetUUIDFrequencySec(); +} + auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); } auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); } auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; } @@ -91,10 +97,20 @@ auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; } auto ReplicationInstance::GetMainUUID() const -> std::optional const & { return main_uuid_; } auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { - if (!main_uuid_ || *main_uuid_ != curr_main_uuid) { - return SendSwapAndUpdateUUID(curr_main_uuid); + if (!IsReadyForUUIDPing()) { + return true; } - return true; + auto res = SendGetInstanceUUID(); + if (res.HasError()) { + return false; + } + UpdateReplicaLastResponseUUID(); + + if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) { + return true; + } + + return SendSwapAndUpdateUUID(curr_main_uuid); } auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool { @@ -111,5 +127,12 @@ auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_n auto ReplicationInstance::EnableWritingOnMain() -> bool { return client_.SendEnableWritingOnMainRpc(); } +auto ReplicationInstance::SendGetInstanceUUID() + -> utils::BasicResult> { + return client_.SendGetInstanceUUIDRpc(); +} + +void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); } + } // namespace memgraph::coordination #endif diff --git a/src/flags/replication.cpp b/src/flags/replication.cpp index ccecb4a20..e6b71b942 100644 --- a/src/flags/replication.cpp +++ b/src/flags/replication.cpp @@ -22,6 +22,8 @@ DEFINE_uint32(raft_server_id, 0, "Unique ID of the raft server."); DEFINE_uint32(instance_down_timeout_sec, 5, "Time duration after which an instance is considered down."); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint32(instance_health_check_frequency_sec, 1, "The time duration between two health checks/pings."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint32(instance_get_uuid_frequency_sec, 10, "The time duration between two instance uuid checks."); #endif // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/flags/replication.hpp b/src/flags/replication.hpp index 51d50bb09..0a4982f12 100644 --- a/src/flags/replication.hpp +++ b/src/flags/replication.hpp @@ -24,6 +24,8 @@ DECLARE_uint32(raft_server_id); DECLARE_uint32(instance_down_timeout_sec); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint32(instance_health_check_frequency_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint32(instance_get_uuid_frequency_sec); #endif // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 1ba23a744..378d55e77 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -359,6 +359,7 @@ int main(int argc, char **argv) { #ifdef MG_ENTERPRISE .instance_down_timeout_sec = std::chrono::seconds(FLAGS_instance_down_timeout_sec), .instance_health_check_frequency_sec = std::chrono::seconds(FLAGS_instance_health_check_frequency_sec), + .instance_get_uuid_frequency_sec = std::chrono::seconds(FLAGS_instance_get_uuid_frequency_sec), #endif .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers, .default_pulsar_service_url = FLAGS_pulsar_service_url, diff --git a/src/query/config.hpp b/src/query/config.hpp index 04fa272c9..fe8c2ae4c 100644 --- a/src/query/config.hpp +++ b/src/query/config.hpp @@ -24,6 +24,7 @@ struct InterpreterConfig { std::chrono::seconds instance_down_timeout_sec{5}; std::chrono::seconds instance_health_check_frequency_sec{1}; + std::chrono::seconds instance_get_uuid_frequency_sec{10}; std::string default_kafka_bootstrap_servers; std::string default_pulsar_service_url; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 36725cea3..f0fed2357 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -329,7 +329,7 @@ class ReplQueryHandler { .port = static_cast(*port), }; - if (!handler_->SetReplicationRoleReplica(config, std::nullopt)) { + if (!handler_->TrySetReplicationRoleReplica(config, std::nullopt)) { throw QueryRuntimeException("Couldn't set role to replica!"); } } @@ -486,8 +486,9 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { void RegisterReplicationInstance(std::string const &coordinator_socket_address, std::string const &replication_socket_address, std::chrono::seconds const &instance_check_frequency, - std::chrono::seconds const &instance_down_timeout, std::string const &instance_name, - CoordinatorQuery::SyncMode sync_mode) override { + std::chrono::seconds const &instance_down_timeout, + std::chrono::seconds const &instance_get_uuid_frequency, + std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) override { const auto maybe_replication_ip_port = io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); if (!maybe_replication_ip_port) { @@ -514,6 +515,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .port = coordinator_server_port, .instance_health_check_frequency_sec = instance_check_frequency, .instance_down_timeout_sec = instance_down_timeout, + .instance_get_uuid_frequency_sec = instance_get_uuid_frequency, .replication_client_info = repl_config, .ssl = std::nullopt}; @@ -1185,11 +1187,12 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param instance_health_check_frequency_sec = config.instance_health_check_frequency_sec, instance_name = coordinator_query->instance_name_, instance_down_timeout_sec = config.instance_down_timeout_sec, + instance_get_uuid_frequency_sec = config.instance_get_uuid_frequency_sec, sync_mode = coordinator_query->sync_mode_]() mutable { handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()), std::string(replication_socket_address_tv.ValueString()), instance_health_check_frequency_sec, instance_down_timeout_sec, - instance_name, sync_mode); + instance_get_uuid_frequency_sec, instance_name, sync_mode); return std::vector>(); }; diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 642ce302b..b4b130f72 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -109,6 +109,7 @@ class CoordinatorQueryHandler { std::string const &replication_socket_address, std::chrono::seconds const &instance_health_check_frequency, std::chrono::seconds const &instance_down_timeout, + std::chrono::seconds const &instance_get_uuid_frequency, std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0; /// @throw QueryRuntimeException if an error ocurred. diff --git a/src/query/replication_query_handler.hpp b/src/query/replication_query_handler.hpp index aa0611a43..011548bd4 100644 --- a/src/query/replication_query_handler.hpp +++ b/src/query/replication_query_handler.hpp @@ -49,6 +49,9 @@ struct ReplicationQueryHandler { virtual bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, const std::optional &main_uuid) = 0; + virtual bool TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, + const std::optional &main_uuid) = 0; + // as MAIN, define and connect to REPLICAs virtual auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) -> utils::BasicResult = 0; diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 1a505f9a4..1155fdb51 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -62,8 +62,9 @@ ReplicationState::ReplicationState(std::optional durabili } #endif if (std::holds_alternative(replication_data)) { - spdlog::trace("Recovered main's uuid for replica {}", - std::string(std::get(replication_data).uuid_.value())); + auto &replica_uuid = std::get(replication_data).uuid_; + std::string uuid = replica_uuid.has_value() ? std::string(replica_uuid.value()) : ""; + spdlog::trace("Recovered main's uuid for replica {}", uuid); } else { spdlog::trace("Recovered uuid for main {}", std::string(std::get(replication_data).uuid_)); } diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index 7882fa3c0..3e8e21265 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -14,6 +14,7 @@ #include "dbms/dbms_handler.hpp" #include "flags/experimental.hpp" #include "replication/include/replication/state.hpp" +#include "replication_handler/system_replication.hpp" #include "replication_handler/system_rpc.hpp" #include "utils/result.hpp" @@ -113,10 +114,14 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { // as REPLICA, become MAIN bool SetReplicationRoleMain() override; - // as MAIN, become REPLICA + // as MAIN, become REPLICA, can be called on MAIN and REPLICA bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, const std::optional &main_uuid) override; + // as MAIN, become REPLICA, can be called only on MAIN + bool TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, + const std::optional &main_uuid) override; + // as MAIN, define and connect to REPLICAs auto TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) -> memgraph::utils::BasicResult override; @@ -137,12 +142,13 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto GetReplState() const -> const memgraph::replication::ReplicationState &; auto GetReplState() -> memgraph::replication::ReplicationState &; + auto GetReplicaUUID() -> std::optional; + private: - template + template auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) -> memgraph::utils::BasicResult { MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); - auto maybe_client = repl_state_.RegisterReplica(config); if (maybe_client.HasError()) { switch (maybe_client.GetError()) { @@ -159,7 +165,6 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { break; } } - using enum memgraph::flags::Experiments; bool system_replication_enabled = flags::AreExperimentsEnabled(SYSTEM_REPLICATION); if (!system_replication_enabled && dbms_handler_.Count() > 1) { @@ -167,25 +172,21 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { } const auto main_uuid = std::get(dbms_handler_.ReplicationState().ReplicationData()).uuid_; - if (send_swap_uuid) { if (!memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_, main_uuid)) { return memgraph::query::RegisterReplicaError::ERROR_ACCEPTING_MAIN; } } - #ifdef MG_ENTERPRISE // Update system before enabling individual storage <-> replica clients SystemRestore(*maybe_client.GetValue(), system_, dbms_handler_, main_uuid, auth_); #endif - const auto dbms_error = HandleRegisterReplicaStatus(maybe_client); if (dbms_error.has_value()) { return *dbms_error; } auto &instance_client_ptr = maybe_client.GetValue(); - bool all_clients_good = true; // Add database specific clients (NOTE Currently all databases are connected to each replica) dbms_handler_.ForEach([&](dbms::DatabaseAccess db_acc) { @@ -195,7 +196,6 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { } // TODO: ATM only IN_MEMORY_TRANSACTIONAL, fix other modes if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return; - all_clients_good &= storage->repl_storage_state_.replication_clients_.WithLock( [storage, &instance_client_ptr, db_acc = std::move(db_acc), main_uuid](auto &storage_clients) mutable { // NOLINT @@ -203,9 +203,12 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { client->Start(storage, std::move(db_acc)); bool const success = std::invoke([state = client->State()]() { if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) { - return AllowReplicaToDivergeFromMain; + return false; } - return state != storage::replication::ReplicaState::MAYBE_BEHIND; + if (state == storage::replication::ReplicaState::MAYBE_BEHIND) { + return AllowRPCFailure; + } + return true; }); if (success) { @@ -214,14 +217,12 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { return success; }); }); - // NOTE Currently if any databases fails, we revert back if (!all_clients_good) { spdlog::error("Failed to register all databases on the REPLICA \"{}\"", config.name); UnregisterReplica(config.name); return memgraph::query::RegisterReplicaError::CONNECTION_FAILED; } - // No client error, start instance level client #ifdef MG_ENTERPRISE StartReplicaClient(*instance_client_ptr, system_, dbms_handler_, main_uuid, auth_); @@ -231,6 +232,57 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { return {}; } + template + bool SetReplicationRoleReplica_(const memgraph::replication::ReplicationServerConfig &config, + const std::optional &main_uuid) { + if (repl_state_.IsReplica()) { + if (!AllowIdempotency) { + return false; + } + // We don't want to restart the server if we're already a REPLICA with correct config + auto &replica_data = std::get(repl_state_.ReplicationData()); + if (replica_data.config == config) { + return true; + } + repl_state_.SetReplicationRoleReplica(config, main_uuid); +#ifdef MG_ENTERPRISE + return StartRpcServer(dbms_handler_, replica_data, auth_, system_); +#else + return StartRpcServer(dbms_handler_, replica_data); +#endif + } + + // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are + // deleting the replica. + // Remove database specific clients + dbms_handler_.ForEach([&](memgraph::dbms::DatabaseAccess db_acc) { + auto *storage = db_acc->storage(); + storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); + }); + // Remove instance level clients + std::get(repl_state_.ReplicationData()).registered_replicas_.clear(); + + // Creates the server + repl_state_.SetReplicationRoleReplica(config, main_uuid); + + // Start + const auto success = + std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) { + // ASSERT + return false; + }, + [this](memgraph::replication::RoleReplicaData &data) { +#ifdef MG_ENTERPRISE + return StartRpcServer(dbms_handler_, data, auth_, system_); +#else + return StartRpcServer(dbms_handler_, data); +#endif + }}, + repl_state_.ReplicationData()); + // TODO Handle error (restore to main?) + return success; + } + memgraph::replication::ReplicationState &repl_state_; memgraph::dbms::DbmsHandler &dbms_handler_; diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index 8d07d5af5..747f327e4 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -192,41 +192,12 @@ bool ReplicationHandler::SetReplicationRoleMain() { bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, const std::optional &main_uuid) { - // We don't want to restart the server if we're already a REPLICA - if (repl_state_.IsReplica()) { - spdlog::trace("Instance has already has replica role."); - return false; - } + return SetReplicationRoleReplica_(config, main_uuid); +} - // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are - // deleting the replica. - // Remove database specific clients - dbms_handler_.ForEach([&](memgraph::dbms::DatabaseAccess db_acc) { - auto *storage = db_acc->storage(); - storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); - }); - // Remove instance level clients - std::get(repl_state_.ReplicationData()).registered_replicas_.clear(); - - // Creates the server - repl_state_.SetReplicationRoleReplica(config, main_uuid); - - // Start - const auto success = - std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData &) { - // ASSERT - return false; - }, - [this](memgraph::replication::RoleReplicaData &data) { -#ifdef MG_ENTERPRISE - return StartRpcServer(dbms_handler_, data, auth_, system_); -#else - return StartRpcServer(dbms_handler_, data); -#endif - }}, - repl_state_.ReplicationData()); - // TODO Handle error (restore to main?) - return success; +bool ReplicationHandler::TrySetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config, + const std::optional &main_uuid) { + return SetReplicationRoleReplica_(config, main_uuid); } bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) { @@ -258,13 +229,13 @@ bool ReplicationHandler::DoReplicaToMainPromotion(const utils::UUID &main_uuid) auto ReplicationHandler::TryRegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) -> memgraph::utils::BasicResult { - return RegisterReplica_(config, send_swap_uuid); + return RegisterReplica_(config, send_swap_uuid); } auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) -> memgraph::utils::BasicResult { - return RegisterReplica_(config, send_swap_uuid); + return RegisterReplica_(config, send_swap_uuid); } auto ReplicationHandler::UnregisterReplica(std::string_view name) -> memgraph::query::UnregisterReplicaResult { @@ -297,6 +268,11 @@ auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_g return repl_state_.GetRole(); } +auto ReplicationHandler::GetReplicaUUID() -> std::optional { + MG_ASSERT(repl_state_.IsReplica()); + return std::get(repl_state_.ReplicationData()).uuid_; +} + auto ReplicationHandler::GetReplState() const -> const memgraph::replication::ReplicationState & { return repl_state_; } auto ReplicationHandler::GetReplState() -> memgraph::replication::ReplicationState & { return repl_state_; } diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index a4068b023..1ca08a3f7 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -112,6 +112,9 @@ enum class TypeId : uint64_t { COORD_ENABLE_WRITING_ON_MAIN_REQ, COORD_ENABLE_WRITING_ON_MAIN_RES, + COORD_GET_UUID_REQ, + COORD_GET_UUID_RES, + // AST AST_LABELIX = 3000, AST_PROPERTYIX, diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index 604946644..de05a5617 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -71,6 +71,7 @@ startup_config_dict = { "raft_server_id": ("0", "0", "Unique ID of the raft server."), "instance_down_timeout_sec": ("5", "5", "Time duration after which an instance is considered down."), "instance_health_check_frequency_sec": ("1", "1", "The time duration between two health checks/pings."), + "instance_get_uuid_frequency_sec": ("10", "10", "The time duration between two instance uuid checks."), "data_directory": ("mg_data", "mg_data", "Path to directory in which to save all permanent data."), "data_recovery_on_startup": ( "false", diff --git a/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py b/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py index d6f6f7da4..b859cae84 100644 --- a/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py +++ b/tests/e2e/high_availability_experimental/not_replicate_from_old_main.py @@ -10,11 +10,13 @@ # licenses/APL.txt. import os +import shutil import sys +import tempfile import interactive_mg_runner import pytest -from common import execute_and_fetch_all +from common import execute_and_fetch_all, safe_execute from mg_utils import mg_sleep_and_assert interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -38,7 +40,7 @@ MEMGRAPH_FIRST_CLUSTER_DESCRIPTION = { } -MEMGRAPH_INSTANCES_DESCRIPTION = { +MEMGRAPH_SECOND_CLUSTER_DESCRIPTION = { "replica": { "args": ["--bolt-port", "7689", "--log-level", "TRACE"], "log_file": "replica.log", @@ -71,7 +73,7 @@ def test_replication_works_on_failover(connection): assert actual_data_on_main == expected_data_on_main # 3 - interactive_mg_runner.start_all_keep_others(MEMGRAPH_INSTANCES_DESCRIPTION) + interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_CLUSTER_DESCRIPTION) # 4 new_main_cursor = connection(7690, "main_2").cursor() @@ -113,5 +115,144 @@ def test_replication_works_on_failover(connection): interactive_mg_runner.stop_all() +def test_not_replicate_old_main_register_new_cluster(connection): + # Goal of this test is to check that although replica is registered in one cluster + # it can be re-registered to new cluster + # This flow checks if Registering replica is idempotent and that old main cannot talk to replica + # 1. We start all replicas and main in one cluster + # 2. Main from first cluster can see all replicas + # 3. We start all replicas and main in second cluster, by reusing one replica from first cluster + # 4. New main should see replica. Registration should pass (idempotent registration) + # 5. Old main should not talk to new replica + # 6. New main should talk to replica + + TEMP_DIR = tempfile.TemporaryDirectory().name + MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION = { + "shared_instance": { + "args": [ + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/shared_instance", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", + "setup_queries": [], + }, + "coordinator_1": { + "args": ["--bolt-port", "7690", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10111"], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "SET INSTANCE instance_2 TO MAIN", + ], + }, + } + + # 1 + interactive_mg_runner.start_all_keep_others(MEMGRAPH_FISRT_COORD_CLUSTER_DESCRIPTION) + + # 2 + + first_cluster_coord_cursor = connection(7690, "coord_1").cursor() + + def show_repl_cluster(): + return sorted(list(execute_and_fetch_all(first_cluster_coord_cursor, "SHOW INSTANCES;"))) + + expected_data_up_first_cluster = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("shared_instance", "", "127.0.0.1:10011", True, "replica"), + ] + + mg_sleep_and_assert(expected_data_up_first_cluster, show_repl_cluster) + + # 3 + + MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION = { + "instance_3": { + "args": [ + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", + "setup_queries": [], + }, + "coordinator_2": { + "args": ["--bolt-port", "7691", "--log-level=TRACE", "--raft-server-id=1", "--raft-server-port=10112"], + "log_file": "coordinator.log", + "setup_queries": [], + }, + } + + interactive_mg_runner.start_all_keep_others(MEMGRAPH_SECOND_COORD_CLUSTER_DESCRIPTION) + second_cluster_coord_cursor = connection(7691, "coord_2").cursor() + execute_and_fetch_all( + second_cluster_coord_cursor, "REGISTER INSTANCE shared_instance ON '127.0.0.1:10011' WITH '127.0.0.1:10001';" + ) + execute_and_fetch_all( + second_cluster_coord_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';" + ) + execute_and_fetch_all(second_cluster_coord_cursor, "SET INSTANCE instance_3 TO MAIN") + + # 4 + + def show_repl_cluster(): + return sorted(list(execute_and_fetch_all(second_cluster_coord_cursor, "SHOW INSTANCES;"))) + + expected_data_up_second_cluster = [ + ("coordinator_1", "127.0.0.1:10112", "", True, "coordinator"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ("shared_instance", "", "127.0.0.1:10011", True, "replica"), + ] + + mg_sleep_and_assert(expected_data_up_second_cluster, show_repl_cluster) + + # 5 + main_1_cursor = connection(7689, "main_1").cursor() + with pytest.raises(Exception) as e: + execute_and_fetch_all(main_1_cursor, "CREATE ();") + assert ( + str(e.value) + == "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query." + ) + + shared_replica_cursor = connection(7688, "shared_replica").cursor() + res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0] + assert res == 0, "Old main should not replicate to 'shared' replica" + + # 6 + main_2_cursor = connection(7687, "main_2").cursor() + + execute_and_fetch_all(main_2_cursor, "CREATE ();") + + shared_replica_cursor = connection(7688, "shared_replica").cursor() + res = execute_and_fetch_all(shared_replica_cursor, "MATCH (n) RETURN count(n);")[0][0] + assert res == 1, "New main should replicate to 'shared' replica" + + interactive_mg_runner.stop_all() + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/single_coordinator.py b/tests/e2e/high_availability_experimental/single_coordinator.py index d490a36ba..d1df05b4f 100644 --- a/tests/e2e/high_availability_experimental/single_coordinator.py +++ b/tests/e2e/high_availability_experimental/single_coordinator.py @@ -147,6 +147,105 @@ def test_replication_works_on_failover(): interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION) +def test_replication_works_on_replica_instance_restart(): + # Goal of this test is to check the replication works after replica goes down and restarts + # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 2. We check that main has correct state + # 3. We kill replica + # 4. We check that main cannot replicate to replica + # 5. We bring replica back up + # 6. We check that replica gets data + safe_execute(shutil.rmtree, TEMP_DIR) + + # 1 + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + # 2 + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + # 3 + coord_cursor = connect(host="localhost", port=7690).cursor() + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"), + ] + mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas) + + # 4 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + with pytest.raises(Exception) as e: + execute_and_fetch_all(main_cursor, "CREATE ();") + assert ( + str(e.value) + == "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query." + ) + + res_instance_1 = execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + assert res_instance_1 == 1 + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"), + ] + mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas) + + # 5. + + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 2, 0, "ready"), + ] + mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas) + + # 6. + instance_2_cursor = connect(port=7689, host="localhost").cursor() + execute_and_fetch_all(main_cursor, "CREATE ();") + res_instance_2 = execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0] + assert res_instance_2 == 2 + + def test_show_instances(): safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index c5e1ad543..64366f331 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -142,7 +142,7 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { MinMemgraph replica(repl_conf); auto replica_store_handler = replica.repl_handler; - replica_store_handler.SetReplicationRoleReplica( + replica_store_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], @@ -439,13 +439,13 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { MinMemgraph replica1(repl_conf); MinMemgraph replica2(repl2_conf); - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }, std::nullopt); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], @@ -597,7 +597,7 @@ TEST_F(ReplicationTest, RecoveryProcess) { MinMemgraph replica(repl_conf); auto replica_store_handler = replica.repl_handler; - replica_store_handler.SetReplicationRoleReplica( + replica_store_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], @@ -676,7 +676,7 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { MinMemgraph replica_async(repl_conf); auto replica_store_handler = replica_async.repl_handler; - replica_store_handler.SetReplicationRoleReplica( + replica_store_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], @@ -726,7 +726,7 @@ TEST_F(ReplicationTest, EpochTest) { MinMemgraph main(main_conf); MinMemgraph replica1(repl_conf); - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], @@ -734,7 +734,7 @@ TEST_F(ReplicationTest, EpochTest) { std::nullopt); MinMemgraph replica2(repl2_conf); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = 10001, @@ -819,7 +819,7 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc->Commit().HasError()); } - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], @@ -858,7 +858,7 @@ TEST_F(ReplicationTest, ReplicationInformation) { MinMemgraph replica1(repl_conf); uint16_t replica1_port = 10001; - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = replica1_port, @@ -867,7 +867,7 @@ TEST_F(ReplicationTest, ReplicationInformation) { uint16_t replica2_port = 10002; MinMemgraph replica2(repl2_conf); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = replica2_port, @@ -923,7 +923,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { MinMemgraph replica1(repl_conf); uint16_t replica1_port = 10001; - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = replica1_port, @@ -932,7 +932,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { uint16_t replica2_port = 10002; MinMemgraph replica2(repl2_conf); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = replica2_port, @@ -966,7 +966,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { MinMemgraph main(main_conf); MinMemgraph replica1(repl_conf); - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = common_port, @@ -974,7 +974,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { std::nullopt); MinMemgraph replica2(repl2_conf); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = common_port, @@ -1023,7 +1023,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { std::optional main(main_config); MinMemgraph replica1(replica1_config); - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], @@ -1031,7 +1031,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { std::nullopt); MinMemgraph replica2(replica2_config); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], @@ -1088,7 +1088,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { std::optional main(main_config); MinMemgraph replica1(repl_conf); - replica1.repl_handler.SetReplicationRoleReplica( + replica1.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], @@ -1097,7 +1097,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { MinMemgraph replica2(repl2_conf); - replica2.repl_handler.SetReplicationRoleReplica( + replica2.repl_handler.TrySetReplicationRoleReplica( ReplicationServerConfig{ .ip_address = local_host, .port = ports[1],