From 97b0a56b3e96ab8d00d965e37f84dd50ca44f58c Mon Sep 17 00:00:00 2001 From: antoniofilipovic Date: Mon, 4 Mar 2024 18:03:48 +0100 Subject: [PATCH] build test case which can fail --- src/coordination/coordinator_handlers.cpp | 2 +- src/coordination/coordinator_instance.cpp | 17 +- src/coordination/raft_state.cpp | 1 + .../high_availability/distributed_coords.py | 352 +++++++++++++++++- tests/e2e/interactive_mg_runner.py | 9 + 5 files changed, 376 insertions(+), 5 deletions(-) diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index 637360267..62fadd94a 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -140,7 +140,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa .port = repl_info_config.replication_port, }; }; - + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); // registering replicas for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) { auto instance_client = replication_handler.RegisterReplica(config); diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index afbcaa7d8..acc39dff5 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -66,16 +66,18 @@ CoordinatorInstance::CoordinatorInstance() }, [this]() { spdlog::info("Leader changed, stopping all replication instances!"); + auto lock = std::lock_guard{coord_instance_lock_}; // RAII repl_instances_.clear(); + spdlog::info("Leader changed, stopped all replication instances!"); })) { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::unique_lock{self->coord_instance_lock_}; + auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII auto &repl_instance = self->FindReplicationInstance(repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); }; client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::unique_lock{self->coord_instance_lock_}; + auto lock = std::lock_guard{self->coord_instance_lock_}; auto &repl_instance = self->FindReplicationInstance(repl_instance_name); std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); }; @@ -216,17 +218,26 @@ auto CoordinatorInstance::TryFailover() -> void { spdlog::warn("Failover failed since promoting replica to main failed!"); return; } - + spdlog::info("Leader changed, stopping all replication instances!"); + // TODO (antoniofilipovic) : This can fail and we don't know we change uuid if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { return; } auto const new_main_instance_name = new_main->InstanceName(); + // TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { return; } + // TODO: + // Set trying failover on instance (chosen instance) + // If all passes, all good + // if it fails, we need to check if main was promoted and we have correct new uuid + // PromoteToMain -> split on promote to main and enable writting -> solves issue that we have + // consistent state + spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); } diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index 365388b06..0210447a3 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -59,6 +59,7 @@ auto RaftState::InitRaftServer() -> void { raft_server::init_options init_opts; init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { + spdlog::info("Received some message, my id {}, leader id {}", param->myId, param->leaderId); if (event_type == cb_func::BecomeLeader) { spdlog::info("Node {} became leader", param->leaderId); become_leader_cb_(); diff --git a/tests/e2e/high_availability/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py index 9fa654d68..2f482b65b 100644 --- a/tests/e2e/high_availability/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -13,6 +13,7 @@ import os import shutil import sys import tempfile +from time import sleep import interactive_mg_runner import pytest @@ -122,9 +123,19 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { def test_distributed_automatic_failover(): + # Goal of this test is to check that coordination works if one instance fails + # with multiple coordinators + + # 1. Start all manually + # 2. Everything is set up on main + # 3. MAIN dies + # 4. New main elected + + # 1 safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + # 2 main_cursor = connect(host="localhost", port=7689).cursor() expected_data_on_main = [ ( @@ -148,8 +159,10 @@ def test_distributed_automatic_failover(): mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) + # 3 interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + # 4 coord_cursor = connect(host="localhost", port=7692).cursor() def retrieve_data_show_repl_cluster(): @@ -210,11 +223,21 @@ def test_distributed_automatic_failover(): def test_distributed_automatic_failover_after_coord_dies(): + # Goal of this test is to check if main and coordinator die at same time in the beginning that + # everything works fine + # 1. Start all manually + # 2. Coordinator dies + # 3. MAIN dies + # 4. safe_execute(shutil.rmtree, TEMP_DIR) + + # 1 interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + # 2 interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3") + # 3 interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") coord_cursor_1 = connect(host="localhost", port=7690).cursor() @@ -243,7 +266,7 @@ def test_distributed_automatic_failover_after_coord_dies(): ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), ("instance_1", "", "", "unknown", "main"), ("instance_2", "", "", "unknown", "replica"), - ("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. + ("instance_3", "", "", "unknown", "main"), ] mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) @@ -292,5 +315,332 @@ def test_distributed_automatic_failover_after_coord_dies(): mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) +@pytest.mark.parametrize("data_recovery", ["false"]) +def test_distributed_coordinators_work_partial_failover(data_recovery): + # Goal of this test is to check that correct MAIN instance is chosen + # if coordinator dies while failover is being done + # 1. We start all replicas, main and 4 coordinators manually + # 2. We check that main has correct state + # 3. Create initial data on MAIN + # 4. Expect data to be copied on all replicas + # 5. Kill MAIN: instance 3 + + # 6. Failover should succeed to instance 1 -> SHOW ROLE -> MAIN + # 7. Kill 2 coordinators (1 and 2) as soon as instance becomes MAIN + # 8. coord 4 become follower (2 coords dead, no leader) + # 8. instance_1 writes (MAIN, successfully) + # 9. Kill coordinator 4 + # 10. Connect to coordinator 3 + # 11. Start coordinator_1 and coordinator_2 (coord 3 probably leader) + # 12. Failover again + # 13. Main can't write to replicas anymore, coordinator can't progress + + temp_dir = tempfile.TemporaryDirectory().name + + MEMGRAPH_INNER_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "coordinator_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "data_directory": f"{temp_dir}/coordinator_1", + "setup_queries": [], + }, + "coordinator_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "data_directory": f"{temp_dir}/coordinator_2", + "setup_queries": [], + }, + "coordinator_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7692", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + ], + "log_file": "coordinator3.log", + "data_directory": f"{temp_dir}/coordinator_3", + "setup_queries": [], + }, + "coordinator_4": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7693", + "--log-level=TRACE", + "--raft-server-id=4", + "--raft-server-port=10114", + ], + "log_file": "coordinator4.log", + "data_directory": f"{temp_dir}/coordinator_4", + "setup_queries": [], + }, + } + + # 1 + + interactive_mg_runner.start_all_except(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, {"coordinator_4"}) + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4") + + coord_cursor = connect(host="localhost", port=7693).cursor() + + for query in [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111';", + "ADD COORDINATOR 2 ON '127.0.0.1:10112';", + "ADD COORDINATOR 3 ON '127.0.0.1:10113';", + ]: + sleep(1) + execute_and_fetch_all(coord_cursor, query) + + for query in [ + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "SET INSTANCE instance_3 TO MAIN;", + ]: + execute_and_fetch_all(coord_cursor, query) + + # 2 + + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ( + "instance_1", + "127.0.0.1:10001", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"behind": None, "status": "ready", "ts": 0}, + {"memgraph": {"behind": 0, "status": "ready", "ts": 0}}, + ), + ] + + main_cursor = connect(host="localhost", port=7687).cursor() + + def retrieve_data_show_replicas(): + return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + + mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) + + coord_cursor = connect(host="localhost", port=7693).cursor() + + def retrieve_data_show_instances_main_coord(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "up", "replica"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "up", "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_main_coord) + + # 3 + + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});") + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});") + + # 4 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 5 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + # 6 + + sleep(4.5) + max_tries = 100 + last_role = "replica" + while max_tries: + max_tries -= 1 + last_role = execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0][0] + if "main" == last_role: + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1") + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2") + break + sleep(0.1) + assert max_tries > 0 or last_role == "main" + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "unknown", "main"), # TODO although above we see it is MAIN + ("instance_2", "", "127.0.0.1:10012", "unknown", "replica"), + ("instance_3", "", "127.0.0.1:10013", "unknown", "main"), + ] + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 7 + def retrieve_role(): + return execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0] + + mg_sleep_and_assert("MAIN", retrieve_role) + + # 8 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + def get_vertex_count_instance_2(): + return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count_instance_2) + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + + # 9 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4") + + # 10 + + coord_3_cursor = connect(port=7692, host="localhost").cursor() + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "unknown", "replica"), # TODO: bug this is main + ("instance_2", "", "127.0.0.1:10012", "unknown", "replica"), + ("instance_3", "", "127.0.0.1:10013", "unknown", "replica"), + ] + + def retrieve_data_show_instances_new_leader(): + return sorted(list(execute_and_fetch_all(coord_3_cursor, "SHOW INSTANCES;"))) + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader) + + # 11 + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2") + + # 12 + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"), + ( + "instance_1", + "", + "127.0.0.1:10011", + "up", + "replica", + ), # TODO: bug this is main, this might crash instance, because we will maybe execute replica callback on MAIN, which won't work + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader) + + import time + + time.sleep(5) # failover + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:2});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + def get_vertex_count_instance_2(): + return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(4, get_vertex_count_instance_2) # MAIN will not be able to write to it anymore + + # 13 + + if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-k", "test_distributed_coordinators_work_partial_failover", "-vv"])) sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/interactive_mg_runner.py b/tests/e2e/interactive_mg_runner.py index efa4dc3d5..ef0ed5555 100755 --- a/tests/e2e/interactive_mg_runner.py +++ b/tests/e2e/interactive_mg_runner.py @@ -39,6 +39,7 @@ import tempfile import time from argparse import ArgumentParser from inspect import signature +from typing import List, Set import yaml @@ -214,6 +215,14 @@ def start_all(context, procdir="", keep_directories=True): start_instance(context, key, procdir) +def start_all_except(context, exceptions: Set[str], procdir="", keep_directories=True): + stop_all(keep_directories) + for key, _ in context.items(): + if key in exceptions: + continue + start_instance(context, key, procdir) + + def start_all_keep_others(context, procdir="", keep_directories=True): for key, _ in context.items(): start_instance(context, key, procdir)