diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 694ee482c..93ef3e3af 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -44,7 +44,7 @@ void CoordinatorClient::StartFrequentCheck() { MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), "Health check frequency must be greater than 0"); - replica_checker_.Run( + instance_checker_.Run( "Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] { try { spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name, @@ -58,20 +58,22 @@ void CoordinatorClient::StartFrequentCheck() { }); } -void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } +void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); } -void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); } -void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); } +void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); } +void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); } auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); } auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); } -auto CoordinatorClient::ReplicationClientInfo() const - -> const std::optional & { +auto CoordinatorClient::ReplicationClientInfo() const -> const CoordinatorClientConfig::ReplicationClientInfo & { return config_.replication_client_info; } -auto CoordinatorClient::ResetReplicationClientInfo() -> void { config_.replication_client_info.reset(); } +auto CoordinatorClient::ResetReplicationClientInfo() -> void { + // TODO (antoniofilipovic) Sync with Andi on this one + // config_.replication_client_info.reset(); +} auto CoordinatorClient::SendPromoteReplicaToMainRpc( std::vector replication_clients_info) const -> bool { diff --git a/src/coordination/coordinator_data.cpp b/src/coordination/coordinator_data.cpp index 1408472cb..ce783e749 100644 --- a/src/coordination/coordinator_data.cpp +++ b/src/coordination/coordinator_data.cpp @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include "coordination/coordinator_instance.hpp" +#include "coordination/register_main_replica_coordinator_status.hpp" #ifdef MG_ENTERPRISE #include "coordination/coordinator_data.hpp" @@ -110,12 +112,13 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus { auto not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) { return instance != *chosen_replica_instance; }; + auto not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); }; - for (const auto &unchosen_replica_instance : replica_instances | ranges::views::filter(not_chosen_replica_instance)) { - if (auto repl_client_info = unchosen_replica_instance.client_.ReplicationClientInfo(); - repl_client_info.has_value()) { - repl_clients_info.emplace_back(std::move(repl_client_info.value())); - } + // Filter not current replicas and not MAIN instance + // TODO (antoniofilipovic): Should we send also data on old MAIN??? + for (const auto &unchosen_replica_instance : + replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) { + repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo()); } if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) { @@ -154,49 +157,74 @@ auto CoordinatorData::ShowReplicas() const -> std::vector RegisterMainReplicaCoordinatorStatus { +auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { // TODO: (andi) test this std::lock_guard lock{coord_data_lock_}; - if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { - return instance.InstanceName() == config.instance_name; - })) { - return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS; + // Find replica we already registered + auto registered_replica = std::find_if( + registered_instances_.begin(), registered_instances_.end(), + [instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; }); + + // if replica not found... + if (registered_replica == registered_instances_.end()) { + spdlog::error("You didn't register instance with given name {}", instance_name); + return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; } - if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { - return instance.SocketAddress() == config.SocketAddress(); - })) { - return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS; + // Stop for now because we need to swap success and fail callbacks + registered_replica->client_.StopFrequentCheck(); + + std::vector repl_clients_info; + repl_clients_info.reserve(registered_instances_.size() - 1); + std::ranges::for_each(registered_instances_, + [registered_replica, &repl_clients_info](const CoordinatorInstance &replica) { + if (replica != *registered_replica) { + repl_clients_info.emplace_back(replica.client_.ReplicationClientInfo()); + } + }); + + // PROMOTE REPLICA TO MAIN + // THIS SHOULD FAIL HERE IF IT IS DOWN + if (auto result = registered_replica->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) { + registered_replica->client_.StartFrequentCheck(); + return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; } - auto *instance = ®istered_instances_.emplace_back(this, std::move(config), main_succ_cb_, main_fail_cb_, - replication_coordination_glue::ReplicationRole::MAIN); - instance->client_.StartFrequentCheck(); + registered_replica->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; + registered_replica->client_.SetSuccCallback(main_succ_cb_); + registered_replica->client_.SetFailCallback(main_fail_cb_); - return RegisterMainReplicaCoordinatorStatus::SUCCESS; + return SetInstanceToMainCoordinatorStatus::SUCCESS; } -auto CoordinatorData::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { +auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { std::lock_guard lock{coord_data_lock_}; if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { return instance.InstanceName() == config.instance_name; })) { - return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS; + return RegisterInstanceCoordinatorStatus::NAME_EXISTS; } if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress()); return instance.SocketAddress() == config.SocketAddress(); })) { - return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS; + return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS; } + CoordinatorClientConfig::ReplicationClientInfo replication_client_info_copy = config.replication_client_info; + + // TODO (antoniofilipovic) create and then push back auto *instance = ®istered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_, replication_coordination_glue::ReplicationRole::REPLICA); + if (auto res = instance->client_.SendSetToReplicaRpc(replication_client_info_copy); !res) { + return RegisterInstanceCoordinatorStatus::RPC_FAILED; + } + instance->client_.StartFrequentCheck(); - return RegisterMainReplicaCoordinatorStatus::SUCCESS; + return RegisterInstanceCoordinatorStatus::SUCCESS; } } // namespace memgraph::coordination diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 929dbc58c..0b0b87b50 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -40,191 +40,30 @@ CoordinatorState::CoordinatorState() { } } -auto CoordinatorState::RegisterInstanceOnCoordinator(CoordinatorClientConfig config) - -> RegisterInstanceCoordinatorStatus { - // MG_ASSERT(std::holds_alternative(data_), - // "Coordinator cannot register replica since variant holds wrong alternative"); - - // const auto name_endpoint_status = - // std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) - // { - // return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; - // }, - // [&config](const CoordinatorData &coordinator_data) { - // if (memgraph::coordination::CheckName( - // coordinator_data.registered_replicas_, config)) { - // return RegisterInstanceCoordinatorStatus::NAME_EXISTS; - // } - // return RegisterInstanceCoordinatorStatus::SUCCESS; - // }}, - // data_); - - // if (name_endpoint_status != RegisterInstanceCoordinatorStatus::SUCCESS) { - // return name_endpoint_status; - // } - - // auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo - // & { - // MG_ASSERT(std::holds_alternative(coord_state->data_), - // "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); - // auto &coord_data = std::get(coord_state->data_); - // std::shared_lock lock{coord_data.coord_data_lock_}; - - // auto replica_client_info = std::ranges::find_if( - // coord_data.registered_replicas_info_, - // [instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; }); - - // if (replica_client_info != coord_data.registered_replicas_info_.end()) { - // return *replica_client_info; - // } - - // MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name, - // "Instance is neither a replica nor main..."); - // return *coord_data.registered_main_info_; - // }; - - // // TODO MERGE WITH ANDI's WORK - // auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - // auto &client_info = find_client_info(coord_state, instance_name); - // client_info.UpdateLastResponseTime(); - // }; - - // auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - // auto &client_info = find_client_info(coord_state, instance_name); - // client_info.UpdateInstanceStatus(); - // }; - // CoordinatorClientConfig::ReplicationClientInfo replication_client_info = *config.replication_client_info; - // auto *coord_client = &std::get(data_).registered_replicas_.emplace_back( - // this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)); - - // coord_client->SendSetToReplicaRpc(replication_client_info); - - // std::get(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), - // coord_client->SocketAddress()); - // coord_client->StartFrequentCheck(); - - return RegisterInstanceCoordinatorStatus::SUCCESS; -} - -auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { - // MG_ASSERT(std::holds_alternative(data_), - // "Coordinator cannot register replica since variant holds wrong alternative"); - - // // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks? - // // We should probably at that point search for main also in instances as for replicas... - // auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & - // { - // MG_ASSERT(std::holds_alternative(coord_state->data_), - // "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); - // MG_ASSERT(std::get(coord_state->data_).registered_main_info_.has_value(), - // "Main info is not set, but callback is called"); - // auto &coord_data = std::get(coord_state->data_); - // std::shared_lock lock{coord_data.coord_data_lock_}; - - // // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at - // // this point.... - // auto ®istered_main_info = coord_data.registered_main_info_; - // MG_ASSERT(registered_main_info->InstanceName() == instance_name, - // "Callback called for wrong instance name: {}, expected: {}", instance_name, - // registered_main_info->InstanceName()); - // return *registered_main_info; - // }; - - // auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - // auto ®istered_main_info = get_client_info(coord_state, instance_name); - // registered_main_info.UpdateLastResponseTime(); - // }; - - // auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - // auto ®istered_main_info = get_client_info(coord_state, instance_name); - // if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { - // spdlog::warn("Main is not alive, starting failover"); - // switch (auto failover_status = DoFailover(); failover_status) { - // using enum DoFailoverStatus; - // case ALL_REPLICAS_DOWN: - // spdlog::warn("Failover aborted since all replicas are down!"); - // case MAIN_ALIVE: - // spdlog::warn("Failover aborted since main is alive!"); - // case CLUSTER_UNINITIALIZED: - // spdlog::warn("Failover aborted since cluster is uninitialized!"); - // case SUCCESS: - // break; - // } - // } - // }; - - // auto ®istered_replicas = std::get(data_).registered_replicas_; - // // Find replica we already registered - // auto registered_replica = - // std::find_if(registered_replicas.begin(), registered_replicas.end(), [instance_name](const auto - // &replica_client) { - // std::cout << "replica name: " << replica_client.InstanceName() << ", instance name: " << instance_name - // << std::endl; - // return replica_client.InstanceName() == instance_name; - // }); - - // std::for_each(registered_replicas.begin(), registered_replicas.end(), - // [](const auto &client) { std::cout << "replica names: " << client.InstanceName() << std::endl; }); - // // if replica not found... - // if (registered_replica == registered_replicas.end()) { - // spdlog::error("You didn't register instance with given name {}", instance_name); - // return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; - // } - // registered_replica->StopFrequentCheck(); - // // Set instance as MAIN - // // THIS WILL SHUT DOWN CLIENT - // auto ®istered_main = std::get(data_).registered_main_; - // registered_main = - // std::make_unique(this, registered_replica->Config(), std::move(succ_cb), - // std::move(fail_cb)); - - // std::get(data_).registered_main_info_.emplace(registered_main->InstanceName(), - // registered_main->SocketAddress()); - // std::vector repl_clients_info; - // repl_clients_info.reserve(registered_replicas.size() - 1); - // std::ranges::for_each(registered_replicas, - // [registered_replica, &repl_clients_info](const CoordinatorClient &replica) { - // if (replica != *registered_replica) { - // repl_clients_info.emplace_back(replica.ReplicationClientInfo()); - // } - // }); - - // // PROMOTE REPLICA TO MAIN - // // THIS SHOULD FAIL HERE IF IT IS DOWN - // if (auto result = registered_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) { - // registered_replica->StartFrequentCheck(); - // registered_main.reset(); - // return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; - // } - - // registered_main->StartFrequentCheck(); - // registered_replicas.erase(registered_replica); - return SetInstanceToMainCoordinatorStatus::SUCCESS; -} - -auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { +auto CoordinatorState::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { MG_ASSERT(std::holds_alternative(data_), "Coordinator cannot register replica since variant holds wrong alternative"); return std::visit( memgraph::utils::Overloaded{ [](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { - return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR; + return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; }, - [&config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterReplica(std::move(config)); }}, + [config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterInstance(config); }}, data_); } -auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { +auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { MG_ASSERT(std::holds_alternative(data_), - "Coordinator cannot register main since variant holds wrong alternative"); + "Coordinator cannot register replica since variant holds wrong alternative"); return std::visit( - memgraph::utils::Overloaded{ - [](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { - return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR; - }, - [&config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterMain(std::move(config)); }}, + memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { + return SetInstanceToMainCoordinatorStatus::NOT_COORDINATOR; + }, + [&instance_name](CoordinatorData &coordinator_data) { + return coordinator_data.SetInstanceToMain(instance_name); + }}, data_); } diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 557d6b3ac..1bc361a57 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -48,7 +48,7 @@ class CoordinatorClient { auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; - auto ReplicationClientInfo() const -> const std::optional &; + auto ReplicationClientInfo() const -> const ReplClientInfo &; auto ResetReplicationClientInfo() -> void; auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool; @@ -61,7 +61,7 @@ class CoordinatorClient { } private: - utils::Scheduler replica_checker_; + utils::Scheduler instance_checker_; // TODO: (andi) Pimpl? communication::ClientContext rpc_context_; diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index 45312fefb..bbbed9dd7 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -43,7 +43,8 @@ struct CoordinatorClientConfig { friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default; }; - std::optional replication_client_info; + // Each instance has replication config in case it fails + ReplicationClientInfo replication_client_info; struct SSL { std::string key_file; diff --git a/src/coordination/include/coordination/coordinator_data.hpp b/src/coordination/include/coordination/coordinator_data.hpp index da4b97b6e..ea5dbc47c 100644 --- a/src/coordination/include/coordination/coordinator_data.hpp +++ b/src/coordination/include/coordination/coordinator_data.hpp @@ -29,8 +29,8 @@ class CoordinatorData { [[nodiscard]] auto DoFailover() -> DoFailoverStatus; - [[nodiscard]] auto RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; - [[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; + [[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; auto ShowReplicas() const -> std::vector; auto ShowMain() const -> std::optional; diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 06ab0a25c..c4779963c 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -59,7 +59,8 @@ class CoordinatorInstance { replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; client_.SetSuccCallback(std::move(main_succ_cb)); client_.SetFailCallback(std::move(main_fail_cb)); - client_.ResetReplicationClientInfo(); + // Comment with Andi but we shouldn't delete this, what if this MAIN FAILS AGAIN + // client_.ResetReplicationClientInfo(); client_.ResumeFrequentCheck(); } diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index b9ee05995..a91bed118 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -34,11 +34,7 @@ class CoordinatorState { CoordinatorState(CoordinatorState &&) noexcept = delete; CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; - [[nodiscard]] auto RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; - - [[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; - - [[nodiscard]] auto RegisterInstanceOnCoordinator(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; [[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index bbf956178..acb191bfd 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -17,19 +17,12 @@ namespace memgraph::coordination { -enum class RegisterMainReplicaCoordinatorStatus : uint8_t { - NAME_EXISTS, - ENDPOINT_EXISTS, - COULD_NOT_BE_PERSISTED, - NOT_COORDINATOR, - SUCCESS -}; - enum class RegisterInstanceCoordinatorStatus : uint8_t { NAME_EXISTS, END_POINT_EXISTS, COULD_NOT_BE_PERSISTED, NOT_COORDINATOR, + RPC_FAILED, SUCCESS }; diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index 4f3c90807..d0ecb58f0 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -20,19 +20,9 @@ namespace memgraph::dbms { CoordinatorHandler::CoordinatorHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {} -auto CoordinatorHandler::RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config) - -> coordination::RegisterMainReplicaCoordinatorStatus { - return dbms_handler_.CoordinatorState().RegisterReplica(std::move(config)); -} - -auto CoordinatorHandler::RegisterMainOnCoordinator(memgraph::coordination::CoordinatorClientConfig config) - -> coordination::RegisterMainReplicaCoordinatorStatus { - return dbms_handler_.CoordinatorState().RegisterMain(std::move(config)); -} - -auto CoordinatorHandler::RegisterInstanceOnCoordinator(memgraph::coordination::CoordinatorClientConfig config) +auto CoordinatorHandler::RegisterInstance(memgraph::coordination::CoordinatorClientConfig config) -> coordination::RegisterInstanceCoordinatorStatus { - return dbms_handler_.CoordinatorState().RegisterInstanceOnCoordinator(std::move(config)); + return dbms_handler_.CoordinatorState().RegisterInstance(config); } auto CoordinatorHandler::SetInstanceToMain(std::string instance_name) @@ -48,10 +38,6 @@ auto CoordinatorHandler::ShowMainOnCoordinator() const -> std::optional coordination::DoFailoverStatus { - return dbms_handler_.CoordinatorState().DoFailover(); -} - } // namespace memgraph::dbms #endif diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index 7dc18e9d6..f0ca8950a 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -32,13 +32,7 @@ class CoordinatorHandler { public: explicit CoordinatorHandler(DbmsHandler &dbms_handler); - auto RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config) - -> coordination::RegisterMainReplicaCoordinatorStatus; - - auto RegisterMainOnCoordinator(coordination::CoordinatorClientConfig config) - -> coordination::RegisterMainReplicaCoordinatorStatus; - - auto RegisterInstanceOnCoordinator(coordination::CoordinatorClientConfig config) + auto RegisterInstance(coordination::CoordinatorClientConfig config) -> coordination::RegisterInstanceCoordinatorStatus; auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; @@ -47,8 +41,6 @@ class CoordinatorHandler { auto ShowMainOnCoordinator() const -> std::optional; - auto DoFailover() const -> coordination::DoFailoverStatus; - private: DbmsHandler &dbms_handler_; }; diff --git a/src/dbms/coordinator_handlers.cpp b/src/dbms/coordinator_handlers.cpp index 918f5c2f2..b8cfd898a 100644 --- a/src/dbms/coordinator_handlers.cpp +++ b/src/dbms/coordinator_handlers.cpp @@ -34,7 +34,7 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) { server.Register( [&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { - spdlog::info("Received PromoteReplicaToMainRpc from coordinator server"); + spdlog::info("Received SetMainToReplicaRpc from coordinator server"); CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder); }); } diff --git a/src/dbms/utils.hpp b/src/dbms/utils.hpp index f36c4727a..fd5db9cf1 100644 --- a/src/dbms/utils.hpp +++ b/src/dbms/utils.hpp @@ -46,7 +46,6 @@ inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) { inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler, const memgraph::replication::ReplicationServerConfig &config) { - // We don't want to restart the server if we're already a REPLICA if (dbms_handler.ReplicationState().IsReplica()) { return false; } @@ -54,36 +53,27 @@ inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler, // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are // deleting the replica. // Remove database specific clients - - // dbms_handler.ForEach([&](Database *db) { - // auto *storage = db->storage(); - // storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); - // }); - + dbms_handler.ForEach([&](DatabaseAccess db_acc) { + auto *storage = db_acc->storage(); + storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); + }); // Remove instance level clients - // std::get(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear(); + std::get(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear(); - // // Creates the server - // dbms_handler.ReplicationState().SetReplicationRoleReplica(config); + // Creates the server + dbms_handler.ReplicationState().SetReplicationRoleReplica(config); - // // Start - // const auto success = - // std::visit(utils::Overloaded{[](replication::RoleMainData const &) { - // // ASSERT - // return false; - // }, - // [&dbms_handler](replication::RoleReplicaData const &data) { - // // Register handlers - // InMemoryReplicationHandlers::Register(&dbms_handler, *data.server); - // if (!data.server->Start()) { - // spdlog::error("Unable to start the replication server."); - // return false; - // } - // return true; - // }}, - // dbms_handler.ReplicationState().ReplicationData()); + // Start + const auto success = std::visit(utils::Overloaded{[](replication::RoleMainData const &) { + // ASSERT + return false; + }, + [&dbms_handler](replication::RoleReplicaData const &data) { + return StartRpcServer(dbms_handler, data); + }}, + dbms_handler.ReplicationState().ReplicationData()); // TODO Handle error (restore to main?) - return true; + return success; } inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler, diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index c4fbe388c..3fbfb7162 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3071,12 +3071,9 @@ class CoordinatorQuery : public memgraph::query::Query { const utils::TypeInfo &GetTypeInfo() const override { return kType; } enum class Action { - REGISTER_INSTANCE_ON_COORDINATOR, - REGISTER_MAIN_COORDINATOR_SERVER, - REGISTER_REPLICA_COORDINATOR_SERVER, + REGISTER_INSTANCE, SET_INSTANCE_TO_MAIN, SHOW_REPLICATION_CLUSTER, - DO_FAILOVER }; enum class SyncMode { SYNC, ASYNC }; diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index abb1db688..5735326ac 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -385,7 +385,7 @@ antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator( if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) { throw SemanticException("Coordinator socket address should be a string literal!"); } - coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR; + coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE; coordinator_query->replication_socket_address_ = std::any_cast(ctx->replicationSocketAddress()->accept(this)); coordinator_query->coordinator_socket_address_ = @@ -420,14 +420,6 @@ antlrcpp::Any CypherMainVisitor::visitShowReplicas(MemgraphCypher::ShowReplicasC return replication_query; } -// License check is done in the interpreter -antlrcpp::Any CypherMainVisitor::visitDoFailover(MemgraphCypher::DoFailoverContext * /*ctx*/) { - auto *coordinator_query = storage_->Create(); - coordinator_query->action_ = CoordinatorQuery::Action::DO_FAILOVER; - query_ = coordinator_query; - return coordinator_query; -} - // License check is done in the interpreter antlrcpp::Any CypherMainVisitor::visitSetInstanceToMain(MemgraphCypher::SetInstanceToMainContext *ctx) { auto *coordinator_query = storage_->Create(); diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index 3db4a18cb..e9da98f71 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -253,11 +253,6 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitShowReplicationCluster(MemgraphCypher::ShowReplicationClusterContext *ctx) override; - /** - * @return CoordinatorQuery* - */ - antlrcpp::Any visitDoFailover(MemgraphCypher::DoFailoverContext *ctx) override; - /** * @return LockPathQuery* */ diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 51dc968f3..e41184468 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -190,7 +190,6 @@ replicationQuery : setReplicationRole coordinatorQuery : registerInstanceOnCoordinator | setInstanceToMain | showReplicationCluster - | doFailover ; triggerQuery : createTrigger @@ -254,8 +253,6 @@ transactionQueueQuery : showTransactions showTransactions : SHOW TRANSACTIONS ; -doFailover : DO FAILOVER ; - terminateTransactions : TERMINATE TRANSACTIONS transactionIdList; loadCsv : LOAD CSV FROM csvFile ( WITH | NO ) HEADER diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 6e5e92315..301a0b592 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -461,11 +461,9 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { #ifdef MG_ENTERPRISE /// @throw QueryRuntimeException if an error ocurred. - void RegisterReplicaCoordinatorServer(const std::string &replication_socket_address, - const std::string &coordinator_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name, - CoordinatorQuery::SyncMode sync_mode) override { + void RegisterInstance(const std::string &coordinator_socket_address, const std::string &replication_socket_address, + const std::chrono::seconds instance_check_frequency, const std::string &instance_name, + CoordinatorQuery::SyncMode sync_mode) override { const auto maybe_replication_ip_port = io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); if (!maybe_replication_ip_port) { @@ -494,89 +492,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .replication_client_info = repl_config, .ssl = std::nullopt}; - auto status = coordinator_handler_.RegisterReplicaOnCoordinator(std::move(coordinator_client_config)); - switch (status) { - using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus; - case NAME_EXISTS: - throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!"); - case ENDPOINT_EXISTS: - throw QueryRuntimeException( - "Couldn't register replica instance since instance with such endpoint already exists!"); - case COULD_NOT_BE_PERSISTED: - throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!"); - case NOT_COORDINATOR: - throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!"); - case SUCCESS: - break; - } - } - - void RegisterMainCoordinatorServer(const std::string &coordinator_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name) override { - const auto maybe_ip_and_port = - io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt); - if (!maybe_ip_and_port) { - throw QueryRuntimeException("Invalid socket address!"); - } - const auto [ip, port] = *maybe_ip_and_port; - auto coordinator_client_config = - coordination::CoordinatorClientConfig{.instance_name = instance_name, - .ip_address = ip, - .port = port, - .health_check_frequency_sec = instance_check_frequency, - .ssl = std::nullopt}; - - auto status = coordinator_handler_.RegisterMainOnCoordinator(std::move(coordinator_client_config)); - switch (status) { - using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus; - case NAME_EXISTS: - throw QueryRuntimeException("Couldn't register main instance since instance with such name already exists!"); - case ENDPOINT_EXISTS: - throw QueryRuntimeException( - "Couldn't register main instance since instance with such endpoint already exists!"); - case COULD_NOT_BE_PERSISTED: - throw QueryRuntimeException("Couldn't register main instance since it couldn't be persisted!"); - case NOT_COORDINATOR: - throw QueryRuntimeException("Couldn't register main instance since this instance is not a coordinator!"); - case SUCCESS: - break; - } - } - - void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address, - const std::string &replication_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override { - const auto maybe_replication_ip_port = - io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); - if (!maybe_replication_ip_port) { - throw QueryRuntimeException("Invalid replication socket address!"); - } - - const auto maybe_coordinator_ip_port = - io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt); - if (!maybe_replication_ip_port) { - throw QueryRuntimeException("Invalid replication socket address!"); - } - - const auto [replication_ip, replication_port] = *maybe_replication_ip_port; - const auto [coordinator_server_ip, coordinator_server_port] = *maybe_coordinator_ip_port; - const auto repl_config = coordination::CoordinatorClientConfig::ReplicationClientInfo{ - .instance_name = instance_name, - .replication_mode = convertFromCoordinatorToReplicationMode(sync_mode), - .replication_ip_address = replication_ip, - .replication_port = replication_port}; - - auto coordinator_client_config = - coordination::CoordinatorClientConfig{.instance_name = instance_name, - .ip_address = coordinator_server_ip, - .port = coordinator_server_port, - .health_check_frequency_sec = instance_check_frequency, - .replication_client_info = repl_config, - .ssl = std::nullopt}; - - auto status = coordinator_handler_.RegisterInstanceOnCoordinator(std::move(coordinator_client_config)); + auto status = coordinator_handler_.RegisterInstance(coordinator_client_config); switch (status) { using enum memgraph::coordination::RegisterInstanceCoordinatorStatus; case NAME_EXISTS: @@ -588,6 +504,9 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!"); case NOT_COORDINATOR: throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!"); + case RPC_FAILED: + throw QueryRuntimeException( + "Couldn't register replica because promotion on replica failed! Check replica for more logs"); case SUCCESS: break; } @@ -609,28 +528,6 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } - /// @throw QueryRuntimeException if an error ocurred. - void DoFailover() const override { - if (!FLAGS_coordinator) { - throw QueryRuntimeException("Only coordinator can register coordinator server!"); - } - - auto status = coordinator_handler_.DoFailover(); - switch (status) { - using enum memgraph::coordination::DoFailoverStatus; - case ALL_REPLICAS_DOWN: - throw QueryRuntimeException("Failover aborted since all replicas are down!"); - case MAIN_ALIVE: - throw QueryRuntimeException("Failover aborted since main is alive!"); - case CLUSTER_UNINITIALIZED: - throw QueryRuntimeException("Failover aborted since cluster is uninitialized!"); - case RPC_FAILED: - throw QueryRuntimeException("Failover aborted since promoting replica to main failed!"); - case SUCCESS: - break; - } - } - #endif #ifdef MG_ENTERPRISE @@ -1091,44 +988,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param std::vector *notifications) { Callback callback; switch (coordinator_query->action_) { - case CoordinatorQuery::Action::REGISTER_MAIN_COORDINATOR_SERVER: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } -#ifdef MG_ENTERPRISE - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } - if (!FLAGS_coordinator) { - throw QueryRuntimeException("Only coordinator can register coordinator server!"); - } - - throw QueryRuntimeException("Query is disabled for now"); - return callback; -#endif - } - case CoordinatorQuery::Action::REGISTER_REPLICA_COORDINATOR_SERVER: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } -#ifdef MG_ENTERPRISE - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } - - if (!FLAGS_coordinator) { - throw QueryRuntimeException("Only coordinator can register coordinator server!"); - } - - throw QueryRuntimeException("Query is disabled for now"); - return callback; -#endif - } - case CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR: { + case CoordinatorQuery::Action::REGISTER_INSTANCE: { if (!license::global_license_checker.IsEnterpriseValidFast()) { throw QueryException("Trying to use enterprise feature without a valid license."); } @@ -1152,9 +1012,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency, instance_name = coordinator_query->instance_name_, sync_mode = coordinator_query->sync_mode_]() mutable { - handler.RegisterInstanceOnCoordinator(std::string(coordinator_socket_address_tv.ValueString()), - std::string(replication_socket_address_tv.ValueString()), - main_check_frequency, instance_name, sync_mode); + handler.RegisterInstance(std::string(coordinator_socket_address_tv.ValueString()), + std::string(replication_socket_address_tv.ValueString()), main_check_frequency, + instance_name, sync_mode); return std::vector>(); }; @@ -1224,45 +1084,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param return result; }; return callback; -#endif - } - case CoordinatorQuery::Action::DO_FAILOVER: { - if (!license::global_license_checker.IsEnterpriseValidFast()) { - throw QueryException("Trying to use enterprise feature without a valid license."); - } -#ifdef MG_ENTERPRISE - if constexpr (!coordination::allow_ha) { - throw QueryRuntimeException( - "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " - "be able to use this functionality."); - } - if (!FLAGS_coordinator) { - throw QueryRuntimeException("Only coordinator can run DO FAILOVER!"); - } - - callback.header = {"name", "socket_address", "alive", "role"}; - callback.fn = [handler = CoordQueryHandler{dbms_handler}]() mutable { - handler.DoFailover(); - - const auto replicas = handler.ShowReplicasOnCoordinator(); - const auto main = handler.ShowMainOnCoordinator(); - std::vector> result{}; - result.reserve(replicas.size() + 1); - - std::ranges::transform(replicas, std::back_inserter(result), [](const auto &status) -> std::vector { - return {TypedValue{status.instance_name}, TypedValue{status.socket_address}, TypedValue{status.is_alive}, - TypedValue{"replica"}}; - }); - if (main) { - result.emplace_back(std::vector{TypedValue{main->instance_name}, TypedValue{main->socket_address}, - TypedValue{main->is_alive}, TypedValue{"main"}}); - } - return result; - }; - - notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DO_FAILOVER, - "DO FAILOVER called on coordinator."); - return callback; #endif } return callback; diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 6f49c78fe..2de4f4284 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -107,21 +107,12 @@ class CoordinatorQueryHandler { #ifdef MG_ENTERPRISE /// @throw QueryRuntimeException if an error ocurred. - virtual void RegisterReplicaCoordinatorServer(const std::string &replication_socket_address, - const std::string &coordinator_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name, - CoordinatorQuery::SyncMode sync_mode) = 0; - virtual void RegisterMainCoordinatorServer(const std::string &socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name) = 0; - - virtual void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address, - const std::string &replication_socket_address, - const std::chrono::seconds instance_check_frequency, - const std::string &instance_name, - CoordinatorQuery::SyncMode sync_mode) = 0; + virtual void RegisterInstance(const std::string &coordinator_socket_address, + const std::string &replication_socket_address, + const std::chrono::seconds instance_check_frequency, const std::string &instance_name, + CoordinatorQuery::SyncMode sync_mode) = 0; + /// @throw QueryRuntimeException if an error ocurred. virtual void SetInstanceToMain(const std::string &instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. @@ -130,9 +121,6 @@ class CoordinatorQueryHandler { /// @throw QueryRuntimeException if an error ocurred. virtual std::optional ShowMainOnCoordinator() const = 0; - /// @throw QueryRuntimeException if an error ocurred. - virtual void DoFailover() const = 0; - #endif }; diff --git a/src/query/metadata.cpp b/src/query/metadata.cpp index 6995285f7..56ef57431 100644 --- a/src/query/metadata.cpp +++ b/src/query/metadata.cpp @@ -69,8 +69,6 @@ constexpr std::string_view GetCodeString(const NotificationCode code) { #ifdef MG_ENTERPRISE case NotificationCode::REGISTER_COORDINATOR_SERVER: return "RegisterCoordinatorServer"sv; - case NotificationCode::DO_FAILOVER: - return "DoFailover"sv; #endif case NotificationCode::REPLICA_PORT_WARNING: return "ReplicaPortWarning"sv; diff --git a/src/query/metadata.hpp b/src/query/metadata.hpp index 8e61280c0..8e82ad1e3 100644 --- a/src/query/metadata.hpp +++ b/src/query/metadata.hpp @@ -44,7 +44,6 @@ enum class NotificationCode : uint8_t { REGISTER_REPLICA, #ifdef MG_ENTERPRISE REGISTER_COORDINATOR_SERVER, - DO_FAILOVER, #endif SET_REPLICA, START_STREAM, diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 6ef902186..b742eb203 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2685,22 +2685,6 @@ TEST_P(CypherMainVisitorTest, TestRegisterCoordinatorServer) { EXPECT_EQ(full_query_parsed->sync_mode_, CoordinatorQuery::SyncMode::ASYNC); } } - -TEST_P(CypherMainVisitorTest, TestDoFailover) { - auto &ast_generator = *GetParam(); - - { - std::string invalid_query = "DO FAILO"; - ASSERT_THROW(ast_generator.ParseQuery(invalid_query), SyntaxException); - } - - { - std::string correct_query = "DO FAILOVER"; - auto *correct_query_parsed = dynamic_cast(ast_generator.ParseQuery(correct_query)); - ASSERT_TRUE(correct_query_parsed); - EXPECT_EQ(correct_query_parsed->action_, CoordinatorQuery::Action::DO_FAILOVER); - } -} #endif TEST_P(CypherMainVisitorTest, TestDeleteReplica) {