From 0a28fee34b1dd04e9cf6a0f5bd1540bd5255ddf4 Mon Sep 17 00:00:00 2001 From: antoniofilipovic Date: Tue, 23 Jan 2024 16:20:58 +0100 Subject: [PATCH] add register instance and set instance to main queries and logic --- src/coordination/coordinator_client.cpp | 17 ++ src/coordination/coordinator_rpc.cpp | 38 +++++ src/coordination/coordinator_state.cpp | 160 ++++++++++++++++++ .../coordination/coordinator_client.hpp | 3 + .../include/coordination/coordinator_rpc.hpp | 38 +++++ .../coordination/coordinator_state.hpp | 4 + ...gister_main_replica_coordinator_status.hpp | 15 ++ src/dbms/coordinator_handler.cpp | 11 ++ src/dbms/coordinator_handler.hpp | 5 + src/dbms/coordinator_handlers.cpp | 31 ++++ src/dbms/coordinator_handlers.hpp | 1 + src/dbms/utils.hpp | 40 +++++ src/query/frontend/ast/ast.hpp | 11 +- .../frontend/ast/cypher_main_visitor.cpp | 77 ++++----- .../frontend/ast/cypher_main_visitor.hpp | 10 +- .../opencypher/grammar/CypherLexer.g4 | 2 + .../opencypher/grammar/MemgraphCypher.g4 | 11 +- .../opencypher/grammar/MemgraphCypherLexer.g4 | 1 + .../frontend/stripped_lexer_constants.hpp | 5 +- src/query/interpreter.cpp | 149 +++++++++++++--- src/query/interpreter.hpp | 8 + src/utils/scheduler.hpp | 2 + src/utils/typeinfo.hpp | 2 + 23 files changed, 546 insertions(+), 95 deletions(-) diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 5bb79b72f..694ee482c 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -58,6 +58,8 @@ void CoordinatorClient::StartFrequentCheck() { }); } +void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } + void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); } void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); } @@ -86,5 +88,20 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc( return false; } +auto CoordinatorClient::SendSetToReplicaRpc(CoordinatorClient::ReplClientInfo replication_client_info) const -> bool { + try { + auto stream{rpc_client_.Stream(std::move(replication_client_info))}; + if (!stream.AwaitResponse().success) { + spdlog::error("Failed to set main to replica!"); + return false; + } + spdlog::info("Sent request RPC from coordinator to instance to set it as replica!"); + return true; + } catch (const rpc::RpcFailedException &) { + spdlog::error("Failed to send failover RPC from coordinator to new main!"); + } + return false; +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index da0132a38..e8a16f0e2 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -36,6 +36,22 @@ void PromoteReplicaToMainRes::Load(PromoteReplicaToMainRes *self, memgraph::slk: memgraph::slk::Load(self, reader); } +void SetMainToReplicaReq::Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void SetMainToReplicaReq::Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void SetMainToReplicaRes::Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void SetMainToReplicaRes::Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -44,6 +60,12 @@ constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::Ty constexpr utils::TypeInfo coordination::PromoteReplicaToMainRes::kType{utils::TypeId::COORD_FAILOVER_RES, "CoordPromoteReplicaToMainRes", nullptr}; +constexpr utils::TypeInfo coordination::SetMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ, + "CoordSetReplMainReq", nullptr}; + +constexpr utils::TypeInfo coordination::SetMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES, + "CoordSetReplMainRes", nullptr}; + namespace slk { void Save(const memgraph::coordination::PromoteReplicaToMainRes &self, memgraph::slk::Builder *builder) { @@ -62,6 +84,22 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk:: memgraph::slk::Load(&self->replication_clients_info, reader); } +void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.replication_client_info, builder); +} + +void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->replication_client_info, reader); +} + +void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); +} + +void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index d5e623672..2039e3e33 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -10,6 +10,9 @@ // licenses/APL.txt. #include "coordination/coordinator_state.hpp" +#include +#include "coordination/coordinator_client_info.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" #ifdef MG_ENTERPRISE @@ -38,6 +41,163 @@ CoordinatorState::CoordinatorState() { } } +auto CoordinatorState::RegisterInstanceOnCoordinator(CoordinatorClientConfig config) + -> RegisterInstanceCoordinatorStatus { + MG_ASSERT(std::holds_alternative(data_), + "Coordinator cannot register replica since variant holds wrong alternative"); + + const auto name_endpoint_status = + std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { + return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; + }, + [&config](const CoordinatorData &coordinator_data) { + if (memgraph::coordination::CheckName( + coordinator_data.registered_replicas_, config)) { + return RegisterInstanceCoordinatorStatus::NAME_EXISTS; + } + return RegisterInstanceCoordinatorStatus::SUCCESS; + }}, + data_); + + if (name_endpoint_status != RegisterInstanceCoordinatorStatus::SUCCESS) { + return name_endpoint_status; + } + + auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & { + MG_ASSERT(std::holds_alternative(coord_state->data_), + "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + auto &coord_data = std::get(coord_state->data_); + std::shared_lock lock{coord_data.coord_data_lock_}; + + auto replica_client_info = std::ranges::find_if( + coord_data.registered_replicas_info_, + [instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; }); + + if (replica_client_info != coord_data.registered_replicas_info_.end()) { + return *replica_client_info; + } + + MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name, + "Instance is neither a replica nor main..."); + return *coord_data.registered_main_info_; + }; + + // TODO MERGE WITH ANDI's WORK + auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto &client_info = find_client_info(coord_state, instance_name); + client_info.UpdateLastResponseTime(); + }; + + auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto &client_info = find_client_info(coord_state, instance_name); + client_info.UpdateInstanceStatus(); + }; + CoordinatorClientConfig::ReplicationClientInfo replication_client_info = *config.replication_client_info; + auto *coord_client = &std::get(data_).registered_replicas_.emplace_back( + this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)); + + coord_client->SendSetToReplicaRpc(replication_client_info); + + std::get(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), + coord_client->SocketAddress()); + coord_client->StartFrequentCheck(); + + return RegisterInstanceCoordinatorStatus::SUCCESS; +} + +auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { + MG_ASSERT(std::holds_alternative(data_), + "Coordinator cannot register replica since variant holds wrong alternative"); + + // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks? + // We should probably at that point search for main also in instances as for replicas... + auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & { + MG_ASSERT(std::holds_alternative(coord_state->data_), + "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + MG_ASSERT(std::get(coord_state->data_).registered_main_info_.has_value(), + "Main info is not set, but callback is called"); + auto &coord_data = std::get(coord_state->data_); + std::shared_lock lock{coord_data.coord_data_lock_}; + + // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at + // this point.... + auto ®istered_main_info = coord_data.registered_main_info_; + MG_ASSERT(registered_main_info->InstanceName() == instance_name, + "Callback called for wrong instance name: {}, expected: {}", instance_name, + registered_main_info->InstanceName()); + return *registered_main_info; + }; + + auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto ®istered_main_info = get_client_info(coord_state, instance_name); + registered_main_info.UpdateLastResponseTime(); + }; + + auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto ®istered_main_info = get_client_info(coord_state, instance_name); + if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { + spdlog::warn("Main is not alive, starting failover"); + switch (auto failover_status = DoFailover(); failover_status) { + using enum DoFailoverStatus; + case ALL_REPLICAS_DOWN: + spdlog::warn("Failover aborted since all replicas are down!"); + case MAIN_ALIVE: + spdlog::warn("Failover aborted since main is alive!"); + case CLUSTER_UNINITIALIZED: + spdlog::warn("Failover aborted since cluster is uninitialized!"); + case SUCCESS: + break; + } + } + }; + + auto ®istered_replicas = std::get(data_).registered_replicas_; + // Find replica we already registered + auto registered_replica = + std::find_if(registered_replicas.begin(), registered_replicas.end(), [instance_name](const auto &replica_client) { + std::cout << "replica name: " << replica_client.InstanceName() << ", instance name: " << instance_name + << std::endl; + return replica_client.InstanceName() == instance_name; + }); + + std::for_each(registered_replicas.begin(), registered_replicas.end(), + [](const auto &client) { std::cout << "replica names: " << client.InstanceName() << std::endl; }); + // if replica not found... + if (registered_replica == registered_replicas.end()) { + spdlog::error("You didn't register instance with given name {}", instance_name); + return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; + } + registered_replica->StopFrequentCheck(); + // Set instance as MAIN + // THIS WILL SHUT DOWN CLIENT + auto ®istered_main = std::get(data_).registered_main_; + registered_main = + std::make_unique(this, registered_replica->Config(), std::move(succ_cb), std::move(fail_cb)); + + std::get(data_).registered_main_info_.emplace(registered_main->InstanceName(), + registered_main->SocketAddress()); + std::vector repl_clients_info; + repl_clients_info.reserve(registered_replicas.size() - 1); + std::ranges::for_each(registered_replicas, + [registered_replica, &repl_clients_info](const CoordinatorClient &replica) { + if (replica != *registered_replica) { + repl_clients_info.emplace_back(replica.ReplicationClientInfo()); + } + }); + + // PROMOTE REPLICA TO MAIN + // THIS SHOULD FAIL HERE IF IT IS DOWN + if (auto result = registered_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) { + registered_replica->StartFrequentCheck(); + registered_main.reset(); + return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; + } + + registered_main->StartFrequentCheck(); + registered_replicas.erase(registered_replica); + return SetInstanceToMainCoordinatorStatus::SUCCESS; +} + auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { MG_ASSERT(std::holds_alternative(data_), "Coordinator cannot register replica since variant holds wrong alternative"); diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index e99e5b048..557d6b3ac 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -39,6 +39,7 @@ class CoordinatorClient { CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete; void StartFrequentCheck(); + void StopFrequentCheck(); void PauseFrequentCheck(); void ResumeFrequentCheck(); @@ -50,6 +51,8 @@ class CoordinatorClient { auto ReplicationClientInfo() const -> const std::optional &; auto ResetReplicationClientInfo() -> void; + auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool; + auto SetSuccCallback(HealthCheckCallback succ_cb) -> void; auto SetFailCallback(HealthCheckCallback fail_cb) -> void; diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 64836a64f..99996ef52 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -48,6 +48,36 @@ struct PromoteReplicaToMainRes { using PromoteReplicaToMainRpc = rpc::RequestResponse; +struct SetMainToReplicaReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader); + static void Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder); + + explicit SetMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info) + : replication_client_info(std::move(replication_client_info)) {} + + SetMainToReplicaReq() = default; + + CoordinatorClientConfig::ReplicationClientInfo replication_client_info; +}; + +struct SetMainToReplicaRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader); + static void Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder); + + explicit SetMainToReplicaRes(bool success) : success(success) {} + SetMainToReplicaRes() = default; + + bool success; +}; + +using SetMainToReplicaRpc = rpc::RequestResponse; + } // namespace memgraph::coordination // SLK serialization declarations @@ -61,6 +91,14 @@ void Save(const memgraph::coordination::PromoteReplicaToMainReq &self, memgraph: void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::Reader *reader); +void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder); + +void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader); + +void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder); + +void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader); + } // namespace memgraph::slk #endif diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index cc6ce6d3f..b9ee05995 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -38,6 +38,10 @@ class CoordinatorState { [[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; + [[nodiscard]] auto RegisterInstanceOnCoordinator(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + + [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + auto ShowReplicas() const -> std::vector; auto ShowMain() const -> std::optional; diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index de811c0d8..bbf956178 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -25,5 +25,20 @@ enum class RegisterMainReplicaCoordinatorStatus : uint8_t { SUCCESS }; +enum class RegisterInstanceCoordinatorStatus : uint8_t { + NAME_EXISTS, + END_POINT_EXISTS, + COULD_NOT_BE_PERSISTED, + NOT_COORDINATOR, + SUCCESS +}; + +enum class SetInstanceToMainCoordinatorStatus : uint8_t { + NO_INSTANCE_WITH_NAME, + NOT_COORDINATOR, + SUCCESS, + COULD_NOT_PROMOTE_TO_MAIN, +}; + } // namespace memgraph::coordination #endif diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index 01abb31a1..4f3c90807 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include "coordination/register_main_replica_coordinator_status.hpp" #ifdef MG_ENTERPRISE #include "dbms/coordinator_handler.hpp" @@ -29,6 +30,16 @@ auto CoordinatorHandler::RegisterMainOnCoordinator(memgraph::coordination::Coord return dbms_handler_.CoordinatorState().RegisterMain(std::move(config)); } +auto CoordinatorHandler::RegisterInstanceOnCoordinator(memgraph::coordination::CoordinatorClientConfig config) + -> coordination::RegisterInstanceCoordinatorStatus { + return dbms_handler_.CoordinatorState().RegisterInstanceOnCoordinator(std::move(config)); +} + +auto CoordinatorHandler::SetInstanceToMain(std::string instance_name) + -> coordination::SetInstanceToMainCoordinatorStatus { + return dbms_handler_.CoordinatorState().SetInstanceToMain(std::move(instance_name)); +} + auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector { return dbms_handler_.CoordinatorState().ShowReplicas(); } diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index 9632c76b5..7dc18e9d6 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -38,6 +38,11 @@ class CoordinatorHandler { auto RegisterMainOnCoordinator(coordination::CoordinatorClientConfig config) -> coordination::RegisterMainReplicaCoordinatorStatus; + auto RegisterInstanceOnCoordinator(coordination::CoordinatorClientConfig config) + -> coordination::RegisterInstanceCoordinatorStatus; + + auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; + auto ShowReplicasOnCoordinator() const -> std::vector; auto ShowMainOnCoordinator() const -> std::optional; diff --git a/src/dbms/coordinator_handlers.cpp b/src/dbms/coordinator_handlers.cpp index 233d57d03..918f5c2f2 100644 --- a/src/dbms/coordinator_handlers.cpp +++ b/src/dbms/coordinator_handlers.cpp @@ -31,6 +31,37 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) { spdlog::info("Received PromoteReplicaToMainRpc"); CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder); }); + + server.Register( + [&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received PromoteReplicaToMainRpc from coordinator server"); + CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder); + }); +} + +void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, + slk::Builder *res_builder) { + auto &repl_state = dbms_handler.ReplicationState(); + + if (!repl_state.IsMain()) { + spdlog::error("Setting to replica must be performed on main."); + slk::Save(coordination::SetMainToReplicaRes{false}, res_builder); + return; + } + + coordination::SetMainToReplicaReq req; + slk::Load(&req, req_reader); + + replication::ReplicationServerConfig clients_config{.ip_address = req.replication_client_info.replication_ip_address, + .port = req.replication_client_info.replication_port}; + + if (bool success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) { + spdlog::error("Setting main to replica failed!"); + slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder); + return; + } + + slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder); } void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, diff --git a/src/dbms/coordinator_handlers.hpp b/src/dbms/coordinator_handlers.hpp index d08fccb6a..ae4c59a0a 100644 --- a/src/dbms/coordinator_handlers.hpp +++ b/src/dbms/coordinator_handlers.hpp @@ -26,6 +26,7 @@ class CoordinatorHandlers { private: static void PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder); + static void SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/dbms/utils.hpp b/src/dbms/utils.hpp index 51e9f4f2d..f43a0765f 100644 --- a/src/dbms/utils.hpp +++ b/src/dbms/utils.hpp @@ -44,6 +44,46 @@ inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) { return true; }; +inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler, + const memgraph::replication::ReplicationServerConfig &config) { + // We don't want to restart the server if we're already a REPLICA + if (dbms_handler.ReplicationState().IsReplica()) { + return false; + } + + // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are + // deleting the replica. + // Remove database specific clients + dbms_handler.ForEach([&](Database *db) { + auto *storage = db->storage(); + storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); + }); + // Remove instance level clients + std::get(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear(); + + // Creates the server + dbms_handler.ReplicationState().SetReplicationRoleReplica(config); + + // Start + const auto success = + std::visit(utils::Overloaded{[](replication::RoleMainData const &) { + // ASSERT + return false; + }, + [&dbms_handler](replication::RoleReplicaData const &data) { + // Register handlers + InMemoryReplicationHandlers::Register(&dbms_handler, *data.server); + if (!data.server->Start()) { + spdlog::error("Unable to start the replication server."); + return false; + } + return true; + }}, + dbms_handler.ReplicationState().ReplicationData()); + // TODO Handle error (restore to main?) + return success; +} + inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler, replication::ReplicationClient &instance_client) { if (!allow_mt_repl && dbms_handler.All().size() > 1) { diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index 6975d9282..c4fbe388c 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3071,14 +3071,14 @@ class CoordinatorQuery : public memgraph::query::Query { const utils::TypeInfo &GetTypeInfo() const override { return kType; } enum class Action { + REGISTER_INSTANCE_ON_COORDINATOR, REGISTER_MAIN_COORDINATOR_SERVER, REGISTER_REPLICA_COORDINATOR_SERVER, + SET_INSTANCE_TO_MAIN, SHOW_REPLICATION_CLUSTER, DO_FAILOVER }; - enum class ReplicationRole { MAIN, REPLICA }; - enum class SyncMode { SYNC, ASYNC }; CoordinatorQuery() = default; @@ -3086,18 +3086,17 @@ class CoordinatorQuery : public memgraph::query::Query { DEFVISITABLE(QueryVisitor); memgraph::query::CoordinatorQuery::Action action_; - memgraph::query::CoordinatorQuery::ReplicationRole role_; std::string instance_name_; - memgraph::query::Expression *socket_address_{nullptr}; + memgraph::query::Expression *replication_socket_address_{nullptr}; memgraph::query::Expression *coordinator_socket_address_{nullptr}; memgraph::query::CoordinatorQuery::SyncMode sync_mode_; CoordinatorQuery *Clone(AstStorage *storage) const override { auto *object = storage->Create(); object->action_ = action_; - object->role_ = role_; object->instance_name_ = instance_name_; - object->socket_address_ = socket_address_ ? socket_address_->Clone(storage) : nullptr; + object->replication_socket_address_ = + replication_socket_address_ ? replication_socket_address_->Clone(storage) : nullptr; object->sync_mode_ = sync_mode_; object->coordinator_socket_address_ = coordinator_socket_address_ ? coordinator_socket_address_->Clone(storage) : nullptr; diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 0c4d499c4..abb1db688 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -375,10 +375,28 @@ antlrcpp::Any CypherMainVisitor::visitRegisterReplica(MemgraphCypher::RegisterRe } // License check is done in the interpreter. -antlrcpp::Any CypherMainVisitor::visitRegisterCoordinatorServer(MemgraphCypher::RegisterCoordinatorServerContext *ctx) { - MG_ASSERT(ctx->children.size() == 1, "RegisterCoordinatorServerQuery should have exactly one child!"); - auto *coordinator_query = std::any_cast(ctx->children[0]->accept(this)); - query_ = coordinator_query; +antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator( + MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) { + auto *coordinator_query = storage_->Create(); + if (!ctx->replicationSocketAddress()->literal()->StringLiteral()) { + throw SemanticException("Replication socket address should be a string literal!"); + } + + if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) { + throw SemanticException("Coordinator socket address should be a string literal!"); + } + coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR; + coordinator_query->replication_socket_address_ = + std::any_cast(ctx->replicationSocketAddress()->accept(this)); + coordinator_query->coordinator_socket_address_ = + std::any_cast(ctx->coordinatorSocketAddress()->accept(this)); + coordinator_query->instance_name_ = std::any_cast(ctx->instanceName()->symbolicName()->accept(this)); + if (ctx->ASYNC()) { + coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::ASYNC; + } else { + coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::SYNC; + } + return coordinator_query; } @@ -389,48 +407,6 @@ antlrcpp::Any CypherMainVisitor::visitShowReplicationCluster(MemgraphCypher::Sho return coordinator_query; } -// License check is done in the interpreter -antlrcpp::Any CypherMainVisitor::visitRegisterReplicaCoordinatorServer( - MemgraphCypher::RegisterReplicaCoordinatorServerContext *ctx) { - auto *coordinator_query = storage_->Create(); - if (!ctx->socketAddress()->literal()->StringLiteral()) { - throw SemanticException("Socket address should be a string literal!"); - } - - if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) { - throw SemanticException("Coordinator socket address should be a string literal!"); - } - coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_REPLICA_COORDINATOR_SERVER; - coordinator_query->role_ = CoordinatorQuery::ReplicationRole::REPLICA; - coordinator_query->socket_address_ = std::any_cast(ctx->socketAddress()->accept(this)); - coordinator_query->coordinator_socket_address_ = - std::any_cast(ctx->coordinatorSocketAddress()->accept(this)); - coordinator_query->instance_name_ = std::any_cast(ctx->instanceName()->symbolicName()->accept(this)); - if (ctx->SYNC()) { - coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::SYNC; - } else if (ctx->ASYNC()) { - coordinator_query->sync_mode_ = memgraph::query::CoordinatorQuery::SyncMode::ASYNC; - } - - return coordinator_query; -} - -// License check is done in the interpreter -antlrcpp::Any CypherMainVisitor::visitRegisterMainCoordinatorServer( - MemgraphCypher::RegisterMainCoordinatorServerContext *ctx) { - if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) { - throw SemanticException("Coordinator socket address should be a string literal!"); - } - auto *coordinator_query = storage_->Create(); - coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_MAIN_COORDINATOR_SERVER; - coordinator_query->role_ = CoordinatorQuery::ReplicationRole::MAIN; - coordinator_query->coordinator_socket_address_ = - std::any_cast(ctx->coordinatorSocketAddress()->accept(this)); - coordinator_query->instance_name_ = std::any_cast(ctx->instanceName()->symbolicName()->accept(this)); - - return coordinator_query; -} - antlrcpp::Any CypherMainVisitor::visitDropReplica(MemgraphCypher::DropReplicaContext *ctx) { auto *replication_query = storage_->Create(); replication_query->action_ = ReplicationQuery::Action::DROP_REPLICA; @@ -452,6 +428,15 @@ antlrcpp::Any CypherMainVisitor::visitDoFailover(MemgraphCypher::DoFailoverConte return coordinator_query; } +// License check is done in the interpreter +antlrcpp::Any CypherMainVisitor::visitSetInstanceToMain(MemgraphCypher::SetInstanceToMainContext *ctx) { + auto *coordinator_query = storage_->Create(); + coordinator_query->action_ = CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN; + coordinator_query->instance_name_ = std::any_cast(ctx->instanceName()->symbolicName()->accept(this)); + query_ = coordinator_query; + return coordinator_query; +} + antlrcpp::Any CypherMainVisitor::visitLockPathQuery(MemgraphCypher::LockPathQueryContext *ctx) { auto *lock_query = storage_->Create(); if (ctx->STATUS()) { diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index 174588bbb..3db4a18cb 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -241,18 +241,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { /** * @return CoordinatorQuery* */ - antlrcpp::Any visitRegisterCoordinatorServer(MemgraphCypher::RegisterCoordinatorServerContext *ctx) override; + antlrcpp::Any visitRegisterInstanceOnCoordinator(MemgraphCypher::RegisterInstanceOnCoordinatorContext *ctx) override; /** * @return CoordinatorQuery* */ - antlrcpp::Any visitRegisterMainCoordinatorServer(MemgraphCypher::RegisterMainCoordinatorServerContext *ctx) override; - - /** - * @return CoordinatorQuery* - */ - antlrcpp::Any visitRegisterReplicaCoordinatorServer( - MemgraphCypher::RegisterReplicaCoordinatorServerContext *ctx) override; + antlrcpp::Any visitSetInstanceToMain(MemgraphCypher::SetInstanceToMainContext *ctx) override; /** * @return CoordinatorQuery* diff --git a/src/query/frontend/opencypher/grammar/CypherLexer.g4 b/src/query/frontend/opencypher/grammar/CypherLexer.g4 index 3428a2191..3e3c640d6 100644 --- a/src/query/frontend/opencypher/grammar/CypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/CypherLexer.g4 @@ -102,6 +102,7 @@ FILTER : F I L T E R ; IN : I N ; INDEX : I N D E X ; INFO : I N F O ; +INSTANCE : I N S T A N C E ; IS : I S ; KB : K B ; KEY : K E Y ; @@ -122,6 +123,7 @@ PROCEDURE : P R O C E D U R E ; PROFILE : P R O F I L E ; QUERY : Q U E R Y ; REDUCE : R E D U C E ; +REGISTER : R E G I S T E R; REMOVE : R E M O V E ; RETURN : R E T U R N ; SET : S E T ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 6be2aef86..51dc968f3 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -63,6 +63,7 @@ memgraphCypherKeyword : cypherKeyword | GRANT | HEADER | IDENTIFIED + | INSTANCE | NODE_LABELS | NULLIF | IMPORT @@ -186,7 +187,8 @@ replicationQuery : setReplicationRole | showReplicas ; -coordinatorQuery : registerCoordinatorServer +coordinatorQuery : registerInstanceOnCoordinator + | setInstanceToMain | showReplicationCluster | doFailover ; @@ -382,15 +384,14 @@ instanceName : symbolicName ; socketAddress : literal ; coordinatorSocketAddress : literal ; +replicationSocketAddress : literal ; registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC ) TO socketAddress ; -registerReplicaCoordinatorServer: REGISTER REPLICA instanceName ( ASYNC | SYNC ) TO socketAddress WITH COORDINATOR SERVER ON coordinatorSocketAddress ; +registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ; -registerMainCoordinatorServer: REGISTER MAIN instanceName WITH COORDINATOR SERVER ON coordinatorSocketAddress ; - -registerCoordinatorServer : registerMainCoordinatorServer | registerReplicaCoordinatorServer ; +setInstanceToMain : SET INSTANCE instanceName TO MAIN ; dropReplica : DROP REPLICA instanceName ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 5ffd5aadd..b0febc4af 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -79,6 +79,7 @@ IMPORT : I M P O R T ; INACTIVE : I N A C T I V E ; IN_MEMORY_ANALYTICAL : I N UNDERSCORE M E M O R Y UNDERSCORE A N A L Y T I C A L ; IN_MEMORY_TRANSACTIONAL : I N UNDERSCORE M E M O R Y UNDERSCORE T R A N S A C T I O N A L ; +INSTANCE : I N S T A N C E ; ISOLATION : I S O L A T I O N ; KAFKA : K A F K A ; LABELS : L A B E L S ; diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index 21a14ae83..bd6ab7971 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -218,7 +218,8 @@ const trie::Trie kKeywords = {"union", "directory", "lock", "unlock", - "build"}; + "build", + "instance"}; // Unicode codepoints that are allowed at the start of the unescaped name. const std::bitset kUnescapedNameAllowedStarts( diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index aa3f083a9..6e5e92315 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -544,6 +544,71 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } + void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address, + const std::string &replication_socket_address, + const std::chrono::seconds instance_check_frequency, + const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override { + const auto maybe_replication_ip_port = + io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); + if (!maybe_replication_ip_port) { + throw QueryRuntimeException("Invalid replication socket address!"); + } + + const auto maybe_coordinator_ip_port = + io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt); + if (!maybe_replication_ip_port) { + throw QueryRuntimeException("Invalid replication socket address!"); + } + + const auto [replication_ip, replication_port] = *maybe_replication_ip_port; + const auto [coordinator_server_ip, coordinator_server_port] = *maybe_coordinator_ip_port; + const auto repl_config = coordination::CoordinatorClientConfig::ReplicationClientInfo{ + .instance_name = instance_name, + .replication_mode = convertFromCoordinatorToReplicationMode(sync_mode), + .replication_ip_address = replication_ip, + .replication_port = replication_port}; + + auto coordinator_client_config = + coordination::CoordinatorClientConfig{.instance_name = instance_name, + .ip_address = coordinator_server_ip, + .port = coordinator_server_port, + .health_check_frequency_sec = instance_check_frequency, + .replication_client_info = repl_config, + .ssl = std::nullopt}; + + auto status = coordinator_handler_.RegisterInstanceOnCoordinator(std::move(coordinator_client_config)); + switch (status) { + using enum memgraph::coordination::RegisterInstanceCoordinatorStatus; + case NAME_EXISTS: + throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!"); + case END_POINT_EXISTS: + throw QueryRuntimeException( + "Couldn't register replica instance since instance with such endpoint already exists!"); + case COULD_NOT_BE_PERSISTED: + throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!"); + case NOT_COORDINATOR: + throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!"); + case SUCCESS: + break; + } + } + + void SetInstanceToMain(const std::string &instance_name) override { + auto status = coordinator_handler_.SetInstanceToMain(instance_name); + switch (status) { + using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus; + case NO_INSTANCE_WITH_NAME: + throw QueryRuntimeException("No instance with such name!"); + case NOT_COORDINATOR: + throw QueryRuntimeException("Couldn't set replica instance to main since this instance is not a coordinator!"); + case COULD_NOT_PROMOTE_TO_MAIN: + throw QueryRuntimeException( + "Couldn't set replica instance to main. Check coordinator and replica for more logs"); + case SUCCESS: + break; + } + } + /// @throw QueryRuntimeException if an error ocurred. void DoFailover() const override { if (!FLAGS_coordinator) { @@ -1039,24 +1104,8 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param if (!FLAGS_coordinator) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } - // TODO: MemoryResource for EvaluationContext, it should probably be passed as - // the argument to Callback. - EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; - auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; - auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator); - callback.fn = [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, - main_check_frequency = config.replication_replica_check_frequency, - instance_name = coordinator_query->instance_name_]() mutable { - handler.RegisterMainCoordinatorServer(std::string(coordinator_socket_address_tv.ValueString()), - main_check_frequency, instance_name); - return std::vector>(); - }; - - notifications->emplace_back( - SeverityLevel::INFO, NotificationCode::REGISTER_COORDINATOR_SERVER, - fmt::format("Coordinator has registered coordinator server on {} for instance {}.", - coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_)); + throw QueryRuntimeException("Query is disabled for now"); return callback; #endif } @@ -1075,28 +1124,72 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param throw QueryRuntimeException("Only coordinator can register coordinator server!"); } + throw QueryRuntimeException("Query is disabled for now"); + return callback; +#endif + } + case CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR: { + if (!license::global_license_checker.IsEnterpriseValidFast()) { + throw QueryException("Trying to use enterprise feature without a valid license."); + } +#ifdef MG_ENTERPRISE + if constexpr (!coordination::allow_ha) { + throw QueryRuntimeException( + "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " + "be able to use this functionality."); + } + if (!FLAGS_coordinator) { + throw QueryRuntimeException("Only coordinator can register coordinator server!"); + } // TODO: MemoryResource for EvaluationContext, it should probably be passed as // the argument to Callback. EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; - auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator); - auto replication_socket_address_tv = coordinator_query->socket_address_->Accept(evaluator); - callback.fn = - [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, replication_socket_address_tv, - replica_check_frequency = config.replication_replica_check_frequency, - instance_name = coordinator_query->instance_name_, sync_mode = coordinator_query->sync_mode_]() mutable { - handler.RegisterReplicaCoordinatorServer(std::string(replication_socket_address_tv.ValueString()), - std::string(coordinator_socket_address_tv.ValueString()), - replica_check_frequency, instance_name, sync_mode); - return std::vector>(); - }; + auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator); + auto replication_socket_address_tv = coordinator_query->replication_socket_address_->Accept(evaluator); + callback.fn = [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, + replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency, + instance_name = coordinator_query->instance_name_, + sync_mode = coordinator_query->sync_mode_]() mutable { + handler.RegisterInstanceOnCoordinator(std::string(coordinator_socket_address_tv.ValueString()), + std::string(replication_socket_address_tv.ValueString()), + main_check_frequency, instance_name, sync_mode); + return std::vector>(); + }; notifications->emplace_back( SeverityLevel::INFO, NotificationCode::REGISTER_COORDINATOR_SERVER, fmt::format("Coordinator has registered coordinator server on {} for instance {}.", coordinator_socket_address_tv.ValueString(), coordinator_query->instance_name_)); return callback; +#endif + } + case CoordinatorQuery::Action::SET_INSTANCE_TO_MAIN: { + if (!license::global_license_checker.IsEnterpriseValidFast()) { + throw QueryException("Trying to use enterprise feature without a valid license."); + } +#ifdef MG_ENTERPRISE + if constexpr (!coordination::allow_ha) { + throw QueryRuntimeException( + "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " + "be able to use this functionality."); + } + if (!FLAGS_coordinator) { + throw QueryRuntimeException("Only coordinator can register coordinator server!"); + } + // TODO: MemoryResource for EvaluationContext, it should probably be passed as + // the argument to Callback. + EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; + auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; + + callback.fn = [handler = CoordQueryHandler{dbms_handler}, + instance_name = coordinator_query->instance_name_]() mutable { + handler.SetInstanceToMain(instance_name); + return std::vector>(); + }; + + return callback; #endif } case CoordinatorQuery::Action::SHOW_REPLICATION_CLUSTER: { diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 96ecea1c5..6f49c78fe 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -116,6 +116,14 @@ class CoordinatorQueryHandler { const std::chrono::seconds instance_check_frequency, const std::string &instance_name) = 0; + virtual void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address, + const std::string &replication_socket_address, + const std::chrono::seconds instance_check_frequency, + const std::string &instance_name, + CoordinatorQuery::SyncMode sync_mode) = 0; + + virtual void SetInstanceToMain(const std::string &instance_name) = 0; + /// @throw QueryRuntimeException if an error ocurred. virtual std::vector ShowReplicasOnCoordinator() const = 0; diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index 53b890252..742271a95 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -89,7 +89,9 @@ class Scheduler { * @throw std::system_error */ void Stop() { + is_paused_.store(false); is_working_.store(false); + pause_cv_.notify_one(); condition_variable_.notify_one(); if (thread_.joinable()) thread_.join(); } diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index aae88c63a..fd0d1fdeb 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -97,6 +97,8 @@ enum class TypeId : uint64_t { // Coordinator COORD_FAILOVER_REQ, COORD_FAILOVER_RES, + COORD_SET_REPL_MAIN_REQ, + COORD_SET_REPL_MAIN_RES, // AST AST_LABELIX = 3000,