Improve restoration process, WIP

This commit is contained in:
Andi Skrgat 2024-01-29 10:36:30 +01:00
parent d0cb85e642
commit aac169c5b3
12 changed files with 277 additions and 234 deletions

View File

@ -45,7 +45,7 @@ void CoordinatorClient::StartFrequentCheck() {
"Health check frequency must be greater than 0");
instance_checker_.Run(
"Coord checker", config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
config_.instance_name, config_.health_check_frequency_sec, [this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
rpc_client_.Endpoint().SocketAddress());
@ -63,18 +63,15 @@ void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); }
auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); }
auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
succ_cb_ = std::move(succ_cb);
fail_cb_ = std::move(fail_cb);
}
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return config_.replication_client_info;
}
auto CoordinatorClient::ResetReplicationClientInfo() -> void {
// TODO (antoniofilipovic) Sync with Andi on this one
// config_.replication_client_info.reset();
}
auto CoordinatorClient::SendPromoteReplicaToMainRpc(
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {
try {
@ -90,17 +87,18 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(
return false;
}
auto CoordinatorClient::SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool {
auto CoordinatorClient::DemoteToReplica() const -> bool {
const auto instance_name = config_.instance_name;
try {
auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(std::move(replication_client_info))};
auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(config_.replication_client_info)};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to set main to replica!");
spdlog::error("Failed to receive successful RPC response for setting instance {} to replica!", instance_name);
return false;
}
spdlog::info("Sent request RPC from coordinator to instance to set it as replica!");
return true;
} catch (const rpc::RpcFailedException &) {
spdlog::error("Failed to send failover RPC from coordinator to new main!");
spdlog::error("Failed to set instance {} to replica!", instance_name);
}
return false;
}

View File

@ -35,36 +35,42 @@ CoordinatorData::CoordinatorData() {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.UpdateLastResponseTime();
instance.UpdateAliveStatus();
instance.OnSuccessPing();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.UpdateAliveStatus();
instance.OnFailPing();
};
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
bool const failover_performed = coord_data->ClusterHasAliveMain();
auto const new_role = failover_performed ? replication_coordination_glue::ReplicationRole::REPLICA
: replication_coordination_glue::ReplicationRole::MAIN;
auto &instance = find_instance(coord_data, instance_name);
instance.SetReplicationRole(new_role);
instance.UpdateLastResponseTime();
instance.UpdateAliveStatus();
if (!instance.IsAlive()) {
auto const new_role = coord_data->ClusterHasAliveMain() ? replication_coordination_glue::ReplicationRole::REPLICA
: replication_coordination_glue::ReplicationRole::MAIN;
if (new_role == replication_coordination_glue::ReplicationRole::REPLICA) {
auto const status = instance.DemoteToReplica(coord_data->replica_succ_cb_, coord_data->replica_fail_cb_);
if (!status) {
spdlog::error("Instance {} failed to demote to replica", instance_name);
} else {
spdlog::info("Instance {} demoted to replica", instance_name);
}
}
}
instance.OnSuccessPing();
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
instance.UpdateAliveStatus();
instance.OnFailPing();
if (!ClusterHasAliveMain()) {
spdlog::info("Cluster without main instance, starting automatic failover");
@ -92,8 +98,6 @@ auto CoordinatorData::ClusterHasAliveMain() const -> bool {
}
auto CoordinatorData::DoFailover() -> DoFailoverStatus {
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive);
@ -101,9 +105,10 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
return DoFailoverStatus::ALL_REPLICAS_DOWN;
}
chosen_replica_instance->PrepareForFailover();
chosen_replica_instance->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&chosen_replica_instance] { chosen_replica_instance->ResumeFrequentCheck(); }};
std::vector<ReplicationClientInfo> repl_clients_info;
std::vector<ReplClientInfo> repl_clients_info;
repl_clients_info.reserve(std::ranges::distance(replica_instances));
auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
@ -114,13 +119,9 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
if (!chosen_replica_instance->SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
chosen_replica_instance->RestoreAfterFailedFailover();
if (!chosen_replica_instance->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return DoFailoverStatus::RPC_FAILED;
}
chosen_replica_instance->PromoteToMain(main_succ_cb_, main_fail_cb_);
return DoFailoverStatus::SUCCESS;
}
@ -129,7 +130,7 @@ auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceSt
instances_status.reserve(registered_instances_.size());
auto const stringify_repl_role = [](const CoordinatorInstance &instance) -> std::string {
if (!instance.IsAlive()) return "";
if (!instance.IsAlive()) return "unknown";
if (instance.IsMain()) return "main";
return "replica";
};
@ -159,11 +160,13 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
auto new_main = std::ranges::find_if(registered_instances_, is_new_main);
if (new_main == registered_instances_.end()) {
spdlog::error("You didn't register instance with given name {}", instance_name);
spdlog::error("Instance {} not registered. Please register it using REGISTER INSTANCE {}", instance_name,
instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
@ -175,15 +178,11 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto const result = new_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
new_main->ResumeFrequentCheck();
if (!new_main->PromoteToMain(std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
new_main->PromoteToMain(main_succ_cb_, main_fail_cb_);
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
@ -196,23 +195,18 @@ auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> Regist
}
if (std::ranges::any_of(registered_instances_, [&config](const CoordinatorInstance &instance) {
spdlog::trace("Comparing {} with {}", instance.SocketAddress(), config.SocketAddress());
return instance.SocketAddress() == config.SocketAddress();
})) {
return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS;
}
ReplClientInfo replication_client_info_copy = config.replication_client_info;
try {
registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
return RegisterInstanceCoordinatorStatus::SUCCESS;
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA);
if (auto const res = instance->SendSetToReplicaRpc(replication_client_info_copy); !res) {
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
instance->StartFrequentCheck();
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
} // namespace memgraph::coordination

View File

@ -15,22 +15,33 @@
namespace memgraph::coordination {
auto CoordinatorInstance::UpdateAliveStatus() -> bool {
CoordinatorInstance::CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
is_alive_(true) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
auto CoordinatorInstance::OnSuccessPing() -> void {
last_response_time_ = std::chrono::system_clock::now();
is_alive_ = true;
}
auto CoordinatorInstance::OnFailPing() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto CoordinatorInstance::UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; }
auto CoordinatorInstance::SetReplicationRole(replication_coordination_glue::ReplicationRole role) -> void {
replication_role_ = role;
}
auto CoordinatorInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
@ -38,40 +49,36 @@ auto CoordinatorInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto CoordinatorInstance::PrepareForFailover() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
return false;
}
auto CoordinatorInstance::PromoteToMain(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void {
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetSuccCallback(std::move(main_succ_cb));
client_.SetFailCallback(std::move(main_fail_cb));
client_.ResumeFrequentCheck();
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
return true;
}
auto CoordinatorInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
return true;
}
auto CoordinatorInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::SetSuccCallback(HealthCheckCallback succ_cb) -> void {
client_.SetSuccCallback(std::move(succ_cb));
}
auto CoordinatorInstance::SetFailCallback(HealthCheckCallback fail_cb) -> void {
client_.SetFailCallback(std::move(fail_cb));
}
auto CoordinatorInstance::ResetReplicationClientInfo() -> void { client_.ResetReplicationClientInfo(); }
auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
auto CoordinatorInstance::SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool {
return client_.SendPromoteReplicaToMainRpc(std::move(replication_clients_info));
}
auto CoordinatorInstance::SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool {
return client_.SendSetToReplicaRpc(std::move(replication_client_info));
}
} // namespace memgraph::coordination
#endif

View File

@ -21,7 +21,6 @@ namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
@ -45,15 +44,12 @@ class CoordinatorClient {
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
[[nodiscard]] auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
[[nodiscard]] auto DemoteToReplica() const -> bool;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto ResetReplicationClientInfo() -> void;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {
return first.config_ == second.config_;

View File

@ -32,9 +32,7 @@ struct CoordinatorClientConfig {
auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); }
// Info which coordinator will send to new main when performing failover
struct ReplicationClientInfo {
// Must be the same as CoordinatorClientConfig's instance_name
std::string instance_name;
replication_coordination_glue::ReplicationMode replication_mode{};
std::string replication_ip_address;
@ -43,7 +41,6 @@ struct CoordinatorClientConfig {
friend bool operator==(ReplicationClientInfo const &, ReplicationClientInfo const &) = default;
};
// Each instance has replication config in case it fails
ReplicationClientInfo replication_client_info;
struct SSL {
@ -58,6 +55,8 @@ struct CoordinatorClientConfig {
friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default;
};
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
struct CoordinatorServerConfig {
std::string ip_address;
uint16_t port{};

View File

@ -16,16 +16,16 @@
#include "utils/exceptions.hpp"
namespace memgraph::coordination {
class CoordinatorFailoverException final : public utils::BasicException {
class CoordinatorRegisterInstanceException final : public utils::BasicException {
public:
explicit CoordinatorFailoverException(const std::string_view what) noexcept
: BasicException("Failover didn't complete successfully: " + std::string(what)) {}
explicit CoordinatorRegisterInstanceException(const std::string_view what) noexcept
: BasicException("Failed to create instance: " + std::string(what)) {}
template <class... Args>
explicit CoordinatorFailoverException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: CoordinatorFailoverException(fmt::format(fmt, std::forward<Args>(args)...)) {}
explicit CoordinatorRegisterInstanceException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: CoordinatorRegisterInstanceException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorFailoverException)
SPECIALIZE_GET_EXCEPTION_NAME(CoordinatorRegisterInstanceException)
};
} // namespace memgraph::coordination

View File

@ -15,6 +15,7 @@
#include "coordination/coordinator_client.hpp"
#include "coordination/coordinator_cluster_config.hpp"
#include "coordination/coordinator_exceptions.hpp"
#include "replication_coordination_glue/role.hpp"
namespace memgraph::coordination {
@ -24,10 +25,7 @@ class CoordinatorData;
class CoordinatorInstance {
public:
CoordinatorInstance(CoordinatorData *data, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb, replication_coordination_glue::ReplicationRole replication_role)
: client_(data, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_role),
is_alive_(true) {}
HealthCheckCallback fail_cb);
CoordinatorInstance(CoordinatorInstance const &other) = delete;
CoordinatorInstance &operator=(CoordinatorInstance const &other) = delete;
@ -35,34 +33,26 @@ class CoordinatorInstance {
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto UpdateAliveStatus() -> bool;
auto UpdateLastResponseTime() -> void;
auto OnSuccessPing() -> void;
auto OnFailPing() -> bool;
auto IsAlive() const -> bool;
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto SetReplicationRole(replication_coordination_glue::ReplicationRole role) -> void;
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PrepareForFailover() -> void;
auto RestoreAfterFailedFailover() -> void;
auto PromoteToMain(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void;
auto PromoteToMain(ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto StartFrequentCheck() -> void;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ResetReplicationClientInfo() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void;
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;

View File

@ -20,7 +20,6 @@ namespace memgraph::coordination {
enum class RegisterInstanceCoordinatorStatus : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
COULD_NOT_BE_PERSISTED,
NOT_COORDINATOR,
RPC_FAILED,
SUCCESS

View File

@ -43,7 +43,7 @@ void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk
slk::Builder *res_builder) {
auto &repl_state = dbms_handler.ReplicationState();
if (!repl_state.IsMain()) {
if (repl_state.IsReplica()) {
spdlog::error("Setting to replica must be performed on main.");
slk::Save(coordination::SetMainToReplicaRes{false}, res_builder);
return;
@ -52,8 +52,9 @@ void CoordinatorHandlers::SetMainToReplicaHandler(DbmsHandler &dbms_handler, slk
coordination::SetMainToReplicaReq req;
slk::Load(&req, req_reader);
replication::ReplicationServerConfig clients_config{.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
const replication::ReplicationServerConfig clients_config{
.ip_address = req.replication_client_info.replication_ip_address,
.port = req.replication_client_info.replication_port};
if (bool success = memgraph::dbms::SetReplicationRoleReplica(dbms_handler, clients_config); !success) {
spdlog::error("Setting main to replica failed!");

View File

@ -500,8 +500,6 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
case END_POINT_EXISTS:
throw QueryRuntimeException(
"Couldn't register replica instance since instance with such endpoint already exists!");
case COULD_NOT_BE_PERSISTED:
throw QueryRuntimeException("Couldn't register replica instance since it couldn't be persisted!");
case NOT_COORDINATOR:
throw QueryRuntimeException("Couldn't register replica instance since this instance is not a coordinator!");
case RPC_FAILED:

View File

@ -32,7 +32,7 @@ namespace memgraph::utils {
* void long_function() {
* resource.enable();
* OnScopeExit on_exit([&resource] { resource.disable(); });
* // long block of code, might trow an exception
* // long block of code, might throw an exception
* }
*/
template <typename Callable>

View File

@ -26,17 +26,38 @@ interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactiv
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"],
"args": [
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "replica1.log",
"setup_queries": [],
},
"instance_2": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"],
"args": [
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "replica2.log",
"setup_queries": [],
},
"instance_3": {
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
"args": [
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "main.log",
"setup_queries": [],
},
@ -54,49 +75,53 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
def test_show_replication_cluster(connection):
# Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that all replicas and main have the correct state: they should all be alive.
# 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 1.
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
instance1_cursor = connection(7688, "replica").cursor()
instance2_cursor = connection(7689, "replica").cursor()
instance3_cursor = connection(7687, "main").cursor()
coord_cursor = connection(7690, "coordinator").cursor()
# 2.
# We leave some time for the coordinator to realise the replicas are down.
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
def show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
def retrieve_data_show_repl_role_instance1():
return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance2():
return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
def retrieve_data_show_repl_role_instance3():
return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
# 3.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
# 4.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", False, ""),
("instance_1", "127.0.0.1:10011", False, "unknown"),
("instance_2", "127.0.0.1:10012", False, "unknown"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
mg_sleep_and_assert(expected_data, show_repl_cluster)
def test_simple_automatic_failover(connection):
@ -120,11 +145,11 @@ def test_simple_automatic_failover(connection):
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
("instance_3", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
new_main_cursor = connection(7688, "main").cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
@ -162,100 +187,136 @@ def test_registering_replica_fails_endpoint_exists(connection):
)
def test_replica_instance_restarts(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, retrieve_data)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_down, retrieve_data)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
mg_sleep_and_assert(expected_data_up, retrieve_data)
# def test_replica_instance_restarts(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# cursor = connection(7690, "coordinator").cursor()
#
# def show_repl_cluster():
# return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
#
# expected_data_up = [
# ("instance_1", "127.0.0.1:10011", True, "replica"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data_up, show_repl_cluster)
#
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
#
# expected_data_down = [
# ("instance_1", "127.0.0.1:10011", False, "unknown"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data_down, show_repl_cluster)
#
# interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
#
# mg_sleep_and_assert(expected_data_up, show_repl_cluster)
#
# instance1_cursor = connection(7688, "replica").cursor()
#
# def retrieve_data_show_repl_role_instance1():
# return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
#
# expected_data_replica = [("replica",)]
# mg_sleep_and_assert(expected_data_replica, retrieve_data_show_repl_role_instance1)
def test_automatic_failover_main_back_as_replica(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_after_failover = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
]
mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster)
expected_data_after_main_coming_back = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "replica"),
]
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
mg_sleep_and_assert(expected_data_after_main_coming_back, retrieve_data_show_repl_cluster)
# def test_automatic_failover_main_back_as_replica(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
#
# coord_cursor = connection(7690, "coordinator").cursor()
#
# def retrieve_data_show_repl_cluster():
# return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
#
# expected_data_after_failover = [
# ("instance_1", "127.0.0.1:10011", True, "main"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", False, "unknown"),
# ]
# mg_sleep_and_assert(expected_data_after_failover, retrieve_data_show_repl_cluster)
#
# expected_data_after_main_coming_back = [
# ("instance_1", "127.0.0.1:10011", True, "main"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "replica"),
# ]
#
# interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# mg_sleep_and_assert(expected_data_after_main_coming_back, retrieve_data_show_repl_cluster)
#
# instance3_cursor = connection(7687, "replica").cursor()
#
# def retrieve_data_show_repl_role_instance3():
# return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
#
# mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance3)
def test_automatic_failover_main_back_as_main(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# def test_automatic_failover_main_back_as_main(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
#
# instance1_cursor = connection(7688, "instance1").cursor()
# instance2_cursor = connection(7689, "instance2").cursor()
# instance3_cursor = connection(7687, "instance3").cursor()
# coord_cursor = connection(7690, "coordinator").cursor()
#
# def retrieve_data_show_repl_cluster():
# return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
#
# def retrieve_data_show_repl_role_instance1():
# return sorted(list(execute_and_fetch_all(instance1_cursor, "SHOW REPLICATION ROLE;")))
#
# def retrieve_data_show_repl_role_instance2():
# return sorted(list(execute_and_fetch_all(instance2_cursor, "SHOW REPLICATION ROLE;")))
#
# def retrieve_data_show_repl_role_instance3():
# return sorted(list(execute_and_fetch_all(instance3_cursor, "SHOW REPLICATION ROLE;")))
#
# expected_data_all_down = [
# ("instance_1", "127.0.0.1:10011", False, "unknown"),
# ("instance_2", "127.0.0.1:10012", False, "unknown"),
# ("instance_3", "127.0.0.1:10013", False, "unknown"),
# ]
#
# mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster)
#
# interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# expected_data_main_back = [
# ("instance_1", "127.0.0.1:10011", False, "unknown"),
# ("instance_2", "127.0.0.1:10012", False, "unknown"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster)
#
# mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
#
# interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
# interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
#
# expected_data_replicas_back = [
# ("instance_1", "127.0.0.1:10011", True, "replica"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
#
# mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster)
#
# mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance1)
# mg_sleep_and_assert([("replica",)], retrieve_data_show_repl_role_instance2)
# mg_sleep_and_assert([("main",)], retrieve_data_show_repl_role_instance3)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_all_down = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", False, ""),
("instance_3", "127.0.0.1:10013", False, ""),
]
mg_sleep_and_assert(expected_data_all_down, retrieve_data_show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_main_back = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", False, ""),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_main_back, retrieve_data_show_repl_cluster)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data_replicas_back = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_replicas_back, retrieve_data_show_repl_cluster)
# TODO: (andi) Remove connection fixture, doesn't make sense anymore
if __name__ == "__main__":