Thread-safe AF working

This commit is contained in:
Andi Skrgat 2024-01-23 13:45:02 +01:00
parent a0ecea7d1c
commit 3f4ac0dd58
12 changed files with 144 additions and 236 deletions

View File

@ -13,7 +13,6 @@ target_sources(mg-coordination
include/coordination/coordinator_data.hpp include/coordination/coordinator_data.hpp
include/coordination/constants.hpp include/coordination/constants.hpp
include/coordination/failover_status.hpp include/coordination/failover_status.hpp
include/coordination/coordinator_client_info.hpp
include/coordination/coordinator_cluster_config.hpp include/coordination/coordinator_cluster_config.hpp
PRIVATE PRIVATE

View File

@ -37,70 +37,54 @@ CoordinatorClient::CoordinatorClient(CoordinatorData *coord_data, CoordinatorCli
succ_cb_{std::move(succ_cb)}, succ_cb_{std::move(succ_cb)},
fail_cb_{std::move(fail_cb)} {} fail_cb_{std::move(fail_cb)} {}
CoordinatorClient::~CoordinatorClient() { auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; }
const auto endpoint = rpc_client_.Endpoint(); auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
spdlog::trace("Closing coordinator client on {}:{}", endpoint.address, endpoint.port);
}
void CoordinatorClient::StartFrequentCheck() { void CoordinatorClient::StartFrequentCheck() {
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0"); "Health check frequency must be greater than 0");
std::string_view instance_name = config_.instance_name; replica_checker_.Run(
replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [this, instance_name] { "Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
try { try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}:{}", instance_name, rpc_client_.Endpoint().address, spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().port); rpc_client_.Endpoint().SocketAddress());
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
if (stream.AwaitResponse().success) { if (stream.AwaitResponse().success) {
succ_cb_(coord_data_, instance_name); succ_cb_(coord_data_, instance_name);
} else { } else {
fail_cb_(coord_data_, instance_name); fail_cb_(coord_data_, instance_name);
} }
} catch (const rpc::RpcFailedException &) { } catch (const rpc::RpcFailedException &) {
fail_cb_(coord_data_, instance_name); fail_cb_(coord_data_, instance_name);
} }
}); });
} }
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); } void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); } void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); }
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
auto CoordinatorClient::SuccCallback() const -> HealthCheckCallback const & { return succ_cb_; }
auto CoordinatorClient::FailCallback() const -> HealthCheckCallback const & { return fail_cb_; }
// TODO: (andi) What is better, like this or fetch it by const &
auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); } auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); }
auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); } auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); }
////// AF design choice auto CoordinatorClient::ReplicationClientInfo() const
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & { -> const std::optional<CoordinatorClientConfig::ReplicationClientInfo> & {
MG_ASSERT(config_.replication_client_info.has_value(), "No ReplicationClientInfo for MAIN instance!");
return *config_.replication_client_info;
}
////// AF design choice
auto CoordinatorClient::ReplicationClientInfo() -> std::optional<CoordinatorClientConfig::ReplicationClientInfo> & {
MG_ASSERT(config_.replication_client_info.has_value(), "No ReplicationClientInfo for MAIN instance!");
return config_.replication_client_info; return config_.replication_client_info;
} }
auto CoordinatorClient::ResetReplicationClientInfo() -> void { config_.replication_client_info.reset(); }
auto CoordinatorClient::SendPromoteReplicaToMainRpc( auto CoordinatorClient::SendPromoteReplicaToMainRpc(
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool { std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {
try { try {
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))}; auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))};
if (!stream.AwaitResponse().success) { if (!stream.AwaitResponse().success) {
spdlog::error("Failed to perform failover!"); spdlog::error("Failed to receive successful RPC failover response!");
return false; return false;
} }
spdlog::info("Sent failover RPC from coordinator to new main!");
return true; return true;
} catch (const rpc::RpcFailedException &) { } catch (const rpc::RpcFailedException &) {
spdlog::error("Failed to send failover RPC from coordinator to new main!"); spdlog::error("RPC error occurred while sending failover RPC!");
} }
return false; return false;
} }

View File

@ -14,6 +14,7 @@
#include "coordination/coordinator_data.hpp" #include "coordination/coordinator_data.hpp"
#include <range/v3/view.hpp> #include <range/v3/view.hpp>
#include <shared_mutex>
namespace memgraph::coordination { namespace memgraph::coordination {
@ -21,10 +22,9 @@ CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & { auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & {
std::shared_lock<utils::RWLock> lock{coord_data->coord_data_lock_}; std::shared_lock<utils::RWLock> lock{coord_data->coord_data_lock_};
auto instance = auto instance = std::ranges::find_if(
std::ranges::find_if(coord_data->registered_instances_, [instance_name](const CoordinatorInstance &instance) { coord_data->registered_instances_,
return instance.client_info_.InstanceName() == instance_name; [instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
});
MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!", MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!",
instance_name); instance_name);
@ -32,64 +32,67 @@ CoordinatorData::CoordinatorData() {
}; };
replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name); auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name); MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.client_info_.UpdateLastResponseTime(); instance.UpdateLastResponseTime();
}; };
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name); auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name); MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.client_info_.UpdateInstanceStatus(); instance.UpdateInstanceStatus();
}; };
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name); auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name); MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
instance.client_info_.UpdateLastResponseTime(); instance.UpdateLastResponseTime();
}; };
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void { main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name); auto &instance = find_instance(coord_data, instance_name);
if (bool main_alive = instance.client_info_.UpdateInstanceStatus(); !main_alive) { MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
// spdlog::warn("Main is not alive, starting failover"); if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) {
// switch (auto failover_status = DoFailover(); failover_status) { spdlog::warn("Main is not alive, starting automatic failover");
// using enum DoFailoverStatus; switch (auto failover_status = DoFailover(); failover_status) {
// case ALL_REPLICAS_DOWN: using enum DoFailoverStatus;
// spdlog::warn("Failover aborted since all replicas are down!"); case ALL_REPLICAS_DOWN:
// case MAIN_ALIVE: spdlog::warn("Failover aborted since all replicas are down!");
// spdlog::warn("Failover aborted since main is alive!"); case MAIN_ALIVE:
// case CLUSTER_UNINITIALIZED: spdlog::warn("Failover aborted since main is alive!");
// spdlog::warn("Failover aborted since cluster is uninitialized!"); case CLUSTER_UNINITIALIZED:
// case SUCCESS: spdlog::warn("Failover aborted since cluster is uninitialized!");
// break; case SUCCESS:
// } break;
}
} }
}; };
} }
auto CoordinatorData::DoFailover() -> DoFailoverStatus { auto CoordinatorData::DoFailover() -> DoFailoverStatus {
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
// std::lock_guard<utils::RWLock> lock{coord_data_lock_}; std::lock_guard<utils::RWLock> lock{coord_data_lock_};
// TODO: (andi) Make const what is possible to make const const auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
if (main_instance == registered_instances_.end()) { if (main_instance == registered_instances_.end()) {
return DoFailoverStatus::CLUSTER_UNINITIALIZED; return DoFailoverStatus::CLUSTER_UNINITIALIZED;
} }
if (main_instance->client_info_.IsAlive()) { if (main_instance->IsAlive()) {
return DoFailoverStatus::MAIN_ALIVE; return DoFailoverStatus::MAIN_ALIVE;
} }
main_instance->client_.StopFrequentCheck(); main_instance->client_.PauseFrequentCheck();
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica); auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
auto chosen_replica_instance = std::ranges::find_if( auto chosen_replica_instance =
replica_instances, [](const CoordinatorInstance &instance) { return instance.client_info_.IsAlive(); }); std::ranges::find_if(replica_instances, [](const CoordinatorInstance &instance) { return instance.IsAlive(); });
if (chosen_replica_instance == replica_instances.end()) { if (chosen_replica_instance == replica_instances.end()) {
return DoFailoverStatus::ALL_REPLICAS_DOWN; return DoFailoverStatus::ALL_REPLICAS_DOWN;
@ -105,46 +108,48 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
}; };
for (const auto &unchosen_replica_instance : replica_instances | ranges::views::filter(not_chosen_replica_instance)) { for (const auto &unchosen_replica_instance : replica_instances | ranges::views::filter(not_chosen_replica_instance)) {
repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo()); if (auto repl_client_info = unchosen_replica_instance.client_.ReplicationClientInfo();
repl_client_info.has_value()) {
repl_clients_info.emplace_back(std::move(repl_client_info.value()));
}
} }
if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) { if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
spdlog::error("Sent RPC message, but exception was caught, aborting Failover");
// TODO: new status and rollback all changes that were done... // TODO: new status and rollback all changes that were done...
MG_ASSERT(false, "RPC message failed"); MG_ASSERT(false, "Promoting replica to main failed!");
} }
chosen_replica_instance->client_.SetSuccCallback(main_succ_cb_); chosen_replica_instance->client_.SetSuccCallback(main_succ_cb_);
chosen_replica_instance->client_.SetFailCallback(main_fail_cb_); chosen_replica_instance->client_.SetFailCallback(main_fail_cb_);
chosen_replica_instance->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; chosen_replica_instance->client_.ResetReplicationClientInfo();
// TODO: (andi) Is this correct
chosen_replica_instance->client_.ReplicationClientInfo().reset();
chosen_replica_instance->client_.ResumeFrequentCheck(); chosen_replica_instance->client_.ResumeFrequentCheck();
chosen_replica_instance->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
registered_instances_.erase(main_instance); main_instance->replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
return DoFailoverStatus::SUCCESS; return DoFailoverStatus::SUCCESS;
} }
auto CoordinatorData::ShowMain() const -> std::optional<CoordinatorInstanceStatus> { auto CoordinatorData::ShowMain() const -> std::optional<CoordinatorInstanceStatus> {
std::shared_lock<utils::RWLock> lock{coord_data_lock_};
auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain); auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
if (main_instance == registered_instances_.end()) { if (main_instance == registered_instances_.end()) {
return std::nullopt; return std::nullopt;
} }
return CoordinatorInstanceStatus{.instance_name = main_instance->client_info_.InstanceName(), return CoordinatorInstanceStatus{.instance_name = main_instance->InstanceName(),
.socket_address = main_instance->client_info_.SocketAddress(), .socket_address = main_instance->SocketAddress(),
.is_alive = main_instance->client_info_.IsAlive()}; .is_alive = main_instance->IsAlive()};
}; };
auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceStatus> { auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceStatus> {
std::shared_lock<utils::RWLock> lock{coord_data_lock_};
std::vector<CoordinatorInstanceStatus> instances_status; std::vector<CoordinatorInstanceStatus> instances_status;
for (const auto &replica_instance : registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica)) { for (const auto &replica_instance : registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica)) {
instances_status.emplace_back( instances_status.emplace_back(CoordinatorInstanceStatus{.instance_name = replica_instance.InstanceName(),
CoordinatorInstanceStatus{.instance_name = replica_instance.client_info_.InstanceName(), .socket_address = replica_instance.SocketAddress(),
.socket_address = replica_instance.client_info_.SocketAddress(), .is_alive = replica_instance.IsAlive()});
.is_alive = replica_instance.client_info_.IsAlive()});
} }
return instances_status; return instances_status;
@ -152,20 +157,20 @@ auto CoordinatorData::ShowReplicas() const -> std::vector<CoordinatorInstanceSta
auto CoordinatorData::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { auto CoordinatorData::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
// TODO: (andi) test this // TODO: (andi) test this
std::lock_guard<utils::RWLock> lock{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.client_info_.InstanceName() == config.instance_name; return instance.InstanceName() == config.instance_name;
})) { })) {
return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS; return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS;
} }
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.client_info_.SocketAddress() == config.ip_address + ":" + std::to_string(config.port); return instance.SocketAddress() == config.SocketAddress();
})) { })) {
return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS; return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS;
} }
// TODO: (andi) Improve this
auto *instance = &registered_instances_.emplace_back(this, std::move(config), main_succ_cb_, main_fail_cb_, auto *instance = &registered_instances_.emplace_back(this, std::move(config), main_succ_cb_, main_fail_cb_,
replication_coordination_glue::ReplicationRole::MAIN); replication_coordination_glue::ReplicationRole::MAIN);
instance->client_.StartFrequentCheck(); instance->client_.StartFrequentCheck();
@ -174,20 +179,20 @@ auto CoordinatorData::RegisterMain(CoordinatorClientConfig config) -> RegisterMa
} }
auto CoordinatorData::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { auto CoordinatorData::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
// TODO: (andi) Test it std::lock_guard<utils::RWLock> lock{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.client_info_.InstanceName() == config.instance_name; return instance.InstanceName() == config.instance_name;
})) { })) {
return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS; return RegisterMainReplicaCoordinatorStatus::NAME_EXISTS;
} }
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) { if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
return instance.client_info_.SocketAddress() == config.ip_address + ":" + std::to_string(config.port); spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress());
return instance.SocketAddress() == config.SocketAddress();
})) { })) {
return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS; return RegisterMainReplicaCoordinatorStatus::ENDPOINT_EXISTS;
} }
// TODO: (andi) Improve this
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_, auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA); replication_coordination_glue::ReplicationRole::REPLICA);
instance->client_.StartFrequentCheck(); instance->client_.StartFrequentCheck();

View File

@ -13,18 +13,12 @@
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_config.hpp" #include "coordination/coordinator_config.hpp"
#include "flags/replication.hpp" #include "flags/replication.hpp"
#include "spdlog/spdlog.h" #include "spdlog/spdlog.h"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
#include <atomic>
#include <exception>
#include <optional>
namespace memgraph::coordination { namespace memgraph::coordination {
CoordinatorState::CoordinatorState() { CoordinatorState::CoordinatorState() {

View File

@ -16,25 +16,21 @@
#include "coordination/coordinator_config.hpp" #include "coordination/coordinator_config.hpp"
#include "rpc/client.hpp" #include "rpc/client.hpp"
#include "utils/scheduler.hpp" #include "utils/scheduler.hpp"
#include "utils/thread_pool.hpp"
#include <string_view>
namespace memgraph::coordination { namespace memgraph::coordination {
class CoordinatorData; class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
class CoordinatorClient { class CoordinatorClient {
public: public:
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo; using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
using ReplicationClientsInfo = std::vector<ReplClientInfo>; using ReplicationClientsInfo = std::vector<ReplClientInfo>;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb, explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb); HealthCheckCallback fail_cb);
~CoordinatorClient(); ~CoordinatorClient() = default;
CoordinatorClient(CoordinatorClient &) = delete; CoordinatorClient(CoordinatorClient &) = delete;
CoordinatorClient &operator=(CoordinatorClient const &) = delete; CoordinatorClient &operator=(CoordinatorClient const &) = delete;
@ -43,23 +39,17 @@ class CoordinatorClient {
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete; CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
void StartFrequentCheck(); void StartFrequentCheck();
void StopFrequentCheck();
void PauseFrequentCheck(); void PauseFrequentCheck();
void ResumeFrequentCheck(); void ResumeFrequentCheck();
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
// TODO: (andi) These several methods are probably not needed, Instance should own this info auto ReplicationClientInfo() const -> const std::optional<ReplClientInfo> &;
auto InstanceName() const -> std::string_view; auto ResetReplicationClientInfo() -> void;
auto SocketAddress() const -> std::string;
auto Config() const -> CoordinatorClientConfig const &;
auto ReplicationClientInfo() const -> ReplClientInfo const &;
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
auto SuccCallback() const -> HealthCheckCallback const &;
auto FailCallback() const -> HealthCheckCallback const &;
// Const &
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void; auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void; auto SetFailCallback(HealthCheckCallback fail_cb) -> void;

View File

@ -1,89 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_cluster_config.hpp"
#include "io/network/endpoint.hpp"
#include <atomic>
#include <chrono>
namespace memgraph::coordination {
// TODO: (andi) Fix ownerships with std::string_view
class CoordinatorClientInfo {
public:
CoordinatorClientInfo(std::string instance_name, std::string socket_address)
: last_response_time_(std::chrono::system_clock::now()),
is_alive_(true), // TODO: (andi) Maybe it should be false until the first ping
instance_name_(instance_name),
socket_address_(socket_address) {}
~CoordinatorClientInfo() = default;
CoordinatorClientInfo(const CoordinatorClientInfo &other)
: last_response_time_(other.last_response_time_.load()),
is_alive_(other.is_alive_.load()),
instance_name_(other.instance_name_),
socket_address_(other.socket_address_) {}
CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
if (this != &other) {
last_response_time_ = other.last_response_time_.load();
is_alive_ = other.is_alive_.load();
instance_name_ = other.instance_name_;
socket_address_ = other.socket_address_;
}
return *this;
}
CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
: last_response_time_(other.last_response_time_.load()),
is_alive_(other.is_alive_.load()),
instance_name_(std::move(other.instance_name_)),
socket_address_(std::move(other.socket_address_)) {}
CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
if (this != &other) {
last_response_time_.store(other.last_response_time_.load());
is_alive_ = other.is_alive_.load();
instance_name_ = std::move(other.instance_name_);
socket_address_ = std::move(other.socket_address_);
}
return *this;
}
auto UpdateInstanceStatus() -> bool {
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
last_response_time_.load(std::memory_order_acquire))
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto InstanceName() const -> std::string_view { return instance_name_; }
auto IsAlive() const -> bool { return is_alive_; }
auto SocketAddress() const -> std::string_view { return socket_address_; }
private:
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
std::atomic<bool> is_alive_{false};
// TODO: (andi) Who owns this info?
std::string instance_name_;
std::string socket_address_;
};
} // namespace memgraph::coordination
#endif

View File

@ -30,8 +30,9 @@ struct CoordinatorClientConfig {
uint16_t port{}; uint16_t port{};
std::chrono::seconds health_check_frequency_sec{1}; std::chrono::seconds health_check_frequency_sec{1};
auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); }
// Info which coordinator will send to new main when performing failover // Info which coordinator will send to new main when performing failover
// TODO: (andi) Does this correspond to replication data we receive at the registration beginning?
struct ReplicationClientInfo { struct ReplicationClientInfo {
// Must be the same as CoordinatorClientConfig's instance_name // Must be the same as CoordinatorClientConfig's instance_name
std::string instance_name; std::string instance_name;

View File

@ -13,8 +13,6 @@
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_client_info.hpp"
#include "coordination/coordinator_instance.hpp" #include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp" #include "coordination/coordinator_server.hpp"
@ -23,12 +21,8 @@
#include "utils/rw_lock.hpp" #include "utils/rw_lock.hpp"
#include <list> #include <list>
#include <memory>
#include <optional>
#include <string_view>
namespace memgraph::coordination { namespace memgraph::coordination {
class CoordinatorData { class CoordinatorData {
public: public:
CoordinatorData(); CoordinatorData();
@ -43,12 +37,7 @@ class CoordinatorData {
private: private:
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ}; mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
std::function<void(CoordinatorData *, std::string_view)> main_succ_cb_;
std::function<void(CoordinatorData *, std::string_view)> main_fail_cb_;
std::function<void(CoordinatorData *, std::string_view)> replica_succ_cb_;
std::function<void(CoordinatorData *, std::string_view)> replica_fail_cb_;
std::list<CoordinatorInstance> registered_instances_; std::list<CoordinatorInstance> registered_instances_;
}; };

View File

@ -14,21 +14,20 @@
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
#include "coordination/coordinator_client.hpp" #include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_client_info.hpp" #include "coordination/coordinator_cluster_config.hpp"
#include "replication_coordination_glue/role.hpp" #include "replication_coordination_glue/role.hpp"
namespace memgraph::coordination { namespace memgraph::coordination {
class CoordinatorData; class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
struct CoordinatorInstance { class CoordinatorInstance {
// TODO: (andi) Capture by const reference functions public:
CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb, CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb, replication_coordination_glue::ReplicationRole replication_role) HealthCheckCallback fail_cb, replication_coordination_glue::ReplicationRole replication_role)
: client_(data, config, succ_cb, fail_cb), : client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
client_info_(config.instance_name, config.ip_address + ":" + std::to_string(config.port)), replication_role_(replication_role),
replication_role_(replication_role) {} is_alive_(true) {}
CoordinatorInstance(CoordinatorInstance const &other) = delete; CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete; CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
@ -36,16 +35,28 @@ struct CoordinatorInstance {
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete; CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default; ~CoordinatorInstance() = default;
auto UpdateInstanceStatus() -> bool {
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
last_response_time_.load(std::memory_order_acquire))
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto InstanceName() const -> std::string { return client_.InstanceName(); }
auto SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto IsAlive() const -> bool { return is_alive_; }
auto IsReplica() const -> bool { auto IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA; return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
} }
auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; } auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; }
CoordinatorClient client_; CoordinatorClient client_;
CoordinatorClientInfo client_info_;
replication_coordination_glue::ReplicationRole replication_role_; replication_coordination_glue::ReplicationRole replication_role_;
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
std::atomic<bool> is_alive_{false};
// TODO: (andi) Make this better
friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) { friend bool operator==(CoordinatorInstance const &first, CoordinatorInstance const &second) {
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_; return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
} }

View File

@ -20,8 +20,8 @@
namespace memgraph::coordination { namespace memgraph::coordination {
struct CoordinatorInstanceStatus { struct CoordinatorInstanceStatus {
std::string_view instance_name; std::string instance_name;
std::string_view socket_address; std::string socket_address;
bool is_alive; bool is_alive;
}; };

View File

@ -26,7 +26,7 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) {
server.Register<coordination::PromoteReplicaToMainRpc>( server.Register<coordination::PromoteReplicaToMainRpc>(
[&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { [&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received PromoteReplicaToMainRpc from coordinator server"); spdlog::info("Received PromoteReplicaToMainRpc");
CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder); CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder);
}); });
} }

View File

@ -59,11 +59,11 @@ def test_simple_automatic_failover(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connection(7687, "instance_3").cursor() main_cursor = connection(7687, "instance_3").cursor()
expected_data_on_main = { expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
} ]
actual_data_on_main = set(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")) actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main assert actual_data_on_main == expected_data_on_main
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
@ -71,25 +71,49 @@ def test_simple_automatic_failover(connection):
coord_cursor = connection(7690, "coordinator").cursor() coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data_show_repl_cluster(): def retrieve_data_show_repl_cluster():
return set(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")) return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_on_coord = { expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"), ("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"), ("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "main"), # TODO: (andi) Include or exclude dead main from the result? ("instance_3", "127.0.0.1:10013", False, "replica"),
} ]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor() new_main_cursor = connection(7688, "instance_1").cursor()
def retrieve_data_show_replicas(): def retrieve_data_show_replicas():
return set(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")) return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = { expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
} ]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
def test_registering_replica_fails_name_exists(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10051' WITH COORDINATOR SERVER ON '127.0.0.1:10111';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!"
def test_registering_replica_fails_endpoint_exists(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER REPLICA instance_5 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10013';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!"
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))