Compare commits

...

10 Commits

Author SHA1 Message Date
Andi Skrgat
8f9e044fcd Add unreachable replica state 2024-02-09 07:53:21 +01:00
Andi
efd3257479
Merge branch 'master' into replication-status 2024-02-08 16:45:29 +01:00
Andi Skrgat
189895370b Adapt tests to current state 2024-02-08 11:05:40 +01:00
Andi Skrgat
4f873a6b4d Test for distributed AF 2024-02-08 10:36:31 +01:00
Andi Skrgat
ec6d35ff67 Added InMemoryLogStore 2024-02-08 10:35:55 +01:00
Andi Skrgat
e125c5cd98 Tests for creating cluster 2024-02-08 10:35:54 +01:00
Andi Skrgat
1ecf6ddab2 Request leadership on registering instance 2024-02-08 10:34:38 +01:00
Andi Skrgat
17ad671773 Callbacks for leadership change 2024-02-08 10:34:38 +01:00
Andi Skrgat
7386b786a9 Remove CoordinatorData 2024-02-08 10:34:37 +01:00
Andi Skrgat
6e758d3b5a Only leader performing callbacks 2024-02-08 10:24:21 +01:00
33 changed files with 1189 additions and 948 deletions

View File

@ -9,13 +9,13 @@ target_sources(mg-coordination
include/coordination/coordinator_config.hpp
include/coordination/coordinator_exceptions.hpp
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_data.hpp
include/coordination/constants.hpp
include/coordination/coordinator_instance.hpp
include/coordination/coordinator_cluster_config.hpp
include/coordination/coordinator_handlers.hpp
include/coordination/coordinator_instance.hpp
include/coordination/constants.hpp
include/coordination/instance_status.hpp
include/coordination/replication_instance.hpp
include/coordination/raft_instance.hpp
include/nuraft/coordinator_log_store.hpp
include/nuraft/coordinator_state_machine.hpp
@ -26,10 +26,10 @@ target_sources(mg-coordination
coordinator_state.cpp
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_data.cpp
coordinator_instance.cpp
coordinator_handlers.cpp
coordinator_instance.cpp
replication_instance.cpp
raft_instance.cpp
coordinator_log_store.cpp
coordinator_state_machine.cpp

View File

@ -28,13 +28,13 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
}
} // namespace
CoordinatorClient::CoordinatorClient(CoordinatorData *coord_data, CoordinatorClientConfig config,
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: rpc_context_{CreateClientContext(config)},
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
&rpc_context_},
config_{std::move(config)},
coord_data_{coord_data},
coord_instance_{coord_instance},
succ_cb_{std::move(succ_cb)},
fail_cb_{std::move(fail_cb)} {}
@ -42,6 +42,10 @@ auto CoordinatorClient::InstanceName() const -> std::string { return config_.ins
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
void CoordinatorClient::StartFrequentCheck() {
if (instance_checker_.IsRunning()) {
return;
}
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0");
@ -54,9 +58,9 @@ void CoordinatorClient::StartFrequentCheck() {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
succ_cb_(coord_data_, instance_name);
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_data_, instance_name);
fail_cb_(coord_instance_, instance_name);
}
});
}

View File

@ -1,282 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/uuid.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination {
using nuraft::ptr;
using nuraft::srv_config;
CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
coord_data->repl_instances_,
[instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; });
MG_ASSERT(instance != coord_data->repl_instances_.end(), "Instance {} not found during callback!", instance_name);
return *instance;
};
replica_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
if (!instance.GetMainUUID().has_value() || main_uuid_ != instance.GetMainUUID().value()) {
if (!instance.SendSwapAndUpdateUUID(main_uuid_)) {
spdlog::error(
fmt::format("Failed to swap uuid for replica instance {} which is alive", instance.InstanceName()));
return;
}
}
instance.OnSuccessPing();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.OnFailPing();
// We need to restart main uuid from instance since it was "down" at least a second
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
// our isAlive time difference, which would lead to instance setting UUID to nullopt and stopping accepting any
// incoming RPCs from valid main
// TODO(antoniofilipovic) this needs here more complex logic
// We need to get id of main replica is listening to on successful ping
// and swap it to correct uuid if it failed
instance.SetNewMainUUID();
};
main_succ_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
if (instance.IsAlive()) {
instance.OnSuccessPing();
return;
}
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (main_uuid_ == instance_uuid.value()) {
instance.OnSuccessPing();
return;
}
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
return;
}
if (!instance.SendSwapAndUpdateUUID(main_uuid_)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", instance.InstanceName()));
return;
}
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.OnFailPing();
const auto &instance_uuid = instance.GetMainUUID();
MG_ASSERT(instance_uuid.has_value(), "Instance must have uuid set");
if (!instance.IsAlive() && main_uuid_ == instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_data->TryFailover();
}
};
}
auto CoordinatorData::TryFailover() -> void {
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// TODO: Smarter choice
auto chosen_replica_instance = ranges::begin(alive_replicas);
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
auto const potential_new_main_uuid = utils::UUID{};
auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) {
return instance != *chosen_replica_instance;
};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
}
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_,
main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
chosen_replica_instance->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorData::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = self_.GetAllCoordinators();
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_data_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto potential_new_main_uuid = utils::UUID{};
spdlog::trace("Generated potential new main uuid");
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
}
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
new_main->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
}
auto CoordinatorData::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
} // namespace memgraph::coordination
#endif

View File

@ -18,80 +18,271 @@
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination {
using nuraft::asio_service;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
CoordinatorInstance::CoordinatorInstance()
: raft_server_id_(FLAGS_raft_server_id), raft_port_(FLAGS_raft_server_port), raft_address_("127.0.0.1") {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
logger_ = nullptr;
: self_([this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); }) {
auto find_instance = [](CoordinatorInstance *coord_instance,
std::string_view instance_name) -> ReplicationInstance & {
auto instance = std::ranges::find_if(
coord_instance->repl_instances_,
[instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; });
// ASIO options
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
MG_ASSERT(instance != coord_instance->repl_instances_.end(), "Instance {} not found during callback!",
instance_name);
return *instance;
};
// RAFT parameters. Heartbeat every 100ms, election timeout between 200ms and 400ms.
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
replica_succ_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
find_instance(coord_instance, instance_name).OnSuccessPing();
};
raft_server_ =
launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts, params);
replica_fail_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
find_instance(coord_instance, instance_name).OnFailPing();
};
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
}
main_succ_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto maybe_stop = utils::ResettableCounter<20>();
while (!raft_server_->is_initialized() && !maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
auto &instance = find_instance(coord_instance, instance_name);
if (!raft_server_->is_initialized()) {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint);
}
if (instance.IsAlive()) {
instance.OnSuccessPing();
return;
}
spdlog::info("Raft server started on {}", raft_endpoint);
bool const is_latest_main = !coord_instance->ClusterHasAliveMain_();
if (is_latest_main) {
spdlog::info("Instance {} is the latest main", instance_name);
instance.OnSuccessPing();
return;
}
bool const demoted = instance.DemoteToReplica(coord_instance->replica_succ_cb_, coord_instance->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
}
};
main_fail_cb_ = [find_instance](CoordinatorInstance *coord_instance, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_instance->coord_instance_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
find_instance(coord_instance, instance_name).OnFailPing();
if (!coord_instance->ClusterHasAliveMain_()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_instance->TryFailover();
}
};
}
auto CoordinatorInstance::InstanceName() const -> std::string {
return "coordinator_" + std::to_string(raft_server_id_);
auto CoordinatorInstance::ClusterHasAliveMain_() const -> bool {
auto const alive_main = [](ReplicationInstance const &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(repl_instances_, alive_main);
}
auto CoordinatorInstance::RaftSocketAddress() const -> std::string {
return raft_address_ + ":" + std::to_string(raft_port_);
auto CoordinatorInstance::TryFailover() -> void {
auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) |
ranges::views::filter(&ReplicationInstance::IsAlive);
if (ranges::empty(alive_replicas)) {
spdlog::warn("Failover failed since all replicas are down!");
return;
}
// TODO: Smarter choice
auto chosen_replica_instance = ranges::begin(alive_replicas);
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
auto const potential_new_main_uuid = utils::UUID{};
auto const is_not_chosen_replica_instance = [&chosen_replica_instance](ReplicationInstance &instance) {
return instance != *chosen_replica_instance;
};
// If for some replicas swap fails, for others on successful ping we will revert back on next change
// or we will do failover first again and then it will be consistent again
for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_chosen_replica_instance)) {
if (!other_replica_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover",
other_replica_instance.InstanceName()));
return;
}
}
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_chosen_replica_instance),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!chosen_replica_instance->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_,
main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
chosen_replica_instance->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
auto const coord_instances = self_.GetAllCoordinators();
std::vector<InstanceStatus> instances_status;
instances_status.reserve(repl_instances_.size() + coord_instances.size());
auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.coord_socket_address = instance.SocketAddress(),
.cluster_role = stringify_repl_role(instance),
.is_alive = instance.IsAlive()};
};
auto const coord_instance_to_status = [](ptr<srv_config> const &instance) -> InstanceStatus {
return {.instance_name = "coordinator_" + std::to_string(instance->get_id()),
.raft_socket_address = instance->get_endpoint(),
.cluster_role = "coordinator",
.is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move
// CoordinatorState to every instance, we can be smarter about this using our RPC.
};
std::ranges::transform(coord_instances, std::back_inserter(instances_status), coord_instance_to_status);
{
auto lock = std::shared_lock{coord_instance_lock_};
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status);
}
return instances_status;
}
// TODO: (andi) Make sure you cannot put coordinator instance to the main
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name)
-> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const is_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(repl_instances_, is_new_main);
if (new_main == repl_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(repl_instances_.size() - 1);
auto const is_not_new_main = [&instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() != instance_name;
};
auto potential_new_main_uuid = utils::UUID{};
spdlog::trace("Generated potential new main uuid");
for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) {
if (!other_instance.SendSwapAndUpdateUUID(potential_new_main_uuid)) {
spdlog::error(
fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName()));
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
}
}
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const ReplicationInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(potential_new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
new_main->SetNewMainUUID(potential_new_main_uuid);
main_uuid_ = potential_new_main_uuid;
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
auto const name_matches = [&config](ReplicationInstance const &instance) {
return instance.InstanceName() == config.instance_name;
};
if (std::ranges::any_of(repl_instances_, name_matches)) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
auto const socket_address_matches = [&config](ReplicationInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
};
if (std::ranges::any_of(repl_instances_, socket_address_matches)) {
return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS;
}
if (!self_.RequestLeadership()) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
}
auto const res = self_.AppendRegisterReplicationInstance(config.instance_name);
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for registering instance {}. Most likely the reason is that the instance is not the "
"leader.",
config.instance_name);
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
spdlog::info("Request for registering instance {} accepted", config.instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code());
return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND;
}
spdlog::info("Instance {} registered", config.instance_name);
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
}
auto CoordinatorInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
self_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address));
}
} // namespace memgraph::coordination

View File

@ -13,214 +13,149 @@
#include "nuraft/coordinator_log_store.hpp"
#include "coordination/coordinator_exceptions.hpp"
namespace memgraph::coordination {
using nuraft::cs_new;
using nuraft::timer_helper;
CoordinatorLogStore::CoordinatorLogStore()
: start_idx_(1),
raft_server_bwd_pointer_(nullptr),
disk_emul_delay(0),
disk_emul_thread_(nullptr),
disk_emul_thread_stop_signal_(false),
disk_emul_last_durable_index_(0) {
// Dummy entry for index 0.
ptr<buffer> buf = buffer::alloc(sz_ulong);
namespace {
ptr<log_entry> MakeClone(const ptr<log_entry> &entry) {
return cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
}
} // namespace
CoordinatorLogStore::CoordinatorLogStore() : start_idx_(1) {
ptr<buffer> buf = buffer::alloc(sizeof(uint64_t));
logs_[0] = cs_new<log_entry>(0, buf);
}
CoordinatorLogStore::~CoordinatorLogStore() {
if (disk_emul_thread_) {
disk_emul_thread_stop_signal_ = true;
// disk_emul_ea_.invoke();
if (disk_emul_thread_->joinable()) {
disk_emul_thread_->join();
}
}
}
CoordinatorLogStore::~CoordinatorLogStore() {}
ptr<log_entry> CoordinatorLogStore::MakeClone(const ptr<log_entry> &entry) {
// NOTE:
// Timestamp is used only when `replicate_log_timestamp_` option is on.
// Otherwise, log store does not need to store or load it.
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type(),
entry->get_timestamp());
return clone;
}
ulong CoordinatorLogStore::next_slot() const {
std::lock_guard<std::mutex> l(logs_lock_);
// Exclude the dummy entry.
return start_idx_ + logs_.size() - 1;
}
ulong CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
ulong next_idx = next_slot();
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(next_idx - 1);
auto CoordinatorLogStore::FindOrDefault_(uint64_t index) const -> ptr<log_entry> {
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
return MakeClone(entry->second);
return entry->second;
}
ulong CoordinatorLogStore::append(ptr<log_entry> &entry) {
uint64_t CoordinatorLogStore::next_slot() const {
auto lock = std::lock_guard{logs_lock_};
return start_idx_ + logs_.size() - 1;
}
uint64_t CoordinatorLogStore::start_index() const { return start_idx_; }
ptr<log_entry> CoordinatorLogStore::last_entry() const {
auto lock = std::lock_guard{logs_lock_};
uint64_t const last_idx = start_idx_ + logs_.size() - 1;
auto const last_src = FindOrDefault_(last_idx - 1);
return MakeClone(last_src);
}
uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
std::lock_guard<std::mutex> l(logs_lock_);
size_t idx = start_idx_ + logs_.size() - 1;
logs_[idx] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = idx;
// disk_emul_ea_.invoke();
uint64_t next_slot{0};
{
auto lock = std::lock_guard{logs_lock_};
next_slot = start_idx_ + logs_.size() - 1;
logs_[next_slot] = clone;
}
return idx;
return next_slot;
}
void CoordinatorLogStore::write_at(ulong index, ptr<log_entry> &entry) {
void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
// Discard all logs equal to or greater than `index.
std::lock_guard<std::mutex> l(logs_lock_);
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
logs_[index] = clone;
if (disk_emul_delay) {
uint64_t cur_time = timer_helper::get_timeofday_us();
disk_emul_logs_being_written_[cur_time + disk_emul_delay * 1000] = index;
// Remove entries greater than `index`.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->second > index) {
entry = disk_emul_logs_being_written_.erase(entry);
} else {
entry++;
}
{
auto lock = std::lock_guard{logs_lock_};
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
// disk_emul_ea_.invoke();
logs_[index] = clone;
}
}
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(ulong start, ulong end) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start, uint64_t end) {
auto ret = cs_new<std::vector<ptr<log_entry>>>();
ret->resize(end - start);
ulong cc = 0;
for (ulong ii = start; ii < end; ++ii) {
for (uint64_t i = start, curr_index = 0; i < end; ++i, ++curr_index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
auto lock = std::lock_guard{logs_lock_};
if (auto const entry = logs_.find(i); entry != logs_.end()) {
src = entry->second;
} else {
throw RaftCouldNotFindEntryException("Could not find entry at index {}", i);
}
src = entry->second;
}
(*ret)[cc++] = MakeClone(src);
(*ret)[curr_index] = MakeClone(src);
}
return ret;
}
// NOLINTNEXTLINE(google-default-arguments)
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries_ext(ulong start, ulong end,
int64 batch_size_hint_in_bytes) {
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();
if (batch_size_hint_in_bytes < 0) {
return ret;
}
size_t accum_size = 0;
for (ulong ii = start; ii < end; ++ii) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(ii);
if (entry == logs_.end()) {
entry = logs_.find(0);
assert(0);
}
src = entry->second;
}
ret->push_back(MakeClone(src));
accum_size += src->get_buf().size();
if (batch_size_hint_in_bytes && accum_size >= (ulong)batch_size_hint_in_bytes) break;
}
return ret;
}
ptr<log_entry> CoordinatorLogStore::entry_at(ulong index) {
ptr<log_entry> CoordinatorLogStore::entry_at(uint64_t index) {
ptr<log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
src = entry->second;
auto lock = std::lock_guard{logs_lock_};
src = FindOrDefault_(index);
}
return MakeClone(src);
}
ulong CoordinatorLogStore::term_at(ulong index) {
ulong term = 0;
uint64_t CoordinatorLogStore::term_at(uint64_t index) {
uint64_t term = 0;
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.find(index);
if (entry == logs_.end()) {
entry = logs_.find(0);
}
term = entry->second->get_term();
auto lock = std::lock_guard{logs_lock_};
term = FindOrDefault_(index)->get_term();
}
return term;
}
ptr<buffer> CoordinatorLogStore::pack(ulong index, int32 cnt) {
ptr<buffer> CoordinatorLogStore::pack(uint64_t index, int32 cnt) {
std::vector<ptr<buffer>> logs;
size_t size_total = 0;
for (ulong ii = index; ii < index + cnt; ++ii) {
uint64_t const end_index = index + cnt;
for (uint64_t i = index; i < end_index; ++i) {
ptr<log_entry> le = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock_);
le = logs_[ii];
auto lock = std::lock_guard{logs_lock_};
le = logs_[i];
}
assert(le.get());
ptr<buffer> buf = le->serialize();
auto buf = le->serialize();
size_total += buf->size();
logs.push_back(buf);
}
ptr<buffer> buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
auto buf_out = buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
buf_out->pos(0);
buf_out->put((int32)cnt);
for (auto &entry : logs) {
ptr<buffer> &bb = entry;
buf_out->put((int32)bb->size());
auto &bb = entry; // TODO: (andi) This smells like not needed
buf_out->put(static_cast<int32>(bb->size()));
buf_out->put(*bb);
}
return buf_out;
}
void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
void CoordinatorLogStore::apply_pack(uint64_t index, buffer &pack) {
pack.pos(0);
int32 num_logs = pack.get_int();
int32 const num_logs = pack.get_int();
for (int32 ii = 0; ii < num_logs; ++ii) {
ulong cur_idx = index + ii;
for (int32 i = 0; i < num_logs; ++i) {
uint64_t cur_idx = index + i;
int32 buf_size = pack.get_int();
ptr<buffer> buf_local = buffer::alloc(buf_size);
@ -228,14 +163,14 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
ptr<log_entry> le = log_entry::deserialize(*buf_local);
{
std::lock_guard<std::mutex> l(logs_lock_);
auto lock = std::lock_guard{logs_lock_};
logs_[cur_idx] = le;
}
}
{
std::lock_guard<std::mutex> l(logs_lock_);
auto entry = logs_.upper_bound(0);
auto lock = std::lock_guard{logs_lock_};
auto const entry = logs_.upper_bound(0);
if (entry != logs_.end()) {
start_idx_ = entry->first;
} else {
@ -244,88 +179,23 @@ void CoordinatorLogStore::apply_pack(ulong index, buffer &pack) {
}
}
bool CoordinatorLogStore::compact(ulong last_log_index) {
std::lock_guard<std::mutex> l(logs_lock_);
for (ulong ii = start_idx_; ii <= last_log_index; ++ii) {
auto entry = logs_.find(ii);
// NOTE: Remove all logs up to given 'last_log_index' (inclusive).
bool CoordinatorLogStore::compact(uint64_t last_log_index) {
auto lock = std::lock_guard{logs_lock_};
for (uint64_t ii = start_idx_; ii <= last_log_index; ++ii) {
auto const entry = logs_.find(ii);
if (entry != logs_.end()) {
logs_.erase(entry);
}
}
// WARNING:
// Even though nothing has been erased,
// we should set `start_idx_` to new index.
if (start_idx_ <= last_log_index) {
start_idx_ = last_log_index + 1;
}
return true;
}
bool CoordinatorLogStore::flush() {
disk_emul_last_durable_index_ = next_slot() - 1;
return true;
}
ulong CoordinatorLogStore::last_durable_index() {
uint64_t last_log = next_slot() - 1;
if (!disk_emul_delay) {
return last_log;
}
return disk_emul_last_durable_index_;
}
void CoordinatorLogStore::DiskEmulLoop() {
// This thread mimics async disk writes.
// uint32_t next_sleep_us = 100 * 1000;
while (!disk_emul_thread_stop_signal_) {
// disk_emul_ea_.wait_us(next_sleep_us);
// disk_emul_ea_.reset();
if (disk_emul_thread_stop_signal_) break;
uint64_t cur_time = timer_helper::get_timeofday_us();
// next_sleep_us = 100 * 1000;
bool call_notification = false;
{
std::lock_guard<std::mutex> l(logs_lock_);
// Remove all timestamps equal to or smaller than `cur_time`,
// and pick the greatest one among them.
auto entry = disk_emul_logs_being_written_.begin();
while (entry != disk_emul_logs_being_written_.end()) {
if (entry->first <= cur_time) {
disk_emul_last_durable_index_ = entry->second;
entry = disk_emul_logs_being_written_.erase(entry);
call_notification = true;
} else {
break;
}
}
entry = disk_emul_logs_being_written_.begin();
if (entry != disk_emul_logs_being_written_.end()) {
// next_sleep_us = entry->first - cur_time;
}
}
if (call_notification) {
raft_server_bwd_pointer_->notify_log_append_completion(true);
}
}
}
void CoordinatorLogStore::Close() {}
void CoordinatorLogStore::SetDiskDelay(raft_server *raft, size_t delay_ms) {
disk_emul_delay = delay_ms;
raft_server_bwd_pointer_ = raft;
if (!disk_emul_thread_) {
disk_emul_thread_ = std::make_unique<std::thread>(&CoordinatorLogStore::DiskEmulLoop, this);
}
}
bool CoordinatorLogStore::flush() { return true; }
} // namespace memgraph::coordination
#endif

View File

@ -41,37 +41,39 @@ CoordinatorState::CoordinatorState() {
}
}
auto CoordinatorState::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig config)
-> RegisterInstanceCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[config](CoordinatorData &coordinator_data) { return coordinator_data.RegisterInstance(config); }},
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return RegisterInstanceCoordinatorStatus::NOT_COORDINATOR;
},
[config](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.RegisterReplicationInstance(config);
}},
data_);
}
auto CoordinatorState::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::visit(
memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
return SetInstanceToMainCoordinatorStatus::NOT_COORDINATOR;
},
[&instance_name](CoordinatorData &coordinator_data) {
return coordinator_data.SetInstanceToMain(instance_name);
[&instance_name](CoordinatorInstance &coordinator_instance) {
return coordinator_instance.SetReplicationInstanceToMain(instance_name);
}},
data_);
}
auto CoordinatorState::ShowInstances() const -> std::vector<InstanceStatus> {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Can't call show instances on data_, as variant holds wrong alternative");
return std::get<CoordinatorData>(data_).ShowInstances();
return std::get<CoordinatorInstance>(data_).ShowInstances();
}
auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
@ -82,9 +84,9 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
MG_ASSERT(std::holds_alternative<CoordinatorInstance>(data_),
"Coordinator cannot register replica since variant holds wrong alternative");
return std::get<CoordinatorData>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
return std::get<CoordinatorInstance>(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address);
}
} // namespace memgraph::coordination

View File

@ -15,6 +15,19 @@
namespace memgraph::coordination {
auto CoordinatorStateMachine::EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer> {
std::string str_log = name + "_replica";
ptr<buffer> log = buffer::alloc(sizeof(uint32_t) + str_log.size());
buffer_serializer bs(log);
bs.put_str(str_log);
return log;
}
auto CoordinatorStateMachine::DecodeRegisterReplicationInstance(buffer &data) -> std::string {
buffer_serializer bs(data);
return bs.get_str();
}
auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
buffer_serializer bs(data);
std::string str = bs.get_str();

View File

@ -20,14 +20,14 @@
namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
class CoordinatorInstance;
using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
~CoordinatorClient() = default;
@ -69,7 +69,7 @@ class CoordinatorClient {
mutable rpc::Client rpc_client_;
CoordinatorClientConfig config_;
CoordinatorData *coord_data_;
CoordinatorInstance *coord_instance_;
HealthCheckCallback succ_cb_;
HealthCheckCallback fail_cb_;
};

View File

@ -1,61 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "replication_coordination_glue/handler.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include "utils/uuid.hpp"
#include <list>
namespace memgraph::coordination {
class CoordinatorData {
public:
CoordinatorData();
// TODO: (andi) Probably rename to RegisterReplicationInstance
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
private:
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
CoordinatorInstance self_;
utils::UUID main_uuid_;
};
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
} // namespace memgraph::coordination
#endif

View File

@ -50,5 +50,27 @@ class RaftAddServerException final : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(RaftAddServerException)
};
class RaftBecomeLeaderException final : public utils::BasicException {
public:
explicit RaftBecomeLeaderException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftBecomeLeaderException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftBecomeLeaderException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftBecomeLeaderException)
};
class RaftCouldNotFindEntryException final : public utils::BasicException {
public:
explicit RaftCouldNotFindEntryException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit RaftCouldNotFindEntryException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: RaftCouldNotFindEntryException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotFindEntryException)
};
} // namespace memgraph::coordination
#endif

View File

@ -13,45 +13,44 @@
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/raft_instance.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include <libnuraft/nuraft.hxx>
#include <list>
namespace memgraph::coordination {
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
class CoordinatorInstance {
public:
CoordinatorInstance();
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
CoordinatorInstance(CoordinatorInstance &&other) noexcept = delete;
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto TryFailover() -> void;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
private:
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
auto ClusterHasAliveMain_() const -> bool;
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
utils::UUID main_uuid_;
RaftInstance self_;
};
} // namespace memgraph::coordination

View File

@ -13,7 +13,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_data.hpp"
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/instance_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
@ -33,19 +33,23 @@ class CoordinatorState {
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
[[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<InstanceStatus>;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
// The client code must check that the server exists before calling this method.
// NOTE: The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;
private:
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
struct CoordinatorMainReplicaData {
std::unique_ptr<CoordinatorServer> coordinator_server_;
};
std::variant<CoordinatorInstance, CoordinatorMainReplicaData> data_;
};
} // namespace memgraph::coordination

View File

@ -0,0 +1,73 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <flags/replication.hpp>
#include <libnuraft/nuraft.hxx>
namespace memgraph::coordination {
using BecomeLeaderCb = std::function<void()>;
using BecomeFollowerCb = std::function<void()>;
using nuraft::buffer;
using nuraft::logger;
using nuraft::ptr;
using nuraft::raft_launcher;
using nuraft::raft_server;
using nuraft::srv_config;
using nuraft::state_machine;
using nuraft::state_mgr;
using raft_result = nuraft::cmd_result<ptr<buffer>>;
class RaftInstance {
public:
RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb);
RaftInstance(RaftInstance const &other) = delete;
RaftInstance &operator=(RaftInstance const &other) = delete;
RaftInstance(RaftInstance &&other) noexcept = delete;
RaftInstance &operator=(RaftInstance &&other) noexcept = delete;
~RaftInstance();
auto InstanceName() const -> std::string;
auto RaftSocketAddress() const -> std::string;
auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void;
auto GetAllCoordinators() const -> std::vector<ptr<srv_config>>;
auto RequestLeadership() -> bool;
auto IsLeader() const -> bool;
auto AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result>;
private:
ptr<state_machine> state_machine_;
ptr<state_mgr> state_manager_;
ptr<raft_server> raft_server_;
ptr<logger> logger_;
raft_launcher launcher_;
// TODO: (andi) I think variables below can be abstracted
uint32_t raft_server_id_;
uint32_t raft_port_;
std::string raft_address_;
BecomeLeaderCb become_leader_cb_;
BecomeFollowerCb become_follower_cb_;
};
} // namespace memgraph::coordination
#endif

View File

@ -22,6 +22,9 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
ENDPOINT_EXISTS,
NOT_COORDINATOR,
RPC_FAILED,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
SUCCESS
};

View File

@ -23,11 +23,11 @@
namespace memgraph::coordination {
class CoordinatorData;
class CoordinatorInstance;
class ReplicationInstance {
public:
ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
ReplicationInstance(ReplicationInstance const &other) = delete;
@ -51,6 +51,8 @@ class ReplicationInstance {
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto StartFrequentCheck() -> void;
auto StopFrequentCheck() -> void;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;

View File

@ -46,9 +46,6 @@ class CoordinatorLogStore : public log_store {
ptr<std::vector<ptr<log_entry>>> log_entries(ulong start, ulong end) override;
// NOLINTNEXTLINE
ptr<std::vector<ptr<log_entry>>> log_entries_ext(ulong start, ulong end, int64 batch_size_hint_in_bytes = 0) override;
ptr<log_entry> entry_at(ulong index) override;
ulong term_at(ulong index) override;
@ -61,67 +58,12 @@ class CoordinatorLogStore : public log_store {
bool flush() override;
ulong last_durable_index() override;
void Close();
void SetDiskDelay(raft_server *raft, size_t delay_ms);
private:
static ptr<log_entry> MakeClone(ptr<log_entry> const &entry);
auto FindOrDefault_(ulong index) const -> ptr<log_entry>;
void DiskEmulLoop();
/**
* Map of <log index, log data>.
*/
std::map<ulong, ptr<log_entry>> logs_;
/**
* Lock for `logs_`.
*/
mutable std::mutex logs_lock_;
/**
* The index of the first log.
*/
std::atomic<ulong> start_idx_;
/**
* Backward pointer to Raft server.
*/
raft_server *raft_server_bwd_pointer_;
// Testing purpose --------------- BEGIN
/**
* If non-zero, this log store will emulate the disk write delay.
*/
std::atomic<size_t> disk_emul_delay;
/**
* Map of <timestamp, log index>, emulating logs that is being written to disk.
* Log index will be regarded as "durable" after the corresponding timestamp.
*/
std::map<uint64_t, uint64_t> disk_emul_logs_being_written_;
/**
* Thread that will update `last_durable_index_` and call
* `notify_log_append_completion` at proper time.
*/
std::unique_ptr<std::thread> disk_emul_thread_;
/**
* Flag to terminate the thread.
*/
std::atomic<bool> disk_emul_thread_stop_signal_;
/**
* Last written log index.
*/
std::atomic<uint64_t> disk_emul_last_durable_index_;
// Testing purpose --------------- END
};
} // namespace memgraph::coordination

View File

@ -36,6 +36,10 @@ class CoordinatorStateMachine : public state_machine {
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr<buffer>;
static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string;
auto pre_commit(ulong log_idx, buffer &data) -> ptr<buffer> override;
auto commit(ulong log_idx, buffer &data) -> ptr<buffer> override;

View File

@ -0,0 +1,126 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/raft_instance.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
namespace memgraph::coordination {
using nuraft::asio_service;
using nuraft::cb_func;
using nuraft::CbReturnCode;
using nuraft::cmd_result;
using nuraft::cs_new;
using nuraft::ptr;
using nuraft::raft_params;
using nuraft::raft_server;
using nuraft::srv_config;
using raft_result = cmd_result<ptr<buffer>>;
RaftInstance::RaftInstance(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb)
: raft_server_id_(FLAGS_raft_server_id),
raft_port_(FLAGS_raft_server_port),
raft_address_("127.0.0.1"),
become_leader_cb_(std::move(become_leader_cb)),
become_follower_cb_(std::move(become_follower_cb)) {
auto raft_endpoint = raft_address_ + ":" + std::to_string(raft_port_);
state_manager_ = cs_new<CoordinatorStateManager>(raft_server_id_, raft_endpoint);
state_machine_ = cs_new<CoordinatorStateMachine>();
logger_ = nullptr;
// TODO: (andi) Maybe params file
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_();
} else if (event_type == cb_func::BecomeFollower) {
spdlog::info("Node {} became follower", param->myId);
become_follower_cb_();
}
return CbReturnCode::Ok;
};
raft_server_ = launcher_.init(state_machine_, state_manager_, logger_, static_cast<int>(raft_port_), asio_opts,
params, init_opts);
if (!raft_server_) {
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint);
}
auto maybe_stop = utils::ResettableCounter<20>();
while (!raft_server_->is_initialized() && !maybe_stop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
if (!raft_server_->is_initialized()) {
throw RaftServerStartException("Failed to initialize raft server on {}", raft_endpoint);
}
spdlog::info("Raft server started on {}", raft_endpoint);
}
RaftInstance::~RaftInstance() { launcher_.shutdown(); }
auto RaftInstance::InstanceName() const -> std::string { return "coordinator_" + std::to_string(raft_server_id_); }
auto RaftInstance::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); }
auto RaftInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address)
-> void {
auto const endpoint = raft_address + ":" + std::to_string(raft_port);
srv_config const srv_config_to_add(static_cast<int>(raft_server_id), endpoint);
if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) {
throw RaftAddServerException("Failed to add server {} to the cluster", endpoint);
}
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
}
auto RaftInstance::GetAllCoordinators() const -> std::vector<ptr<srv_config>> {
std::vector<ptr<srv_config>> all_srv_configs;
raft_server_->get_srv_config_all(all_srv_configs);
return all_srv_configs;
}
auto RaftInstance::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftInstance::RequestLeadership() -> bool {
return raft_server_->is_leader() || raft_server_->request_leadership();
}
auto RaftInstance::AppendRegisterReplicationInstance(std::string const &instance) -> ptr<raft_result> {
auto new_log = CoordinatorStateMachine::EncodeRegisterReplicationInstance(instance);
return raft_server_->append_entries({new_log});
}
} // namespace memgraph::coordination
#endif

View File

@ -17,14 +17,14 @@
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorData *data, CoordinatorClientConfig config,
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
@ -75,6 +75,8 @@ auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, H
return true;
}
auto ReplicationInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
auto ReplicationInstance::StopFrequentCheck() -> void { client_.StopFrequentCheck(); }
auto ReplicationInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto ReplicationInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }

View File

@ -20,14 +20,14 @@ namespace memgraph::dbms {
CoordinatorHandler::CoordinatorHandler(coordination::CoordinatorState &coordinator_state)
: coordinator_state_(coordinator_state) {}
auto CoordinatorHandler::RegisterInstance(memgraph::coordination::CoordinatorClientConfig config)
auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus {
return coordinator_state_.RegisterInstance(config);
return coordinator_state_.RegisterReplicationInstance(config);
}
auto CoordinatorHandler::SetInstanceToMain(std::string instance_name)
auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name)
-> coordination::SetInstanceToMainCoordinatorStatus {
return coordinator_state_.SetInstanceToMain(std::move(instance_name));
return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name));
}
auto CoordinatorHandler::ShowInstances() const -> std::vector<coordination::InstanceStatus> {

View File

@ -28,10 +28,10 @@ class CoordinatorHandler {
public:
explicit CoordinatorHandler(coordination::CoordinatorState &coordinator_state);
auto RegisterInstance(coordination::CoordinatorClientConfig config)
auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config)
-> coordination::RegisterInstanceCoordinatorStatus;
auto SetInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus;
auto ShowInstances() const -> std::vector<coordination::InstanceStatus>;

View File

@ -3034,7 +3034,7 @@ class ReplicationQuery : public memgraph::query::Query {
enum class SyncMode { SYNC, ASYNC };
enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND };
enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, UNREACHABLE };
ReplicationQuery() = default;

View File

@ -437,6 +437,9 @@ class ReplQueryHandler {
case storage::replication::ReplicaState::MAYBE_BEHIND:
replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND;
break;
case storage::replication::ReplicaState::UNREACHABLE:
replica.state = ReplicationQuery::ReplicaState::UNREACHABLE;
break;
}
return replica;
@ -458,9 +461,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
: coordinator_handler_(coordinator_state) {}
/// @throw QueryRuntimeException if an error ocurred.
void RegisterInstance(const std::string &coordinator_socket_address, const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) override {
void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) override {
const auto maybe_replication_ip_port =
io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt);
if (!maybe_replication_ip_port) {
@ -489,7 +493,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
.replication_client_info = repl_config,
.ssl = std::nullopt};
auto status = coordinator_handler_.RegisterInstance(coordinator_client_config);
auto status = coordinator_handler_.RegisterReplicationInstance(coordinator_client_config);
switch (status) {
using enum memgraph::coordination::RegisterInstanceCoordinatorStatus;
case NAME_EXISTS:
@ -499,6 +503,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
"Couldn't register replica instance since instance with such endpoint already exists!");
case NOT_COORDINATOR:
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
@ -519,8 +531,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
}
}
void SetInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetInstanceToMain(instance_name);
void SetReplicationInstanceToMain(const std::string &instance_name) override {
auto status = coordinator_handler_.SetReplicationInstanceToMain(instance_name);
switch (status) {
using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus;
case NO_INSTANCE_WITH_NAME:
@ -1073,6 +1085,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
case ReplicationQuery::ReplicaState::MAYBE_BEHIND:
typed_replica.emplace_back("invalid");
break;
case ReplicationQuery::ReplicaState::UNREACHABLE:
typed_replica.emplace_back("unreachable");
break;
}
typed_replicas.emplace_back(std::move(typed_replica));
@ -1145,9 +1160,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()), main_check_frequency,
instance_name, sync_mode);
handler.RegisterReplicationInstance(std::string(coordinator_socket_address_tv.ValueString()),
std::string(replication_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
@ -1176,7 +1191,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
callback.fn = [handler = CoordQueryHandler{*coordinator_state},
instance_name = coordinator_query->instance_name_]() mutable {
handler.SetInstanceToMain(instance_name);
handler.SetReplicationInstanceToMain(instance_name);
return std::vector<std::vector<TypedValue>>();
};

View File

@ -105,13 +105,13 @@ class CoordinatorQueryHandler {
};
/// @throw QueryRuntimeException if an error ocurred.
virtual void RegisterInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency, const std::string &instance_name,
CoordinatorQuery::SyncMode sync_mode) = 0;
virtual void RegisterReplicationInstance(const std::string &coordinator_socket_address,
const std::string &replication_socket_address,
const std::chrono::seconds instance_check_frequency,
const std::string &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void SetInstanceToMain(const std::string &instance_name) = 0;
virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<coordination::InstanceStatus> ShowInstances() const = 0;

View File

@ -133,39 +133,38 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto GetReplState() -> memgraph::replication::ReplicationState &;
private:
template <bool HandleFailure>
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)
-> memgraph::utils::BasicResult<memgraph::query::RegisterReplicaError> {
template <bool AllowReplicaToBeUnreachable>
auto RegisterReplica_(const replication::ReplicationClientConfig &config, bool send_swap_uuid)
-> utils::BasicResult<memgraph::query::RegisterReplicaError> {
MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!");
auto maybe_client = repl_state_.RegisterReplica(config);
if (maybe_client.HasError()) {
switch (maybe_client.GetError()) {
case memgraph::replication::RegisterReplicaError::NOT_MAIN:
case replication::RegisterReplicaError::NOT_MAIN:
MG_ASSERT(false, "Only main instance can register a replica!");
return {};
case memgraph::replication::RegisterReplicaError::NAME_EXISTS:
return memgraph::query::RegisterReplicaError::NAME_EXISTS;
case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS:
return memgraph::query::RegisterReplicaError::ENDPOINT_EXISTS;
case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
return memgraph::query::RegisterReplicaError::COULD_NOT_BE_PERSISTED;
case memgraph::replication::RegisterReplicaError::SUCCESS:
case replication::RegisterReplicaError::NAME_EXISTS:
return query::RegisterReplicaError::NAME_EXISTS;
case replication::RegisterReplicaError::ENDPOINT_EXISTS:
return query::RegisterReplicaError::ENDPOINT_EXISTS;
case replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
return query::RegisterReplicaError::COULD_NOT_BE_PERSISTED;
case replication::RegisterReplicaError::SUCCESS:
break;
}
}
if (!memgraph::dbms::allow_mt_repl && dbms_handler_.All().size() > 1) {
if (!dbms::allow_mt_repl && dbms_handler_.All().size() > 1) {
spdlog::warn("Multi-tenant replication is currently not supported!");
}
const auto main_uuid =
std::get<memgraph::replication::RoleMainData>(dbms_handler_.ReplicationState().ReplicationData()).uuid_;
if (send_swap_uuid) {
if (!memgraph::replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_,
main_uuid)) {
return memgraph::query::RegisterReplicaError::ERROR_ACCEPTING_MAIN;
}
auto const main_uuid =
std::get<replication::RoleMainData>(dbms_handler_.ReplicationState().ReplicationData()).uuid_;
if (send_swap_uuid &&
!replication_coordination_glue::SendSwapMainUUIDRpc(maybe_client.GetValue()->rpc_client_, main_uuid)) {
return query::RegisterReplicaError::ERROR_ACCEPTING_MAIN;
}
#ifdef MG_ENTERPRISE
@ -193,21 +192,21 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
[storage, &instance_client_ptr, db_acc = std::move(db_acc),
main_uuid](auto &storage_clients) mutable { // NOLINT
auto client = std::make_unique<storage::ReplicationStorageClient>(*instance_client_ptr, main_uuid);
// All good, start replica client
client->Start(storage, std::move(db_acc));
// After start the storage <-> replica state should be READY or RECOVERING (if correctly started)
// MAYBE_BEHIND isn't a statement of the current state, this is the default value
// Failed to start due an error like branching of MAIN and REPLICA
const bool success = client->State() != storage::replication::ReplicaState::MAYBE_BEHIND;
if (HandleFailure || success) {
// After start the storage <-> replica state shouldn't be MAYBE_BEHIND.
// When part of coordinator cluster we allow replica to be UNREACHABLE.
auto state = client->State();
bool const success =
(state != storage::replication::ReplicaState::MAYBE_BEHIND) ||
(state == storage::replication::ReplicaState::UNREACHABLE && AllowReplicaToBeUnreachable);
if (success) {
storage_clients.push_back(std::move(client));
}
return success;
});
});
// NOTE Currently if any databases fails, we revert back
if (!HandleFailure && !all_clients_good) {
if (!all_clients_good) {
spdlog::error("Failed to register all databases on the REPLICA \"{}\"", config.name);
UnregisterReplica(config.name);
return memgraph::query::RegisterReplicaError::CONNECTION_FAILED;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -14,6 +14,6 @@
namespace memgraph::storage::replication {
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND };
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND, UNREACHABLE };
} // namespace memgraph::storage::replication

View File

@ -46,6 +46,9 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
std::string{storage->uuid()});
state = memgraph::replication::ReplicationClient::State::BEHIND;
});
replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::UNREACHABLE; });
return;
}
#endif
@ -149,6 +152,9 @@ void ReplicationStorageClient::StartTransactionReplication(const uint64_t curren
auto locked_state = replica_state_.Lock();
switch (*locked_state) {
using enum replication::ReplicaState;
case UNREACHABLE:
spdlog::debug("Replica {} is unreachable", client_.name_);
return;
case RECOVERY:
spdlog::debug("Replica {} is behind MAIN instance", client_.name_);
return;

View File

@ -1,8 +1,9 @@
find_package(gflags REQUIRED)
copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental distributed_coordinators.py)
copy_e2e_python_files(ha_experimental single_coordinator.py)
copy_e2e_python_files(ha_experimental coord_cluster_registration.py)
copy_e2e_python_files(ha_experimental distributed_coords.py)
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
copy_e2e_python_files(ha_experimental not_replicate_from_old_main.py)
copy_e2e_python_files(ha_experimental common.py)

View File

@ -0,0 +1,284 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [],
},
}
# NOTE: Repeated execution because it can fail if Raft server is not up
def add_coordinator(cursor, query):
for _ in range(10):
try:
execute_and_fetch_all(cursor, query)
return True
except Exception:
pass
return False
def test_register_repl_instances_then_coordinators():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
def test_register_coordinator_then_repl_instances():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
def test_coordinators_communication_with_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,145 +0,0 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"coordinator1": {
"args": [
"--bolt-port",
"7687",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator2": {
"args": [
"--bolt-port",
"7688",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator3": {
"args": [
"--bolt-port",
"7689",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
],
},
}
def test_coordinators_communication():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7689).cursor()
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator2)
def test_coordinators_communication_with_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
expected_cluster = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
mg_sleep_and_assert(expected_cluster, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator2")
coordinator1_cursor = connect(host="localhost", port=7687).cursor()
coordinator2_cursor = connect(host="localhost", port=7688).cursor()
mg_sleep_and_assert(expected_cluster, check_coordinator1)
mg_sleep_and_assert(expected_cluster, check_coordinator2)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,164 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
def test_distributed_automatic_failover():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connect(host="localhost", port=7692).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"), # TODO: (andi) Solve it
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,5 +1,4 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
@ -134,18 +133,18 @@ def test_replication_works_on_failover():
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
# 5
execute_and_fetch_all(new_main_cursor, "CREATE ();")
# execute_and_fetch_all(new_main_cursor, "CREATE ();")
# 6
alive_replica_cursror = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated"
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# alive_replica_cursror = connect(host="localhost", port=7689).cursor()
# res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
# assert res == 1, "Vertex should be replicated"
# interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_show_instances():
@ -243,7 +242,7 @@ def test_simple_automatic_failover():
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)

View File

@ -28,18 +28,22 @@ workloads:
args: ["high_availability_experimental/coordinator.py"]
<<: *ha_cluster
- name: "Automatic failover"
- name: "Single coordinator"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/automatic_failover.py"]
args: ["high_availability_experimental/single_coordinator.py"]
- name: "Disabled manual setting of replication cluster"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/manual_setting_replicas.py"]
- name: "Distributed coordinators"
- name: "Coordinator cluster registration"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/distributed_coordinators.py"]
args: ["high_availability_experimental/coord_cluster_registration.py"]
- name: "Not replicate from old main"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/not_replicate_from_old_main.py"]
- name: "Distributed coordinators"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/distributed_coords.py"]