merge with Andi's work, register instance works

This commit is contained in:
antoniofilipovic 2024-01-24 17:42:49 +01:00 committed by Andi Skrgat
parent d3168ded5a
commit 5242427686
22 changed files with 119 additions and 520 deletions

View File

@ -44,7 +44,7 @@ void CoordinatorClient::StartFrequentCheck() {
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0");
replica_checker_.Run(
instance_checker_.Run(
"Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
@ -58,20 +58,22 @@ void CoordinatorClient::StartFrequentCheck() {
});
}
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); }
auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); }
auto CoordinatorClient::ReplicationClientInfo() const
-> const std::optional<CoordinatorClientConfig::ReplicationClientInfo> & {
auto CoordinatorClient::ReplicationClientInfo() const -> const CoordinatorClientConfig::ReplicationClientInfo & {
return config_.replication_client_info;
}
auto CoordinatorClient::ResetReplicationClientInfo() -> void { config_.replication_client_info.reset(); }
auto CoordinatorClient::ResetReplicationClientInfo() -> void {
// TODO (antoniofilipovic) Sync with Andi on this one
// config_.replication_client_info.reset();
}
auto CoordinatorClient::SendPromoteReplicaToMainRpc(
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {

View File

@ -9,6 +9,8 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "coordination/coordinator_instance.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
@ -110,12 +112,13 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
auto not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
return instance != *chosen_replica_instance;
};
auto not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
for (const auto &unchosen_replica_instance : replica_instances | ranges::views::filter(not_chosen_replica_instance)) {
if (auto repl_client_info = unchosen_replica_instance.client_.ReplicationClientInfo();
repl_client_info.has_value()) {
repl_clients_info.emplace_back(std::move(repl_client_info.value()));
}
// Filter not current replicas and not MAIN instance
// TODO (antoniofilipovic): Should we send also data on old MAIN???
for (const auto &unchosen_replica_instance :
replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) {
repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo());
}
if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
@ -154,49 +157,74 @@ auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceSta
return instances_status;
}
auto CoordinatorData::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
// TODO: (andi) test this
std::lock_guard<utils::RWLock> lock{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS;
// Find replica we already registered
auto registered_replica = std::find_if(
registered_instances_.begin(), registered_instances_.end(),
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
// if replica not found...
if (registered_replica == registered_instances_.end()) {
spdlog::error("You didn't register instance with given name {}", instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS;
// Stop for now because we need to swap success and fail callbacks
registered_replica->client_.StopFrequentCheck();
std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
std::ranges::for_each(registered_instances_,
[registered_replica, &repl_clients_info](const CoordinatorInstance &replica) {
if (replica != *registered_replica) {
repl_clients_info.emplace_back(replica.client_.ReplicationClientInfo());
}
});
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto result = registered_replica->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
registered_replica->client_.StartFrequentCheck();
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
auto *instance = &registered_instances_.emplace_back(this, std::move(config), main_succ_cb_, main_fail_cb_,
replication_coordination_glue::ReplicationRole::MAIN);
instance->client_.StartFrequentCheck();
registered_replica->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
registered_replica->client_.SetSuccCallback(main_succ_cb_);
registered_replica->client_.SetFailCallback(main_fail_cb_);
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorData::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
std::lock_guard<utils::RWLock> lock{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS;
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress());
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS;
return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS;
}
CoordinatorClientConfig::ReplicationClientInfo replication_client_info_copy = config.replication_client_info;
// TODO (antoniofilipovic) create and then push back
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA);
if (auto res = instance->client_.SendSetToReplicaRpc(replication_client_info_copy); !res) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
instance->client_.StartFrequentCheck();
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
} // namespace memgraph::coordination

View File

@ -40,191 +40,30 @@ CoordinatorState::CoordinatorState() {
}
}
auto CoordinatorState::RegisterInstanceOnCoordinator(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
// MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
// "Coordinator cannot register replica since variant holds wrong alternative");
// const auto name_endpoint_status =
// std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/)
// {
// return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
// },
// [&config](const CoordinatorData &coordinator_data) {
// if (memgraph::coordination::CheckName(
// coordinator_data.registered_replicas_, config)) {
// return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
// }
// return RegisterInstanceCoordinatorStatus::SUCCESS;
// }},
// data_);
// if (name_endpoint_status != RegisterInstanceCoordinatorStatus::SUCCESS) {
// return name_endpoint_status;
// }
// auto find_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo
// & {
// MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
// "Can't execute CoordinatorClient's callback since variant holds wrong alternative");
// auto &coord_data = std::get<CoordinatorData>(coord_state->data_);
// std::shared_lock<utils::RWLock> lock{coord_data.coord_data_lock_};
// auto replica_client_info = std::ranges::find_if(
// coord_data.registered_replicas_info_,
// [instance_name](const CoordinatorClientInfo &replica) { return replica.InstanceName() == instance_name; });
// if (replica_client_info != coord_data.registered_replicas_info_.end()) {
// return *replica_client_info;
// }
// MG_ASSERT(coord_data.registered_main_info_->InstanceName() == instance_name,
// "Instance is neither a replica nor main...");
// return *coord_data.registered_main_info_;
// };
// // TODO MERGE WITH ANDI's WORK
// auto repl_succ_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
// auto &client_info = find_client_info(coord_state, instance_name);
// client_info.UpdateLastResponseTime();
// };
// auto repl_fail_cb = [find_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
// auto &client_info = find_client_info(coord_state, instance_name);
// client_info.UpdateInstanceStatus();
// };
// CoordinatorClientConfig::ReplicationClientInfo replication_client_info = *config.replication_client_info;
// auto *coord_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(
// this, std::move(config), std::move(repl_succ_cb), std::move(repl_fail_cb));
// coord_client->SendSetToReplicaRpc(replication_client_info);
// std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
// coord_client->SocketAddress());
// coord_client->StartFrequentCheck();
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
// MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
// "Coordinator cannot register replica since variant holds wrong alternative");
// // TODO: (andi) How does the situation change when restoration of main is implemented regarding callbacks?
// // We should probably at that point search for main also in instances as for replicas...
// auto get_client_info = [](CoordinatorState *coord_state, std::string_view instance_name) -> CoordinatorClientInfo &
// {
// MG_ASSERT(std::holds_alternative<CoordinatorData>(coord_state->data_),
// "Can't execute CoordinatorClient's callback since variant holds wrong alternative");
// MG_ASSERT(std::get<CoordinatorData>(coord_state->data_).registered_main_info_.has_value(),
// "Main info is not set, but callback is called");
// auto &coord_data = std::get<CoordinatorData>(coord_state->data_);
// std::shared_lock<utils::RWLock> lock{coord_data.coord_data_lock_};
// // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
// // this point....
// auto &registered_main_info = coord_data.registered_main_info_;
// MG_ASSERT(registered_main_info->InstanceName() == instance_name,
// "Callback called for wrong instance name: {}, expected: {}", instance_name,
// registered_main_info->InstanceName());
// return *registered_main_info;
// };
// auto succ_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
// auto &registered_main_info = get_client_info(coord_state, instance_name);
// registered_main_info.UpdateLastResponseTime();
// };
// auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
// auto &registered_main_info = get_client_info(coord_state, instance_name);
// if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
// spdlog::warn("Main is not alive, starting failover");
// switch (auto failover_status = DoFailover(); failover_status) {
// using enum DoFailoverStatus;
// case ALL_REPLICAS_DOWN:
// spdlog::warn("Failover aborted since all replicas are down!");
// case MAIN_ALIVE:
// spdlog::warn("Failover aborted since main is alive!");
// case CLUSTER_UNINITIALIZED:
// spdlog::warn("Failover aborted since cluster is uninitialized!");
// case SUCCESS:
// break;
// }
// }
// };
// auto &registered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
// // Find replica we already registered
// auto registered_replica =
// std::find_if(registered_replicas.begin(), registered_replicas.end(), [instance_name](const auto
// &replica_client) {
// std::cout << "replica name: " << replica_client.InstanceName() << ", instance name: " << instance_name
// << std::endl;
// return replica_client.InstanceName() == instance_name;
// });
// std::for_each(registered_replicas.begin(), registered_replicas.end(),
// [](const auto &client) { std::cout << "replica names: " << client.InstanceName() << std::endl; });
// // if replica not found...
// if (registered_replica == registered_replicas.end()) {
// spdlog::error("You didn't register instance with given name {}", instance_name);
// return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
// }
// registered_replica->StopFrequentCheck();
// // Set instance as MAIN
// // THIS WILL SHUT DOWN CLIENT
// auto &registered_main = std::get<CoordinatorData>(data_).registered_main_;
// registered_main =
// std::make_unique<CoordinatorClient>(this, registered_replica->Config(), std::move(succ_cb),
// std::move(fail_cb));
// std::get<CoordinatorData>(data_).registered_main_info_.emplace(registered_main->InstanceName(),
// registered_main->SocketAddress());
// std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
// repl_clients_info.reserve(registered_replicas.size() - 1);
// std::ranges::for_each(registered_replicas,
// [registered_replica, &repl_clients_info](const CoordinatorClient &replica) {
// if (replica != *registered_replica) {
// repl_clients_info.emplace_back(replica.ReplicationClientInfo());
// }
// });
// // PROMOTE REPLICA TO MAIN
// // THIS SHOULD FAIL HERE IF IT IS DOWN
// if (auto result = registered_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
// registered_replica->StartFrequentCheck();
// registered_main.reset();
// return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
// }
// registered_main->StartFrequentCheck();
// registered_replicas.erase(registered_replica);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
auto CoordinatorState::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR;
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[&config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterReplica(std::move(config)); }},
[config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterInstance(config); }},
data_);
}
auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
"Coordinator cannot register main since variant holds wrong alternative");
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR;
},
[&config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterMain(std::move(config)); }},
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return SetInstanceToMainCoordinatorStatus::NOT_COORDINATOR;
},
[&instance_name](CoordinatorData &coordinator_data) {
return coordinator_data.SetInstanceToMain(instance_name);
}},
data_);
}

View File

@ -48,7 +48,7 @@ class CoordinatorClient {
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
auto ReplicationClientInfo() const -> const std::optional<ReplClientInfo> &;
auto ReplicationClientInfo() const -> const ReplClientInfo &;
auto ResetReplicationClientInfo() -> void;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
@ -61,7 +61,7 @@ class CoordinatorClient {
}
private:
utils::Scheduler replica_checker_;
utils::Scheduler instance_checker_;
// TODO: (andi) Pimpl?
communication::ClientContext rpc_context_;

View File

@ -43,7 +43,8 @@ struct CoordinatorClientConfig {
friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default;
};
std::optional<ReplicationClientInfo> replication_client_info;
// Each instance has replication config in case it fails
ReplicationClientInfo replication_client_info;
struct SSL {
std::string key_file;

View File

@ -29,8 +29,8 @@ class CoordinatorData {
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
[[nodiscard]] auto RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
[[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowReplicas() const -> std::vector<CoordinatorInstanceStatus>;
auto ShowMain() const -> std::optional<CoordinatorInstanceStatus>;

View File

@ -59,7 +59,8 @@ class CoordinatorInstance {
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetSuccCallback(std::move(main_succ_cb));
client_.SetFailCallback(std::move(main_fail_cb));
client_.ResetReplicationClientInfo();
// Comment with Andi but we shouldn't delete this, what if this MAIN FAILS AGAIN
// client_.ResetReplicationClientInfo();
client_.ResumeFrequentCheck();
}

View File

@ -34,11 +34,7 @@ class CoordinatorState {
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
[[nodiscard]] auto RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
[[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
[[nodiscard]] auto RegisterInstanceOnCoordinator(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;

View File

@ -17,19 +17,12 @@
namespace memgraph::coordination {
enum class RegisterMainReplicaCoordinatorStatus : uint8_t {
NAME_EXISTS,
ENDPOINT_EXISTS,
COULD_NOT_BE_PERSISTED,
NOT_COORDINATOR,
SUCCESS
};
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
COULD_NOT_BE_PERSISTED,
NOT_COORDINATOR,
RPC_FAILED,
SUCCESS
};

View File

@ -20,19 +20,9 @@ namespace memgraph::dbms {
CoordinatorHandler::CoordinatorHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {}
auto CoordinatorHandler::RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config)
-> coordination::RegisterMainReplicaCoordinatorStatus {
return dbms_handler_.CoordinatorState().RegisterReplica(std::move(config));
}
auto CoordinatorHandler::RegisterMainOnCoordinator(memgraph::coordination::CoordinatorClientConfig config)
-> coordination::RegisterMainReplicaCoordinatorStatus {
return dbms_handler_.CoordinatorState().RegisterMain(std::move(config));
}
auto CoordinatorHandler::RegisterInstanceOnCoordinator(memgraph::coordination::CoordinatorClientConfig config)
auto CoordinatorHandler::RegisterInstance(memgraph::coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus {
return dbms_handler_.CoordinatorState().RegisterInstanceOnCoordinator(std::move(config));
return dbms_handler_.CoordinatorState().RegisterInstance(config);
}
auto CoordinatorHandler::SetInstanceToMain(std::string instance_name)
@ -48,10 +38,6 @@ auto CoordinatorHandler::ShowMainOnCoordinator() const -> std::optional<coordina
return dbms_handler_.CoordinatorState().ShowMain();
}
auto CoordinatorHandler::DoFailover() const -> coordination::DoFailoverStatus {
return dbms_handler_.CoordinatorState().DoFailover();
}
} // namespace memgraph::dbms
#endif

View File

@ -32,13 +32,7 @@ class CoordinatorHandler {
public:
explicit CoordinatorHandler(DbmsHandler &dbms_handler);
auto RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config)
-> coordination::RegisterMainReplicaCoordinatorStatus;
auto RegisterMainOnCoordinator(coordination::CoordinatorClientConfig config)
-> coordination::RegisterMainReplicaCoordinatorStatus;
auto RegisterInstanceOnCoordinator(coordination::CoordinatorClientConfig config)
auto RegisterInstance(coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
@ -47,8 +41,6 @@ class CoordinatorHandler {
auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorInstanceStatus>;
auto DoFailover() const -> coordination::DoFailoverStatus;
private:
DbmsHandler &dbms_handler_;
};

View File

@ -34,7 +34,7 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) {
server.Register<coordination::SetMainToReplicaRpc>(
[&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received PromoteReplicaToMainRpc from coordinator server");
spdlog::info("Received SetMainToReplicaRpc from coordinator server");
CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder);
});
}

View File

@ -46,7 +46,6 @@ inline bool DoReplicaToMainPromotion(dbms::DbmsHandler &dbms_handler) {
inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler,
const memgraph::replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (dbms_handler.ReplicationState().IsReplica()) {
return false;
}
@ -54,36 +53,27 @@ inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler,
// TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are
// deleting the replica.
// Remove database specific clients
// dbms_handler.ForEach([&](Database *db) {
// auto *storage = db->storage();
// storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
// });
dbms_handler.ForEach([&](DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
});
// Remove instance level clients
// std::get<replication::RoleMainData>(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear();
std::get<replication::RoleMainData>(dbms_handler.ReplicationState().ReplicationData()).registered_replicas_.clear();
// // Creates the server
// dbms_handler.ReplicationState().SetReplicationRoleReplica(config);
// Creates the server
dbms_handler.ReplicationState().SetReplicationRoleReplica(config);
// // Start
// const auto success =
// std::visit(utils::Overloaded{[](replication::RoleMainData const &) {
// // ASSERT
// return false;
// },
// [&dbms_handler](replication::RoleReplicaData const &data) {
// // Register handlers
// InMemoryReplicationHandlers::Register(&dbms_handler, *data.server);
// if (!data.server->Start()) {
// spdlog::error("Unable to start the replication server.");
// return false;
// }
// return true;
// }},
// dbms_handler.ReplicationState().ReplicationData());
// Start
const auto success = std::visit(utils::Overloaded{[](replication::RoleMainData const &) {
// ASSERT
return false;
},
[&dbms_handler](replication::RoleReplicaData const &data) {
return StartRpcServer(dbms_handler, data);
}},
dbms_handler.ReplicationState().ReplicationData());
// TODO Handle error (restore to main?)
return true;
return success;
}
inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,

View File

@ -3071,12 +3071,9 @@ class CoordinatorQuery : public memgraph::query::Query {
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class Action {
REGISTER_INSTANCE_ON_COORDINATOR,
REGISTER_MAIN_COORDINATOR_SERVER,
REGISTER_REPLICA_COORDINATOR_SERVER,
REGISTER_INSTANCE,
SET_INSTANCE_TO_MAIN,
SHOW_REPLICATION_CLUSTER,
DO_FAILOVER
};
enum class SyncMode { SYNC, ASYNC };

View File

@ -385,7 +385,7 @@ antlrcpp::Any CypherMainVisitor::visitRegisterInstanceOnCoordinator(
if (!ctx->coordinatorSocketAddress()->literal()->StringLiteral()) {
throw SemanticException("Coordinator socket address should be a string literal!");
}
coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR;
coordinator_query->action_ = CoordinatorQuery::Action::REGISTER_INSTANCE;
coordinator_query->replication_socket_address_ =
std::any_cast<Expression *>(ctx->replicationSocketAddress()->accept(this));
coordinator_query->coordinator_socket_address_ =
@ -420,14 +420,6 @@ antlrcpp::Any CypherMainVisitor::visitShowReplicas(MemgraphCypher::ShowReplicasC
return replication_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitDoFailover(MemgraphCypher::DoFailoverContext * /*ctx*/) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();
coordinator_query->action_ = CoordinatorQuery::Action::DO_FAILOVER;
query_ = coordinator_query;
return coordinator_query;
}
// License check is done in the interpreter
antlrcpp::Any CypherMainVisitor::visitSetInstanceToMain(MemgraphCypher::SetInstanceToMainContext *ctx) {
auto *coordinator_query = storage_->Create<CoordinatorQuery>();

View File

@ -253,11 +253,6 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitShowReplicationCluster(MemgraphCypher::ShowReplicationClusterContext *ctx) override;
/**
* @return CoordinatorQuery*
*/
antlrcpp::Any visitDoFailover(MemgraphCypher::DoFailoverContext *ctx) override;
/**
* @return LockPathQuery*
*/

View File

@ -190,7 +190,6 @@ replicationQuery : setReplicationRole
coordinatorQuery : registerInstanceOnCoordinator
| setInstanceToMain
| showReplicationCluster
| doFailover
;
triggerQuery : createTrigger
@ -254,8 +253,6 @@ transactionQueueQuery : showTransactions
showTransactions : SHOW TRANSACTIONS ;
doFailover : DO FAILOVER ;
terminateTransactions : TERMINATE TRANSACTIONS transactionIdList;
loadCsv : LOAD CSV FROM csvFile ( WITH | NO ) HEADER

View File

@ -461,11 +461,9 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
#ifdef MG_ENTERPRISE
/// @throw QueryRuntimeException if an error ocurred.
void RegisterReplicaCoordinatorServer(const std::string &replication_socket_address,
const std::string &coordinator_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) override {
void RegisterInstance(const std::string &coordinator_socket_address, const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
@ -494,89 +492,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
.replication_client_info = repl_config,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterReplicaOnCoordinator(std::move(coordinator_client_config));
switch (status) {
using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
case NAME_EXISTS:
throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!");
case ENDPOINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
case COULD_NOT_BE_PERSISTED:
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
case SUCCESS:
break;
}
}
void RegisterMainCoordinatorServer(const std::string &coordinator_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name) override {
const auto maybe_ip_and_port =
io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt);
if (!maybe_ip_and_port) {
throw QueryRuntimeException("Invalid socket address!");
}
const auto [ip, port] = *maybe_ip_and_port;
auto coordinator_client_config =
coordination::CoordinatorClientConfig{.instance_name = instance_name,
.ip_address = ip,
.port = port,
.health_check_frequency_sec = instance_check_frequency,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterMainOnCoordinator(std::move(coordinator_client_config));
switch (status) {
using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
case NAME_EXISTS:
throw QueryRuntimeException("Couldn't register main instance since instance with such name already exists!");
case ENDPOINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register main instance since instance with such endpoint already exists!");
case COULD_NOT_BE_PERSISTED:
throw QueryRuntimeException("Couldn't register main instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register main instance since this instance is not a coordinator!");
case SUCCESS:
break;
}
}
void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
throw QueryRuntimeException("Invalid replication socket address!");
}
const auto maybe_coordinator_ip_port =
io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
throw QueryRuntimeException("Invalid replication socket address!");
}
const auto [replication_ip, replication_port] = *maybe_replication_ip_port;
const auto [coordinator_server_ip, coordinator_server_port] = *maybe_coordinator_ip_port;
const auto repl_config = coordination::CoordinatorClientConfig::ReplicationClientInfo{
.instance_name = instance_name,
.replication_mode = convertFromCoordinatorToReplicationMode(sync_mode),
.replication_ip_address = replication_ip,
.replication_port = replication_port};
auto coordinator_client_config =
coordination::CoordinatorClientConfig{.instance_name = instance_name,
.ip_address = coordinator_server_ip,
.port = coordinator_server_port,
.health_check_frequency_sec = instance_check_frequency,
.replication_client_info = repl_config,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterInstanceOnCoordinator(std::move(coordinator_client_config));
auto status = coordinator_handler_.RegisterInstance(coordinator_client_config);
switch (status) {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
@ -588,6 +504,9 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica because promotion on replica failed! Check replica for more logs");
case SUCCESS:
break;
}
@ -609,28 +528,6 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
/// @throw QueryRuntimeException if an error ocurred.
void DoFailover() const override {
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
auto status = coordinator_handler_.DoFailover();
switch (status) {
using enum memgraph::coordination::DoFailoverStatus;
case ALL_REPLICAS_DOWN:
throw QueryRuntimeException("Failover aborted since all replicas are down!");
case MAIN_ALIVE:
throw QueryRuntimeException("Failover aborted since main is alive!");
case CLUSTER_UNINITIALIZED:
throw QueryRuntimeException("Failover aborted since cluster is uninitialized!");
case RPC_FAILED:
throw QueryRuntimeException("Failover aborted since promoting replica to main failed!");
case SUCCESS:
break;
}
}
#endif
#ifdef MG_ENTERPRISE
@ -1091,44 +988,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
std::vector<Notification> *notifications) {
Callback callback;
switch (coordinator_query->action_) {
case CoordinatorQuery::Action::REGISTER_MAIN_COORDINATOR_SERVER: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
#ifdef MG_ENTERPRISE
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
throw QueryRuntimeException("Query is disabled for now");
return callback;
#endif
}
case CoordinatorQuery::Action::REGISTER_REPLICA_COORDINATOR_SERVER: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
#ifdef MG_ENTERPRISE
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
throw QueryRuntimeException("Query is disabled for now");
return callback;
#endif
}
case CoordinatorQuery::Action::REGISTER_INSTANCE_ON_COORDINATOR: {
case CoordinatorQuery::Action::REGISTER_INSTANCE: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
@ -1152,9 +1012,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterInstanceOnCoordinator(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
handler.RegisterInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()), main_check_frequency,
instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
@ -1224,45 +1084,6 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
return result;
};
return callback;
#endif
}
case CoordinatorQuery::Action::DO_FAILOVER: {
if (!license::global_license_checker.IsEnterpriseValidFast()) {
throw QueryException("Trying to use enterprise feature without a valid license.");
}
#ifdef MG_ENTERPRISE
if constexpr (!coordination::allow_ha) {
throw QueryRuntimeException(
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
"be able to use this functionality.");
}
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can run DO FAILOVER!");
}
callback.header = {"name", "socket_address", "alive", "role"};
callback.fn = [handler = CoordQueryHandler{dbms_handler}]() mutable {
handler.DoFailover();
const auto replicas = handler.ShowReplicasOnCoordinator();
const auto main = handler.ShowMainOnCoordinator();
std::vector<std::vector<TypedValue>> result{};
result.reserve(replicas.size() + 1);
std::ranges::transform(replicas, std::back_inserter(result), [](const auto &status) -> std::vector<TypedValue> {
return {TypedValue{status.instance_name}, TypedValue{status.socket_address}, TypedValue{status.is_alive},
TypedValue{"replica"}};
});
if (main) {
result.emplace_back(std::vector<TypedValue>{TypedValue{main->instance_name}, TypedValue{main->socket_address},
TypedValue{main->is_alive}, TypedValue{"main"}});
}
return result;
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DO_FAILOVER,
"DO FAILOVER called on coordinator.");
return callback;
#endif
}
return callback;

View File

@ -107,21 +107,12 @@ class CoordinatorQueryHandler {
#ifdef MG_ENTERPRISE
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterReplicaCoordinatorServer(const std::string &replication_socket_address,
const std::string &coordinator_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void RegisterMainCoordinatorServer(const std::string &socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name) = 0;
virtual void RegisterInstanceOnCoordinator(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void RegisterInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void SetInstanceToMain(const std::string &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
@ -130,9 +121,6 @@ class CoordinatorQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
virtual std::optional<coordination::CoordinatorInstanceStatus> ShowMainOnCoordinator() const = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void DoFailover() const = 0;
#endif
};

View File

@ -69,8 +69,6 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
#ifdef MG_ENTERPRISE
case NotificationCode::REGISTER_COORDINATOR_SERVER:
return "RegisterCoordinatorServer"sv;
case NotificationCode::DO_FAILOVER:
return "DoFailover"sv;
#endif
case NotificationCode::REPLICA_PORT_WARNING:
return "ReplicaPortWarning"sv;

View File

@ -44,7 +44,6 @@ enum class NotificationCode : uint8_t {
REGISTER_REPLICA,
#ifdef MG_ENTERPRISE
REGISTER_COORDINATOR_SERVER,
DO_FAILOVER,
#endif
SET_REPLICA,
START_STREAM,

View File

@ -2685,22 +2685,6 @@ TEST_P(CypherMainVisitorTest, TestRegisterCoordinatorServer) {
EXPECT_EQ(full_query_parsed->sync_mode_, CoordinatorQuery::SyncMode::ASYNC);
}
}
TEST_P(CypherMainVisitorTest, TestDoFailover) {
auto &ast_generator = *GetParam();
{
std::string invalid_query = "DO FAILO";
ASSERT_THROW(ast_generator.ParseQuery(invalid_query), SyntaxException);
}
{
std::string correct_query = "DO FAILOVER";
auto *correct_query_parsed = dynamic_cast<CoordinatorQuery *>(ast_generator.ParseQuery(correct_query));
ASSERT_TRUE(correct_query_parsed);
EXPECT_EQ(correct_query_parsed->action_, CoordinatorQuery::Action::DO_FAILOVER);
}
}
#endif
TEST_P(CypherMainVisitorTest, TestDeleteReplica) {