Support main restart

This commit is contained in:
Andi Skrgat 2024-01-26 12:12:34 +01:00
parent 34a7fed59a
commit d0cb85e642
5 changed files with 163 additions and 159 deletions

View File

@ -35,34 +35,39 @@ CoordinatorData::CoordinatorData() {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica successful callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateLastResponseTime();
instance.UpdateInstanceStatus();
instance.UpdateAliveStatus();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing replica failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateInstanceStatus();
instance.UpdateAliveStatus();
};
main_succ_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main successful callback", instance_name);
bool const failover_performed = coord_data->ClusterHasAliveMain();
auto const new_role = failover_performed ? replication_coordination_glue::ReplicationRole::REPLICA
: replication_coordination_glue::ReplicationRole::MAIN;
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
instance.SetReplicationRole(new_role);
instance.UpdateLastResponseTime();
instance.UpdateAliveStatus();
};
main_fail_cb_ = [this, find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
auto lock = std::lock_guard{coord_data->coord_data_lock_};
spdlog::trace("Instance {} performing main failure callback", instance_name);
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsMain(), "Instance {} is not a main!", instance_name);
if (bool main_alive = instance.UpdateInstanceStatus(); !main_alive) {
spdlog::info("Main instance {} is not alive, starting automatic failover", instance_name);
instance.UpdateAliveStatus();
if (!ClusterHasAliveMain()) {
spdlog::info("Cluster without main instance, starting automatic failover");
switch (auto failover_status = DoFailover(); failover_status) {
using enum DoFailoverStatus;
case ALL_REPLICAS_DOWN:
@ -81,6 +86,11 @@ CoordinatorData::CoordinatorData() {
};
}
auto CoordinatorData::ClusterHasAliveMain() const -> bool {
auto const alive_main = [](const CoordinatorInstance &instance) { return instance.IsMain() && instance.IsAlive(); };
return std::ranges::any_of(registered_instances_, alive_main);
}
auto CoordinatorData::DoFailover() -> DoFailoverStatus {
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
@ -99,24 +109,16 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
auto const not_chosen_replica_instance = [&chosen_replica_instance](const CoordinatorInstance &instance) {
return instance != *chosen_replica_instance;
};
auto const not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
// TODO (antoniofilipovic): Should we send also data on old MAIN???
// TODO: (andi) Don't send replicas which aren't alive
for (const auto &unchosen_replica_instance :
replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) {
repl_clients_info.emplace_back(unchosen_replica_instance.ReplicationClientInfo());
}
std::ranges::transform(replica_instances | ranges::views::filter(not_chosen_replica_instance),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
if (!chosen_replica_instance->SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
chosen_replica_instance->RestoreAfterFailedFailover();
return DoFailoverStatus::RPC_FAILED;
}
auto old_main = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
// TODO: (andi) For performing restoration we will have to improve this
old_main->PauseFrequentCheck();
chosen_replica_instance->PromoteToMain(main_succ_cb_, main_fail_cb_);
return DoFailoverStatus::SUCCESS;
@ -175,7 +177,7 @@ auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanc
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto result = new_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
if (auto const result = new_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
new_main->ResumeFrequentCheck();
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
@ -204,7 +206,7 @@ auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> Regist
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA);
if (auto res = instance->SendSetToReplicaRpc(replication_client_info_copy); !res) {
if (auto const res = instance->SendSetToReplicaRpc(replication_client_info_copy); !res) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}

View File

@ -15,7 +15,7 @@
namespace memgraph::coordination {
auto CoordinatorInstance::UpdateInstanceStatus() -> bool {
auto CoordinatorInstance::UpdateAliveStatus() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;

View File

@ -35,6 +35,8 @@ class CoordinatorData {
auto ShowInstances() const -> std::vector<CoordinatorInstanceStatus>;
private:
auto ClusterHasAliveMain() const -> bool;
mutable utils::RWLock coord_data_lock_{utils::RWLock::Priority::READ};
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
// Must be std::list because we rely on pointer stability

View File

@ -35,7 +35,7 @@ class CoordinatorInstance {
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto UpdateInstanceStatus() -> bool;
auto UpdateAliveStatus() -> bool;
auto UpdateLastResponseTime() -> void;
auto IsAlive() const -> bool;

View File

@ -53,142 +53,142 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
}
# def test_show_replication_cluster(connection):
# # Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# # 2. We check that all replicas and main have the correct state: they should all be alive.
# # 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# # 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
#
# # 1.
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# cursor = connection(7690, "coordinator").cursor()
#
# # 2.
#
# # We leave some time for the coordinator to realise the replicas are down.
# def retrieve_data():
# return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
#
# expected_data = [
# ("instance_1", "127.0.0.1:10011", True, "replica"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data, retrieve_data)
#
# # 3.
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
#
# expected_data = [
# ("instance_1", "127.0.0.1:10011", False, ""),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data, retrieve_data)
#
# # 4.
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
#
# expected_data = [
# ("instance_1", "127.0.0.1:10011", False, ""),
# ("instance_2", "127.0.0.1:10012", False, ""),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data, retrieve_data)
#
#
# def test_simple_automatic_failover(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# main_cursor = connection(7687, "instance_3").cursor()
# expected_data_on_main = [
# ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
# ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
# ]
# actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
# assert actual_data_on_main == expected_data_on_main
#
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
#
# coord_cursor = connection(7690, "coordinator").cursor()
#
# def retrieve_data_show_repl_cluster():
# return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
#
# expected_data_on_coord = [
# ("instance_1", "127.0.0.1:10011", True, "main"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", False, ""),
# ]
# mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
#
# new_main_cursor = connection(7688, "instance_1").cursor()
#
# def retrieve_data_show_replicas():
# return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
#
# expected_data_on_new_main = [
# ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
# ]
# mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
#
#
# def test_registering_replica_fails_name_exists(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# coord_cursor = connection(7690, "coordinator").cursor()
# with pytest.raises(Exception) as e:
# execute_and_fetch_all(
# coord_cursor,
# "REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';",
# )
# assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!"
#
#
# def test_registering_replica_fails_endpoint_exists(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# coord_cursor = connection(7690, "coordinator").cursor()
# with pytest.raises(Exception) as e:
# execute_and_fetch_all(
# coord_cursor,
# "REGISTER INSTANCE instance_5 ON '127.0.0.1:10001' WITH '127.0.0.1:10013';",
# )
# assert (
# str(e.value)
# == "Couldn't register replica because promotion on replica failed! Check logs on replica to find out more info!"
# )
#
#
# def test_replica_instance_restarts(connection):
# interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
#
# cursor = connection(7690, "coordinator").cursor()
#
# def retrieve_data():
# return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
#
# expected_data_up = [
# ("instance_1", "127.0.0.1:10011", True, "replica"),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data_up, retrieve_data)
#
# interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
#
# expected_data_down = [
# ("instance_1", "127.0.0.1:10011", False, ""),
# ("instance_2", "127.0.0.1:10012", True, "replica"),
# ("instance_3", "127.0.0.1:10013", True, "main"),
# ]
# mg_sleep_and_assert(expected_data_down, retrieve_data)
#
# interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
#
# mg_sleep_and_assert(expected_data_up, retrieve_data)
def test_show_replication_cluster(connection):
# Goal of this test is to check the SHOW REPLICATION CLUSTER command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that all replicas and main have the correct state: they should all be alive.
# 3. We kill one replica. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 4. We kill main. It should not appear anymore in the SHOW REPLICATION CLUSTER command.
# 1.
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
# 2.
# We leave some time for the coordinator to realise the replicas are down.
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
# 3.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
# 4.
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
expected_data = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", False, ""),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data, retrieve_data)
def test_simple_automatic_failover(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connection(7687, "instance_3").cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, ""),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
def test_registering_replica_fails_name_exists(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10051' WITH '127.0.0.1:10111';",
)
assert str(e.value) == "Couldn't register replica instance since instance with such name already exists!"
def test_registering_replica_fails_endpoint_exists(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(
coord_cursor,
"REGISTER INSTANCE instance_5 ON '127.0.0.1:10001' WITH '127.0.0.1:10013';",
)
assert (
str(e.value)
== "Couldn't register replica because promotion on replica failed! Check logs on replica to find out more info!"
)
def test_replica_instance_restarts(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, retrieve_data)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_down, retrieve_data)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
mg_sleep_and_assert(expected_data_up, retrieve_data)
def test_automatic_failover_main_back_as_replica(connection):