diff --git a/tests/e2e/high_availability/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py index 3b0964111..7dc3ef238 100644 --- a/tests/e2e/high_availability/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -175,6 +175,7 @@ def get_instances_description_no_setup(): "--raft-server-port=10111", ], "log_file": "coordinator1.log", + "data_directory": f"{TEMP_DIR}/coordinator_1", "setup_queries": [], }, "coordinator_2": { @@ -187,6 +188,7 @@ def get_instances_description_no_setup(): "--raft-server-port=10112", ], "log_file": "coordinator2.log", + "data_directory": f"{TEMP_DIR}/coordinator_2", "setup_queries": [], }, "coordinator_3": { @@ -199,6 +201,7 @@ def get_instances_description_no_setup(): "--raft-server-port=10113", ], "log_file": "coordinator3.log", + "data_directory": f"{TEMP_DIR}/coordinator_3", "setup_queries": [], }, } @@ -530,6 +533,8 @@ def test_old_main_comes_back_on_new_leader_as_main(): # 4. Start the old main instance # 5. Run SHOW INSTANCES on the new leader and check that the old main instance is main once again + safe_execute(shutil.rmtree, TEMP_DIR) + inner_memgraph_instances = get_instances_description_no_setup() interactive_mg_runner.start_all(inner_memgraph_instances) @@ -623,6 +628,7 @@ def test_old_main_comes_back_on_new_leader_as_main(): def test_registering_4_coords(): # Goal of this test is to assure registering of multiple coordinators in row works + safe_execute(shutil.rmtree, TEMP_DIR) INSTANCES_DESCRIPTION = { "instance_1": { "args": [ @@ -743,5 +749,505 @@ def test_registering_4_coords(): mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) +def test_registering_coord_log_store(): + # Goal of this test is to assure registering a bunch of instances and de-registering works properly + # w.r.t nuRaft log + # 1. Start basic instances # 3 logs + # 2. Check all is there + # 3. Create 3 additional instances and add them to cluster # 3 logs -> 1st snapshot + # 4. Check everything is there + # 5. Set main # 1 log + # 6. Check correct state + # 7. Drop 2 new instances # 2 logs + # 8. Check correct state + # 9. Drop 1 new instance # 1 log -> 2nd snapshot + # 10. Check correct state + safe_execute(shutil.rmtree, TEMP_DIR) + + INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", + "setup_queries": [], + }, + "coordinator_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "setup_queries": [], + }, + "coordinator_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "setup_queries": [], + }, + "coordinator_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7692", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + ], + "log_file": "coordinator3.log", + "setup_queries": [], + }, + "coordinator_4": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7693", + "--log-level=TRACE", + "--raft-server-id=4", + "--raft-server-port=10114", + ], + "log_file": "coordinator4.log", + "setup_queries": [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111';", + "ADD COORDINATOR 2 ON '127.0.0.1:10112';", + "ADD COORDINATOR 3 ON '127.0.0.1:10113';", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + ], + }, + } + assert "SET INSTANCE instance_3 TO MAIN" not in INSTANCES_DESCRIPTION["coordinator_4"]["setup_queries"] + + # 1 + interactive_mg_runner.start_all(INSTANCES_DESCRIPTION) + + # 2 + coord_cursor = connect(host="localhost", port=7693).cursor() + + def retrieve_data_show_repl_cluster(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + coordinators = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"), + ] + + basic_instances = [ + ("instance_1", "", "127.0.0.1:10011", "up", "replica"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "up", "replica"), + ] + + expected_data_on_coord = [] + expected_data_on_coord.extend(coordinators) + expected_data_on_coord.extend(basic_instances) + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + # 3 + instances_ports_added = [10011, 10012, 10013] + bolt_port_id = 7700 + coord_port_id = 10014 + + additional_instances = [] + for i in range(4, 7): + instance_name = f"instance_{i}" + args_desc = [ + "--experimental-enabled=high-availability", + "--log-level=TRACE", + ] + + bolt_port = f"--bolt-port={bolt_port_id}" + + coord_server_port = f"--coordinator-server-port={coord_port_id}" + + args_desc.append(bolt_port) + args_desc.append(coord_server_port) + + instance_description = { + "args": args_desc, + "log_file": f"instance_{i}.log", + "data_directory": f"{TEMP_DIR}/instance_{i}", + "setup_queries": [], + } + + full_instance_desc = {instance_name: instance_description} + interactive_mg_runner.start(full_instance_desc, instance_name) + repl_port_id = coord_port_id - 10 + assert repl_port_id < 10011, "Wrong test setup, repl port must be smaller than smallest coord port id" + + execute_and_fetch_all( + coord_cursor, + f"REGISTER INSTANCE {instance_name} ON '127.0.0.1:{coord_port_id}' WITH '127.0.0.1:{repl_port_id}'", + ) + + additional_instances.append((f"{instance_name}", "", f"127.0.0.1:{coord_port_id}", "up", "replica")) + instances_ports_added.append(coord_port_id) + coord_port_id += 1 + bolt_port_id += 1 + + # 4 + expected_data_on_coord.extend(additional_instances) + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + # 5 + execute_and_fetch_all(coord_cursor, "SET INSTANCE instance_3 TO MAIN") + + # 6 + basic_instances.pop() + basic_instances.append(("instance_3", "", "127.0.0.1:10013", "up", "main")) + + new_expected_data_on_coordinator = [] + + new_expected_data_on_coordinator.extend(coordinators) + new_expected_data_on_coordinator.extend(basic_instances) + new_expected_data_on_coordinator.extend(additional_instances) + + mg_sleep_and_assert(new_expected_data_on_coordinator, retrieve_data_show_repl_cluster) + + # 7 + for i in range(6, 4, -1): + execute_and_fetch_all(coord_cursor, f"UNREGISTER INSTANCE instance_{i};") + additional_instances.pop() + + new_expected_data_on_coordinator = [] + new_expected_data_on_coordinator.extend(coordinators) + new_expected_data_on_coordinator.extend(basic_instances) + new_expected_data_on_coordinator.extend(additional_instances) + + # 8 + mg_sleep_and_assert(new_expected_data_on_coordinator, retrieve_data_show_repl_cluster) + + # 9 + + new_expected_data_on_coordinator = [] + new_expected_data_on_coordinator.extend(coordinators) + new_expected_data_on_coordinator.extend(basic_instances) + + execute_and_fetch_all(coord_cursor, f"UNREGISTER INSTANCE instance_4;") + + # 10 + mg_sleep_and_assert(new_expected_data_on_coordinator, retrieve_data_show_repl_cluster) + + +def test_multiple_failovers_in_row_no_leadership_change(): + # Goal of this test is to assure multiple failovers in row work without leadership change + # 1. Start basic instances + # 2. Check all is there + # 3. Kill MAIN (instance_3) + # 4. Expect failover (instance_1) + # 5. Kill instance_1 + # 6. Expect failover instance_2 + # 7. Start instance_3 + # 8. Expect instance_3 and instance_2 (MAIN) up + # 9. Kill instance_2 + # 10. Expect instance_3 MAIN + # 11. Write some data on instance_3 + # 12. Start instance_2 and instance_1 + # 13. Expect instance_1 and instance2 to be up and cluster to have correct state + # 13. Expect data to be replicated + + # 1 + inner_memgraph_instances = get_instances_description_no_setup() + interactive_mg_runner.start_all(inner_memgraph_instances, keep_directories=False) + + coord_cursor_3 = connect(host="localhost", port=7692).cursor() + + setup_queries = [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111'", + "ADD COORDINATOR 2 ON '127.0.0.1:10112'", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "SET INSTANCE instance_3 TO MAIN", + ] + + for query in setup_queries: + execute_and_fetch_all(coord_cursor_3, query) + + # 2 + + def get_func_show_instances(cursor): + def show_instances_follower_coord(): + return sorted(list(execute_and_fetch_all(cursor, "SHOW INSTANCES;"))) + + return show_instances_follower_coord + + coordinator_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ] + + leader_data = [] + leader_data.extend(coordinator_data) + leader_data.extend( + [ + ("instance_1", "", "127.0.0.1:10011", "up", "replica"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "up", "main"), + ] + ) + + follower_data = [] + follower_data.extend(coordinator_data) + follower_data.extend( + [ + ("instance_1", "", "", "unknown", "replica"), + ("instance_2", "", "", "unknown", "replica"), + ("instance_3", "", "", "unknown", "main"), + ] + ) + + coord_cursor_1 = connect(host="localhost", port=7690).cursor() + coord_cursor_2 = connect(host="localhost", port=7691).cursor() + + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_1)) + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_2)) + mg_sleep_and_assert_collection(leader_data, get_func_show_instances(coord_cursor_3)) + + # 3 + + interactive_mg_runner.kill(inner_memgraph_instances, "instance_3") + + # 4 + + leader_data = [] + leader_data.extend(coordinator_data) + leader_data.extend( + [ + ("instance_1", "", "127.0.0.1:10011", "up", "main"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + ) + + follower_data = [] + follower_data.extend(coordinator_data) + follower_data.extend( + [ + ("instance_1", "", "", "unknown", "main"), + ("instance_2", "", "", "unknown", "replica"), + ( + "instance_3", + "", + "", + "unknown", + "main", + ), # TODO(antoniofilipovic) change to unknown after PR with transitions + ] + ) + + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_1)) + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_2)) + mg_sleep_and_assert_collection(leader_data, get_func_show_instances(coord_cursor_3)) + + # 5 + interactive_mg_runner.kill(inner_memgraph_instances, "instance_1") + + # 6 + leader_data = [] + leader_data.extend(coordinator_data) + leader_data.extend( + [ + ("instance_1", "", "127.0.0.1:10011", "down", "unknown"), + ("instance_2", "", "127.0.0.1:10012", "up", "main"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + ) + + follower_data = [] + follower_data.extend(coordinator_data) + follower_data.extend( + [ + ("instance_1", "", "", "unknown", "main"), + ("instance_2", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_3", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ] + ) + + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_1)) + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_2)) + mg_sleep_and_assert_collection(leader_data, get_func_show_instances(coord_cursor_3)) + + # 7 + + interactive_mg_runner.start(inner_memgraph_instances, "instance_3") + + # 8 + + leader_data = [] + leader_data.extend(coordinator_data) + leader_data.extend( + [ + ("instance_1", "", "127.0.0.1:10011", "down", "unknown"), + ("instance_2", "", "127.0.0.1:10012", "up", "main"), + ("instance_3", "", "127.0.0.1:10013", "up", "replica"), + ] + ) + + follower_data = [] + follower_data.extend(coordinator_data) + follower_data.extend( + [ + ("instance_1", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_2", "", "", "unknown", "main"), + ("instance_3", "", "", "unknown", "replica"), + ] + ) + + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_1)) + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_2)) + mg_sleep_and_assert_collection(leader_data, get_func_show_instances(coord_cursor_3)) + + # 9 + interactive_mg_runner.kill(inner_memgraph_instances, "instance_2") + + # 10 + leader_data = [] + leader_data.extend(coordinator_data) + leader_data.extend( + [ + ("instance_1", "", "127.0.0.1:10011", "down", "unknown"), + ("instance_2", "", "127.0.0.1:10012", "down", "unknown"), + ("instance_3", "", "127.0.0.1:10013", "up", "main"), + ] + ) + + follower_data = [] + follower_data.extend(coordinator_data) + follower_data.extend( + [ + ("instance_1", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_2", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown + ("instance_3", "", "", "unknown", "main"), + ] + ) + + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_1)) + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_2)) + mg_sleep_and_assert_collection(leader_data, get_func_show_instances(coord_cursor_3)) + + # 11 + + instance_3_cursor = connect(port=7689, host="localhost").cursor() + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_3_cursor, "CREATE ();") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + # 12 + interactive_mg_runner.start(inner_memgraph_instances, "instance_1") + interactive_mg_runner.start(inner_memgraph_instances, "instance_2") + + # 13 + leader_data = [] + leader_data.extend(coordinator_data) + leader_data.extend( + [ + ("instance_1", "", "127.0.0.1:10011", "up", "replica"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "up", "main"), + ] + ) + + follower_data = [] + follower_data.extend(coordinator_data) + follower_data.extend( + [ + ("instance_1", "", "", "unknown", "replica"), + ("instance_2", "", "", "unknown", "replica"), + ("instance_3", "", "", "unknown", "main"), + ] + ) + + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_1)) + mg_sleep_and_assert_collection(follower_data, get_func_show_instances(coord_cursor_2)) + mg_sleep_and_assert_collection(leader_data, get_func_show_instances(coord_cursor_3)) + + # 14. + + def show_replicas(): + return sorted(list(execute_and_fetch_all(instance_3_cursor, "SHOW REPLICAS;"))) + + replicas = [ + ( + "instance_1", + "127.0.0.1:10001", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 2, "behind": 0, "status": "ready"}}, + ), + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 2, "behind": 0, "status": "ready"}}, + ), + ] + mg_sleep_and_assert_collection(replicas, show_replicas) + + def get_vertex_count_func(cursor): + def get_vertex_count(): + return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(n)")[0][0] + + return get_vertex_count + + mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7687, host="localhost").cursor())) + + mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7688, host="localhost").cursor())) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"]))