Refactoring of coord state
This commit is contained in:
parent
afe7d47a5c
commit
986ea37ead
@ -7,7 +7,6 @@ target_sources(mg-coordination
|
||||
include/coordination/coordinator_rpc.hpp
|
||||
include/coordination/coordinator_server.hpp
|
||||
include/coordination/coordinator_config.hpp
|
||||
include/coordination/coordinator_entity_info.hpp
|
||||
include/coordination/coordinator_exceptions.hpp
|
||||
include/coordination/coordinator_slk.hpp
|
||||
include/coordination/constants.hpp
|
||||
|
@ -27,11 +27,15 @@ auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig &
|
||||
}
|
||||
} // namespace
|
||||
|
||||
CoordinatorClient::CoordinatorClient(const CoordinatorClientConfig &config)
|
||||
CoordinatorClient::CoordinatorClient(CoordinatorClientConfig config,
|
||||
std::function<void(std::string_view)> freq_check_cb)
|
||||
: rpc_context_{CreateClientContext(config)},
|
||||
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
|
||||
&rpc_context_},
|
||||
config_{config} {}
|
||||
config_{std::move(config)},
|
||||
freq_check_cb_{std::move(freq_check_cb)} {
|
||||
StartFrequentCheck();
|
||||
}
|
||||
|
||||
CoordinatorClient::~CoordinatorClient() {
|
||||
auto exit_job = utils::OnScopeExit([&] {
|
||||
@ -46,22 +50,27 @@ CoordinatorClient::~CoordinatorClient() {
|
||||
void CoordinatorClient::StartFrequentCheck() {
|
||||
MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0),
|
||||
"Health check frequency must be greater than 0");
|
||||
replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [rpc_client = &rpc_client_] {
|
||||
try {
|
||||
auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
||||
stream.AwaitResponse();
|
||||
// last_response_time->store(std::chrono::system_clock::now(), std::memory_order_acq_rel);
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
// Nothing to do...wait for a reconnect
|
||||
}
|
||||
});
|
||||
|
||||
replica_checker_.Run(
|
||||
"Coord checker", config_.health_check_frequency_sec,
|
||||
[instance_name = config_.instance_name, rpc_client = &rpc_client_, freq_check_cb = freq_check_cb_] {
|
||||
try {
|
||||
auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
||||
stream.AwaitResponse();
|
||||
freq_check_cb(instance_name);
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
// Nothing to do...wait for a reconnect
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
|
||||
|
||||
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
|
||||
auto CoordinatorClient::Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); }
|
||||
auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); }
|
||||
// TODO: remove this method and implement copy constructor
|
||||
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
|
||||
auto CoordinatorClient::Callback() const -> std::function<void(std::string_view)> const & { return freq_check_cb_; }
|
||||
|
||||
////// AF design choice
|
||||
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & {
|
||||
|
@ -16,7 +16,6 @@
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
#include "coordination/coordinator_cluster_config.hpp"
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "flags/replication.hpp"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "utils/logging.hpp"
|
||||
@ -53,8 +52,10 @@ CoordinatorState::CoordinatorState() {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: Don't return client, start here the client
|
||||
auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus {
|
||||
auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Coordinator cannot register replica since variant holds wrong alternative");
|
||||
|
||||
const auto name_endpoint_status =
|
||||
std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
|
||||
return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR;
|
||||
@ -72,32 +73,42 @@ auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) ->
|
||||
return name_endpoint_status;
|
||||
}
|
||||
|
||||
auto callback = [&]() -> void {
|
||||
auto freq_check_cb = [&](std::string_view instance_name) -> void {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||
|
||||
auto replica_client_info = std::ranges::find_if(
|
||||
registered_replicas_info,
|
||||
[&config](const CoordinatorClientInfo &replica) { return replica.instance_name_ == config.instance_name; });
|
||||
[instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; });
|
||||
|
||||
MG_ASSERT(replica_client_info == registered_replicas_info.end(), "Replica {} not found in registered replicas info",
|
||||
config.instance_name);
|
||||
// TODO: Refactor so that we just know about instances, nothing more. Here after failover it can happen that replica
|
||||
// has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in
|
||||
// registered replicas info",
|
||||
// instance_name);
|
||||
|
||||
auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now() - replica_client_info->last_response_time_.load(std::memory_order_acquire));
|
||||
|
||||
replica_client_info->is_alive_ =
|
||||
sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||
if (replica_client_info == registered_replicas_info.end()) {
|
||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||
MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main...");
|
||||
registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
|
||||
} else {
|
||||
replica_client_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
|
||||
}
|
||||
};
|
||||
|
||||
// Maybe no need to return client if you can start replica client here
|
||||
auto *replica_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(config);
|
||||
replica_client->StartFrequentCheck();
|
||||
auto *coord_client =
|
||||
&std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(std::move(config), std::move(freq_check_cb));
|
||||
|
||||
std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(),
|
||||
coord_client->Endpoint());
|
||||
|
||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
|
||||
auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus {
|
||||
auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Coordinator cannot register main since variant holds wrong alternative");
|
||||
|
||||
const auto endpoint_status = std::visit(
|
||||
memgraph::utils::Overloaded{
|
||||
[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) {
|
||||
@ -110,87 +121,86 @@ auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) -> Re
|
||||
return endpoint_status;
|
||||
}
|
||||
|
||||
auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
registered_main = std::make_unique<CoordinatorClient>(config);
|
||||
|
||||
auto cb = [&]() -> void {
|
||||
auto freq_check_cb = [&](std::string_view instance_name) -> void {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't execute CoordinatorClient's callback since variant holds wrong alternative");
|
||||
MG_ASSERT(std::get<CoordinatorData>(data_).registered_main_info_.has_value(),
|
||||
"Main info is not set, but callback is called");
|
||||
|
||||
// TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at
|
||||
// this point....
|
||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||
MG_ASSERT(registered_main_info.instance_name_ == config.instance_name,
|
||||
"Registered main instance name {} does not match config instance name {}",
|
||||
registered_main_info.instance_name_, config.instance_name);
|
||||
MG_ASSERT(registered_main_info->instance_name_ == instance_name,
|
||||
"Callback called for wrong instance name: {}, expected: {}", instance_name,
|
||||
registered_main_info->instance_name_);
|
||||
|
||||
auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now() - registered_main_info.last_response_time_.load(std::memory_order_acquire));
|
||||
registered_main_info.is_alive_ =
|
||||
sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||
registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release);
|
||||
|
||||
if (!registered_main_info.is_alive_) {
|
||||
spdlog::warn("Main is not alive, starting failover");
|
||||
switch (auto failover_status = DoFailover(); failover_status) {
|
||||
using enum DoFailoverStatus;
|
||||
case ALL_REPLICAS_DOWN:
|
||||
spdlog::warn("Failover aborted since all replicas are down!");
|
||||
case MAIN_ALIVE:
|
||||
spdlog::warn("Failover aborted since main is alive!");
|
||||
case CLUSTER_UNINITIALIZED:
|
||||
spdlog::warn("Failover aborted since cluster is uninitialized!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
}
|
||||
// if (!registered_main_info->is_alive_) {
|
||||
// spdlog::warn("Main is not alive, starting failover");
|
||||
// switch (auto failover_status = DoFailover(); failover_status) {
|
||||
// using enum DoFailoverStatus;
|
||||
// case ALL_REPLICAS_DOWN:
|
||||
// spdlog::warn("Failover aborted since all replicas are down!");
|
||||
// case MAIN_ALIVE:
|
||||
// spdlog::warn("Failover aborted since main is alive!");
|
||||
// case CLUSTER_UNINITIALIZED:
|
||||
// spdlog::warn("Failover aborted since cluster is uninitialized!");
|
||||
// case SUCCESS:
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
};
|
||||
|
||||
registered_main->StartFrequentCheck();
|
||||
auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
registered_main = std::make_unique<CoordinatorClient>(std::move(config), std::move(freq_check_cb));
|
||||
|
||||
auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||
registered_main_info.emplace(registered_main->InstanceName(), registered_main->Endpoint());
|
||||
|
||||
return RegisterMainReplicaCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
|
||||
auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorEntityInfo> {
|
||||
auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorInstanceStatus> {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't call show replicas on data_, as variant holds wrong alternative");
|
||||
std::vector<CoordinatorEntityInfo> result;
|
||||
const auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||
result.reserve(registered_replicas.size());
|
||||
std::ranges::transform(registered_replicas, std::back_inserter(result), [](const auto &replica) {
|
||||
return CoordinatorEntityInfo{replica.InstanceName(), replica.Endpoint()};
|
||||
});
|
||||
return result;
|
||||
const auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||
std::vector<CoordinatorInstanceStatus> instances_status;
|
||||
instances_status.reserve(registered_replicas_info.size());
|
||||
|
||||
std::ranges::transform(
|
||||
registered_replicas_info, std::back_inserter(instances_status),
|
||||
[](const CoordinatorClientInfo &coord_client_info) {
|
||||
const auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now() -
|
||||
coord_client_info.last_response_time_.load(std::memory_order_acquire))
|
||||
.count();
|
||||
|
||||
return CoordinatorInstanceStatus{
|
||||
.instance_name = coord_client_info.instance_name_,
|
||||
.socket_address = coord_client_info.endpoint->SocketAddress(),
|
||||
.is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_};
|
||||
});
|
||||
return instances_status;
|
||||
}
|
||||
|
||||
auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorEntityInfo> {
|
||||
auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStatus> {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't call show main on data_, as variant holds wrong alternative");
|
||||
const auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
if (registered_main) {
|
||||
return CoordinatorEntityInfo{registered_main->InstanceName(), registered_main->Endpoint()};
|
||||
const auto &main = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||
if (!main.has_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
const auto sec_since_last_response =
|
||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
|
||||
main->last_response_time_.load(std::memory_order_acquire))
|
||||
.count();
|
||||
|
||||
auto CoordinatorState::PingReplicas() const -> std::unordered_map<std::string_view, bool> {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't call ping replicas on data_, as variant holds wrong alternative");
|
||||
std::unordered_map<std::string_view, bool> result;
|
||||
const auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||
result.reserve(registered_replicas.size());
|
||||
// for (const CoordinatorClient &replica_client : registered_replicas) {
|
||||
// result.emplace(replica_client.InstanceName(), replica_client.DoHealthCheck());
|
||||
// }
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealthInfo> {
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_),
|
||||
"Can't call show main on data_, as variant holds wrong alternative");
|
||||
const auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
if (registered_main) {
|
||||
// return CoordinatorEntityHealthInfo{registered_main->InstanceName(), registered_main->DoHealthCheck()};
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
return CoordinatorInstanceStatus{
|
||||
.instance_name = main->instance_name_,
|
||||
.socket_address = main->endpoint->SocketAddress(),
|
||||
.is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_};
|
||||
};
|
||||
|
||||
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
|
||||
// 1. MAIN is already down, stop sending frequent checks
|
||||
@ -201,32 +211,53 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth
|
||||
// 6. remove replica which was promoted to main from all replicas -> this will shut down RPC frequent check client
|
||||
// (coordinator)
|
||||
// 7. for new main start frequent checks (coordinator)
|
||||
/*
|
||||
|
||||
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative");
|
||||
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
||||
|
||||
// 1.
|
||||
auto ¤t_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
auto ¤t_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
|
||||
|
||||
if (!current_main) {
|
||||
if (!current_main_info.has_value()) {
|
||||
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
|
||||
}
|
||||
|
||||
if (current_main->DoHealthCheck()) {
|
||||
auto sec_since_last_response_main =
|
||||
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() -
|
||||
current_main_info->last_response_time_.load())
|
||||
.count();
|
||||
|
||||
if (sec_since_last_response_main <= CoordinatorClusterConfig::alive_response_time_difference_sec_) {
|
||||
return DoFailoverStatus::MAIN_ALIVE;
|
||||
}
|
||||
|
||||
auto ¤t_main = std::get<CoordinatorData>(data_).registered_main_;
|
||||
// TODO: stop pinging as soon as you figure out that failover is needed
|
||||
current_main->StopFrequentCheck();
|
||||
|
||||
// 2.
|
||||
// Get all replicas and find new main
|
||||
auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||
auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
|
||||
|
||||
const auto chosen_replica = std::ranges::find_if(
|
||||
registered_replicas, [](const CoordinatorClient &replica) { return replica.DoHealthCheck(); });
|
||||
if (chosen_replica == registered_replicas.end()) {
|
||||
const auto chosen_replica_info =
|
||||
std::ranges::find_if(registered_replicas_info, [](const CoordinatorClientInfo &client_info) {
|
||||
auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now() - client_info.last_response_time_.load())
|
||||
.count();
|
||||
return sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_;
|
||||
});
|
||||
if (chosen_replica_info == registered_replicas_info.end()) {
|
||||
return DoFailoverStatus::ALL_REPLICAS_DOWN;
|
||||
}
|
||||
|
||||
auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
|
||||
const auto chosen_replica =
|
||||
std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
|
||||
return replica.InstanceName() == chosen_replica_info->instance_name_;
|
||||
});
|
||||
MG_ASSERT(chosen_replica != registered_replicas.end(), "Chosen replica {} not found in registered replicas",
|
||||
chosen_replica_info->instance_name_);
|
||||
|
||||
std::vector<ReplicationClientInfo> repl_clients_info;
|
||||
repl_clients_info.reserve(registered_replicas.size() - 1);
|
||||
std::ranges::for_each(registered_replicas, [&chosen_replica, &repl_clients_info](const CoordinatorClient &replica) {
|
||||
@ -239,9 +270,9 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth
|
||||
// Set on coordinator data of new main
|
||||
// allocate resources for new main, clear replication info on this replica as main
|
||||
// set last response time
|
||||
auto potential_new_main = std::make_unique<CoordinatorClient>(chosen_replica->Config());
|
||||
auto potential_new_main = std::make_unique<CoordinatorClient>(chosen_replica->Config(), chosen_replica->Callback());
|
||||
potential_new_main->ReplicationClientInfo().reset();
|
||||
potential_new_main->UpdateTimeCheck(chosen_replica->GetLastTimeResponse());
|
||||
auto potential_new_main_info = *chosen_replica_info;
|
||||
|
||||
// 4.
|
||||
if (!chosen_replica->SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
|
||||
@ -252,15 +283,14 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth
|
||||
|
||||
// 5.
|
||||
current_main = std::move(potential_new_main);
|
||||
current_main_info.emplace(potential_new_main_info);
|
||||
|
||||
// 6. remove old replica
|
||||
// TODO: Stop pinging chosen_replica before failover.
|
||||
// Check that it doesn't fail when you call StopFrequentCheck if it is already stopped
|
||||
registered_replicas.erase(chosen_replica);
|
||||
registered_replicas_info.erase(chosen_replica_info);
|
||||
|
||||
// 7.
|
||||
current_main->StartFrequentCheck();
|
||||
*/
|
||||
return DoFailoverStatus::SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ class CoordinatorClient {
|
||||
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
|
||||
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
|
||||
|
||||
explicit CoordinatorClient(const CoordinatorClientConfig &config);
|
||||
explicit CoordinatorClient(CoordinatorClientConfig config, std::function<void(std::string_view)> freq_check_cb);
|
||||
|
||||
~CoordinatorClient();
|
||||
|
||||
@ -43,10 +43,12 @@ class CoordinatorClient {
|
||||
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
|
||||
|
||||
auto InstanceName() const -> std::string_view;
|
||||
auto Endpoint() const -> io::network::Endpoint const &;
|
||||
auto Endpoint() const -> const io::network::Endpoint *;
|
||||
auto Config() const -> CoordinatorClientConfig const &;
|
||||
auto ReplicationClientInfo() const -> ReplClientInfo const &;
|
||||
auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &;
|
||||
// TODO: We should add copy constructor and then there won't be need for this
|
||||
auto Callback() const -> std::function<void(std::string_view)> const &;
|
||||
|
||||
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
|
||||
return first.config_ == second.config_;
|
||||
@ -60,6 +62,7 @@ class CoordinatorClient {
|
||||
mutable rpc::Client rpc_client_;
|
||||
|
||||
CoordinatorClientConfig config_;
|
||||
std::function<void(std::string_view)> freq_check_cb_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -13,16 +13,52 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
// TODO: better connect this
|
||||
struct CoordinatorClientInfo {
|
||||
CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint)
|
||||
: last_response_time_(std::chrono::system_clock::now()), instance_name_(instance_name), endpoint(endpoint) {}
|
||||
|
||||
~CoordinatorClientInfo() = default;
|
||||
|
||||
CoordinatorClientInfo(const CoordinatorClientInfo &other)
|
||||
: last_response_time_(other.last_response_time_.load()),
|
||||
instance_name_(other.instance_name_),
|
||||
endpoint(other.endpoint) {}
|
||||
|
||||
CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) {
|
||||
if (this != &other) {
|
||||
last_response_time_.store(other.last_response_time_.load());
|
||||
instance_name_ = other.instance_name_;
|
||||
endpoint = other.endpoint;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept
|
||||
: last_response_time_(other.last_response_time_.load()),
|
||||
instance_name_(other.instance_name_),
|
||||
endpoint(other.endpoint) {}
|
||||
|
||||
CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept {
|
||||
if (this != &other) {
|
||||
last_response_time_.store(other.last_response_time_.load());
|
||||
instance_name_ = other.instance_name_;
|
||||
endpoint = other.endpoint;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
/// TODO: Add a method is_alive
|
||||
|
||||
std::atomic<std::chrono::system_clock::time_point> last_response_time_{};
|
||||
bool is_alive_{false};
|
||||
std::string_view instance_name_;
|
||||
const io::network::Endpoint *endpoint;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -25,16 +25,14 @@ namespace memgraph::coordination {
|
||||
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
|
||||
|
||||
struct CoordinatorClientConfig {
|
||||
const std::string instance_name;
|
||||
const std::string ip_address;
|
||||
const uint16_t port{};
|
||||
|
||||
// Frequency with which coordinator pings main/replicas about it status
|
||||
const std::chrono::seconds health_check_frequency_sec{1};
|
||||
std::string instance_name;
|
||||
std::string ip_address;
|
||||
uint16_t port{};
|
||||
std::chrono::seconds health_check_frequency_sec{1};
|
||||
|
||||
// Info which coordinator will send to new main when performing failover
|
||||
struct ReplicationClientInfo {
|
||||
// Should be the same as CoordinatorClientConfig's instance_name
|
||||
// Must be the same as CoordinatorClientConfig's instance_name
|
||||
std::string instance_name;
|
||||
replication_coordination_glue::ReplicationMode replication_mode{};
|
||||
std::string replication_ip_address;
|
||||
@ -46,13 +44,13 @@ struct CoordinatorClientConfig {
|
||||
std::optional<ReplicationClientInfo> replication_client_info;
|
||||
|
||||
struct SSL {
|
||||
const std::string key_file;
|
||||
const std::string cert_file;
|
||||
std::string key_file;
|
||||
std::string cert_file;
|
||||
|
||||
friend bool operator==(const SSL &, const SSL &) = default;
|
||||
};
|
||||
|
||||
const std::optional<SSL> ssl;
|
||||
std::optional<SSL> ssl;
|
||||
|
||||
friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default;
|
||||
};
|
||||
|
@ -19,14 +19,10 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
struct CoordinatorEntityInfo {
|
||||
std::string_view name;
|
||||
const io::network::Endpoint &endpoint;
|
||||
};
|
||||
|
||||
struct CoordinatorEntityHealthInfo {
|
||||
std::string_view name;
|
||||
bool alive;
|
||||
struct CoordinatorInstanceStatus {
|
||||
std::string_view instance_name;
|
||||
std::string socket_address;
|
||||
bool is_alive;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
@ -15,7 +15,7 @@
|
||||
|
||||
#include "coordination/coordinator_client.hpp"
|
||||
#include "coordination/coordinator_client_info.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "coordination/coordinator_instance_status.hpp"
|
||||
#include "coordination/coordinator_server.hpp"
|
||||
#include "coordination/failover_status.hpp"
|
||||
#include "coordination/register_main_replica_coordinator_status.hpp"
|
||||
@ -40,17 +40,13 @@ class CoordinatorState {
|
||||
CoordinatorState(CoordinatorState &&) noexcept = delete;
|
||||
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
|
||||
|
||||
[[nodiscard]] auto RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus;
|
||||
[[nodiscard]] auto RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
[[nodiscard]] auto RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus;
|
||||
[[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto ShowReplicas() const -> std::vector<CoordinatorEntityInfo>;
|
||||
auto ShowReplicas() const -> std::vector<CoordinatorInstanceStatus>;
|
||||
|
||||
auto PingReplicas() const -> std::unordered_map<std::string_view, bool>;
|
||||
|
||||
auto ShowMain() const -> std::optional<CoordinatorEntityInfo>;
|
||||
|
||||
auto PingMain() const -> std::optional<CoordinatorEntityHealthInfo>;
|
||||
auto ShowMain() const -> std::optional<CoordinatorInstanceStatus>;
|
||||
|
||||
// The client code must check that the server exists before calling this method.
|
||||
auto GetCoordinatorServer() const -> CoordinatorServer &;
|
||||
@ -63,7 +59,7 @@ class CoordinatorState {
|
||||
std::list<CoordinatorClient> registered_replicas_;
|
||||
std::vector<CoordinatorClientInfo> registered_replicas_info_;
|
||||
std::unique_ptr<CoordinatorClient> registered_main_;
|
||||
CoordinatorClientInfo registered_main_info_;
|
||||
std::optional<CoordinatorClientInfo> registered_main_info_;
|
||||
};
|
||||
|
||||
struct CoordinatorMainReplicaData {
|
||||
|
@ -19,32 +19,24 @@ namespace memgraph::dbms {
|
||||
|
||||
CoordinatorHandler::CoordinatorHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {}
|
||||
|
||||
auto CoordinatorHandler::RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config)
|
||||
auto CoordinatorHandler::RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus {
|
||||
return dbms_handler_.CoordinatorState().RegisterReplica(config);
|
||||
return dbms_handler_.CoordinatorState().RegisterReplica(std::move(config));
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::RegisterMainOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config)
|
||||
auto CoordinatorHandler::RegisterMainOnCoordinator(memgraph::coordination::CoordinatorClientConfig config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus {
|
||||
return dbms_handler_.CoordinatorState().RegisterMain(config);
|
||||
return dbms_handler_.CoordinatorState().RegisterMain(std::move(config));
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo> {
|
||||
auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorInstanceStatus> {
|
||||
return dbms_handler_.CoordinatorState().ShowReplicas();
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::PingReplicasOnCoordinator() const -> std::unordered_map<std::string_view, bool> {
|
||||
return dbms_handler_.CoordinatorState().PingReplicas();
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityInfo> {
|
||||
auto CoordinatorHandler::ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorInstanceStatus> {
|
||||
return dbms_handler_.CoordinatorState().ShowMain();
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::PingMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityHealthInfo> {
|
||||
return dbms_handler_.CoordinatorState().PingMain();
|
||||
}
|
||||
|
||||
auto CoordinatorHandler::DoFailover() const -> coordination::DoFailoverStatus {
|
||||
return dbms_handler_.CoordinatorState().DoFailover();
|
||||
}
|
||||
|
@ -16,7 +16,7 @@
|
||||
#include "utils/result.hpp"
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "coordination/coordinator_instance_status.hpp"
|
||||
#include "coordination/failover_status.hpp"
|
||||
#include "coordination/register_main_replica_coordinator_status.hpp"
|
||||
|
||||
@ -32,19 +32,15 @@ class CoordinatorHandler {
|
||||
public:
|
||||
explicit CoordinatorHandler(DbmsHandler &dbms_handler);
|
||||
|
||||
auto RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config)
|
||||
auto RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto RegisterMainOnCoordinator(const coordination::CoordinatorClientConfig &config)
|
||||
auto RegisterMainOnCoordinator(coordination::CoordinatorClientConfig config)
|
||||
-> coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
|
||||
auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo>;
|
||||
auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorInstanceStatus>;
|
||||
|
||||
auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityInfo>;
|
||||
|
||||
auto PingReplicasOnCoordinator() const -> std::unordered_map<std::string_view, bool>;
|
||||
|
||||
auto PingMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityHealthInfo>;
|
||||
auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorInstanceStatus>;
|
||||
|
||||
auto DoFailover() const -> coordination::DoFailoverStatus;
|
||||
|
||||
|
@ -110,7 +110,6 @@
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "coordination/constants.hpp"
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#endif
|
||||
|
||||
namespace memgraph::metrics {
|
||||
@ -487,7 +486,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
.replication_ip_address = replication_ip,
|
||||
.replication_port = replication_port};
|
||||
|
||||
const auto coordinator_client_config =
|
||||
auto coordinator_client_config =
|
||||
coordination::CoordinatorClientConfig{.instance_name = instance_name,
|
||||
.ip_address = coordinator_server_ip,
|
||||
.port = coordinator_server_port,
|
||||
@ -495,7 +494,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
.replication_client_info = repl_config,
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto status = coordinator_handler_.RegisterReplicaOnCoordinator(coordinator_client_config);
|
||||
auto status = coordinator_handler_.RegisterReplicaOnCoordinator(std::move(coordinator_client_config));
|
||||
switch (status) {
|
||||
using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
case NAME_EXISTS:
|
||||
@ -521,13 +520,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
throw QueryRuntimeException("Invalid socket address!");
|
||||
}
|
||||
const auto [ip, port] = *maybe_ip_and_port;
|
||||
const auto config = coordination::CoordinatorClientConfig{.instance_name = instance_name,
|
||||
.ip_address = ip,
|
||||
.port = port,
|
||||
.health_check_frequency_sec = instance_check_frequency,
|
||||
.ssl = std::nullopt};
|
||||
auto coordinator_client_config =
|
||||
coordination::CoordinatorClientConfig{.instance_name = instance_name,
|
||||
.ip_address = ip,
|
||||
.port = port,
|
||||
.health_check_frequency_sec = instance_check_frequency,
|
||||
.ssl = std::nullopt};
|
||||
|
||||
auto status = coordinator_handler_.RegisterMainOnCoordinator(config);
|
||||
auto status = coordinator_handler_.RegisterMainOnCoordinator(std::move(coordinator_client_config));
|
||||
switch (status) {
|
||||
using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus;
|
||||
case NAME_EXISTS:
|
||||
@ -564,20 +564,17 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Remove this method, probably not needed.
|
||||
std::vector<MainReplicaStatus> ShowMainReplicaStatus(
|
||||
const std::vector<coordination::CoordinatorEntityInfo> &replicas,
|
||||
const std::unordered_map<std::string_view, bool> &health_check_replicas,
|
||||
const std::optional<coordination::CoordinatorEntityInfo> &main,
|
||||
const std::optional<coordination::CoordinatorEntityHealthInfo> &health_check_main) const override {
|
||||
const std::vector<coordination::CoordinatorInstanceStatus> &replicas,
|
||||
const std::optional<coordination::CoordinatorInstanceStatus> &main) const override {
|
||||
std::vector<MainReplicaStatus> result{};
|
||||
result.reserve(replicas.size() + 1); // replicas + 1 main
|
||||
std::ranges::transform(
|
||||
replicas, std::back_inserter(result), [&health_check_replicas](const auto &replica) -> MainReplicaStatus {
|
||||
return {replica.name, replica.endpoint.SocketAddress(), health_check_replicas.at(replica.name), false};
|
||||
});
|
||||
std::ranges::transform(replicas, std::back_inserter(result), [](const auto &replica) -> MainReplicaStatus {
|
||||
return {replica.instance_name, replica.socket_address, replica.is_alive, false};
|
||||
});
|
||||
if (main) {
|
||||
bool is_main_alive = health_check_main.has_value() ? health_check_main.value().alive : false;
|
||||
result.emplace_back(main->name, main->endpoint.SocketAddress(), is_main_alive, true);
|
||||
result.emplace_back(main->instance_name, main->socket_address, main->is_alive, true);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -585,21 +582,13 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
#endif
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
std::vector<coordination::CoordinatorEntityInfo> ShowReplicasOnCoordinator() const override {
|
||||
std::vector<coordination::CoordinatorInstanceStatus> ShowReplicasOnCoordinator() const override {
|
||||
return coordinator_handler_.ShowReplicasOnCoordinator();
|
||||
}
|
||||
|
||||
std::unordered_map<std::string_view, bool> PingReplicasOnCoordinator() const override {
|
||||
return coordinator_handler_.PingReplicasOnCoordinator();
|
||||
}
|
||||
|
||||
std::optional<coordination::CoordinatorEntityInfo> ShowMainOnCoordinator() const override {
|
||||
std::optional<coordination::CoordinatorInstanceStatus> ShowMainOnCoordinator() const override {
|
||||
return coordinator_handler_.ShowMainOnCoordinator();
|
||||
}
|
||||
|
||||
std::optional<coordination::CoordinatorEntityHealthInfo> PingMainOnCoordinator() const override {
|
||||
return coordinator_handler_.PingMainOnCoordinator();
|
||||
}
|
||||
#endif
|
||||
|
||||
private:
|
||||
@ -1094,11 +1083,18 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
"High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to "
|
||||
"be able to use this functionality.");
|
||||
}
|
||||
|
||||
if (!FLAGS_coordinator) {
|
||||
throw QueryRuntimeException("Only coordinator can register coordinator server!");
|
||||
}
|
||||
|
||||
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
|
||||
// the argument to Callback.
|
||||
EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters};
|
||||
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
|
||||
auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator);
|
||||
auto replication_socket_address_tv = coordinator_query->socket_address_->Accept(evaluator);
|
||||
|
||||
callback.fn =
|
||||
[handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, replication_socket_address_tv,
|
||||
replica_check_frequency = config.replication_replica_check_frequency,
|
||||
@ -1132,10 +1128,8 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
|
||||
callback.header = {"name", "socket_address", "alive", "role"};
|
||||
callback.fn = [handler = CoordQueryHandler{dbms_handler}, replica_nfields = callback.header.size()]() mutable {
|
||||
const auto main = handler.ShowMainOnCoordinator();
|
||||
const auto health_check_main = main ? handler.PingMainOnCoordinator() : std::nullopt;
|
||||
const auto result_status = handler.ShowMainReplicaStatus(
|
||||
handler.ShowReplicasOnCoordinator(), handler.PingReplicasOnCoordinator(), main, health_check_main);
|
||||
const auto result_status =
|
||||
handler.ShowMainReplicaStatus(handler.ShowReplicasOnCoordinator(), handler.ShowMainOnCoordinator());
|
||||
std::vector<std::vector<TypedValue>> result{};
|
||||
result.reserve(result_status.size());
|
||||
|
||||
@ -1166,10 +1160,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
callback.header = {"name", "socket_address", "alive", "role"};
|
||||
callback.fn = [handler = CoordQueryHandler{dbms_handler}]() mutable {
|
||||
handler.DoFailover();
|
||||
const auto main = handler.ShowMainOnCoordinator();
|
||||
const auto health_check_main = main ? handler.PingMainOnCoordinator() : std::nullopt;
|
||||
const auto result_status = handler.ShowMainReplicaStatus(
|
||||
handler.ShowReplicasOnCoordinator(), handler.PingReplicasOnCoordinator(), main, health_check_main);
|
||||
|
||||
const auto result_status =
|
||||
handler.ShowMainReplicaStatus(handler.ShowReplicasOnCoordinator(), handler.ShowMainOnCoordinator());
|
||||
std::vector<std::vector<TypedValue>> result{};
|
||||
result.reserve(result_status.size());
|
||||
|
||||
@ -1180,6 +1173,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
|
||||
});
|
||||
return result;
|
||||
};
|
||||
|
||||
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DO_FAILOVER,
|
||||
"DO FAILOVER called on coordinator.");
|
||||
return callback;
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "coordination/coordinator_entity_info.hpp"
|
||||
#include "dbms/database.hpp"
|
||||
#include "dbms/dbms_handler.hpp"
|
||||
#include "memory/query_memory_control.hpp"
|
||||
@ -53,6 +52,10 @@
|
||||
#include "utils/timer.hpp"
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
#include "coordination/coordinator_instance_status.hpp"
|
||||
#endif
|
||||
|
||||
namespace memgraph::metrics {
|
||||
extern const Event FailedQuery;
|
||||
extern const Event FailedPrepare;
|
||||
@ -114,26 +117,18 @@ class CoordinatorQueryHandler {
|
||||
const std::string &instance_name) = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual std::vector<coordination::CoordinatorEntityInfo> ShowReplicasOnCoordinator() const = 0;
|
||||
virtual std::vector<coordination::CoordinatorInstanceStatus> ShowReplicasOnCoordinator() const = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual std::optional<coordination::CoordinatorEntityInfo> ShowMainOnCoordinator() const = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual std::unordered_map<std::string_view, bool> PingReplicasOnCoordinator() const = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual std::optional<coordination::CoordinatorEntityHealthInfo> PingMainOnCoordinator() const = 0;
|
||||
virtual std::optional<coordination::CoordinatorInstanceStatus> ShowMainOnCoordinator() const = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual void DoFailover() const = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual std::vector<MainReplicaStatus> ShowMainReplicaStatus(
|
||||
const std::vector<coordination::CoordinatorEntityInfo> &replicas,
|
||||
const std::unordered_map<std::string_view, bool> &health_check_replicas,
|
||||
const std::optional<coordination::CoordinatorEntityInfo> &main,
|
||||
const std::optional<coordination::CoordinatorEntityHealthInfo> &health_check_main) const = 0;
|
||||
const std::vector<coordination::CoordinatorInstanceStatus> &replicas,
|
||||
const std::optional<coordination::CoordinatorInstanceStatus> &main) const = 0;
|
||||
|
||||
#endif
|
||||
};
|
||||
|
@ -155,29 +155,13 @@ def test_simple_client_initiated_failover(connection):
|
||||
|
||||
|
||||
def test_failover_fails_all_replicas_down(connection):
|
||||
# 1. Start all instances
|
||||
# 2. Kill all replicas
|
||||
# 3. Kill main
|
||||
# 4. Run DO FAILOVER on COORDINATOR. Assert exception is being thrown due to all replicas being down
|
||||
# 5. Assert cluster status didn't change
|
||||
|
||||
# 1.
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
||||
# 2.
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
|
||||
|
||||
# 3.
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
|
||||
|
||||
coord_cursor = connection(7690, "coordinator").cursor()
|
||||
# 4.
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(coord_cursor, "DO FAILOVER;")
|
||||
assert str(e.value) == "Failover aborted since all replicas are down!"
|
||||
|
||||
# 5.
|
||||
|
||||
def retrieve_data():
|
||||
return set(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))
|
||||
@ -189,16 +173,17 @@ def test_failover_fails_all_replicas_down(connection):
|
||||
}
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
|
||||
|
||||
# 4.
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(coord_cursor, "DO FAILOVER;")
|
||||
assert str(e.value) == "Failover aborted since all replicas are down!"
|
||||
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
|
||||
|
||||
|
||||
def test_failover_fails_main_is_alive(connection):
|
||||
# 1. Start all instances
|
||||
# 2. Run DO FAILOVER on COORDINATOR. Assert exception is being thrown due to main is still live.
|
||||
# 3. Assert cluster status didn't change
|
||||
|
||||
# 1.
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
||||
# 2.
|
||||
coord_cursor = connection(7690, "coordinator").cursor()
|
||||
|
||||
def retrieve_data():
|
||||
@ -211,12 +196,10 @@ def test_failover_fails_main_is_alive(connection):
|
||||
}
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
|
||||
|
||||
# 4.
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(coord_cursor, "DO FAILOVER;")
|
||||
assert str(e.value) == "Failover aborted since main is alive!"
|
||||
|
||||
# 5.
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
|
||||
|
||||
|
||||
|
@ -1,194 +0,0 @@
|
||||
import itertools
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import List
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
|
||||
|
||||
|
||||
class DatasetConstants:
|
||||
TEST = "test"
|
||||
OPTIONS = "options"
|
||||
TIMEOUT = "timeout"
|
||||
MODE = "mode"
|
||||
MEMGRAPH_OPTIONS = "memgraph_options"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DatabaseMode:
|
||||
storage_mode: str
|
||||
isolation_level: str
|
||||
|
||||
|
||||
class StorageModeConstants:
|
||||
IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL"
|
||||
IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL"
|
||||
|
||||
@classmethod
|
||||
def to_list(cls) -> List[str]:
|
||||
return [cls.IN_MEMORY_TRANSACTIONAL, cls.IN_MEMORY_ANALYTICAL]
|
||||
|
||||
|
||||
class IsolationLevelConstants:
|
||||
SNAPSHOT_ISOLATION = "SNAPSHOT ISOLATION"
|
||||
READ_COMMITED = "READ COMMITED"
|
||||
READ_UNCOMMITED = "READ UNCOMMITED"
|
||||
|
||||
@classmethod
|
||||
def to_list(cls) -> List[str]:
|
||||
return [cls.SNAPSHOT_SERIALIZATION, cls.READ_COMMITED, cls.READ_UNCOMMITED]
|
||||
|
||||
|
||||
def get_default_database_mode() -> DatabaseMode:
|
||||
return DatabaseMode(StorageModeConstants.IN_MEMORY_TRANSACTIONAL, IsolationLevelConstants.SNAPSHOT_ISOLATION)
|
||||
|
||||
|
||||
def get_all_database_modes() -> List[DatabaseMode]:
|
||||
return [
|
||||
DatabaseMode(x[0], x[1])
|
||||
for x in itertools.product(StorageModeConstants.to_list(), IsolationLevelConstants.to_list())
|
||||
]
|
||||
|
||||
|
||||
# dataset calibrated for running on Apollo (total 4min)
|
||||
# bipartite.py runs for approx. 30s
|
||||
# create_match.py runs for approx. 30s
|
||||
# long_running runs for 1min
|
||||
# long_running runs for 2min
|
||||
SMALL_DATASET = [
|
||||
{
|
||||
DatasetConstants.TEST: "bipartite.py",
|
||||
DatasetConstants.OPTIONS: ["--u-count", "100", "--v-count", "100"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "detach_delete.py",
|
||||
DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "100"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "memory_tracker.py",
|
||||
DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "memory_limit.py",
|
||||
DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "create_match.py",
|
||||
DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "parser.cpp",
|
||||
DatasetConstants.OPTIONS: ["--per-worker-query-count", "1000"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "long_running.cpp",
|
||||
DatasetConstants.OPTIONS: [
|
||||
"--vertex-count",
|
||||
"1000",
|
||||
"--edge-count",
|
||||
"5000",
|
||||
"--max-time",
|
||||
"1",
|
||||
"--verify",
|
||||
"20",
|
||||
],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "long_running.cpp",
|
||||
DatasetConstants.OPTIONS: [
|
||||
"--vertex-count",
|
||||
"10000",
|
||||
"--edge-count",
|
||||
"50000",
|
||||
"--max-time",
|
||||
"2",
|
||||
"--verify",
|
||||
"30",
|
||||
"--stats-file",
|
||||
STATS_FILE,
|
||||
],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
]
|
||||
|
||||
# dataset calibrated for running on daily stress instance (total 9h)
|
||||
# bipartite.py and create_match.py run for approx. 15min
|
||||
# long_running runs for 5min x 6 times = 30min
|
||||
# long_running runs for 8h
|
||||
LARGE_DATASET = (
|
||||
[
|
||||
{
|
||||
DatasetConstants.TEST: "bipartite.py",
|
||||
DatasetConstants.OPTIONS: ["--u-count", "300", "--v-count", "300"],
|
||||
DatasetConstants.TIMEOUT: 30,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "detach_delete.py",
|
||||
DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "300"],
|
||||
DatasetConstants.TIMEOUT: 5,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
{
|
||||
DatasetConstants.TEST: "create_match.py",
|
||||
DatasetConstants.OPTIONS: ["--vertex-count", "500000", "--create-pack-size", "500"],
|
||||
DatasetConstants.TIMEOUT: 30,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
]
|
||||
+ [
|
||||
{
|
||||
DatasetConstants.TEST: "long_running.cpp",
|
||||
DatasetConstants.OPTIONS: [
|
||||
"--vertex-count",
|
||||
"10000",
|
||||
"--edge-count",
|
||||
"40000",
|
||||
"--max-time",
|
||||
"5",
|
||||
"--verify",
|
||||
"60",
|
||||
],
|
||||
DatasetConstants.TIMEOUT: 16,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
]
|
||||
* 6
|
||||
+ [
|
||||
{
|
||||
DatasetConstants.TEST: "long_running.cpp",
|
||||
DatasetConstants.OPTIONS: [
|
||||
"--vertex-count",
|
||||
"200000",
|
||||
"--edge-count",
|
||||
"1000000",
|
||||
"--max-time",
|
||||
"480",
|
||||
"--verify",
|
||||
"300",
|
||||
"--stats-file",
|
||||
STATS_FILE,
|
||||
],
|
||||
DatasetConstants.TIMEOUT: 500,
|
||||
DatasetConstants.MODE: [get_default_database_mode()],
|
||||
},
|
||||
]
|
||||
)
|
Loading…
Reference in New Issue
Block a user