Remove cluster_initialized and client_initiated failover tests

This commit is contained in:
Andi Skrgat 2024-01-25 09:04:05 +01:00
parent c0f979fdcb
commit 303608982a
12 changed files with 36 additions and 214 deletions

View File

@ -68,9 +68,6 @@ CoordinatorData::CoordinatorData() {
case MAIN_ALIVE: case MAIN_ALIVE:
spdlog::warn("Failover aborted since main is alive!"); spdlog::warn("Failover aborted since main is alive!");
break; break;
case CLUSTER_UNINITIALIZED:
spdlog::warn("Failover aborted since cluster is uninitialized!");
break;
case RPC_FAILED: case RPC_FAILED:
spdlog::warn("Failover aborted since promoting replica to main failed!"); spdlog::warn("Failover aborted since promoting replica to main failed!");
break; break;
@ -85,18 +82,6 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
std::lock_guard<utils::RWLock> lock{coord_data_lock_}; std::lock_guard<utils::RWLock> lock{coord_data_lock_};
const auto main_instance = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
if (main_instance == registered_instances_.end()) {
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
}
if (main_instance->IsAlive()) {
return DoFailoverStatus::MAIN_ALIVE;
}
main_instance->client_.PauseFrequentCheck();
auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica); auto replica_instances = registered_instances_ | ranges::views::filter(&CoordinatorInstance::IsReplica);
auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive); auto chosen_replica_instance = std::ranges::find_if(replica_instances, &CoordinatorInstance::IsAlive);
@ -114,8 +99,8 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
}; };
auto not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); }; auto not_main = [](const CoordinatorInstance &instance) { return !instance.IsMain(); };
// Filter not current replicas and not MAIN instance
// TODO (antoniofilipovic): Should we send also data on old MAIN??? // TODO (antoniofilipovic): Should we send also data on old MAIN???
// TODO: (andi) Don't send replicas which aren't alive
for (const auto &unchosen_replica_instance : for (const auto &unchosen_replica_instance :
replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) { replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) {
repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo()); repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo());
@ -127,7 +112,6 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
} }
chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_); chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_);
main_instance->replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
return DoFailoverStatus::SUCCESS; return DoFailoverStatus::SUCCESS;
} }

View File

@ -9,18 +9,19 @@
// by the Apache License, Version 2.0, included in the file // by the Apache License, Version 2.0, included in the file
// licenses/APL.txt. // licenses/APL.txt.
#include "coordination/coordinator_state.hpp"
#include <algorithm>
#include "coordination/register_main_replica_coordinator_status.hpp"
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
#include "coordination/coordinator_state.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/coordinator_config.hpp" #include "coordination/coordinator_config.hpp"
#include "flags/replication.hpp" #include "flags/replication.hpp"
#include "spdlog/spdlog.h" #include "spdlog/spdlog.h"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
#include <algorithm>
namespace memgraph::coordination { namespace memgraph::coordination {
CoordinatorState::CoordinatorState() { CoordinatorState::CoordinatorState() {

View File

@ -16,6 +16,6 @@
#include <cstdint> #include <cstdint>
namespace memgraph::coordination { namespace memgraph::coordination {
enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, CLUSTER_UNINITIALIZED, RPC_FAILED }; enum class DoFailoverStatus : uint8_t { SUCCESS, ALL_REPLICAS_DOWN, MAIN_ALIVE, RPC_FAILED };
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -130,30 +130,13 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(DbmsHandler &dbms_handler,
} }
auto &instance_client_ref = *instance_client.GetValue(); auto &instance_client_ref = *instance_client.GetValue();
// TODO: (andi) Policy for register all databases
// Will be resolved after deciding about choosing new replica
const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler, instance_client_ref); const bool all_clients_good = memgraph::dbms::RegisterAllDatabasesClients(dbms_handler, instance_client_ref);
MG_ASSERT(all_clients_good, "Failed to register one or more databases to the REPLICA \"{}\".", config.name);
if (!all_clients_good) {
spdlog::error(
"Failed to register one or more databases to the REPLICA \"{}\". Failover aborted, instance will continue to "
"operate as replica.",
config.name);
dbms_handler.ForEach([&](DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); });
});
std::get<replication::RoleMainData>(dbms_handler.ReplicationState().ReplicationData())
.registered_replicas_.clear();
repl_state.SetReplicationRoleReplica(repl_server_config);
slk::Save(coordination::PromoteReplicaToMainRes{false}, res_builder);
return;
}
StartReplicaClient(dbms_handler, instance_client_ref); StartReplicaClient(dbms_handler, instance_client_ref);
}; }
slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder); slk::Save(coordination::PromoteReplicaToMainRes{true}, res_builder);
} }

View File

@ -528,6 +528,26 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
} }
} }
/// @throw QueryRuntimeException if an error ocurred.
void DoFailover() const override {
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
auto status = coordinator_handler_.DoFailover();
switch (status) {
using enum memgraph::coordination::DoFailoverStatus;
case ALL_REPLICAS_DOWN:
throw QueryRuntimeException("Failover aborted since all replicas are down!");
case MAIN_ALIVE:
throw QueryRuntimeException("Failover aborted since main is alive!");
case RPC_FAILED:
throw QueryRuntimeException("Failover aborted since promoting replica to main failed!");
case SUCCESS:
break;
}
}
#endif #endif
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE

View File

@ -67,6 +67,7 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
"now hold unique data. Please resolve data conflicts and start the " "now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.", "replication on a clean instance.",
client_.name_, client_.name_, client_.name_); client_.name_, client_.name_, client_.name_);
// TODO: (andi) Talk about renaming MAYBE_BEHIND to branching
// State not updated, hence in MAYBE_BEHIND state // State not updated, hence in MAYBE_BEHIND state
return; return;
} }

View File

@ -1,9 +1,8 @@
find_package(gflags REQUIRED) find_package(gflags REQUIRED)
copy_e2e_python_files(ha_experimental coordinator.py) copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental client_initiated_failover.py) copy_e2e_python_files(ha_experimental show_replication_cluster.py)
copy_e2e_python_files(ha_experimental automatic_failover.py) copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental uninitialized_cluster.py)
copy_e2e_python_files(ha_experimental common.py) copy_e2e_python_files(ha_experimental common.py)
copy_e2e_python_files(ha_experimental conftest.py) copy_e2e_python_files(ha_experimental conftest.py)
copy_e2e_python_files(ha_experimental workloads.yaml) copy_e2e_python_files(ha_experimental workloads.yaml)

View File

@ -76,7 +76,7 @@ def test_simple_automatic_failover(connection):
expected_data_on_coord = [ expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"), ("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"), ("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "replica"), ("instance_3", "127.0.0.1:10013", False, "main"),
] ]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)

View File

@ -83,16 +83,5 @@ def test_main_and_replicas_cannot_register_coord_server(port, role, connection):
assert str(e.value) == "Only coordinator can register coordinator server!" assert str(e.value) == "Only coordinator can register coordinator server!"
@pytest.mark.parametrize(
"port, role",
[(7687, "main"), (7688, "replica"), (7689, "replica")],
)
def test_main_and_replicas_cannot_run_do_failover(port, role, connection):
cursor = connection(port, role).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "DO FAILOVER;")
assert str(e.value) == "Only coordinator can run DO FAILOVER!"
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -101,107 +101,5 @@ def test_show_replication_cluster(connection):
mg_sleep_and_assert(expected_data, retrieve_data) mg_sleep_and_assert(expected_data, retrieve_data)
def test_simple_client_initiated_failover(connection):
# 1. Start all instances
# 2. Kill main
# 3. Run DO FAILOVER on COORDINATOR
# 4. Assert new config on coordinator by running show replication cluster
# 5. Assert replicas on new main
# 1.
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2.
main_cursor = connection(7687, "instance_3").cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
# 3.
execute_and_fetch_all(coord_cursor, "DO FAILOVER")
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
# 4.
new_main_cursor = connection(7688, "instance_1").cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
def test_failover_fails_all_replicas_down(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", False, "replica"),
("instance_2", "127.0.0.1:10012", False, "replica"),
("instance_3", "127.0.0.1:10013", False, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
# 4.
with pytest.raises(Exception) as e:
execute_and_fetch_all(coord_cursor, "DO FAILOVER;")
assert str(e.value) == "Failover aborted since all replicas are down!"
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
def test_failover_fails_main_is_alive(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_on_coord = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
with pytest.raises(Exception) as e:
execute_and_fetch_all(coord_cursor, "DO FAILOVER;")
assert str(e.value) == "Failover aborted since main is alive!"
mg_sleep_and_assert(expected_data_on_coord, retrieve_data)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,26 +0,0 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import execute_and_fetch_all
def test_failover_on_non_setup_cluster(connection):
cursor = connection(7690, "coordinator").cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(cursor, "DO FAILOVER;")
assert str(e.value) == "Failover aborted since cluster is uninitialized!"
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -24,28 +24,6 @@ ha_cluster: &ha_cluster
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012'", "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012'",
] ]
noninitialized_cluster: &noninitialized_cluster
cluster:
replica_1:
args: ["--bolt-port", "7688", "--log-level=TRACE", "--coordinator-server-port=10011"]
log_file: "replication-e2e-replica1.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"]
replica_2:
args: ["--bolt-port", "7689", "--log-level=TRACE", "--coordinator-server-port=10012"]
log_file: "replication-e2e-replica2.log"
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
main:
args: ["--bolt-port", "7687", "--log-level=TRACE", "--coordinator-server-port=10013"]
log_file: "replication-e2e-main.log"
setup_queries: [
"REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002'",
]
coordinator:
args: ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"]
log_file: "replication-e2e-coordinator.log"
setup_queries: []
workloads: workloads:
- name: "Coordinator" - name: "Coordinator"
@ -53,14 +31,9 @@ workloads:
args: ["high_availability_experimental/coordinator.py"] args: ["high_availability_experimental/coordinator.py"]
<<: *ha_cluster <<: *ha_cluster
- name: "Uninitialized cluster" - name: "Show replication cluster"
binary: "tests/e2e/pytest_runner.sh" binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/uninitialized_cluster.py"] args: ["high_availability_experimental/show_replication_cluster.py"]
<<: *noninitialized_cluster
- name: "Client initiated failover"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/client_initiated_failover.py"]
- name: "Automatic failover" - name: "Automatic failover"
binary: "tests/e2e/pytest_runner.sh" binary: "tests/e2e/pytest_runner.sh"