diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 4937c0ad3..a203e3652 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -16,6 +16,7 @@ target_sources(mg-coordination include/coordination/instance_status.hpp include/coordination/replication_instance.hpp include/coordination/raft_state.hpp + include/coordination/rpc_errors.hpp include/nuraft/coordinator_log_store.hpp include/nuraft/coordinator_state_machine.hpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index a30e504b7..87117a627 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -17,6 +17,7 @@ #include "coordination/coordinator_config.hpp" #include "coordination/coordinator_rpc.hpp" #include "replication_coordination_glue/messages.hpp" +#include "utils/result.hpp" namespace memgraph::coordination { @@ -121,5 +122,23 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo return false; } +auto CoordinatorClient::SendGetInstanceUUID() const + -> utils::BasicResult> { + std::optional instance_uuid; + try { + auto stream{rpc_client_.Stream()}; + auto res = stream.AwaitResponse(); + if (!res.uuid) { + return instance_uuid; + } + instance_uuid = res.uuid; + + } catch (const rpc::RpcFailedException &) { + spdlog::error("RPC error occured while sending GetInstance UUID RPC"); + return GetInstanceUUIDError::RPC_EXCEPTION; + } + return instance_uuid; +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index 79d31e393..9d9744f7e 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -39,6 +39,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se spdlog::info("Received SwapMainUUIDRPC on coordinator server"); CoordinatorHandlers::SwapMainUUIDHandler(replication_handler, req_reader, res_builder); }); + + server.Register( + [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received GetInstanceUUIDRpc on coordinator server"); + CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder); + }); } void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, @@ -78,6 +84,16 @@ void CoordinatorHandlers::DemoteMainToReplicaHandler(replication::ReplicationHan slk::Save(coordination::DemoteMainToReplicaRes{true}, res_builder); } +void CoordinatorHandlers::GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, + slk::Reader *req_reader, slk::Builder *res_builder) { + spdlog::info("Executing GetInstanceUUIDHandler"); + + coordination::GetInstanceUUIDReq req; + slk::Load(&req, req_reader); + + slk::Save(coordination::GetInstanceUUIDRes{replication_handler.GetReplicaUUID()}, res_builder); +} + void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder) { if (!replication_handler.IsReplica()) { diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 4c3f3e646..2ac29fb58 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -47,6 +47,10 @@ CoordinatorInstance::CoordinatorInstance() spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); auto &repl_instance = find_repl_instance(self, repl_instance_name); + // We need to get replicas UUID from time to time to ensure replica is listening to correct main + // and that it didn't go down for less time than we could notice + // We need to get id of main replica is listening to + // and swap if necessary if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) { spdlog::error( fmt::format("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName())); @@ -61,14 +65,6 @@ CoordinatorInstance::CoordinatorInstance() spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); auto &repl_instance = find_repl_instance(self, repl_instance_name); repl_instance.OnFailPing(); - // We need to restart main uuid from instance since it was "down" at least a second - // There is slight delay, if we choose to use isAlive, instance can be down and back up in less than - // our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any - // incoming RPCs from valid main - // TODO(antoniofilipovic) this needs here more complex logic - // We need to get id of main replica is listening to on successful ping - // and swap it to correct uuid if it failed - repl_instance.ResetMainUUID(); }; main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index 2b5752a07..e61a2b83e 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -52,6 +52,23 @@ void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::R memgraph::slk::Load(self, reader); } +// GetInstanceUUID +void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetInstanceUUIDReq::Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void GetInstanceUUIDRes::Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -66,8 +83,16 @@ constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::Typ constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES, "CoordDemoteToReplicaRes", nullptr}; +constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId::COORD_GET_UUID_REQ, "CoordGetUUIDReq", + nullptr}; + +constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes", + nullptr}; + namespace slk { +// PromoteReplicaToMainRpc + void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.success, builder); } @@ -86,6 +111,7 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk:: memgraph::slk::Load(&self->replication_clients_info, reader); } +// DemoteMainToReplicaRpc void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.replication_client_info, builder); } @@ -102,6 +128,24 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R memgraph::slk::Load(&self->success, reader); } +// GetInstanceUUIDRpc + +void Save(const memgraph::coordination::GetInstanceUUIDReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* nothing to serialize*/ +} + +void Load(memgraph::coordination::GetInstanceUUIDReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* nothing to serialize*/ +} + +void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.uuid, builder); +} + +void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->uuid, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 02bae1c03..85905de23 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -16,6 +16,8 @@ #include "coordination/coordinator_config.hpp" #include "rpc/client.hpp" +#include "rpc_errors.hpp" +#include "utils/result.hpp" #include "utils/scheduler.hpp" namespace memgraph::coordination { @@ -51,6 +53,8 @@ class CoordinatorClient { auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool; + auto SendGetInstanceUUID() const -> memgraph::utils::BasicResult>; + auto ReplicationClientInfo() const -> ReplClientInfo; auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; diff --git a/src/coordination/include/coordination/coordinator_cluster_config.hpp b/src/coordination/include/coordination/coordinator_cluster_config.hpp index e1d91ff7d..bec67c3b4 100644 --- a/src/coordination/include/coordination/coordinator_cluster_config.hpp +++ b/src/coordination/include/coordination/coordinator_cluster_config.hpp @@ -16,6 +16,7 @@ namespace memgraph::coordination { struct CoordinatorClusterConfig { static constexpr int alive_response_time_difference_sec_{5}; + static constexpr int replica_uuid_last_check_time_difference_sec_{10}; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index 4aa4656c3..b5638d9e9 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -33,6 +33,9 @@ class CoordinatorHandlers { slk::Builder *res_builder); static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); + + static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, + slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 56cfdb403..eaf9ebfa6 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -82,6 +82,33 @@ struct DemoteMainToReplicaRes { using DemoteMainToReplicaRpc = rpc::RequestResponse; +struct GetInstanceUUIDReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader); + static void Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder); + + GetInstanceUUIDReq() = default; + + bool success; +}; + +struct GetInstanceUUIDRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); + static void Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); + + explicit GetInstanceUUIDRes(std::optional uuid) : uuid(uuid) {} + GetInstanceUUIDRes() = default; + + std::optional uuid; +}; + +using GetInstanceUUIDRpc = rpc::RequestResponse; + } // namespace memgraph::coordination // SLK serialization declarations @@ -99,6 +126,11 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader); +// GetInstanceUUIDRpc +void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader); +void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); } // namespace memgraph::slk diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 713a66fd8..94276484a 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -19,6 +19,7 @@ #include "replication_coordination_glue/role.hpp" #include +#include "utils/result.hpp" #include "utils/uuid.hpp" namespace memgraph::coordination { @@ -38,6 +39,9 @@ class ReplicationInstance { auto OnSuccessPing() -> void; auto OnFailPing() -> bool; + auto IsReadyForUUIDPing() -> bool; + + void UpdateReplicaLastResponseUUID(); auto IsAlive() const -> bool; @@ -60,6 +64,7 @@ class ReplicationInstance { auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool; auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool; + auto SendGetInstanceUUID() -> utils::BasicResult>; auto GetClient() -> CoordinatorClient &; auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; @@ -71,6 +76,7 @@ class ReplicationInstance { replication_coordination_glue::ReplicationRole replication_role_; std::chrono::system_clock::time_point last_response_time_{}; bool is_alive_{false}; + std::chrono::system_clock::time_point last_check_of_uuid_{}; // for replica this is main uuid of current main // for "main" main this same as in CoordinatorData diff --git a/src/coordination/include/coordination/rpc_errors.hpp b/src/coordination/include/coordination/rpc_errors.hpp new file mode 100644 index 000000000..f6bfbf3e0 --- /dev/null +++ b/src/coordination/include/coordination/rpc_errors.hpp @@ -0,0 +1,14 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +namespace memgraph::coordination { +enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION }; +} // namespace memgraph::coordination diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 0fb13998c..ee1047e51 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -14,6 +14,7 @@ #include "coordination/replication_instance.hpp" #include "replication_coordination_glue/handler.hpp" +#include "utils/result.hpp" namespace memgraph::coordination { @@ -40,6 +41,11 @@ auto ReplicationInstance::OnFailPing() -> bool { return is_alive_; } +auto ReplicationInstance::IsReadyForUUIDPing() -> bool { + return std::chrono::duration_cast(std::chrono::system_clock::now() - last_check_of_uuid_) + .count() > CoordinatorClusterConfig::replica_uuid_last_check_time_difference_sec_; +} + auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); } auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); } auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; } @@ -91,10 +97,20 @@ auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; } auto ReplicationInstance::GetMainUUID() -> const std::optional & { return main_uuid_; } auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { - if (!main_uuid_ || *main_uuid_ != curr_main_uuid) { - return SendSwapAndUpdateUUID(curr_main_uuid); + if (!IsReadyForUUIDPing()) { + return true; } - return true; + auto res = SendGetInstanceUUID(); + if (res.HasError()) { + return false; + } + UpdateReplicaLastResponseUUID(); + + if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) { + return true; + } + + return SendSwapAndUpdateUUID(curr_main_uuid); } auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool { @@ -105,5 +121,12 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid return true; } +auto ReplicationInstance::SendGetInstanceUUID() + -> utils::BasicResult> { + return client_.SendGetInstanceUUID(); +} + +void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); } + } // namespace memgraph::coordination #endif diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index 3df618b03..db6c2d658 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -188,6 +188,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto GetReplState() const -> const memgraph::replication::ReplicationState &; auto GetReplState() -> memgraph::replication::ReplicationState &; + auto GetReplicaUUID() -> std::optional; + private: template auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index 6380dd866..b09a68a19 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -272,6 +272,11 @@ auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_g return repl_state_.GetRole(); } +auto ReplicationHandler::GetReplicaUUID() -> std::optional { + MG_ASSERT(repl_state_.IsReplica()); + return std::get(repl_state_.ReplicationData()).uuid_; +} + auto ReplicationHandler::GetReplState() const -> const memgraph::replication::ReplicationState & { return repl_state_; } auto ReplicationHandler::GetReplState() -> memgraph::replication::ReplicationState & { return repl_state_; } diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index 1640a70f7..5f7f126ee 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -108,6 +108,9 @@ enum class TypeId : uint64_t { COORD_SWAP_UUID_REQ, COORD_SWAP_UUID_RES, + COORD_GET_UUID_REQ, + COORD_GET_UUID_RES, + // AST AST_LABELIX = 3000, AST_PROPERTYIX,