Distributed coordinators (#1693)

This commit is contained in:
Andi 2024-02-13 09:49:28 +01:00 committed by GitHub
parent 7688a1b068
commit 4a7c7f0898
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1231 additions and 920 deletions

View File

@ -9,13 +9,13 @@ target_sources(mg-coordination
include/coordination/coordinator_config.hpp
include/coordination/coordinator_exceptions.hpp
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_data.hpp
include/coordination/constants.hpp
include/coordination/coordinator_instance.hpp
include/coordination/coordinator_cluster_config.hpp
include/coordination/coordinator_handlers.hpp
include/coordination/coordinator_instance.hpp
include/coordination/constants.hpp
include/coordination/instance_status.hpp
include/coordination/replication_instance.hpp
include/coordination/raft_state.hpp
include/nuraft/coordinator_log_store.hpp
include/nuraft/coordinator_state_machine.hpp
@ -26,10 +26,10 @@ target_sources(mg-coordination
coordinator_state.cpp
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_data.cpp
coordinator_instance.cpp
coordinator_handlers.cpp
coordinator_instance.cpp
replication_instance.cpp
raft_state.cpp
coordinator_log_store.cpp
coordinator_state_machine.cpp

View File

@ -28,13 +28,13 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
}
} // namespace
CoordinatorClient::CoordinatorClient(CoordinatorData *coord_data, CoordinatorClientConfig config,
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: rpc_context_{CreateClientContext(config)},
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
&rpc_context_},
config_{std::move(config)},
coord_data_{coord_data},
coord_instance_{coord_instance},
succ_cb_{std::move(succ_cb)},
fail_cb_{std::move(fail_cb)} {}
@ -42,6 +42,10 @@ auto CoordinatorClient::InstanceName() const -> std::string { return config_.ins
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
void CoordinatorClient::StartFrequentCheck() {
if (instance_checker_.IsRunning()) {
return;
}
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0");
@ -54,9 +58,9 @@ void CoordinatorClient::StartFrequentCheck() {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
succ_cb_(coord_data_, instance_name);
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_data_, instance_name);
fail_cb_(coord_instance_, instance_name);
}
});
}

View File

@ -1,282 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/uuid.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination {
using nuraft::ptr;
using nuraft::srv_config;
CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
coord_data->repl_instances_,
[instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; });
MG_ASSERT(instance != coord_data->repl_instances_.end(), "Instance {} not found during callback!", instance_name);
return *instance;
};
replica_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
if (!instance.GetMainUUID().has_value() || main_uuid_ != instance.GetMainUUID().value()) {
if (!instance.SendSwapAndUpdateUUID(main_uuid_)) {
spdlog::error(
fmt::format("Failed to swap uuid for replica instance {} which is alive", instance.InstanceName()));
return;
}
}
instance.OnSuccessPing();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.OnFailPing();
// We need to restart main uuid from instance since it was "down" at least a second
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
// our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any
// incoming RPCs from valid main
// TODO(antoniofilipovic) this needs here more complex logic
// We need to get id of main replica is listening to on successful ping
// and swap it to correct uuid if it failed
instance.SetNewMainUUID();
};
main_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
if (instance.IsAlive()) {
instance.OnSuccessPing();
return;
}
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (main_uuid_ == instance_uuid.value()) {
instance.OnSuccessPing();
return;
}
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
return;
}
if (!instance.SendSwapAndUpdateUUID(main_uuid_)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", instance.InstanceName()));
return;
}
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.OnFailPing();
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (!instance.IsAlive() && main_uuid_ == instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_data->TryFailover();
}
};
}
auto CoordinatorData::TryFailover() -> void {
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// TODO: Smarter choice
auto chosen_replica_instance = ranges::begin(alive_replicas);
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
auto const potential_new_main_uuid = utils::UUID{};
auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) {
return instance != *chosen_replica_instance;
};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
}
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_,
main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
chosen_replica_instance->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorData::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = self_.GetAllCoordinators();
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_data_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto potential_new_main_uuid = utils::UUID{};
spdlog::trace("Generated potential new main uuid");
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
}
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
new_main->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
}
auto CoordinatorData::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
} // namespace memgraph::coordination
#endif

View File

@ -18,81 +18,305 @@
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination {
using nuraft::asio_service;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
CoordinatorInstance::CoordinatorInstance()
: raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
logger_ = nullptr;
: raft_state_(RaftState::MakeRaftState(
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
auto find_repl_instance = [](CoordinatorInstance *self,
std::string_view repl_instance_name) -> ReplicationInstance & {
auto repl_instance =
std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == repl_instance_name;
});
// ASIO options
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!",
repl_instance_name);
return *repl_instance;
};
// RAFT parameters. Heartbeat every 100ms, election timeout between 200ms and 400ms.
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
raft_server_ =
launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params);
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
spdlog::error(
fmt::format("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()));
return;
}
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
}
repl_instance.OnSuccessPing();
};
auto maybe_stop = utils::ResettableCounter<20>();
while (!raft_server_->is_initialized() && !maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
// We need to restart main uuid from instance since it was "down" at least a second
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
// our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any
// incoming RPCs from valid main
// TODO(antoniofilipovic) this needs here more complex logic
// We need to get id of main replica is listening to on successful ping
// and swap it to correct uuid if it failed
repl_instance.ResetMainUUID();
};
if (!raft_server_->is_initialized()) {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint);
}
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
spdlog::info("Raft server started on {}", raft_endpoint);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
return;
}
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
auto const curr_main_uuid = self->GetMainUUID();
if (curr_main_uuid == repl_instance_uuid.value()) {
repl_instance.OnSuccessPing();
return;
}
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) {
repl_instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", repl_instance_name);
} else {
spdlog::error("Instance {} failed to become replica", repl_instance_name);
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
return;
}
};
main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
self->TryFailover(); // TODO: (andi) Initiate failover
}
};
}
auto CoordinatorInstance::InstanceName() const -> std::string {
return "coordinator_" + std::to_string(raft_server_id_);
auto CoordinatorInstance::ClusterHasAliveMain_() const -> bool {
auto const alive_main = [](ReplicationInstance const &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(repl_instances_, alive_main);
}
auto CoordinatorInstance::RaftSocketAddress() const -> std::string {
return raft_address_ + ":" + std::to_string(raft_port_);
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = raft_state_.GetAllCoordinators();
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_instance_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
auto CoordinatorInstance::TryFailover() -> void {
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// TODO: Smarter choice
auto new_main = ranges::begin(alive_replicas);
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
auto const is_not_new_main = [&new_main](ReplicationInstance &instance) {
return instance.InstanceName() != new_main->InstanceName();
};
auto const new_main_uuid = utils::UUID{};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_new_main)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
}
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
SetMainUUID(new_main_uuid);
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name)
-> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto const new_main_uuid = utils::UUID{};
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
}
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
SetMainUUID(new_main_uuid);
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto instance_name = config.instance_name;
auto const name_matches = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
if (std::ranges::any_of(repl_instances_, name_matches)) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
auto const socket_address_matches = [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
};
if (std::ranges::any_of(repl_instances_, socket_address_matches)) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
if (!raft_state_.RequestLeadership()) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
}
auto const res = raft_state_.AppendRegisterReplicationInstance(instance_name);
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for registering instance {}. Most likely the reason is that the instance is not "
"the "
"leader.",
config.instance_name);
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
spdlog::info("Request for registering instance {} accepted", instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to register instance {} with error code {}", instance_name, res->get_result_code());
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
}
spdlog::info("Instance {} registered", instance_name);
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
auto CoordinatorInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; }
// TODO: (andi) Add to the RAFT log.
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
} // namespace memgraph::coordination
#endif

View File

@ -13,214 +13,149 @@
#include "nuraft/coordinator_log_store.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "utils/logging.hpp"
namespace memgraph::coordination {
using nuraft::cs_new;
using nuraft::timer_helper;
CoordinatorLogStore::CoordinatorLogStore()
: start_idx_(1),
raft_server_bwd_pointer_(nullptr),
disk_emul_delay(0),
disk_emul_thread_(nullptr),
disk_emul_thread_stop_signal_(false),
disk_emul_last_durable_index_(0) {
// Dummy entry for index 0.
ptr<buffer> buf = buffer::alloc(sz_ulong);
namespace {
ptr<log_entry> MakeClone(const ptr<log_entry> &entry) {
return cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
}
} // namespace
CoordinatorLogStore::CoordinatorLogStore() : start_idx_(1) {
ptr<buffer> buf = buffer::alloc(sizeof(uint64_t));
logs_[0] = cs_new<log_entry>(0, buf);
}
CoordinatorLogStore::~CoordinatorLogStore() {
if (disk_emul_thread_) {
disk_emul_thread_stop_signal_ = true;
// disk_emul_ea_.invoke();
if (disk_emul_thread_->joinable()) {
disk_emul_thread_->join();
}
}
}
CoordinatorLogStore::~CoordinatorLogStore() {}
ptr<log_entry> CoordinatorLogStore::MakeClone(const ptr<log_entry> &entry) {
// NOTE:
// Timestamp is used only when `replicate_log_timestamp_` option is on.
// Otherwise, log store does not need to store or load it.
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
return clone;
}
ulong CoordinatorLogStore::next_slot() const {
std::lock_guard<std::mutex> l(logs_lock_);
// Exclude the dummy entry.
return start_idx_ + logs_.size() - 1;
}
ulong CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
ulong next_idx = next_slot();
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(next_idx - 1);
auto CoordinatorLogStore::FindOrDefault_(uint64_t index) const -> ptr<log_entry> {
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
return MakeClone(entry->second);
return entry->second;
}
ulong CoordinatorLogStore::append(ptr<log_entry> &entry) {
uint64_t CoordinatorLogStore::next_slot() const {
auto lock = std::lock_guard{logs_lock_};
return start_idx_ + logs_.size() - 1;
}
uint64_t CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
auto lock = std::lock_guard{logs_lock_};
uint64_t const last_idx = start_idx_ + logs_.size() - 1;
auto const last_src = FindOrDefault_(last_idx - 1);
return MakeClone(last_src);
}
uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
std::lock_guard<std::mutex> l(logs_lock_);
size_t idx = start_idx_ + logs_.size() - 1;
logs_[idx] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx;
// disk_emul_ea_.invoke();
uint64_t next_slot{0};
{
auto lock = std::lock_guard{logs_lock_};
next_slot = start_idx_ + logs_.size() - 1;
logs_[next_slot] = clone;
}
return idx;
return next_slot;
}
void CoordinatorLogStore::write_at(ulong index, ptr<log_entry> &entry) {
void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
// Discard all logs equal to or greater than `index.
std::lock_guard<std::mutex> l(logs_lock_);
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
logs_[index] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index;
// Remove entries greater than `index`.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->second > index) {
entry = disk_emul_logs_being_written_.erase(entry);
} else {
entry++;
}
{
auto lock = std::lock_guard{logs_lock_};
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
// disk_emul_ea_.invoke();
logs_[index] = clone;
}
}
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(ulong start, ulong end) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start, uint64_t end) {
auto ret = cs_new<std::vector<ptr<log_entry>>>();
ret->resize(end - start);
ulong cc = 0;
for (ulong ii = start; ii < end; ++ii) {
for (uint64_t i = start, curr_index = 0; i < end; ++i, ++curr_index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
auto lock = std::lock_guard{logs_lock_};
if (auto const entry = logs_.find(i); entry != logs_.end()) {
src = entry->second;
} else {
throw RaftCouldNotFindEntryException("Could not find entry at index {}", i);
}
src = entry->second;
}
(*ret)[cc++] = MakeClone(src);
(*ret)[curr_index] = MakeClone(src);
}
return ret;
}
// NOLINTNEXTLINE(google-default-arguments)
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries_ext(ulong start, ulong end,
int64 batch_size_hint_in_bytes) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
if (batch_size_hint_in_bytes < 0) {
return ret;
}
size_t accum_size = 0;
for (ulong ii = start; ii < end; ++ii) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
ret->push_back(MakeClone(src));
accum_size += src->get_buf().size();
if (batch_size_hint_in_bytes && accum_size >= (ulong)batch_size_hint_in_bytes) break;
}
return ret;
}
ptr<log_entry> CoordinatorLogStore::entry_at(ulong index) {
ptr<log_entry> CoordinatorLogStore::entry_at(uint64_t index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
src = entry->second;
auto lock = std::lock_guard{logs_lock_};
src = FindOrDefault_(index);
}
return MakeClone(src);
}
ulong CoordinatorLogStore::term_at(ulong index) {
ulong term = 0;
uint64_t CoordinatorLogStore::term_at(uint64_t index) {
uint64_t term = 0;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
term = entry->second->get_term();
auto lock = std::lock_guard{logs_lock_};
term = FindOrDefault_(index)->get_term();
}
return term;
}
ptr<buffer> CoordinatorLogStore::pack(ulong index, int32 cnt) {
ptr<buffer> CoordinatorLogStore::pack(uint64_t index, int32 cnt) {
std::vector<ptr<buffer>> logs;
size_t size_total = 0;
for (ulong ii = index; ii < index + cnt; ++ii) {
uint64_t const end_index = index + cnt;
for (uint64_t i = index; i < end_index; ++i) {
ptr<log_entry> le = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
le = logs_[ii];
auto lock = std::lock_guard{logs_lock_};
le = logs_[i];
}
assert(le.get());
ptr<buffer> buf = le->serialize();
MG_ASSERT(le.get(), "Could not find log entry at index {}", i);
auto buf = le->serialize();
size_total += buf->size();
logs.push_back(buf);
}
ptr<buffer> buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
auto buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
buf_out->pos(0);
buf_out->put((int32)cnt);
for (auto &entry : logs) {
ptr<buffer> &bb = entry;
buf_out->put((int32)bb->size());
buf_out->put(*bb);
buf_out->put(static_cast<int32>(entry->size()));
buf_out->put(*entry);
}
return buf_out;
}
void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
void CoordinatorLogStore::apply_pack(uint64_t index, buffer &pack) {
pack.pos(0);
int32 num_logs = pack.get_int();
int32 const num_logs = pack.get_int();
for (int32 ii = 0; ii < num_logs; ++ii) {
ulong cur_idx = index + ii;
for (int32 i = 0; i < num_logs; ++i) {
uint64_t cur_idx = index + i;
int32 buf_size = pack.get_int();
ptr<buffer> buf_local = buffer::alloc(buf_size);
@ -228,14 +163,14 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
ptr<log_entry> le = log_entry::deserialize(*buf_local);
{
std::lock_guard<std::mutex> l(logs_lock_);
auto lock = std::lock_guard{logs_lock_};
logs_[cur_idx] = le;
}
}
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.upper_bound(0);
auto lock = std::lock_guard{logs_lock_};
auto const entry = logs_.upper_bound(0);
if (entry != logs_.end()) {
start_idx_ = entry->first;
} else {
@ -244,88 +179,23 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
}
}
bool CoordinatorLogStore::compact(ulong last_log_index) {
std::lock_guard<std::mutex> l(logs_lock_);
for (ulong ii = start_idx_; ii <= last_log_index; ++ii) {
auto entry = logs_.find(ii);
// NOTE: Remove all logs up to given 'last_log_index' (inclusive).
bool CoordinatorLogStore::compact(uint64_t last_log_index) {
auto lock = std::lock_guard{logs_lock_};
for (uint64_t ii = start_idx_; ii <= last_log_index; ++ii) {
auto const entry = logs_.find(ii);
if (entry != logs_.end()) {
logs_.erase(entry);
}
}
// WARNING:
// Even though nothing has been erased,
// we should set `start_idx_` to new index.
if (start_idx_ <= last_log_index) {
start_idx_ = last_log_index + 1;
}
return true;
}
bool CoordinatorLogStore::flush() {
disk_emul_last_durable_index_ = next_slot() - 1;
return true;
}
ulong CoordinatorLogStore::last_durable_index() {
uint64_t last_log = next_slot() - 1;
if (!disk_emul_delay) {
return last_log;
}
return disk_emul_last_durable_index_;
}
void CoordinatorLogStore::DiskEmulLoop() {
// This thread mimics async disk writes.
// uint32_t next_sleep_us = 100 * 1000;
while (!disk_emul_thread_stop_signal_) {
// disk_emul_ea_.wait_us(next_sleep_us);
// disk_emul_ea_.reset();
if (disk_emul_thread_stop_signal_) break;
uint64_t cur_time = timer_helper::get_timeofday_us();
// next_sleep_us = 100 * 1000;
bool call_notification = false;
{
std::lock_guard<std::mutex> l(logs_lock_);
// Remove all timestamps equal to or smaller than `cur_time`,
// and pick the greatest one among them.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->first <= cur_time) {
disk_emul_last_durable_index_ = entry->second;
entry = disk_emul_logs_being_written_.erase(entry);
call_notification = true;
} else {
break;
}
}
entry = disk_emul_logs_being_written_.begin();
if (entry != disk_emul_logs_being_written_.end()) {
// next_sleep_us = entry->first - cur_time;
}
}
if (call_notification) {
raft_server_bwd_pointer_->notify_log_append_completion(true);
}
}
}
void CoordinatorLogStore::Close() {}
void CoordinatorLogStore::SetDiskDelay(raft_server *raft, size_t delay_ms) {
disk_emul_delay = delay_ms;
raft_server_bwd_pointer_ = raft;
if (!disk_emul_thread_) {
disk_emul_thread_ = std::make_unique<std::thread>(&CoordinatorLogStore::DiskEmulLoop, this);
}
}
bool CoordinatorLogStore::flush() { return true; }
} // namespace memgraph::coordination
#endif

View File

@ -41,37 +41,39 @@ CoordinatorState::CoordinatorState() {
}
}
auto CoordinatorState::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterInstance(config); }},
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[config](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.RegisterReplicationInstance(config);
}},
data_);
}
auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return SetInstanceToMainCoordinatorStatus::NOT_COORDINATOR;
},
[&instance_name](CoordinatorData &coordinator_data) {
return coordinator_data.SetInstanceToMain(instance_name);
[&instance_name](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.SetReplicationInstanceToMain(instance_name);
}},
data_);
}
auto CoordinatorState::ShowInstances() const -> std::vector<InstanceStatus> {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Can't call show instances on data_, as variant holds wrong alternative");
return std::get<CoordinatorData>(data_).ShowInstances();
return std::get<CoordinatorInstance>(data_).ShowInstances();
}
auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
@ -82,9 +84,9 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::get<CoordinatorData>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
}
} // namespace memgraph::coordination

View File

@ -15,6 +15,19 @@
namespace memgraph::coordination {
auto CoordinatorStateMachine::EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> {
std::string str_log = name + "_replica";
ptr<buffer> log = buffer::alloc(sizeof(uint32_t) + str_log.size());
buffer_serializer bs(log);
bs.put_str(str_log);
return log;
}
auto CoordinatorStateMachine::DecodeRegisterReplicationInstance(buffer &data) -> std::string {
buffer_serializer bs(data);
return bs.get_str();
}
auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();

View File

@ -20,14 +20,14 @@
namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
class CoordinatorInstance;
using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
~CoordinatorClient() = default;
@ -69,7 +69,7 @@ class CoordinatorClient {
mutable rpc::Client rpc_client_;
CoordinatorClientConfig config_;
CoordinatorData *coord_data_;
CoordinatorInstance *coord_instance_;
HealthCheckCallback succ_cb_;
HealthCheckCallback fail_cb_;
};

View File

@ -1,61 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "replication_coordination_glue/handler.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include "utils/uuid.hpp"
#include <list>
namespace memgraph::coordination {
class CoordinatorData {
public:
CoordinatorData();
// TODO: (andi) Probably rename to RegisterReplicationInstance
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
private:
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
CoordinatorInstance self_;
utils::UUID main_uuid_;
};
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
} // namespace memgraph::coordination
#endif

View File

@ -50,5 +50,38 @@ class RaftAddServerException final : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(RaftAddServerException)
};
class RaftBecomeLeaderException final : public utils::BasicException {
public:
explicit RaftBecomeLeaderException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftBecomeLeaderException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftBecomeLeaderException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftBecomeLeaderException)
};
class RaftCouldNotFindEntryException final : public utils::BasicException {
public:
explicit RaftCouldNotFindEntryException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftCouldNotFindEntryException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftCouldNotFindEntryException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotFindEntryException)
};
class RaftCouldNotParseFlagsException final : public utils::BasicException {
public:
explicit RaftCouldNotParseFlagsException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftCouldNotParseFlagsException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftCouldNotParseFlagsException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotParseFlagsException)
};
} // namespace memgraph::coordination
#endif

View File

@ -13,45 +13,48 @@
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/raft_state.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include <libnuraft/nuraft.hxx>
#include <list>
namespace memgraph::coordination {
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
class CoordinatorInstance {
public:
CoordinatorInstance();
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
CoordinatorInstance(CoordinatorInstance &&other) noexcept = delete;
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto GetMainUUID() const -> utils::UUID;
auto SetMainUUID(utils::UUID new_uuid) -> void;
private:
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
auto ClusterHasAliveMain_() const -> bool;
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
utils::UUID main_uuid_;
RaftState raft_state_;
};
} // namespace memgraph::coordination

View File

@ -13,7 +13,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
@ -33,19 +33,23 @@ class CoordinatorState {
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
// The client code must check that the server exists before calling this method.
// NOTE: The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;
private:
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_;
};
} // namespace memgraph::coordination

View File

@ -0,0 +1,81 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include <optional>
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
using BecomeLeaderCb = std::function<void()>;
using BecomeFollowerCb = std::function<void()>;
using nuraft::buffer;
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
using raft_result = nuraft::cmd_result<ptr<buffer>>;
class RaftState {
private:
explicit RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t raft_server_id,
uint32_t raft_port, std::string raft_address);
auto InitRaftServer() -> void;
public:
RaftState() = delete;
RaftState(RaftState const &other) = default;
RaftState &operator=(RaftState const &other) = default;
RaftState(RaftState &&other) noexcept = default;
RaftState &operator=(RaftState &&other) noexcept = default;
~RaftState();
static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto RequestLeadership() -> bool;
auto IsLeader() const -> bool;
auto AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result>;
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
BecomeLeaderCb become_leader_cb_;
BecomeFollowerCb become_follower_cb_;
};
} // namespace memgraph::coordination
#endif

View File

@ -22,6 +22,9 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
ENDPOINT_EXISTS,
NOT_COORDINATOR,
RPC_FAILED,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
SUCCESS
};

View File

@ -23,11 +23,11 @@
namespace memgraph::coordination {
class CoordinatorData;
class CoordinatorInstance;
class ReplicationInstance {
public:
ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
ReplicationInstance(ReplicationInstance const &other) = delete;
@ -51,15 +51,19 @@ class ReplicationInstance {
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto StartFrequentCheck() -> void;
auto StopFrequentCheck() -> void;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool;
auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool;
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
auto GetClient() -> CoordinatorClient &;
void SetNewMainUUID(const std::optional<utils::UUID> &main_uuid = std::nullopt);
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
auto ResetMainUUID() -> void;
auto GetMainUUID() -> const std::optional<utils::UUID> &;
private:

View File

@ -46,9 +46,6 @@ class CoordinatorLogStore : public log_store {
ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end) override;
// NOLINTNEXTLINE
ptr<std::vector<ptr<log_entry>>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override;
ptr<log_entry> entry_at(ulong index) override;
ulong term_at(ulong index) override;
@ -61,67 +58,12 @@ class CoordinatorLogStore : public log_store {
bool flush() override;
ulong last_durable_index() override;
void Close();
void SetDiskDelay(raft_server *raft, size_t delay_ms);
private:
static ptr<log_entry> MakeClone(ptr<log_entry> const &entry);
auto FindOrDefault_(ulong index) const -> ptr<log_entry>;
void DiskEmulLoop();
/**
* Map of <log index, log data>.
*/
std::map<ulong, ptr<log_entry>> logs_;
/**
* Lock for `logs_`.
*/
mutable std::mutex logs_lock_;
/**
* The index of the first log.
*/
std::atomic<ulong> start_idx_;
/**
* Backward pointer to Raft server.
*/
raft_server *raft_server_bwd_pointer_;
// Testing purpose --------------- BEGIN
/**
* If non-zero, this log store will emulate the disk write delay.
*/
std::atomic<size_t> disk_emul_delay;
/**
* Map of <timestamp, log index>, emulating logs that is being written to disk.
* Log index will be regarded as "durable" after the corresponding timestamp.
*/
std::map<uint64_t, uint64_t> disk_emul_logs_being_written_;
/**
* Thread that will update `last_durable_index_` and call
* `notify_log_append_completion` at proper time.
*/
std::unique_ptr<std::thread> disk_emul_thread_;
/**
* Flag to terminate the thread.
*/
std::atomic<bool> disk_emul_thread_stop_signal_;
/**
* Last written log index.
*/
std::atomic<uint64_t> disk_emul_last_durable_index_;
// Testing purpose --------------- END
};
} // namespace memgraph::coordination

View File

@ -36,6 +36,10 @@ class CoordinatorStateMachine : public state_machine {
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer>;
static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string;
auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override;
auto commit(ulong log_idx, buffer &data) -> ptr<buffer> override;

View File

@ -0,0 +1,140 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/raft_state.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
namespace memgraph::coordination {
using nuraft::asio_service;
using nuraft::cb_func;
using nuraft::CbReturnCode;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::raft_server;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb, uint32_t raft_server_id,
uint32_t raft_port, std::string raft_address)
: raft_server_id_(raft_server_id),
raft_port_(raft_port),
raft_address_(std::move(raft_address)),
state_machine_(cs_new<CoordinatorStateMachine>()),
state_manager_(
cs_new<CoordinatorStateManager>(raft_server_id_, raft_address_ + ":" + std::to_string(raft_port_))),
logger_(nullptr),
become_leader_cb_(std::move(become_leader_cb)),
become_follower_cb_(std::move(become_follower_cb)) {}
auto RaftState::InitRaftServer() -> void {
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_();
} else if (event_type == cb_func::BecomeFollower) {
spdlog::info("Node {} became follower", param->myId);
become_follower_cb_();
}
return CbReturnCode::Ok;
};
raft_launcher launcher;
raft_server_ = launcher.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params,
init_opts);
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}:{}", raft_address_, raft_port_);
}
auto maybe_stop = utils::ResettableCounter<20>();
do {
if (raft_server_->is_initialized()) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(250));
} while (!maybe_stop());
throw RaftServerStartException("Failed to initialize raft server on {}:{}", raft_address_, raft_port_);
}
auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState {
uint32_t raft_server_id{0};
uint32_t raft_port{0};
try {
raft_server_id = FLAGS_raft_server_id;
raft_port = FLAGS_raft_server_port;
} catch (std::exception const &e) {
throw RaftCouldNotParseFlagsException("Failed to parse flags: {}", e.what());
}
auto raft_state =
RaftState(std::move(become_leader_cb), std::move(become_follower_cb), raft_server_id, raft_port, "127.0.0.1");
raft_state.InitRaftServer();
return raft_state;
}
RaftState::~RaftState() { launcher_.shutdown(); }
auto RaftState::InstanceName() const -> std::string { return "coordinator_" + std::to_string(raft_server_id_); }
auto RaftState::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); }
auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
}
auto RaftState::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
auto RaftState::AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result> {
auto new_log = CoordinatorStateMachine::EncodeRegisterReplicationInstance(instance);
return raft_server_->append_entries({new_log});
}
} // namespace memgraph::coordination
#endif

View File

@ -17,14 +17,14 @@
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config,
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
@ -51,13 +51,14 @@ auto ReplicationInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto ReplicationInstance::PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(uuid, std::move(repl_clients_info))) {
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
main_uuid_ = new_uuid;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
return true;
@ -75,6 +76,8 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, H
return true;
}
auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
@ -83,14 +86,22 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf
}
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
void ReplicationInstance::SetNewMainUUID(const std::optional<utils::UUID> &main_uuid) { main_uuid_ = main_uuid; }
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
auto ReplicationInstance::GetMainUUID() -> const std::optional<utils::UUID> & { return main_uuid_; }
auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &main_uuid) -> bool {
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), main_uuid)) {
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
if (!main_uuid_ || *main_uuid_ != curr_main_uuid) {
return SendSwapAndUpdateUUID(curr_main_uuid);
}
return true;
}
auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool {
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
return false;
}
SetNewMainUUID(main_uuid_);
SetNewMainUUID(new_main_uuid);
return true;
}

View File

@ -20,14 +20,14 @@ namespace memgraph::dbms {
CoordinatorHandler::CoordinatorHandler(coordination::CoordinatorState &coordinator_state)
: coordinator_state_(coordinator_state) {}
auto CoordinatorHandler::RegisterInstance(memgraph::coordination::CoordinatorClientConfig config)
auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus {
return coordinator_state_.RegisterInstance(config);
return coordinator_state_.RegisterReplicationInstance(config);
}
auto CoordinatorHandler::SetInstanceToMain(std::string instance_name)
auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name)
-> coordination::SetInstanceToMainCoordinatorStatus {
return coordinator_state_.SetInstanceToMain(std::move(instance_name));
return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name));
}
auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> {

View File

@ -28,10 +28,10 @@ class CoordinatorHandler {
public:
explicit CoordinatorHandler(coordination::CoordinatorState &coordinator_state);
auto RegisterInstance(coordination::CoordinatorClientConfig config)
auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<coordination::InstanceStatus>;

View File

@ -458,9 +458,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
: coordinator_handler_(coordinator_state) {}
/// @throw QueryRuntimeException if an error ocurred.
void RegisterInstance(const std::string &coordinator_socket_address, const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) override {
void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
@ -489,7 +490,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
.replication_client_info = repl_config,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterInstance(coordinator_client_config);
auto status = coordinator_handler_.RegisterReplicationInstance(coordinator_client_config);
switch (status) {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
@ -499,6 +500,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
"Couldn't register replica instance since instance with such endpoint already exists!");
case NOT_COORDINATOR:
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
@ -519,8 +528,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
void SetInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetInstanceToMain(instance_name);
void SetReplicationInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetReplicationInstanceToMain(instance_name);
switch (status) {
using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus;
case NO_INSTANCE_WITH_NAME:
@ -1145,9 +1154,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()), main_check_frequency,
instance_name, sync_mode);
handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
@ -1176,7 +1185,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
instance_name = coordinator_query->instance_name_]() mutable {
handler.SetInstanceToMain(instance_name);
handler.SetReplicationInstanceToMain(instance_name);
return std::vector<std::vector<TypedValue>>();
};

View File

@ -105,13 +105,13 @@ class CoordinatorQueryHandler {
};
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void SetInstanceToMain(const std::string &instance_name) = 0;
virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0;

View File

@ -1,8 +1,9 @@
find_package(gflags REQUIRED)
copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental distributed_coordinators.py)
copy_e2e_python_files(ha_experimental single_coordinator.py)
copy_e2e_python_files(ha_experimental coord_cluster_registration.py)
copy_e2e_python_files(ha_experimental distributed_coords.py)
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py)
copy_e2e_python_files(ha_experimental common.py)

View File

@ -0,0 +1,284 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [],
},
}
# NOTE: Repeated execution because it can fail if Raft server is not up
def add_coordinator(cursor, query):
for _ in range(10):
try:
execute_and_fetch_all(cursor, query)
return True
except Exception:
pass
return False
def test_register_repl_instances_then_coordinators():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
def test_register_coordinator_then_repl_instances():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
def test_coordinators_communication_with_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,145 +0,0 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"coordinator1": {
"args": [
"--bolt-port",
"7687",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator2": {
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator3": {
"args": [
"--bolt-port",
"7689",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
],
},
}
def test_coordinators_communication():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7689).cursor()
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator2)
def test_coordinators_communication_with_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
expected_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
mg_sleep_and_assert(expected_cluster, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2")
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
mg_sleep_and_assert(expected_cluster, check_coordinator1)
mg_sleep_and_assert(expected_cluster, check_coordinator2)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,164 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
def test_distributed_automatic_failover():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connect(host="localhost", port=7692).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,5 +1,4 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.

View File

@ -28,18 +28,22 @@ workloads:
args: ["high_availability_experimental/coordinator.py"]
<<: *ha_cluster
- name: "Automatic failover"
- name: "Single coordinator"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/automatic_failover.py"]
args: ["high_availability_experimental/single_coordinator.py"]
- name: "Disabled manual setting of replication cluster"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/manual_setting_replicas.py"]
- name: "Distributed coordinators"
- name: "Coordinator cluster registration"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/distributed_coordinators.py"]
args: ["high_availability_experimental/coord_cluster_registration.py"]
- name: "Not replicate from old main"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/not_replicate_from_old_main.py"]
- name: "Distributed coordinators"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/distributed_coords.py"]