diff --git a/tests/e2e/high_availability/single_coordinator.py b/tests/e2e/high_availability/single_coordinator.py index ac4b3d890..e35073b7b 100644 --- a/tests/e2e/high_availability/single_coordinator.py +++ b/tests/e2e/high_availability/single_coordinator.py @@ -102,7 +102,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { @pytest.mark.parametrize("data_recovery", ["false", "true"]) def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recovery): # Goal of this test is to check the replication works after failover command. - # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 1. We start all replicas, main and coordinator manually # 2. We check that main has correct state # 3. Create initial data on MAIN # 4. Expect data to be copied on all replicas @@ -111,7 +111,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov # 7. Kill main # 8. Instance_2 new MAIN # 9. Create vertex on instance 2 - # 10. Start instance_1 ( it should have one commit on old epoch and new epoch with new commit shouldn't be replicated) + # 10. Start instance_1(it should have one commit on old epoch and new epoch with new commit shouldn't be replicated) # 11. Expect data to be copied on instance_1 # 12. Start old MAIN (instance_3) # 13. Expect data to be copied to instance_3 @@ -271,8 +271,25 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov # 12. Start old MAIN (instance_3) + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + new_expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", True, "replica"), + ] + mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances) + # 13. Expect data to be copied to instance_3 + instance_3_cursor = connect(host="localhost", port=7687).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_3_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + @pytest.mark.parametrize("data_recovery", ["false", "true"]) def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_recovery): @@ -291,7 +308,251 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r # 10. All other instances wake up # 11. Everything is replicated - pass + temp_dir = tempfile.TemporaryDirectory().name + + MEMGRAPH_INNER_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "instance_4": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10014", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_4.log", + "data_directory": f"{temp_dir}/instance_4", + "setup_queries": [], + }, + "coordinator": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "SET INSTANCE instance_3 TO MAIN", + ], + }, + } + + # 1. We start all replicas, main and coordinator manually + + interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION) + + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_4", "127.0.0.1:10004", "sync", 0, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + # 2. Main does commit + + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});") + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});") + + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 3. instance_2 down + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2") + + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 4. Main commits more + + with pytest.raises(Exception) as e: + execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + + # 5. Main down + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + # 6. Instance_1 new main + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "main"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 7. Instance 1 commits + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + # 8. Instance 4 gets data + + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 4 + + # 8. Instance 1 dies + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + + # 9. Instance 4 new main + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 10 Instance 4 commits + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_4_cursor, "CREATE (:Epoch3Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + # 11 Instance 2 wakes up + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 12 Instance 2 gets data from old epochs + + instance_2_cursor = connect(host="localhost", port=7689).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(5, get_vertex_count) + + # 12. All other instances wake up + + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "replica"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 13. Everything is replicated + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(5, get_vertex_count) + + def get_vertex_count(): + return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(5, get_vertex_count) @pytest.mark.parametrize("data_recovery", ["false"]) @@ -903,4 +1164,5 @@ def test_disable_multiple_mains(): if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-k", "test_replication_works_on_failover_replica_2_epochs_more_commits_away"])) sys.exit(pytest.main([__file__, "-rA"]))