diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 2039e3e33..929dbc58c 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -11,7 +11,6 @@ #include "coordination/coordinator_state.hpp" #include -#include "coordination/coordinator_client_info.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" #ifdef MG_ENTERPRISE @@ -43,158 +42,163 @@ CoordinatorState::CoordinatorState() { auto CoordinatorState::RegisterInstanceOnCoordinator(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus { - MG_ASSERT(std::holds_alternative(data_), - "Coordinator cannot register replica since variant holds wrong alternative"); + // MG_ASSERT(std::holds_alternative(data_), + // "Coordinator cannot register replica since variant holds wrong alternative"); - const auto name_endpoint_status = - std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { - return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; - }, - [&config](const CoordinatorData &coordinator_data) { - if (memgraph::coordination::CheckName( - coordinator_data.registered_replicas_, config)) { - return RegisterInstanceCoordinatorStatus::NAME_EXISTS; - } - return RegisterInstanceCoordinatorStatus::SUCCESS; - }}, - data_); + // const auto name_endpoint_status = + // std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) + // { + // return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR; + // }, + // [&config](const CoordinatorData &coordinator_data) { + // if (memgraph::coordination::CheckName( + // coordinator_data.registered_replicas_, config)) { + // return RegisterInstanceCoordinatorStatus::NAME_EXISTS; + // } + // return RegisterInstanceCoordinatorStatus::SUCCESS; + // }}, + // data_); - if (name_endpoint_status != RegisterInstanceCoordinatorStatus::SUCCESS) { - return name_endpoint_status; - } + // if (name_endpoint_status != RegisterInstanceCoordinatorStatus::SUCCESS) { + // return name_endpoint_status; + // } - auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & { - MG_ASSERT(std::holds_alternative(coord_state->data_), - "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); - auto &coord_data = std::get(coord_state->data_); - std::shared_lock lock{coord_data.coord_data_lock_}; + // auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo + // & { + // MG_ASSERT(std::holds_alternative(coord_state->data_), + // "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + // auto &coord_data = std::get(coord_state->data_); + // std::shared_lock lock{coord_data.coord_data_lock_}; - auto replica_client_info = std::ranges::find_if( - coord_data.registered_replicas_info_, - [instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; }); + // auto replica_client_info = std::ranges::find_if( + // coord_data.registered_replicas_info_, + // [instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; }); - if (replica_client_info != coord_data.registered_replicas_info_.end()) { - return *replica_client_info; - } + // if (replica_client_info != coord_data.registered_replicas_info_.end()) { + // return *replica_client_info; + // } - MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name, - "Instance is neither a replica nor main..."); - return *coord_data.registered_main_info_; - }; + // MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name, + // "Instance is neither a replica nor main..."); + // return *coord_data.registered_main_info_; + // }; - // TODO MERGE WITH ANDI's WORK - auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - auto &client_info = find_client_info(coord_state, instance_name); - client_info.UpdateLastResponseTime(); - }; + // // TODO MERGE WITH ANDI's WORK + // auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + // auto &client_info = find_client_info(coord_state, instance_name); + // client_info.UpdateLastResponseTime(); + // }; - auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - auto &client_info = find_client_info(coord_state, instance_name); - client_info.UpdateInstanceStatus(); - }; - CoordinatorClientConfig::ReplicationClientInfo replication_client_info = *config.replication_client_info; - auto *coord_client = &std::get(data_).registered_replicas_.emplace_back( - this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)); + // auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + // auto &client_info = find_client_info(coord_state, instance_name); + // client_info.UpdateInstanceStatus(); + // }; + // CoordinatorClientConfig::ReplicationClientInfo replication_client_info = *config.replication_client_info; + // auto *coord_client = &std::get(data_).registered_replicas_.emplace_back( + // this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb)); - coord_client->SendSetToReplicaRpc(replication_client_info); + // coord_client->SendSetToReplicaRpc(replication_client_info); - std::get(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), - coord_client->SocketAddress()); - coord_client->StartFrequentCheck(); + // std::get(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), + // coord_client->SocketAddress()); + // coord_client->StartFrequentCheck(); return RegisterInstanceCoordinatorStatus::SUCCESS; } auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { - MG_ASSERT(std::holds_alternative(data_), - "Coordinator cannot register replica since variant holds wrong alternative"); + // MG_ASSERT(std::holds_alternative(data_), + // "Coordinator cannot register replica since variant holds wrong alternative"); - // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks? - // We should probably at that point search for main also in instances as for replicas... - auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & { - MG_ASSERT(std::holds_alternative(coord_state->data_), - "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); - MG_ASSERT(std::get(coord_state->data_).registered_main_info_.has_value(), - "Main info is not set, but callback is called"); - auto &coord_data = std::get(coord_state->data_); - std::shared_lock lock{coord_data.coord_data_lock_}; + // // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks? + // // We should probably at that point search for main also in instances as for replicas... + // auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo & + // { + // MG_ASSERT(std::holds_alternative(coord_state->data_), + // "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + // MG_ASSERT(std::get(coord_state->data_).registered_main_info_.has_value(), + // "Main info is not set, but callback is called"); + // auto &coord_data = std::get(coord_state->data_); + // std::shared_lock lock{coord_data.coord_data_lock_}; - // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at - // this point.... - auto ®istered_main_info = coord_data.registered_main_info_; - MG_ASSERT(registered_main_info->InstanceName() == instance_name, - "Callback called for wrong instance name: {}, expected: {}", instance_name, - registered_main_info->InstanceName()); - return *registered_main_info; - }; + // // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at + // // this point.... + // auto ®istered_main_info = coord_data.registered_main_info_; + // MG_ASSERT(registered_main_info->InstanceName() == instance_name, + // "Callback called for wrong instance name: {}, expected: {}", instance_name, + // registered_main_info->InstanceName()); + // return *registered_main_info; + // }; - auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - auto ®istered_main_info = get_client_info(coord_state, instance_name); - registered_main_info.UpdateLastResponseTime(); - }; + // auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + // auto ®istered_main_info = get_client_info(coord_state, instance_name); + // registered_main_info.UpdateLastResponseTime(); + // }; - auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { - auto ®istered_main_info = get_client_info(coord_state, instance_name); - if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { - spdlog::warn("Main is not alive, starting failover"); - switch (auto failover_status = DoFailover(); failover_status) { - using enum DoFailoverStatus; - case ALL_REPLICAS_DOWN: - spdlog::warn("Failover aborted since all replicas are down!"); - case MAIN_ALIVE: - spdlog::warn("Failover aborted since main is alive!"); - case CLUSTER_UNINITIALIZED: - spdlog::warn("Failover aborted since cluster is uninitialized!"); - case SUCCESS: - break; - } - } - }; + // auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + // auto ®istered_main_info = get_client_info(coord_state, instance_name); + // if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { + // spdlog::warn("Main is not alive, starting failover"); + // switch (auto failover_status = DoFailover(); failover_status) { + // using enum DoFailoverStatus; + // case ALL_REPLICAS_DOWN: + // spdlog::warn("Failover aborted since all replicas are down!"); + // case MAIN_ALIVE: + // spdlog::warn("Failover aborted since main is alive!"); + // case CLUSTER_UNINITIALIZED: + // spdlog::warn("Failover aborted since cluster is uninitialized!"); + // case SUCCESS: + // break; + // } + // } + // }; - auto ®istered_replicas = std::get(data_).registered_replicas_; - // Find replica we already registered - auto registered_replica = - std::find_if(registered_replicas.begin(), registered_replicas.end(), [instance_name](const auto &replica_client) { - std::cout << "replica name: " << replica_client.InstanceName() << ", instance name: " << instance_name - << std::endl; - return replica_client.InstanceName() == instance_name; - }); + // auto ®istered_replicas = std::get(data_).registered_replicas_; + // // Find replica we already registered + // auto registered_replica = + // std::find_if(registered_replicas.begin(), registered_replicas.end(), [instance_name](const auto + // &replica_client) { + // std::cout << "replica name: " << replica_client.InstanceName() << ", instance name: " << instance_name + // << std::endl; + // return replica_client.InstanceName() == instance_name; + // }); - std::for_each(registered_replicas.begin(), registered_replicas.end(), - [](const auto &client) { std::cout << "replica names: " << client.InstanceName() << std::endl; }); - // if replica not found... - if (registered_replica == registered_replicas.end()) { - spdlog::error("You didn't register instance with given name {}", instance_name); - return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; - } - registered_replica->StopFrequentCheck(); - // Set instance as MAIN - // THIS WILL SHUT DOWN CLIENT - auto ®istered_main = std::get(data_).registered_main_; - registered_main = - std::make_unique(this, registered_replica->Config(), std::move(succ_cb), std::move(fail_cb)); + // std::for_each(registered_replicas.begin(), registered_replicas.end(), + // [](const auto &client) { std::cout << "replica names: " << client.InstanceName() << std::endl; }); + // // if replica not found... + // if (registered_replica == registered_replicas.end()) { + // spdlog::error("You didn't register instance with given name {}", instance_name); + // return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME; + // } + // registered_replica->StopFrequentCheck(); + // // Set instance as MAIN + // // THIS WILL SHUT DOWN CLIENT + // auto ®istered_main = std::get(data_).registered_main_; + // registered_main = + // std::make_unique(this, registered_replica->Config(), std::move(succ_cb), + // std::move(fail_cb)); - std::get(data_).registered_main_info_.emplace(registered_main->InstanceName(), - registered_main->SocketAddress()); - std::vector repl_clients_info; - repl_clients_info.reserve(registered_replicas.size() - 1); - std::ranges::for_each(registered_replicas, - [registered_replica, &repl_clients_info](const CoordinatorClient &replica) { - if (replica != *registered_replica) { - repl_clients_info.emplace_back(replica.ReplicationClientInfo()); - } - }); + // std::get(data_).registered_main_info_.emplace(registered_main->InstanceName(), + // registered_main->SocketAddress()); + // std::vector repl_clients_info; + // repl_clients_info.reserve(registered_replicas.size() - 1); + // std::ranges::for_each(registered_replicas, + // [registered_replica, &repl_clients_info](const CoordinatorClient &replica) { + // if (replica != *registered_replica) { + // repl_clients_info.emplace_back(replica.ReplicationClientInfo()); + // } + // }); - // PROMOTE REPLICA TO MAIN - // THIS SHOULD FAIL HERE IF IT IS DOWN - if (auto result = registered_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) { - registered_replica->StartFrequentCheck(); - registered_main.reset(); - return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; - } + // // PROMOTE REPLICA TO MAIN + // // THIS SHOULD FAIL HERE IF IT IS DOWN + // if (auto result = registered_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) { + // registered_replica->StartFrequentCheck(); + // registered_main.reset(); + // return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; + // } - registered_main->StartFrequentCheck(); - registered_replicas.erase(registered_replica); + // registered_main->StartFrequentCheck(); + // registered_replicas.erase(registered_replica); return SetInstanceToMainCoordinatorStatus::SUCCESS; } diff --git a/src/dbms/utils.hpp b/src/dbms/utils.hpp index f43a0765f..f36c4727a 100644 --- a/src/dbms/utils.hpp +++ b/src/dbms/utils.hpp @@ -54,34 +54,36 @@ inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler, // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are // deleting the replica. // Remove database specific clients - dbms_handler.ForEach([&](Database *db) { - auto *storage = db->storage(); - storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); - }); + + // dbms_handler.ForEach([&](Database *db) { + // auto *storage = db->storage(); + // storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); + // }); + // Remove instance level clients - std::get(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear(); + // std::get(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear(); - // Creates the server - dbms_handler.ReplicationState().SetReplicationRoleReplica(config); + // // Creates the server + // dbms_handler.ReplicationState().SetReplicationRoleReplica(config); - // Start - const auto success = - std::visit(utils::Overloaded{[](replication::RoleMainData const &) { - // ASSERT - return false; - }, - [&dbms_handler](replication::RoleReplicaData const &data) { - // Register handlers - InMemoryReplicationHandlers::Register(&dbms_handler, *data.server); - if (!data.server->Start()) { - spdlog::error("Unable to start the replication server."); - return false; - } - return true; - }}, - dbms_handler.ReplicationState().ReplicationData()); + // // Start + // const auto success = + // std::visit(utils::Overloaded{[](replication::RoleMainData const &) { + // // ASSERT + // return false; + // }, + // [&dbms_handler](replication::RoleReplicaData const &data) { + // // Register handlers + // InMemoryReplicationHandlers::Register(&dbms_handler, *data.server); + // if (!data.server->Start()) { + // spdlog::error("Unable to start the replication server."); + // return false; + // } + // return true; + // }}, + // dbms_handler.ReplicationState().ReplicationData()); // TODO Handle error (restore to main?) - return success; + return true; } inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,