Add support for unregistering replication instances (#1712)

This commit is contained in:
Andi 2024-02-14 15:24:59 +01:00 committed by GitHub
parent 3a7e62f72c
commit fb281459b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 474 additions and 53 deletions

View File

@ -10,7 +10,6 @@ target_sources(mg-coordination
include/coordination/coordinator_exceptions.hpp
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_instance.hpp
include/coordination/coordinator_cluster_config.hpp
include/coordination/coordinator_handlers.hpp
include/coordination/constants.hpp
include/coordination/instance_status.hpp

View File

@ -41,16 +41,21 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi
auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds {
return config_.instance_down_timeout_sec;
}
void CoordinatorClient::StartFrequentCheck() {
if (instance_checker_.IsRunning()) {
return;
}
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
MG_ASSERT(config_.instance_health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0");
instance_checker_.Run(
config_.instance_name, config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
config_.instance_name, config_.instance_health_check_frequency_sec,
[this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().SocketAddress());
@ -121,5 +126,19 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo
return false;
}
auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool {
try {
auto stream{rpc_client_.Stream<UnregisterReplicaRpc>(instance_name)};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to receive successful RPC response for unregistering replica!");
return false;
}
return true;
} catch (rpc::RpcFailedException const &) {
spdlog::error("Failed to unregister replica!");
}
return false;
}
} // namespace memgraph::coordination
#endif

View File

@ -39,6 +39,12 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
spdlog::info("Received SwapMainUUIDRPC on coordinator server");
CoordinatorHandlers::SwapMainUUIDHandler(replication_handler, req_reader, res_builder);
});
server.Register<coordination::UnregisterReplicaRpc>(
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received UnregisterReplicaRpc on coordinator server");
CoordinatorHandlers::UnregisterReplicaHandler(replication_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,
@ -146,5 +152,37 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder);
}
void CoordinatorHandlers::UnregisterReplicaHandler(replication::ReplicationHandler &replication_handler,
slk::Reader *req_reader, slk::Builder *res_builder) {
if (!replication_handler.IsMain()) {
spdlog::error("Unregistering replica must be performed on main.");
slk::Save(coordination::UnregisterReplicaRes{false}, res_builder);
return;
}
coordination::UnregisterReplicaReq req;
slk::Load(&req, req_reader);
auto res = replication_handler.UnregisterReplica(req.instance_name);
switch (res) {
using enum memgraph::query::UnregisterReplicaResult;
case SUCCESS:
slk::Save(coordination::UnregisterReplicaRes{true}, res_builder);
break;
case NOT_MAIN:
spdlog::error("Unregistering replica must be performed on main.");
slk::Save(coordination::UnregisterReplicaRes{false}, res_builder);
break;
case CAN_NOT_UNREGISTER:
spdlog::error("Could not unregister replica.");
slk::Save(coordination::UnregisterReplicaRes{false}, res_builder);
break;
case COULD_NOT_BE_PERSISTED:
spdlog::error("Could not persist replica unregistration.");
slk::Save(coordination::UnregisterReplicaRes{false}, res_builder);
break;
}
}
} // namespace memgraph::dbms
#endif

View File

@ -122,11 +122,6 @@ CoordinatorInstance::CoordinatorInstance()
};
}
auto CoordinatorInstance::ClusterHasAliveMain_() const -> bool {
auto const alive_main = [](ReplicationInstance const &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(repl_instances_, alive_main);
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = raft_state_.GetAllCoordinators();
@ -308,6 +303,35 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name)
-> UnregisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto inst_to_remove = std::ranges::find_if(repl_instances_, name_matches);
if (inst_to_remove == repl_instances_.end()) {
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
if (inst_to_remove->IsMain() && inst_to_remove->IsAlive()) {
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
}
inst_to_remove->StopFrequentCheck();
auto curr_main = std::ranges::find_if(repl_instances_, &ReplicationInstance::IsMain);
MG_ASSERT(curr_main != repl_instances_.end(), "There must be a main instance when unregistering a replica");
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
inst_to_remove->StartFrequentCheck();
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
}
std::erase_if(repl_instances_, name_matches);
return UnregisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));

View File

@ -52,6 +52,22 @@ void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::R
memgraph::slk::Load(self, reader);
}
void UnregisterReplicaReq::Save(UnregisterReplicaReq const &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void UnregisterReplicaReq::Load(UnregisterReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void UnregisterReplicaRes::Save(UnregisterReplicaRes const &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void UnregisterReplicaRes::Load(UnregisterReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace coordination
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@ -64,8 +80,15 @@ constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::Typ
"CoordDemoteToReplicaReq", nullptr};
constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordDemoteToReplicaRes", nullptr};
constexpr utils::TypeInfo coordination::UnregisterReplicaReq::kType{utils::TypeId::COORD_UNREGISTER_REPLICA_REQ,
"UnregisterReplicaReq", nullptr};
constexpr utils::TypeInfo coordination::UnregisterReplicaRes::kType{utils::TypeId::COORD_UNREGISTER_REPLICA_RES,
"UnregisterReplicaRes", nullptr};
namespace slk {
void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) {
@ -102,6 +125,22 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R
memgraph::slk::Load(&self->success, reader);
}
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.instance_name, builder);
}
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->instance_name, reader);
}
void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.success, builder);
}
void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->success, reader);
}
} // namespace slk
} // namespace memgraph

View File

@ -56,6 +56,20 @@ auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig confi
data_);
}
auto CoordinatorState::UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot unregister instance since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return UnregisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[&instance_name](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.UnregisterReplicationInstance(instance_name);
}},
data_);
}
auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");

View File

@ -46,17 +46,22 @@ class CoordinatorClient {
auto SocketAddress() const -> std::string;
[[nodiscard]] auto DemoteToReplica() const -> bool;
// TODO: (andi) Consistent naming
auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const
-> bool;
auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool;
auto SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
auto RpcClient() -> rpc::Client & { return rpc_client_; }
auto InstanceDownTimeoutSec() const -> std::chrono::seconds;
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
return first.config_ == second.config_;
}

View File

@ -1,22 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
namespace memgraph::coordination {
struct CoordinatorClusterConfig {
static constexpr int alive_response_time_difference_sec_{5};
};
} // namespace memgraph::coordination
#endif

View File

@ -28,7 +28,8 @@ struct CoordinatorClientConfig {
std::string instance_name;
std::string ip_address;
uint16_t port{};
std::chrono::seconds health_check_frequency_sec{1};
std::chrono::seconds instance_health_check_frequency_sec{1};
std::chrono::seconds instance_down_timeout_sec{5};
auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); }

View File

@ -33,6 +33,9 @@ class CoordinatorHandlers {
slk::Builder *res_builder);
static void SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void UnregisterReplicaHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -30,6 +30,7 @@ class CoordinatorInstance {
CoordinatorInstance();
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
@ -44,8 +45,6 @@ class CoordinatorInstance {
auto SetMainUUID(utils::UUID new_uuid) -> void;
private:
auto ClusterHasAliveMain_() const -> bool;
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability

View File

@ -82,6 +82,35 @@ struct DemoteMainToReplicaRes {
using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>;
struct UnregisterReplicaReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
static void Save(UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
explicit UnregisterReplicaReq(std::string instance_name) : instance_name(std::move(instance_name)) {}
UnregisterReplicaReq() = default;
std::string instance_name;
};
struct UnregisterReplicaRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
static void Save(const UnregisterReplicaRes &self, memgraph::slk::Builder *builder);
explicit UnregisterReplicaRes(bool success) : success(success) {}
UnregisterReplicaRes() = default;
bool success;
};
using UnregisterReplicaRpc = rpc::RequestResponse<UnregisterReplicaReq, UnregisterReplicaRes>;
} // namespace memgraph::coordination
// SLK serialization declarations
@ -99,6 +128,11 @@ void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::R
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
// UnregisterReplicaRpc
void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk

View File

@ -34,6 +34,7 @@ class CoordinatorState {
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;

View File

@ -28,6 +28,15 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
SUCCESS
};
enum class UnregisterInstanceCoordinatorStatus : uint8_t {
NO_INSTANCE_WITH_NAME,
IS_MAIN,
NOT_COORDINATOR,
NOT_LEADER,
RPC_FAILED,
SUCCESS,
};
enum class SetInstanceToMainCoordinatorStatus : uint8_t {
NO_INSTANCE_WITH_NAME,
NOT_COORDINATOR,

View File

@ -14,7 +14,6 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/role.hpp"
@ -59,12 +58,16 @@ class ReplicationInstance {
auto ReplicationClientInfo() const -> ReplClientInfo;
auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool;
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool;
// TODO: (andi) Inconsistent API
auto GetClient() -> CoordinatorClient &;
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
auto ResetMainUUID() -> void;
auto GetMainUUID() -> const std::optional<utils::UUID> &;
auto GetMainUUID() const -> const std::optional<utils::UUID> &;
private:
CoordinatorClient client_;

View File

@ -34,9 +34,8 @@ auto ReplicationInstance::OnSuccessPing() -> void {
}
auto ReplicationInstance::OnFailPing() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
auto elapsed_time = std::chrono::system_clock::now() - last_response_time_;
is_alive_ = elapsed_time < client_.InstanceDownTimeoutSec();
return is_alive_;
}
@ -86,9 +85,10 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf
}
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; }
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
if (!main_uuid_ || *main_uuid_ != curr_main_uuid) {
@ -105,5 +105,9 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid
return true;
}
auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_name) -> bool {
return client_.SendUnregisterReplicaRpc(instance_name);
}
} // namespace memgraph::coordination
#endif

View File

@ -25,6 +25,11 @@ auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::Coo
return coordinator_state_.RegisterReplicationInstance(config);
}
auto CoordinatorHandler::UnregisterReplicationInstance(std::string instance_name)
-> coordination::UnregisterInstanceCoordinatorStatus {
return coordinator_state_.UnregisterReplicationInstance(std::move(instance_name));
}
auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name)
-> coordination::SetInstanceToMainCoordinatorStatus {
return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name));

View File

@ -28,9 +28,13 @@ class CoordinatorHandler {
public:
explicit CoordinatorHandler(coordination::CoordinatorState &coordinator_state);
// TODO: (andi) When moving coordinator state on same instances, rename from RegisterReplicationInstance to
// RegisterInstance
auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto UnregisterReplicationInstance(std::string instance_name) -> coordination::UnregisterInstanceCoordinatorStatus;
auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<coordination::InstanceStatus>;

View File

@ -18,6 +18,10 @@ DEFINE_uint32(coordinator_server_port, 0, "Port on which coordinator servers wil
DEFINE_uint32(raft_server_port, 0, "Port on which raft servers will be started.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(raft_server_id, 0, "Unique ID of the raft server.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(instance_down_timeout_sec, 5, "Time duration after which an instance is considered down.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(instance_health_check_frequency_sec, 1, "The time duration between two health checks/pings.");
#endif
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -20,6 +20,10 @@ DECLARE_uint32(coordinator_server_port);
DECLARE_uint32(raft_server_port);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(raft_server_id);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(instance_down_timeout_sec);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint32(instance_health_check_frequency_sec);
#endif
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -356,6 +356,10 @@ int main(int argc, char **argv) {
memgraph::query::InterpreterConfig interp_config{
.query = {.allow_load_csv = FLAGS_allow_load_csv},
.replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec),
#ifdef MG_ENTERPRISE
.instance_down_timeout_sec = std::chrono::seconds(FLAGS_instance_down_timeout_sec),
.instance_health_check_frequency_sec = std::chrono::seconds(FLAGS_instance_health_check_frequency_sec),
#endif
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
.default_pulsar_service_url = FLAGS_pulsar_service_url,
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,

View File

@ -22,6 +22,9 @@ struct InterpreterConfig {
// The same as \ref memgraph::replication::ReplicationClientConfig
std::chrono::seconds replication_replica_check_frequency{1};
std::chrono::seconds instance_down_timeout_sec{5};
std::chrono::seconds instance_health_check_frequency_sec{1};
std::string default_kafka_bootstrap_servers;
std::string default_pulsar_service_url;
uint32_t stream_transaction_conflict_retries;

View File

@ -3071,7 +3071,13 @@ class CoordinatorQuery : public memgraph::query::Query {
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class Action { REGISTER_INSTANCE, SET_INSTANCE_TO_MAIN, SHOW_INSTANCES, ADD_COORDINATOR_INSTANCE };
enum class Action {
REGISTER_INSTANCE,
UNREGISTER_INSTANCE,
SET_INSTANCE_TO_MAIN,
SHOW_INSTANCES,
ADD_COORDINATOR_INSTANCE
};
enum class SyncMode { SYNC, ASYNC };

View File

@ -399,6 +399,14 @@ antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator(
return coordinator_query;
}
antlrcpp::Any CypherMainVisitor::visitUnregisterInstanceOnCoordinator(
MemgraphCypher::UnregisterInstanceOnCoordinatorContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::UNREGISTER_INSTANCE;
coordinator_query->instance_name_ = std::any_cast<std::string>(ctx->instanceName()->symbolicName()->accept(this));
return coordinator_query;
}
antlrcpp::Any CypherMainVisitor::visitAddCoordinatorInstance(MemgraphCypher::AddCoordinatorInstanceContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();

View File

@ -243,6 +243,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitRegisterInstanceOnCoordinator(MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) override;
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitUnregisterInstanceOnCoordinator(
MemgraphCypher::UnregisterInstanceOnCoordinatorContext *ctx) override;
/**
* @return CoordinatorQuery*
*/

View File

@ -190,6 +190,7 @@ replicationQuery : setReplicationRole
;
coordinatorQuery : registerInstanceOnCoordinator
| unregisterInstanceOnCoordinator
| setInstanceToMain
| showInstances
| addCoordinatorInstance
@ -392,6 +393,8 @@ registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC )
registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ;
unregisterInstanceOnCoordinator : UNREGISTER INSTANCE instanceName ;
setInstanceToMain : SET INSTANCE instanceName TO MAIN ;
raftServerId : literal ;

View File

@ -141,6 +141,7 @@ TRIGGER : T R I G G E R ;
TRIGGERS : T R I G G E R S ;
UNCOMMITTED : U N C O M M I T T E D ;
UNLOCK : U N L O C K ;
UNREGISTER : U N R E G I S T E R ;
UPDATE : U P D A T E ;
USE : U S E ;
USER : U S E R ;

View File

@ -461,11 +461,32 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
: coordinator_handler_(coordinator_state) {}
/// @throw QueryRuntimeException if an error ocurred.
void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
void UnregisterInstance(std::string const &instance_name) override {
auto status = coordinator_handler_.UnregisterReplicationInstance(instance_name);
switch (status) {
using enum memgraph::coordination::UnregisterInstanceCoordinatorStatus;
case NO_INSTANCE_WITH_NAME:
throw QueryRuntimeException("No instance with such name!");
case IS_MAIN:
throw QueryRuntimeException(
"Alive main instance can't be unregistered! Shut it down to trigger failover and then unregister it!");
case NOT_COORDINATOR:
throw QueryRuntimeException("UNREGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't unregister replica instance since coordinator is not a leader!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't unregister replica instance because current main instance couldn't unregister replica!");
case SUCCESS:
break;
}
}
void RegisterReplicationInstance(std::string const &coordinator_socket_address,
std::string const &replication_socket_address,
std::chrono::seconds const &instance_check_frequency,
std::chrono::seconds const &instance_down_timeout, std::string const &instance_name,
CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
@ -490,7 +511,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
coordination::CoordinatorClientConfig{.instance_name = instance_name,
.ip_address = coordinator_server_ip,
.port = coordinator_server_port,
.health_check_frequency_sec = instance_check_frequency,
.instance_health_check_frequency_sec = instance_check_frequency,
.instance_down_timeout_sec = instance_down_timeout,
.replication_client_info = repl_config,
.ssl = std::nullopt};
@ -1158,12 +1180,15 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator);
auto replication_socket_address_tv = coordinator_query->replication_socket_address_->Accept(evaluator);
callback.fn = [handler = CoordQueryHandler{*coordinator_state}, coordinator_socket_address_tv,
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
replication_socket_address_tv,
instance_health_check_frequency_sec = config.instance_health_check_frequency_sec,
instance_name = coordinator_query->instance_name_,
instance_down_timeout_sec = config.instance_down_timeout_sec,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
instance_health_check_frequency_sec, instance_down_timeout_sec,
instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
@ -1173,6 +1198,30 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_));
return callback;
}
case CoordinatorQuery::Action::UNREGISTER_INSTANCE:
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_raft_server_id) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
instance_name = coordinator_query->instance_name_]() mutable {
handler.UnregisterInstance(instance_name);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::UNREGISTER_INSTANCE,
fmt::format("Coordinator has unregistered instance {}.", coordinator_query->instance_name_));
return callback;
case CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");

View File

@ -105,10 +105,14 @@ class CoordinatorQueryHandler {
};
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void RegisterReplicationInstance(std::string const &coordinator_socket_address,
std::string const &replication_socket_address,
std::chrono::seconds const &instance_health_check_frequency,
std::chrono::seconds const &instance_down_timeout,
std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void UnregisterInstance(std::string const &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0;

View File

@ -71,6 +71,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
return "RegisterCoordinatorServer"sv;
case NotificationCode::ADD_COORDINATOR_INSTANCE:
return "AddCoordinatorInstance"sv;
case NotificationCode::UNREGISTER_INSTANCE:
return "UnregisterInstance"sv;
#endif
case NotificationCode::REPLICA_PORT_WARNING:
return "ReplicaPortWarning"sv;

View File

@ -43,8 +43,9 @@ enum class NotificationCode : uint8_t {
REPLICA_PORT_WARNING,
REGISTER_REPLICA,
#ifdef MG_ENTERPRISE
REGISTER_COORDINATOR_SERVER,
REGISTER_COORDINATOR_SERVER, // TODO: (andi) What is this?
ADD_COORDINATOR_INSTANCE,
UNREGISTER_INSTANCE,
#endif
SET_REPLICA,
START_STREAM,

View File

@ -107,6 +107,8 @@ enum class TypeId : uint64_t {
COORD_SET_REPL_MAIN_RES,
COORD_SWAP_UUID_REQ,
COORD_SWAP_UUID_RES,
COORD_UNREGISTER_REPLICA_REQ,
COORD_UNREGISTER_REPLICA_RES,
// AST
AST_LABELIX = 3000,

View File

@ -69,6 +69,8 @@ startup_config_dict = {
"coordinator_server_port": ("0", "0", "Port on which coordinator servers will be started."),
"raft_server_port": ("0", "0", "Port on which raft servers will be started."),
"raft_server_id": ("0", "0", "Unique ID of the raft server."),
"instance_down_timeout_sec": ("5", "5", "Time duration after which an instance is considered down."),
"instance_health_check_frequency_sec": ("1", "1", "The time duration between two health checks/pings."),
"data_directory": ("mg_data", "mg_data", "Path to directory in which to save all permanent data."),
"data_recovery_on_startup": (
"false",

View File

@ -280,5 +280,148 @@ def test_coordinators_communication_with_restarts():
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
# TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator
@pytest.mark.parametrize(
"kill_instance",
[True, False],
)
def test_unregister_replicas(kill_instance):
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
main_cursor = connect(host="localhost", port=7689).cursor()
def check_main():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
expected_replicas = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
if kill_instance:
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_1")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
if kill_instance:
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_2")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
expected_replicas = []
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
def test_unregister_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
try:
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3")
except Exception as e:
assert (
str(e)
== "Alive main instance can't be unregistered! Shut it down to trigger failover and then unregister it!"
)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def check_main():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))