HA: Support restart of instances (#1672)

This commit is contained in:
Andi 2024-02-01 11:55:48 +01:00 committed by GitHub
parent b443934b68
commit cb7b88ad92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 552 additions and 389 deletions

View File

@ -12,7 +12,6 @@ target_sources(mg-coordination
include/coordination/coordinator_slk.hpp
include/coordination/coordinator_data.hpp
include/coordination/constants.hpp
include/coordination/failover_status.hpp
include/coordination/coordinator_cluster_config.hpp
PRIVATE
@ -21,6 +20,7 @@ target_sources(mg-coordination
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_data.cpp
coordinator_instance.cpp
)
target_include_directories(mg-coordination PUBLIC include)

View File

@ -20,7 +20,7 @@
namespace memgraph::coordination {
namespace {
auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig &config)
auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &config)
-> communication::ClientContext {
return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file}
: communication::ClientContext{};
@ -45,38 +45,33 @@ void CoordinatorClient::StartFrequentCheck() {
"Health check frequency must be greater than 0");
instance_checker_.Run(
"Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
config_.instance_name, config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().SocketAddress());
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
{ // NOTE: This is intentionally scoped so that stream lock could get released.
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
succ_cb_(coord_data_, instance_name);
} catch (const rpc::RpcFailedException &) {
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_data_, instance_name);
}
});
}
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); }
auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); }
auto CoordinatorClient::ReplicationClientInfo() const -> const CoordinatorClientConfig::ReplicationClientInfo & {
return config_.replication_client_info;
auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
succ_cb_ = std::move(succ_cb);
fail_cb_ = std::move(fail_cb);
}
auto CoordinatorClient::ResetReplicationClientInfo() -> void {
// TODO (antoniofilipovic) Sync with Andi on this one
// config_.replication_client_info.reset();
}
auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; }
auto CoordinatorClient::SendPromoteReplicaToMainRpc(
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {
auto CoordinatorClient::SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool {
try {
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))};
if (!stream.AwaitResponse().success) {
@ -84,23 +79,24 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(
return false;
}
return true;
} catch (const rpc::RpcFailedException &) {
} catch (rpc::RpcFailedException const &) {
spdlog::error("RPC error occurred while sending failover RPC!");
}
return false;
}
auto CoordinatorClient::SendSetToReplicaRpc(CoordinatorClient::ReplClientInfo replication_client_info) const -> bool {
auto CoordinatorClient::DemoteToReplica() const -> bool {
auto const &instance_name = config_.instance_name;
try {
auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(std::move(replication_client_info))};
auto stream{rpc_client_.Stream<DemoteMainToReplicaRpc>(config_.replication_client_info)};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to set main to replica!");
spdlog::error("Failed to receive successful RPC response for setting instance {} to replica!", instance_name);
return false;
}
spdlog::info("Sent request RPC from coordinator to instance to set it as replica!");
return true;
} catch (const rpc::RpcFailedException &) {
spdlog::error("Failed to send failover RPC from coordinator to new main!");
} catch (rpc::RpcFailedException const &) {
spdlog::error("Failed to set instance {} to replica!", instance_name);
}
return false;
}

View File

@ -25,7 +25,7 @@ CoordinatorData::CoordinatorData() {
auto find_instance = [](CoordinatorData *coord_data, std::string_view instance_name) -> CoordinatorInstance & {
auto instance = std::ranges::find_if(
coord_data->registered_instances_,
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
[instance_name](CoordinatorInstance const &instance) { return instance.InstanceName() == instance_name; });
MG_ASSERT(instance != coord_data->registered_instances_.end(), "Instance {} not found during callback!",
instance_name);
@ -35,105 +35,94 @@ CoordinatorData::CoordinatorData() {
replica_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateLastResponseTime();
find_instance(coord_data, instance_name).OnSuccessPing();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateInstanceStatus();
find_instance(coord_data, instance_name).OnFailPing();
};
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
instance.UpdateLastResponseTime();
if (instance.IsAlive() || !coord_data->ClusterHasAliveMain_()) {
instance.OnSuccessPing();
return;
}
bool const demoted = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_);
if (demoted) {
instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", instance_name);
} else {
spdlog::error("Instance {} failed to become replica", instance_name);
}
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
main_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) {
spdlog::info("Main instance {} is not alive, starting automatic failover", instance_name);
switch (auto failover_status = DoFailover(); failover_status) {
using enum DoFailoverStatus;
case ALL_REPLICAS_DOWN:
spdlog::warn("Failover aborted since all replicas are down!");
break;
case MAIN_ALIVE:
spdlog::warn("Failover aborted since main is alive!");
break;
case RPC_FAILED:
spdlog::warn("Failover aborted since promoting replica to main failed!");
break;
case SUCCESS:
break;
}
find_instance(coord_data, instance_name).OnFailPing();
if (!coord_data->ClusterHasAliveMain_()) {
spdlog::info("Cluster without main instance, trying automatic failover");
coord_data->TryFailover();
}
};
}
auto CoordinatorData::DoFailover() -> DoFailoverStatus {
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
auto CoordinatorData::ClusterHasAliveMain_() const -> bool {
auto const alive_main = [](CoordinatorInstance const &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(registered_instances_, alive_main);
}
auto CoordinatorData::TryFailover() -> void {
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive);
if (chosen_replica_instance == replica_instances.end()) {
return DoFailoverStatus::ALL_REPLICAS_DOWN;
spdlog::warn("Failover failed since all replicas are down!");
return;
}
chosen_replica_instance->PrepareForFailover();
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
std::vector<ReplicationClientInfo> repl_clients_info;
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(std::ranges::distance(replica_instances));
auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
auto const not_chosen_replica_instance = [&chosen_replica_instance](CoordinatorInstance const &instance) {
return instance != *chosen_replica_instance;
};
auto const not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
// TODO (antoniofilipovic): Should we send also data on old MAIN???
// TODO: (andi) Don't send replicas which aren't alive
for (const auto &unchosen_replica_instance :
replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) {
repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo());
std::ranges::transform(registered_instances_ | ranges::views::filter(not_chosen_replica_instance),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
if (!chosen_replica_instance->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
chosen_replica_instance->RestoreAfterFailedFailover();
return DoFailoverStatus::RPC_FAILED;
}
auto old_main = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
// TODO: (andi) For performing restoration we will have to improve this
old_main->client_.PauseFrequentCheck();
chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_);
return DoFailoverStatus::SUCCESS;
spdlog::info("Failover successful! Instance {} promoted to main.", chosen_replica_instance->InstanceName());
}
auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceStatus> {
std::vector<CoordinatorInstanceStatus> instances_status;
instances_status.reserve(registered_instances_.size());
auto const stringify_repl_role = [](const CoordinatorInstance &instance) -> std::string {
if (!instance.IsAlive()) return "";
auto const stringify_repl_role = [](CoordinatorInstance const &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
auto const instance_to_status =
[&stringify_repl_role](const CoordinatorInstance &instance) -> CoordinatorInstanceStatus {
[&stringify_repl_role](CoordinatorInstance const &instance) -> CoordinatorInstanceStatus {
return {.instance_name = instance.InstanceName(),
.socket_address = instance.SocketAddress(),
.replication_role = stringify_repl_role(instance),
@ -151,70 +140,59 @@ auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceSt
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
// Find replica we already registered
auto registered_replica = std::find_if(
registered_instances_.begin(), registered_instances_.end(),
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
auto const is_new_main = [&instance_name](CoordinatorInstance const &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(registered_instances_, is_new_main);
// if replica not found...
if (registered_replica == registered_instances_.end()) {
spdlog::error("You didn't register instance with given name {}", instance_name);
if (new_main == registered_instances_.end()) {
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
registered_replica->client_.PauseFrequentCheck();
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
ReplicationClientsInfo repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
std::ranges::for_each(registered_instances_,
[registered_replica, &repl_clients_info](const CoordinatorInstance &replica) {
if (replica != *registered_replica) {
repl_clients_info.emplace_back(replica.client_.ReplicationClientInfo());
}
});
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto result = registered_replica->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
registered_replica->client_.ResumeFrequentCheck();
auto const is_not_new_main = [&instance_name](CoordinatorInstance const &instance) {
return instance.InstanceName() != instance_name;
};
std::ranges::transform(registered_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
if (!new_main->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
registered_replica->client_.SetSuccCallback(main_succ_cb_);
registered_replica->client_.SetFailCallback(main_fail_cb_);
registered_replica->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
registered_replica->client_.ResumeFrequentCheck();
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) {
return instance.InstanceName() == config.instance_name;
})) {
return RegisterInstanceCoordinatorStatus::NAME_EXISTS;
}
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress());
if (std::ranges::any_of(registered_instances_, [&config](CoordinatorInstance const &instance) {
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS;
}
CoordinatorClientConfig::ReplicationClientInfo replication_client_info_copy = config.replication_client_info;
try {
registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
// TODO (antoniofilipovic) create and then push back
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA);
if (auto res = instance->client_.SendSetToReplicaRpc(replication_client_info_copy); !res) {
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
instance->client_.StartFrequentCheck();
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
} // namespace memgraph::coordination

View File

@ -0,0 +1,84 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance.hpp"
namespace memgraph::coordination {
CoordinatorInstance::CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
auto CoordinatorInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
auto CoordinatorInstance::OnFailPing() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; }
auto CoordinatorInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto CoordinatorInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto CoordinatorInstance::PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
return true;
}
auto CoordinatorInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
return true;
}
auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
} // namespace memgraph::coordination
#endif

View File

@ -36,19 +36,19 @@ void PromoteReplicaToMainRes::Load(PromoteReplicaToMainRes *self, memgraph::slk:
memgraph::slk::Load(self, reader);
}
void SetMainToReplicaReq::Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder) {
void DemoteMainToReplicaReq::Save(const DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void SetMainToReplicaReq::Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader) {
void DemoteMainToReplicaReq::Load(DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void SetMainToReplicaRes::Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder) {
void DemoteMainToReplicaRes::Save(const DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void SetMainToReplicaRes::Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader) {
void DemoteMainToReplicaRes::Load(DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
@ -60,11 +60,11 @@ constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::Ty
constexpr utils::TypeInfo coordination::PromoteReplicaToMainRes::kType{utils::TypeId::COORD_FAILOVER_RES,
"CoordPromoteReplicaToMainRes", nullptr};
constexpr utils::TypeInfo coordination::SetMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ,
"CoordSetReplMainReq", nullptr};
constexpr utils::TypeInfo coordination::DemoteMainToReplicaReq::kType{utils::TypeId::COORD_SET_REPL_MAIN_REQ,
"CoordDemoteToReplicaReq", nullptr};
constexpr utils::TypeInfo coordination::SetMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordSetReplMainRes", nullptr};
constexpr utils::TypeInfo coordination::DemoteMainToReplicaRes::kType{utils::TypeId::COORD_SET_REPL_MAIN_RES,
"CoordDemoteToReplicaRes", nullptr};
namespace slk {
@ -84,19 +84,19 @@ void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::
memgraph::slk::Load(&self->replication_clients_info, reader);
}
void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder) {
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.replication_client_info, builder);
}
void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader) {
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->replication_client_info, reader);
}
void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder) {
void Save(const memgraph::coordination::DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.success, builder);
}
void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader) {
void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->success, reader);
}

View File

@ -74,12 +74,6 @@ auto CoordinatorState::ShowInstances() const -> std::vector<CoordinatorInstanceS
return std::get<CoordinatorData>(data_).ShowInstances();
}
[[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus {
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative");
auto &coord_state = std::get<CoordinatorData>(data_);
return coord_state.DoFailover();
}
auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & {
MG_ASSERT(std::holds_alternative<CoordinatorMainReplicaData>(data_),
"Cannot get coordinator server since variant holds wrong alternative");

View File

@ -21,12 +21,10 @@ namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
@ -46,15 +44,12 @@ class CoordinatorClient {
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
[[nodiscard]] auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
[[nodiscard]] auto DemoteToReplica() const -> bool;
auto ReplicationClientInfo() const -> const ReplClientInfo &;
auto ResetReplicationClientInfo() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
return first.config_ == second.config_;

View File

@ -32,9 +32,7 @@ struct CoordinatorClientConfig {
auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); }
// Info which coordinator will send to new main when performing failover
struct ReplicationClientInfo {
// Must be the same as CoordinatorClientConfig's instance_name
std::string instance_name;
replication_coordination_glue::ReplicationMode replication_mode{};
std::string replication_ip_address;
@ -43,7 +41,6 @@ struct CoordinatorClientConfig {
friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default;
};
// Each instance has replication config in case it fails
ReplicationClientInfo replication_client_info;
struct SSL {
@ -58,6 +55,8 @@ struct CoordinatorClientConfig {
friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default;
};
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
struct CoordinatorServerConfig {
std::string ip_address;
uint16_t port{};

View File

@ -16,9 +16,9 @@
#include "coordination/coordinator_instance.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/failover_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
#include <list>
@ -27,17 +27,20 @@ class CoordinatorData {
public:
CoordinatorData();
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
[[nodiscard]] auto RegisterInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus;
[[nodiscard]] auto SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus;
auto TryFailover() -> void;
auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>;
private:
auto ClusterHasAliveMain_() const -> bool;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// Must be std::list because we rely on pointer stability
// NOTE: Must be std::list because we rely on pointer stability
std::list<CoordinatorInstance> registered_instances_;
};

View File

@ -16,16 +16,16 @@
#include "utils/exceptions.hpp"
namespace memgraph::coordination {
class CoordinatorFailoverException final : public utils::BasicException {
class CoordinatorRegisterInstanceException final : public utils::BasicException {
public:
explicit CoordinatorFailoverException(const std::string_view what) noexcept
: BasicException("Failover didn't complete successfully: " + std::string(what)) {}
explicit CoordinatorRegisterInstanceException(const std::string_view what) noexcept
: BasicException("Failed to create instance: " + std::string(what)) {}
template <class... Args>
explicit CoordinatorFailoverException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: CoordinatorFailoverException(fmt::format(fmt, std::forward<Args>(args)...)) {}
explicit CoordinatorRegisterInstanceException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: CoordinatorRegisterInstanceException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorFailoverException)
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorRegisterInstanceException)
};
} // namespace memgraph::coordination

View File

@ -15,6 +15,7 @@
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/role.hpp"
namespace memgraph::coordination {
@ -24,10 +25,7 @@ class CoordinatorData;
class CoordinatorInstance {
public:
CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb, replication_coordination_glue::ReplicationRole replication_role)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_role),
is_alive_(true) {}
HealthCheckCallback fail_cb);
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
@ -35,34 +33,27 @@ class CoordinatorInstance {
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto UpdateInstanceStatus() -> bool {
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_)
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto InstanceName() const -> std::string { return client_.InstanceName(); }
auto SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto IsAlive() const -> bool { return is_alive_; }
auto IsAlive() const -> bool;
auto IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; }
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto PrepareForFailover() -> void { client_.PauseFrequentCheck(); }
auto RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); }
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PostFailover(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void {
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetSuccCallback(std::move(main_succ_cb));
client_.SetFailCallback(std::move(main_fail_cb));
// Comment with Andi but we shouldn't delete this, what if this MAIN FAILS AGAIN
// client_.ResetReplicationClientInfo();
client_.ResumeFrequentCheck();
}
auto PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};

View File

@ -48,35 +48,35 @@ struct PromoteReplicaToMainRes {
using PromoteReplicaToMainRpc = rpc::RequestResponse<PromoteReplicaToMainReq, PromoteReplicaToMainRes>;
struct SetMainToReplicaReq {
struct DemoteMainToReplicaReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(SetMainToReplicaReq *self, memgraph::slk::Reader *reader);
static void Save(const SetMainToReplicaReq &self, memgraph::slk::Builder *builder);
static void Load(DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
static void Save(const DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
explicit SetMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info)
explicit DemoteMainToReplicaReq(CoordinatorClientConfig::ReplicationClientInfo replication_client_info)
: replication_client_info(std::move(replication_client_info)) {}
SetMainToReplicaReq() = default;
DemoteMainToReplicaReq() = default;
CoordinatorClientConfig::ReplicationClientInfo replication_client_info;
};
struct SetMainToReplicaRes {
struct DemoteMainToReplicaRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(SetMainToReplicaRes *self, memgraph::slk::Reader *reader);
static void Save(const SetMainToReplicaRes &self, memgraph::slk::Builder *builder);
static void Load(DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader);
static void Save(const DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder);
explicit SetMainToReplicaRes(bool success) : success(success) {}
SetMainToReplicaRes() = default;
explicit DemoteMainToReplicaRes(bool success) : success(success) {}
DemoteMainToReplicaRes() = default;
bool success;
};
using SetMainToReplicaRpc = rpc::RequestResponse<SetMainToReplicaReq, SetMainToReplicaRes>;
using DemoteMainToReplicaRpc = rpc::RequestResponse<DemoteMainToReplicaReq, DemoteMainToReplicaRes>;
} // namespace memgraph::coordination
@ -91,13 +91,13 @@ void Save(const memgraph::coordination::PromoteReplicaToMainReq &self, memgraph:
void Load(memgraph::coordination::PromoteReplicaToMainReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::SetMainToReplicaRes &self, memgraph::slk::Builder *builder);
void Save(const memgraph::coordination::DemoteMainToReplicaRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::SetMainToReplicaRes *self, memgraph::slk::Reader *reader);
void Load(memgraph::coordination::DemoteMainToReplicaRes *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::SetMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Save(const memgraph::coordination::DemoteMainToReplicaReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::SetMainToReplicaReq *self, memgraph::slk::Reader *reader);
void Load(memgraph::coordination::DemoteMainToReplicaReq *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk

View File

@ -16,7 +16,6 @@
#include "coordination/coordinator_data.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/coordinator_server.hpp"
#include "coordination/failover_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include <variant>
@ -28,8 +27,8 @@ class CoordinatorState {
CoordinatorState();
~CoordinatorState() = default;
CoordinatorState(const CoordinatorState &) = delete;
CoordinatorState &operator=(const CoordinatorState &) = delete;
CoordinatorState(CoordinatorState const &) = delete;
CoordinatorState &operator=(CoordinatorState const &) = delete;
CoordinatorState(CoordinatorState &&) noexcept = delete;
CoordinatorState &operator=(CoordinatorState &&) noexcept = delete;
@ -43,8 +42,6 @@ class CoordinatorState {
// The client code must check that the server exists before calling this method.
auto GetCoordinatorServer() const -> CoordinatorServer &;
[[nodiscard]] auto DoFailover() -> DoFailoverStatus;
private:
std::variant<CoordinatorData, CoordinatorMainReplicaData> data_;
};

View File

@ -1,21 +0,0 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#ifdef MG_ENTERPRISE
#include <cstdint>
namespace memgraph::coordination {
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, RPC_FAILED };
} // namespace memgraph::coordination
#endif

View File

@ -20,7 +20,6 @@ namespace memgraph::coordination {
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
COULD_NOT_BE_PERSISTED,
NOT_COORDINATOR,
RPC_FAILED,
SUCCESS

View File

@ -13,12 +13,10 @@
#ifdef MG_ENTERPRISE
#include "utils/result.hpp"
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_instance_status.hpp"
#include "coordination/failover_status.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "utils/result.hpp"
#include <cstdint>
#include <optional>

View File

@ -12,12 +12,12 @@
#ifdef MG_ENTERPRISE
#include "dbms/coordinator_handlers.hpp"
#include "dbms/utils.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "coordination/coordinator_rpc.hpp"
#include "dbms/dbms_handler.hpp"
#include "dbms/replication_client.hpp"
#include "dbms/utils.hpp"
#include "range/v3/view.hpp"
@ -32,31 +32,33 @@ void CoordinatorHandlers::Register(DbmsHandler &dbms_handler) {
CoordinatorHandlers::PromoteReplicaToMainHandler(dbms_handler, req_reader, res_builder);
});
server.Register<coordination::SetMainToReplicaRpc>(
server.Register<coordination::DemoteMainToReplicaRpc>(
[&dbms_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received SetMainToReplicaRpc from coordinator server");
CoordinatorHandlers::SetMainToReplicaHandler(dbms_handler, req_reader, res_builder);
spdlog::info("Received DemoteMainToReplicaRpc from coordinator server");
CoordinatorHandlers::DemoteMainToReplicaHandler(dbms_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder) {
void CoordinatorHandlers::DemoteMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder) {
auto &repl_state = dbms_handler.ReplicationState();
spdlog::info("Executing SetMainToReplicaHandler");
if (!repl_state.IsMain()) {
if (repl_state.IsReplica()) {
spdlog::error("Setting to replica must be performed on main.");
slk::Save(coordination::SetMainToReplicaRes{false}, res_builder);
slk::Save(coordination::DemoteMainToReplicaRes{false}, res_builder);
return;
}
coordination::SetMainToReplicaReq req;
coordination::DemoteMainToReplicaReq req;
slk::Load(&req, req_reader);
replication::ReplicationServerConfig clients_config{.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
const replication::ReplicationServerConfig clients_config{
.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
if (bool success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) {
spdlog::error("Setting main to replica failed!");
if (bool const success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) {
spdlog::error("Demoting main to replica failed!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
}
@ -69,16 +71,14 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
auto &repl_state = dbms_handler.ReplicationState();
if (!repl_state.IsReplica()) {
spdlog::error("Failover must be performed on replica!");
spdlog::error("Only replica can be promoted to main!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
}
auto repl_server_config = std::get<replication::RoleReplicaData>(repl_state.ReplicationData()).config;
// This can fail because of disk. If it does, the cluster state could get inconsistent.
// We don't handle disk issues.
if (bool success = memgraph::dbms::DoReplicaToMainPromotion(dbms_handler); !success) {
if (bool const success = memgraph::dbms::DoReplicaToMainPromotion(dbms_handler); !success) {
spdlog::error("Promoting replica to main failed!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
@ -104,28 +104,29 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
auto instance_client = repl_state.RegisterReplica(config);
if (instance_client.HasError()) {
using enum memgraph::replication::RegisterReplicaError;
switch (instance_client.GetError()) {
// Can't happen, we are already replica
case memgraph::replication::RegisterReplicaError::NOT_MAIN:
spdlog::error("Failover must be performed to main!");
case NOT_MAIN:
spdlog::error("Failover must be performed on main!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
// Can't happen, checked on the coordinator side
case memgraph::replication::RegisterReplicaError::NAME_EXISTS:
case NAME_EXISTS:
spdlog::error("Replica with the same name already exists!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
// Can't happen, checked on the coordinator side
case memgraph::replication::RegisterReplicaError::ENDPOINT_EXISTS:
case ENDPOINT_EXISTS:
spdlog::error("Replica with the same endpoint already exists!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
// We don't handle disk issues
case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
case COULD_NOT_BE_PERSISTED:
spdlog::error("Registered replica could not be persisted!");
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
case memgraph::replication::RegisterReplicaError::SUCCESS:
case SUCCESS:
break;
}
}
@ -138,9 +139,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
// Update system before enabling individual storage <-> replica clients
dbms_handler.SystemRestore(instance_client_ref);
// TODO: (andi) Policy for register all databases
// Will be resolved after deciding about choosing new replica
const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler, instance_client_ref);
const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients<true>(dbms_handler, instance_client_ref);
MG_ASSERT(all_clients_good, "Failed to register one or more databases to the REPLICA \"{}\".", config.name);
StartReplicaClient(dbms_handler, instance_client_ref);

View File

@ -26,7 +26,7 @@ class CoordinatorHandlers {
private:
static void PromoteReplicaToMainHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder);
static void DemoteMainToReplicaHandler(DbmsHandler &dbms_handler, slk::Reader *req_reader, slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -76,6 +76,7 @@ inline bool SetReplicationRoleReplica(dbms::DbmsHandler &dbms_handler,
return success;
}
template <bool AllowRPCFailure = false>
inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
replication::ReplicationClient &instance_client) {
if (!allow_mt_repl && dbms_handler.All().size() > 1) {
@ -84,7 +85,6 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
bool all_clients_good = true;
// Add database specific clients (NOTE Currently all databases are connected to each replica)
dbms_handler.ForEach([&](DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
if (!allow_mt_repl && storage->name() != kDefaultDB) {
@ -93,16 +93,14 @@ inline bool RegisterAllDatabasesClients(dbms::DbmsHandler &dbms_handler,
// TODO: ATM only IN_MEMORY_TRANSACTIONAL, fix other modes
if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return;
using enum storage::replication::ReplicaState;
all_clients_good &= storage->repl_storage_state_.replication_clients_.WithLock(
[storage, &instance_client, db_acc = std::move(db_acc)](auto &storage_clients) mutable { // NOLINT
auto client = std::make_unique<storage::ReplicationStorageClient>(instance_client);
// All good, start replica client
client->Start(storage, std::move(db_acc));
// After start the storage <-> replica state should be READY or RECOVERING (if correctly started)
// MAYBE_BEHIND isn't a statement of the current state, this is the default value
// Failed to start due an error like branching of MAIN and REPLICA
if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) {
return false; // TODO: sometimes we need to still add to storage_clients
if (client->State() == MAYBE_BEHIND && !AllowRPCFailure) {
return false;
}
storage_clients.push_back(std::move(client));
return true;

View File

@ -386,7 +386,7 @@ replicationSocketAddress : literal ;
registerReplica : REGISTER REPLICA instanceName ( SYNC | ASYNC )
TO socketAddress ;
registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ;
registerInstanceOnCoordinator : REGISTER INSTANCE instanceName ON coordinatorSocketAddress ( AS ASYNC ) ? WITH replicationSocketAddress ;
setInstanceToMain : SET INSTANCE instanceName TO MAIN ;

View File

@ -500,14 +500,12 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
case END_POINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
case COULD_NOT_BE_PERSISTED:
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case RPC_FAILED:
throw QueryRuntimeException(
"Couldn't register replica because promotion on replica failed! Check logs on replica to find out more "
"info!");
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
"find out more info!");
case SUCCESS:
break;
}
@ -520,10 +518,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
case NO_INSTANCE_WITH_NAME:
throw QueryRuntimeException("No instance with such name!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't set replica instance to main since this instance is not a coordinator!");
throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!");
case COULD_NOT_PROMOTE_TO_MAIN:
throw QueryRuntimeException(
"Couldn't set replica instance to main. Check coordinator and replica for more logs");
"Couldn't set replica instance to main!. Check coordinator and replica for more logs");
case SUCCESS:
break;
}

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,6 +15,7 @@
#include <cstdint>
#include <filesystem>
#include "flags/replication.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/storage_mode.hpp"
#include "utils/exceptions.hpp"
@ -128,10 +129,15 @@ struct Config {
};
inline auto ReplicationStateRootPath(memgraph::storage::Config const &config) -> std::optional<std::filesystem::path> {
if (!config.durability.restore_replication_state_on_startup) {
if (!config.durability.restore_replication_state_on_startup
#ifdef MG_ENTERPRISE
&& !FLAGS_coordinator_server_port
#endif
) {
spdlog::warn(
"Replication configuration will NOT be stored. When the server restarts, replication state will be "
"forgotten.");
return std::nullopt;
}
return {config.durability.storage_directory};

View File

@ -35,6 +35,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
auto hb_stream{client_.rpc_client_.Stream<replication::HeartbeatRpc>(
storage->uuid(), replStorageState.last_commit_timestamp_, std::string{replStorageState.epoch_.id()})};
const auto replica = hb_stream.AwaitResponse();
#ifdef MG_ENTERPRISE // Multi-tenancy is only supported in enterprise
@ -67,7 +68,6 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
"now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.",
client_.name_, client_.name_, client_.name_);
// TODO: (andi) Talk about renaming MAYBE_BEHIND to branching
// State not updated, hence in MAYBE_BEHIND state
return;
}

View File

@ -32,7 +32,7 @@ namespace memgraph::utils {
* void long_function() {
* resource.enable();
* OnScopeExit on_exit([&resource] { resource.disable(); });
* // long block of code, might trow an exception
* // long block of code, might throw an exception
* }
*/
template <typename Callable>

View File

@ -4,7 +4,6 @@ copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental manual_setting_replicas.py)
copy_e2e_python_files(ha_experimental common.py)
copy_e2e_python_files(ha_experimental conftest.py)
copy_e2e_python_files(ha_experimental workloads.yaml)
copy_e2e_python_files_from_parent_folder(ha_experimental ".." memgraph.py)

View File

@ -1,4 +1,4 @@
# Copyright 2024 Memgraph Ltd.
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,11 +10,13 @@
# licenses/APL.txt.
import os
import shutil
import sys
import tempfile
import interactive_mg_runner
import pytest
from common import execute_and_fetch_all
from common import connect, execute_and_fetch_all, safe_execute
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -24,25 +26,51 @@ interactive_mg_runner.PROJECT_DIR = os.path.normpath(
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
TEMP_DIR = tempfile.TemporaryDirectory().name
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"],
"log_file": "replica1.log",
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"],
"log_file": "replica2.log",
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
"log_file": "main.log",
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"],
"log_file": "replica3.log",
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
@ -53,7 +81,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
}
def test_replication_works_on_failover(connection):
def test_replication_works_on_failover():
# Goal of this test is to check the replication works after failover command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that main has correct state
@ -61,12 +89,13 @@ def test_replication_works_on_failover(connection):
# 4. We check that coordinator and new main have correct state
# 5. We insert one vertex on new main
# 6. We check that vertex appears on new replica
safe_execute(shutil.rmtree, TEMP_DIR)
# 1
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
main_cursor = connection(7687, "instance_3").cursor()
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
@ -78,7 +107,7 @@ def test_replication_works_on_failover(connection):
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# 4
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
@ -86,17 +115,25 @@ def test_replication_works_on_failover(connection):
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
new_main_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
@ -104,62 +141,68 @@ def test_replication_works_on_failover(connection):
execute_and_fetch_all(new_main_cursor, "CREATE ();")
# 6
alive_replica_cursror = connection(7689, "instance_2").cursor()
alive_replica_cursror = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated"
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_show_replication_cluster(connection):
# Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that all replicas and main have the correct state: they should all be alive.
# 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 1.
def test_show_replication_cluster():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
instance1_cursor = connect(host="localhost", port=7688).cursor()
instance2_cursor = connect(host="localhost", port=7689).cursor()
instance3_cursor = connect(host="localhost", port=7687).cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
# 2.
# We leave some time for the coordinator to realise the replicas are down.
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance2():
return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
# 3.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
# 4.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", False, ""),
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
def test_simple_automatic_failover(connection):
def test_simple_automatic_failover():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connection(7687, "instance_3").cursor()
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
@ -169,7 +212,7 @@ def test_simple_automatic_failover(connection):
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
@ -177,46 +220,189 @@ def test_simple_automatic_failover(connection):
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
new_main_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main_old_alive = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
]
def test_registering_replica_fails_name_exists(connection):
mg_sleep_and_assert(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_registering_replica_fails_name_exists():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!"
shutil.rmtree(TEMP_DIR)
def test_registering_replica_fails_endpoint_exists(connection):
def test_registering_replica_fails_endpoint_exists():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
coord_cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10001' WITH '127.0.0.1:10013';",
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10011' WITH '127.0.0.1:10005';",
)
assert (
str(e.value)
== "Couldn't register replica because promotion on replica failed! Check logs on replica to find out more info!"
)
assert str(e.value) == "Couldn't register replica instance since instance with such endpoint already exists!"
def test_replica_instance_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connect(host="localhost", port=7690).cursor()
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_down, show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
mg_sleep_and_assert(expected_data_up, show_repl_cluster)
instance1_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
expected_data_replica = [("replica",)]
mg_sleep_and_assert(expected_data_replica, retrieve_data_show_repl_role_instance1)
def test_automatic_failover_main_back_as_replica():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_after_failover = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster)
expected_data_after_main_coming_back = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "replica"),
]
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
mg_sleep_and_assert(expected_data_after_main_coming_back, retrieve_data_show_repl_cluster)
instance3_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance3)
def test_automatic_failover_main_back_as_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_all_down = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_main_back = [
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster)
instance3_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data_replicas_back = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster)
instance1_cursor = connect(host="localhost", port=7688).cursor()
instance2_cursor = connect(host="localhost", port=7689).cursor()
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance2():
return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
if __name__ == "__main__":

View File

@ -23,3 +23,10 @@ def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(**kwargs)
connection.autocommit = True
return connection
def safe_execute(function, *args):
try:
function(*args)
except:
pass

View File

@ -1,43 +0,0 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import pytest
from common import connect, execute_and_fetch_all
# The fixture here is more complex because the connection has to be
# parameterized based on the test parameters (info has to be available on both
# sides).
#
# https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
# is not an elegant/feasible solution here.
#
# The solution was independently developed and then I stumbled upon the same
# approach here https://stackoverflow.com/a/68286553/4888809 which I think is
# optimal.
@pytest.fixture(scope="function")
def connection():
connection_holder = None
role_holder = None
def inner_connection(port, role):
nonlocal connection_holder, role_holder
connection_holder = connect(host="localhost", port=port)
role_holder = role
return connection_holder
yield inner_connection
# Only main instance can be cleaned up because replicas do NOT accept
# writes.
if role_holder == "main":
cursor = connection_holder.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")

View File

@ -12,33 +12,33 @@
import sys
import pytest
from common import execute_and_fetch_all
from common import connect, execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
def test_disable_cypher_queries(connection):
cursor = connection(7690, "coordinator").cursor()
def test_disable_cypher_queries():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "CREATE (n:TestNode {prop: 'test'})")
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_cannot_be_replica_role(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_cannot_be_replica_role():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;")
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_cannot_run_show_repl_role(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_cannot_run_show_repl_role():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICATION ROLE;")
assert str(e.value) == "Coordinator can run only coordinator queries!"
def test_coordinator_show_replication_cluster(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_show_replication_cluster():
cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
@ -51,30 +51,30 @@ def test_coordinator_show_replication_cluster(connection):
mg_sleep_and_assert(expected_data, retrieve_data)
def test_coordinator_cannot_call_show_replicas(connection):
cursor = connection(7690, "coordinator").cursor()
def test_coordinator_cannot_call_show_replicas():
cursor = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICAS;")
assert str(e.value) == "Coordinator can run only coordinator queries!"
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica")],
"port",
[7687, 7688, 7689],
)
def test_main_and_replicas_cannot_call_show_repl_cluster(port, role, connection):
cursor = connection(port, role).cursor()
def test_main_and_replicas_cannot_call_show_repl_cluster(port):
cursor = connect(host="localhost", port=port).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")
assert str(e.value) == "Only coordinator can run SHOW REPLICATION CLUSTER."
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica")],
"port",
[7687, 7688, 7689],
)
def test_main_and_replicas_cannot_register_coord_server(port, role, connection):
cursor = connection(port, role).cursor()
def test_main_and_replicas_cannot_register_coord_server(port):
cursor = connect(host="localhost", port=port).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
cursor,