Remove CoordinatorData

This commit is contained in:
Andi Skrgat 2024-02-06 10:55:48 +01:00
parent 6e758d3b5a
commit 7386b786a9
17 changed files with 453 additions and 497 deletions

View File

@ -9,13 +9,13 @@ target_sources(mg-coordination
include/coordination/coordinator_config.hpp
include/coordination/coordinator_exceptions.hpp
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_data.hpp
include/coordination/constants.hpp
include/coordination/coordinator_instance.hpp
include/coordination/coordinator_cluster_config.hpp
include/coordination/coordinator_handlers.hpp
include/coordination/coordinator_instance.hpp
include/coordination/constants.hpp
include/coordination/instance_status.hpp
include/coordination/replication_instance.hpp
include/coordination/raft_instance.hpp
include/nuraft/coordinator_log_store.hpp
include/nuraft/coordinator_state_machine.hpp
@ -26,10 +26,10 @@ target_sources(mg-coordination
coordinator_state.cpp
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_data.cpp
coordinator_instance.cpp
coordinator_handlers.cpp
coordinator_instance.cpp
replication_instance.cpp
raft_instance.cpp
coordinator_log_store.cpp
coordinator_state_machine.cpp

View File

@ -28,13 +28,13 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
}
} // namespace
CoordinatorClient::CoordinatorClient(CoordinatorData *coord_data, CoordinatorClientConfig config,
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: rpc_context_{CreateClientContext(config)},
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
&rpc_context_},
config_{std::move(config)},
coord_data_{coord_data},
coord_instance_{coord_instance},
succ_cb_{std::move(succ_cb)},
fail_cb_{std::move(fail_cb)} {}
@ -54,9 +54,9 @@ void CoordinatorClient::StartFrequentCheck() {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
succ_cb_(coord_data_, instance_name);
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_data_, instance_name);
fail_cb_(coord_instance_, instance_name);
}
});
}

View File

@ -1,290 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/uuid.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination {
using nuraft::ptr;
using nuraft::srv_config;
CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
coord_data->repl_instances_,
[instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; });
MG_ASSERT(instance != coord_data->repl_instances_.end(), "Instance {} not found during callback!", instance_name);
return *instance;
};
replica_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
if (!instance.GetMainUUID().has_value() || main_uuid_ != instance.GetMainUUID().value()) {
if (!instance.SendSwapAndUpdateUUID(main_uuid_)) {
spdlog::error(
fmt::format("Failed to swap uuid for replica instance {} which is alive", instance.InstanceName()));
return;
}
}
instance.OnSuccessPing();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.OnFailPing();
// We need to restart main uuid from instance since it was "down" at least a second
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
// our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any
// incoming RPCs from valid main
// TODO(antoniofilipovic) this needs here more complex logic
// We need to get id of main replica is listening to on successful ping
// and swap it to correct uuid if it failed
instance.SetNewMainUUID();
};
main_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
if (instance.IsAlive()) {
instance.OnSuccessPing();
return;
}
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (main_uuid_ == instance_uuid.value()) {
instance.OnSuccessPing();
return;
}
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
return;
}
if (!instance.SendSwapAndUpdateUUID(main_uuid_)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", instance.InstanceName()));
return;
}
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.OnFailPing();
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (!instance.IsAlive() && main_uuid_ == instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_data->TryFailover();
}
};
}
auto CoordinatorData::TryFailover() -> void {
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// TODO: Smarter choice
auto chosen_replica_instance = ranges::begin(alive_replicas);
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
auto const potential_new_main_uuid = utils::UUID{};
auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) {
return instance != *chosen_replica_instance;
};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
}
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_,
main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
chosen_replica_instance->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorData::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = self_.GetAllCoordinators();
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_data_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto potential_new_main_uuid = utils::UUID{};
spdlog::trace("Generated potential new main uuid");
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
}
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
new_main->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
auto const name_matches = [&config](ReplicationInstance const &instance) {
return instance.InstanceName() == config.instance_name;
};
if (std::ranges::any_of(repl_instances_, name_matches)) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
auto const socket_address_matches = [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
};
if (std::ranges::any_of(repl_instances_, socket_address_matches)) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
try {
auto *repl_instance = &repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
if (self_.IsLeader()) {
repl_instance->StartFrequentCheck();
}
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
}
auto CoordinatorData::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
} // namespace memgraph::coordination
#endif

View File

@ -18,85 +18,222 @@
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination {
using nuraft::asio_service;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
CoordinatorInstance::CoordinatorInstance()
: raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
logger_ = nullptr;
CoordinatorInstance::CoordinatorInstance() {
auto find_instance = [](CoordinatorInstance *coord_instance,
std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
coord_instance->repl_instances_,
[instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; });
// TODO: (andi) Maybe params file
MG_ASSERT(instance != coord_instance->repl_instances_.end(), "Instance {} not found during callback!",
instance_name);
return *instance;
};
// ASIO options
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
replica_succ_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
find_instance(coord_instance, instance_name).OnSuccessPing();
};
// RAFT parameters. Heartbeat every 100ms, election timeout between 200ms and 400ms.
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
replica_fail_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
find_instance(coord_instance, instance_name).OnFailPing();
};
raft_server_ =
launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params);
main_succ_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
}
auto &instance = find_instance(coord_instance, instance_name);
auto maybe_stop = utils::ResettableCounter<20>();
while (!raft_server_->is_initialized() && !maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
if (instance.IsAlive()) {
instance.OnSuccessPing();
return;
}
if (!raft_server_->is_initialized()) {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint);
}
bool const is_latest_main = !coord_instance->ClusterHasAliveMain_();
if (is_latest_main) {
spdlog::info("Instance {} is the latest main", instance_name);
instance.OnSuccessPing();
return;
}
spdlog::info("Raft server started on {}", raft_endpoint);
bool const demoted = instance.DemoteToReplica(coord_instance->replica_succ_cb_, coord_instance->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
}
};
main_fail_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
find_instance(coord_instance, instance_name).OnFailPing();
if (!coord_instance->ClusterHasAliveMain_()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_instance->TryFailover();
}
};
}
auto CoordinatorInstance::InstanceName() const -> std::string {
return "coordinator_" + std::to_string(raft_server_id_);
auto CoordinatorInstance::ClusterHasAliveMain_() const -> bool {
auto const alive_main = [](ReplicationInstance const &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(repl_instances_, alive_main);
}
auto CoordinatorInstance::RaftSocketAddress() const -> std::string {
return raft_address_ + ":" + std::to_string(raft_port_);
auto CoordinatorInstance::TryFailover() -> void {
auto replica_instances = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica);
auto chosen_replica_instance = std::ranges::find_if(replica_instances, &ReplicationInstance::IsAlive);
if (chosen_replica_instance == replica_instances.end()) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(std::ranges::distance(replica_instances));
auto const not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance const &instance) {
return instance != *chosen_replica_instance;
};
std::ranges::transform(repl_instances_ | ranges::views::filter(not_chosen_replica_instance),
std::back_inserter(repl_clients_info),
[](ReplicationInstance const &instance) { return instance.ReplicationClientInfo(); });
if (!chosen_replica_instance->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = self_.GetAllCoordinators();
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_instance_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name)
-> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const name_matches = [&config](ReplicationInstance const &instance) {
return instance.InstanceName() == config.instance_name;
};
if (std::ranges::any_of(repl_instances_, name_matches)) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
auto const socket_address_matches = [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
};
if (std::ranges::any_of(repl_instances_, socket_address_matches)) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
try {
auto *repl_instance = &repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
if (self_.IsLeader()) {
repl_instance->StartFrequentCheck();
}
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
auto CoordinatorInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
auto CoordinatorInstance::IsLeader() const -> bool { return raft_server_->is_leader(); }
} // namespace memgraph::coordination
#endif

View File

@ -41,37 +41,39 @@ CoordinatorState::CoordinatorState() {
}
}
auto CoordinatorState::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterInstance(config); }},
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[config](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.RegisterReplicationInstance(config);
}},
data_);
}
auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return SetInstanceToMainCoordinatorStatus::NOT_COORDINATOR;
},
[&instance_name](CoordinatorData &coordinator_data) {
return coordinator_data.SetInstanceToMain(instance_name);
[&instance_name](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.SetReplicationInstanceToMain(instance_name);
}},
data_);
}
auto CoordinatorState::ShowInstances() const -> std::vector<InstanceStatus> {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Can't call show instances on data_, as variant holds wrong alternative");
return std::get<CoordinatorData>(data_).ShowInstances();
return std::get<CoordinatorInstance>(data_).ShowInstances();
}
auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
@ -82,9 +84,9 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::get<CoordinatorData>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
}
} // namespace memgraph::coordination

View File

@ -20,14 +20,14 @@
namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
class CoordinatorInstance;
using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
~CoordinatorClient() = default;
@ -69,7 +69,7 @@ class CoordinatorClient {
mutable rpc::Client rpc_client_;
CoordinatorClientConfig config_;
CoordinatorData *coord_data_;
CoordinatorInstance *coord_instance_;
HealthCheckCallback succ_cb_;
HealthCheckCallback fail_cb_;
};

View File

@ -1,61 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "replication_coordination_glue/handler.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include "utils/uuid.hpp"
#include <list>
namespace memgraph::coordination {
class CoordinatorData {
public:
CoordinatorData();
// TODO: (andi) Probably rename to RegisterReplicationInstance
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
private:
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
CoordinatorInstance self_;
utils::UUID main_uuid_;
};
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
} // namespace memgraph::coordination
#endif

View File

@ -13,48 +13,41 @@
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/raft_instance.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include <libnuraft/nuraft.hxx>
#include <list>
namespace memgraph::coordination {
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
class CoordinatorInstance {
public:
CoordinatorInstance();
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
CoordinatorInstance(CoordinatorInstance &&other) noexcept = delete;
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto IsLeader() const -> bool;
private:
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
auto ClusterHasAliveMain_() const -> bool;
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
RaftInstance self_;
};
} // namespace memgraph::coordination

View File

@ -13,7 +13,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
@ -33,19 +33,23 @@ class CoordinatorState {
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
// The client code must check that the server exists before calling this method.
// NOTE: The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;
private:
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_;
};
} // namespace memgraph::coordination

View File

@ -0,0 +1,61 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
class RaftInstance {
public:
RaftInstance();
RaftInstance(RaftInstance const &other) = delete;
RaftInstance &operator=(RaftInstance const &other) = delete;
RaftInstance(RaftInstance &&other) noexcept = delete;
RaftInstance &operator=(RaftInstance &&other) noexcept = delete;
~RaftInstance() = default;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto IsLeader() const -> bool;
private:
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
};
} // namespace memgraph::coordination
#endif

View File

@ -23,11 +23,11 @@
namespace memgraph::coordination {
class CoordinatorData;
class CoordinatorInstance;
class ReplicationInstance {
public:
ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
ReplicationInstance(ReplicationInstance const &other) = delete;

View File

@ -0,0 +1,109 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/raft_instance.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
namespace memgraph::coordination {
using nuraft::asio_service;
using nuraft::cb_func;
using nuraft::CbReturnCode;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::raft_server;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
RaftInstance::RaftInstance()
: raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
logger_ = nullptr;
// TODO: (andi) Maybe params file
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
raft_server::init_options init_opts;
init_opts.raft_callback_ = [](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId);
} else if (event_type == cb_func::BecomeFollower) {
spdlog::info("Node {} became follower", param->myId);
}
return CbReturnCode::Ok;
};
raft_server_ = launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts,
params, init_opts);
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
}
auto maybe_stop = utils::ResettableCounter<20>();
while (!raft_server_->is_initialized() && !maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
if (!raft_server_->is_initialized()) {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint);
}
spdlog::info("Raft server started on {}", raft_endpoint);
}
auto RaftInstance::InstanceName() const -> std::string { return "coordinator_" + std::to_string(raft_server_id_); }
auto RaftInstance::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); }
auto RaftInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
}
auto RaftInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
auto RaftInstance::IsLeader() const -> bool { return raft_server_->is_leader(); }
} // namespace memgraph::coordination
#endif

View File

@ -17,9 +17,9 @@
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config,
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
if (!client_.DemoteToReplica()) {

View File

@ -20,14 +20,14 @@ namespace memgraph::dbms {
CoordinatorHandler::CoordinatorHandler(coordination::CoordinatorState &coordinator_state)
: coordinator_state_(coordinator_state) {}
auto CoordinatorHandler::RegisterInstance(memgraph::coordination::CoordinatorClientConfig config)
auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus {
return coordinator_state_.RegisterInstance(config);
return coordinator_state_.RegisterReplicationInstance(config);
}
auto CoordinatorHandler::SetInstanceToMain(std::string instance_name)
auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name)
-> coordination::SetInstanceToMainCoordinatorStatus {
return coordinator_state_.SetInstanceToMain(std::move(instance_name));
return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name));
}
auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> {

View File

@ -28,10 +28,10 @@ class CoordinatorHandler {
public:
explicit CoordinatorHandler(coordination::CoordinatorState &coordinator_state);
auto RegisterInstance(coordination::CoordinatorClientConfig config)
auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<coordination::InstanceStatus>;

View File

@ -458,9 +458,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
: coordinator_handler_(coordinator_state) {}
/// @throw QueryRuntimeException if an error ocurred.
void RegisterInstance(const std::string &coordinator_socket_address, const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) override {
void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
@ -489,7 +490,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
.replication_client_info = repl_config,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterInstance(coordinator_client_config);
auto status = coordinator_handler_.RegisterReplicationInstance(coordinator_client_config);
switch (status) {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
@ -519,8 +520,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
void SetInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetInstanceToMain(instance_name);
void SetReplicationInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetReplicationInstanceToMain(instance_name);
switch (status) {
using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus;
case NO_INSTANCE_WITH_NAME:
@ -1145,9 +1146,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()), main_check_frequency,
instance_name, sync_mode);
handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
@ -1176,7 +1177,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
instance_name = coordinator_query->instance_name_]() mutable {
handler.SetInstanceToMain(instance_name);
handler.SetReplicationInstanceToMain(instance_name);
return std::vector<std::vector<TypedValue>>();
};

View File

@ -105,13 +105,13 @@ class CoordinatorQueryHandler {
};
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void SetInstanceToMain(const std::string &instance_name) = 0;
virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0;