add get uuid RPC
This commit is contained in:
parent
ca20b78479
commit
6b19923214
@ -16,6 +16,7 @@ target_sources(mg-coordination
|
||||
include/coordination/instance_status.hpp
|
||||
include/coordination/replication_instance.hpp
|
||||
include/coordination/raft_state.hpp
|
||||
include/coordination/rpc_errors.hpp
|
||||
|
||||
include/nuraft/coordinator_log_store.hpp
|
||||
include/nuraft/coordinator_state_machine.hpp
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "coordination/coordinator_rpc.hpp"
|
||||
#include "replication_coordination_glue/messages.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
@ -121,5 +122,23 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendGetInstanceUUID() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
std::optional<utils::UUID> instance_uuid;
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<GetInstanceUUIDRpc>()};
|
||||
auto res = stream.AwaitResponse();
|
||||
if (!res.uuid) {
|
||||
return instance_uuid;
|
||||
}
|
||||
instance_uuid = res.uuid;
|
||||
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
|
||||
return GetInstanceUUIDError::RPC_EXCEPTION;
|
||||
}
|
||||
return instance_uuid;
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -39,6 +39,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
|
||||
spdlog::info("Received SwapMainUUIDRPC on coordinator server");
|
||||
CoordinatorHandlers::SwapMainUUIDHandler(replication_handler, req_reader, res_builder);
|
||||
});
|
||||
|
||||
server.Register<coordination::GetInstanceUUIDRpc>(
|
||||
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
|
||||
spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
|
||||
CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,
|
||||
@ -78,6 +84,16 @@ void CoordinatorHandlers::DemoteMainToReplicaHandler(replication::ReplicationHan
|
||||
slk::Save(coordination::DemoteMainToReplicaRes{true}, res_builder);
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler,
|
||||
slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
spdlog::info("Executing GetInstanceUUIDHandler");
|
||||
|
||||
coordination::GetInstanceUUIDReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
slk::Save(coordination::GetInstanceUUIDRes{replication_handler.GetReplicaUUID()}, res_builder);
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHandler &replication_handler,
|
||||
slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
if (!replication_handler.IsReplica()) {
|
||||
|
@ -47,6 +47,10 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
||||
auto &repl_instance = find_repl_instance(self, repl_instance_name);
|
||||
|
||||
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
|
||||
// and that it didn't go down for less time than we could notice
|
||||
// We need to get id of main replica is listening to
|
||||
// and swap if necessary
|
||||
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
|
||||
spdlog::error(
|
||||
fmt::format("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()));
|
||||
@ -61,14 +65,6 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
||||
auto &repl_instance = find_repl_instance(self, repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
// We need to restart main uuid from instance since it was "down" at least a second
|
||||
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
|
||||
// our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any
|
||||
// incoming RPCs from valid main
|
||||
// TODO(antoniofilipovic) this needs here more complex logic
|
||||
// We need to get id of main replica is listening to on successful ping
|
||||
// and swap it to correct uuid if it failed
|
||||
repl_instance.ResetMainUUID();
|
||||
};
|
||||
|
||||
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
|
@ -52,6 +52,23 @@ void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::R
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
// GetInstanceUUID
|
||||
void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self, builder);
|
||||
}
|
||||
|
||||
void GetInstanceUUIDReq::Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
void GetInstanceUUIDRes::Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self, builder);
|
||||
}
|
||||
|
||||
void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
} // namespace coordination
|
||||
|
||||
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
|
||||
@ -66,8 +83,16 @@ constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::Typ
|
||||
constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
|
||||
"CoordDemoteToReplicaRes", nullptr};
|
||||
|
||||
constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId::COORD_GET_UUID_REQ, "CoordGetUUIDReq",
|
||||
nullptr};
|
||||
|
||||
constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
|
||||
nullptr};
|
||||
|
||||
namespace slk {
|
||||
|
||||
// PromoteReplicaToMainRpc
|
||||
|
||||
void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self.success, builder);
|
||||
}
|
||||
@ -86,6 +111,7 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::
|
||||
memgraph::slk::Load(&self->replication_clients_info, reader);
|
||||
}
|
||||
|
||||
// DemoteMainToReplicaRpc
|
||||
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self.replication_client_info, builder);
|
||||
}
|
||||
@ -102,6 +128,24 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R
|
||||
memgraph::slk::Load(&self->success, reader);
|
||||
}
|
||||
|
||||
// GetInstanceUUIDRpc
|
||||
|
||||
void Save(const memgraph::coordination::GetInstanceUUIDReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
|
||||
/* nothing to serialize*/
|
||||
}
|
||||
|
||||
void Load(memgraph::coordination::GetInstanceUUIDReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
|
||||
/* nothing to serialize*/
|
||||
}
|
||||
|
||||
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self.uuid, builder);
|
||||
}
|
||||
|
||||
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(&self->uuid, reader);
|
||||
}
|
||||
|
||||
} // namespace slk
|
||||
|
||||
} // namespace memgraph
|
||||
|
@ -16,6 +16,8 @@
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "rpc/client.hpp"
|
||||
#include "rpc_errors.hpp"
|
||||
#include "utils/result.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
@ -51,6 +53,8 @@ class CoordinatorClient {
|
||||
|
||||
auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool;
|
||||
|
||||
auto SendGetInstanceUUID() const -> memgraph::utils::BasicResult<GetInstanceUUIDError, std::optional<utils::UUID>>;
|
||||
|
||||
auto ReplicationClientInfo() const -> ReplClientInfo;
|
||||
|
||||
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
|
||||
|
@ -16,6 +16,7 @@ namespace memgraph::coordination {
|
||||
|
||||
struct CoordinatorClusterConfig {
|
||||
static constexpr int alive_response_time_difference_sec_{5};
|
||||
static constexpr int replica_uuid_last_check_time_difference_sec_{10};
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -33,6 +33,9 @@ class CoordinatorHandlers {
|
||||
slk::Builder *res_builder);
|
||||
static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
|
||||
static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
};
|
||||
|
||||
} // namespace memgraph::dbms
|
||||
|
@ -82,6 +82,33 @@ struct DemoteMainToReplicaRes {
|
||||
|
||||
using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>;
|
||||
|
||||
struct GetInstanceUUIDReq {
|
||||
static const utils::TypeInfo kType;
|
||||
static const utils::TypeInfo &GetTypeInfo() { return kType; }
|
||||
|
||||
static void Load(GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
|
||||
static void Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder);
|
||||
|
||||
GetInstanceUUIDReq() = default;
|
||||
|
||||
bool success;
|
||||
};
|
||||
|
||||
struct GetInstanceUUIDRes {
|
||||
static const utils::TypeInfo kType;
|
||||
static const utils::TypeInfo &GetTypeInfo() { return kType; }
|
||||
|
||||
static void Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
|
||||
static void Save(const GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
|
||||
|
||||
explicit GetInstanceUUIDRes(std::optional<utils::UUID> uuid) : uuid(uuid) {}
|
||||
GetInstanceUUIDRes() = default;
|
||||
|
||||
std::optional<utils::UUID> uuid;
|
||||
};
|
||||
|
||||
using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
||||
// SLK serialization declarations
|
||||
@ -99,6 +126,11 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R
|
||||
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// GetInstanceUUIDRpc
|
||||
void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
|
||||
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
} // namespace memgraph::slk
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "replication_coordination_glue/role.hpp"
|
||||
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
#include "utils/result.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
@ -38,6 +39,9 @@ class ReplicationInstance {
|
||||
|
||||
auto OnSuccessPing() -> void;
|
||||
auto OnFailPing() -> bool;
|
||||
auto IsReadyForUUIDPing() -> bool;
|
||||
|
||||
void UpdateReplicaLastResponseUUID();
|
||||
|
||||
auto IsAlive() const -> bool;
|
||||
|
||||
@ -60,6 +64,7 @@ class ReplicationInstance {
|
||||
|
||||
auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool;
|
||||
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
|
||||
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
|
||||
auto GetClient() -> CoordinatorClient &;
|
||||
|
||||
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
|
||||
@ -71,6 +76,7 @@ class ReplicationInstance {
|
||||
replication_coordination_glue::ReplicationRole replication_role_;
|
||||
std::chrono::system_clock::time_point last_response_time_{};
|
||||
bool is_alive_{false};
|
||||
std::chrono::system_clock::time_point last_check_of_uuid_{};
|
||||
|
||||
// for replica this is main uuid of current main
|
||||
// for "main" main this same as in CoordinatorData
|
||||
|
14
src/coordination/include/coordination/rpc_errors.hpp
Normal file
14
src/coordination/include/coordination/rpc_errors.hpp
Normal file
@ -0,0 +1,14 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
namespace memgraph::coordination {
|
||||
enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
|
||||
} // namespace memgraph::coordination
|
@ -14,6 +14,7 @@
|
||||
#include "coordination/replication_instance.hpp"
|
||||
|
||||
#include "replication_coordination_glue/handler.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
@ -40,6 +41,11 @@ auto ReplicationInstance::OnFailPing() -> bool {
|
||||
return is_alive_;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::IsReadyForUUIDPing() -> bool {
|
||||
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_check_of_uuid_)
|
||||
.count() > CoordinatorClusterConfig::replica_uuid_last_check_time_difference_sec_;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
|
||||
auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
|
||||
auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; }
|
||||
@ -91,10 +97,20 @@ auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
|
||||
auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; }
|
||||
|
||||
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||
if (!main_uuid_ || *main_uuid_ != curr_main_uuid) {
|
||||
return SendSwapAndUpdateUUID(curr_main_uuid);
|
||||
if (!IsReadyForUUIDPing()) {
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
auto res = SendGetInstanceUUID();
|
||||
if (res.HasError()) {
|
||||
return false;
|
||||
}
|
||||
UpdateReplicaLastResponseUUID();
|
||||
|
||||
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return SendSwapAndUpdateUUID(curr_main_uuid);
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool {
|
||||
@ -105,5 +121,12 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::SendGetInstanceUUID()
|
||||
-> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>> {
|
||||
return client_.SendGetInstanceUUID();
|
||||
}
|
||||
|
||||
void ReplicationInstance::UpdateReplicaLastResponseUUID() { last_check_of_uuid_ = std::chrono::system_clock::now(); }
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -188,6 +188,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
auto GetReplState() const -> const memgraph::replication::ReplicationState &;
|
||||
auto GetReplState() -> memgraph::replication::ReplicationState &;
|
||||
|
||||
auto GetReplicaUUID() -> std::optional<utils::UUID>;
|
||||
|
||||
private:
|
||||
template <bool HandleFailure>
|
||||
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
|
||||
|
@ -272,6 +272,11 @@ auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_g
|
||||
return repl_state_.GetRole();
|
||||
}
|
||||
|
||||
auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
|
||||
MG_ASSERT(repl_state_.IsReplica());
|
||||
return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;
|
||||
}
|
||||
|
||||
auto ReplicationHandler::GetReplState() const -> const memgraph::replication::ReplicationState & { return repl_state_; }
|
||||
|
||||
auto ReplicationHandler::GetReplState() -> memgraph::replication::ReplicationState & { return repl_state_; }
|
||||
|
@ -108,6 +108,9 @@ enum class TypeId : uint64_t {
|
||||
COORD_SWAP_UUID_REQ,
|
||||
COORD_SWAP_UUID_RES,
|
||||
|
||||
COORD_GET_UUID_REQ,
|
||||
COORD_GET_UUID_RES,
|
||||
|
||||
// AST
|
||||
AST_LABELIX = 3000,
|
||||
AST_PROPERTYIX,
|
||||
|
Loading…
Reference in New Issue
Block a user