build test case which can fail

This commit is contained in:
antoniofilipovic 2024-03-04 18:03:48 +01:00
parent 822183b62d
commit 97b0a56b3e
5 changed files with 376 additions and 5 deletions

View File

@ -140,7 +140,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
.port = repl_info_config.replication_port, .port = repl_info_config.replication_port,
}; };
}; };
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
// registering replicas // registering replicas
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) { for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
auto instance_client = replication_handler.RegisterReplica(config); auto instance_client = replication_handler.RegisterReplica(config);

View File

@ -66,16 +66,18 @@ CoordinatorInstance::CoordinatorInstance()
}, },
[this]() { [this]() {
spdlog::info("Leader changed, stopping all replication instances!"); spdlog::info("Leader changed, stopping all replication instances!");
auto lock = std::lock_guard{coord_instance_lock_}; // RAII
repl_instances_.clear(); repl_instances_.clear();
spdlog::info("Leader changed, stopped all replication instances!");
})) { })) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_}; auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
}; };
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_}; auto lock = std::lock_guard{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
}; };
@ -216,17 +218,26 @@ auto CoordinatorInstance::TryFailover() -> void {
spdlog::warn("Failover failed since promoting replica to main failed!"); spdlog::warn("Failover failed since promoting replica to main failed!");
return; return;
} }
spdlog::info("Leader changed, stopping all replication instances!");
// TODO (antoniofilipovic) : This can fail and we don't know we change uuid
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
return; return;
} }
auto const new_main_instance_name = new_main->InstanceName(); auto const new_main_instance_name = new_main->InstanceName();
// TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
return; return;
} }
// TODO:
// Set trying failover on instance (chosen instance)
// If all passes, all good
// if it fails, we need to check if main was promoted and we have correct new uuid
// PromoteToMain -> split on promote to main and enable writting -> solves issue that we have
// consistent state
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
} }

View File

@ -59,6 +59,7 @@ auto RaftState::InitRaftServer() -> void {
raft_server::init_options init_opts; raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
spdlog::info("Received some message, my id {}, leader id {}", param->myId, param->leaderId);
if (event_type == cb_func::BecomeLeader) { if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId); spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_(); become_leader_cb_();

View File

@ -13,6 +13,7 @@ import os
import shutil import shutil
import sys import sys
import tempfile import tempfile
from time import sleep
import interactive_mg_runner import interactive_mg_runner
import pytest import pytest
@ -122,9 +123,19 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
def test_distributed_automatic_failover(): def test_distributed_automatic_failover():
# Goal of this test is to check that coordination works if one instance fails
# with multiple coordinators
# 1. Start all manually
# 2. Everything is set up on main
# 3. MAIN dies
# 4. New main elected
# 1
safe_execute(shutil.rmtree, TEMP_DIR) safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7689).cursor() main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [ expected_data_on_main = [
( (
@ -148,8 +159,10 @@ def test_distributed_automatic_failover():
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# 4
coord_cursor = connect(host="localhost", port=7692).cursor() coord_cursor = connect(host="localhost", port=7692).cursor()
def retrieve_data_show_repl_cluster(): def retrieve_data_show_repl_cluster():
@ -210,11 +223,21 @@ def test_distributed_automatic_failover():
def test_distributed_automatic_failover_after_coord_dies(): def test_distributed_automatic_failover_after_coord_dies():
# Goal of this test is to check if main and coordinator die at same time in the beginning that
# everything works fine
# 1. Start all manually
# 2. Coordinator dies
# 3. MAIN dies
# 4.
safe_execute(shutil.rmtree, TEMP_DIR) safe_execute(shutil.rmtree, TEMP_DIR)
# 1
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor() coord_cursor_1 = connect(host="localhost", port=7690).cursor()
@ -243,7 +266,7 @@ def test_distributed_automatic_failover_after_coord_dies():
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"), ("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"), ("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. ("instance_3", "", "", "unknown", "main"),
] ]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
@ -292,5 +315,332 @@ def test_distributed_automatic_failover_after_coord_dies():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
@pytest.mark.parametrize("data_recovery", ["false"])
def test_distributed_coordinators_work_partial_failover(data_recovery):
# Goal of this test is to check that correct MAIN instance is chosen
# if coordinator dies while failover is being done
# 1. We start all replicas, main and 4 coordinators manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
# 5. Kill MAIN: instance 3
# 6. Failover should succeed to instance 1 -> SHOW ROLE -> MAIN
# 7. Kill 2 coordinators (1 and 2) as soon as instance becomes MAIN
# 8. coord 4 become follower (2 coords dead, no leader)
# 8. instance_1 writes (MAIN, successfully)
# 9. Kill coordinator 4
# 10. Connect to coordinator 3
# 11. Start coordinator_1 and coordinator_2 (coord 3 probably leader)
# 12. Failover again
# 13. Main can't write to replicas anymore, coordinator can't progress
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"data_directory": f"{temp_dir}/coordinator_1",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"data_directory": f"{temp_dir}/coordinator_2",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"data_directory": f"{temp_dir}/coordinator_3",
"setup_queries": [],
},
"coordinator_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7693",
"--log-level=TRACE",
"--raft-server-id=4",
"--raft-server-port=10114",
],
"log_file": "coordinator4.log",
"data_directory": f"{temp_dir}/coordinator_4",
"setup_queries": [],
},
}
# 1
interactive_mg_runner.start_all_except(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, {"coordinator_4"})
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
coord_cursor = connect(host="localhost", port=7693).cursor()
for query in [
"ADD COORDINATOR 1 ON '127.0.0.1:10111';",
"ADD COORDINATOR 2 ON '127.0.0.1:10112';",
"ADD COORDINATOR 3 ON '127.0.0.1:10113';",
]:
sleep(1)
execute_and_fetch_all(coord_cursor, query)
for query in [
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"SET INSTANCE instance_3 TO MAIN;",
]:
execute_and_fetch_all(coord_cursor, query)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
coord_cursor = connect(host="localhost", port=7693).cursor()
def retrieve_data_show_instances_main_coord():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_main_coord)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});")
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 6
sleep(4.5)
max_tries = 100
last_role = "replica"
while max_tries:
max_tries -= 1
last_role = execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0][0]
if "main" == last_role:
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
break
sleep(0.1)
assert max_tries > 0 or last_role == "main"
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "unknown", "main"), # TODO although above we see it is MAIN
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
("instance_3", "", "127.0.0.1:10013", "unknown", "main"),
]
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 7
def retrieve_role():
return execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0]
mg_sleep_and_assert("MAIN", retrieve_role)
# 8
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count_instance_2():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count_instance_2)
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
# 10
coord_3_cursor = connect(port=7692, host="localhost").cursor()
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "unknown", "replica"), # TODO: bug this is main
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
("instance_3", "", "127.0.0.1:10013", "unknown", "replica"),
]
def retrieve_data_show_instances_new_leader():
return sorted(list(execute_and_fetch_all(coord_3_cursor, "SHOW INSTANCES;")))
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
# 11
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
# 12
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
(
"instance_1",
"",
"127.0.0.1:10011",
"up",
"replica",
), # TODO: bug this is main, this might crash instance, because we will maybe execute replica callback on MAIN, which won't work
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
import time
time.sleep(5) # failover
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:2});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count_instance_2():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(4, get_vertex_count_instance_2) # MAIN will not be able to write to it anymore
# 13
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-k", "test_distributed_coordinators_work_partial_failover", "-vv"]))
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -39,6 +39,7 @@ import tempfile
import time import time
from argparse import ArgumentParser from argparse import ArgumentParser
from inspect import signature from inspect import signature
from typing import List, Set
import yaml import yaml
@ -214,6 +215,14 @@ def start_all(context, procdir="", keep_directories=True):
start_instance(context, key, procdir) start_instance(context, key, procdir)
def start_all_except(context, exceptions: Set[str], procdir="", keep_directories=True):
stop_all(keep_directories)
for key, _ in context.items():
if key in exceptions:
continue
start_instance(context, key, procdir)
def start_all_keep_others(context, procdir="", keep_directories=True): def start_all_keep_others(context, procdir="", keep_directories=True):
for key, _ in context.items(): for key, _ in context.items():
start_instance(context, key, procdir) start_instance(context, key, procdir)