From d734cb67c032050637630767f24a1e36a73e1fa7 Mon Sep 17 00:00:00 2001 From: antoniofilipovic Date: Fri, 23 Feb 2024 10:51:27 +0100 Subject: [PATCH] add more tests on replication with high availability --- src/dbms/inmemory/replication_handlers.cpp | 5 + .../v2/replication/replication_client.cpp | 2 + .../high_availability/single_coordinator.py | 461 +++++++++++++++++- 3 files changed, 455 insertions(+), 13 deletions(-) diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index 3fc174d3c..49b720e86 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -291,6 +291,10 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle storage->vertex_id_ = recovery_info.next_vertex_id; storage->edge_id_ = recovery_info.next_edge_id; storage->timestamp_ = std::max(storage->timestamp_, recovery_info.next_timestamp); + // Is this correct? + // storage->repl_storage_state_.last_commit_timestamp_ = + // std::max(storage->repl_storage_state_.last_commit_timestamp_.load(), + // recovered_snapshot.snapshot_info.start_timestamp); spdlog::trace("Recovering indices and constraints from snapshot."); memgraph::storage::durability::RecoverIndicesAndStats(recovered_snapshot.indices_constraints.indices, @@ -304,6 +308,7 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle } storage_guard.unlock(); + // TODO AF: Do we update last_commit_timestamp here storage::replication::SnapshotRes res{true, storage->repl_storage_state_.last_commit_timestamp_.load()}; slk::Save(res, res_builder); diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index ff254e23d..7acbd9a8d 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -258,6 +258,8 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph: spdlog::debug("Starting replica recovery"); auto *mem_storage = static_cast(storage); + // TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using + // only snapshot while (true) { auto file_locker = mem_storage->file_retainer_.AddLocker(); diff --git a/tests/e2e/high_availability/single_coordinator.py b/tests/e2e/high_availability/single_coordinator.py index e35073b7b..61661facf 100644 --- a/tests/e2e/high_availability/single_coordinator.py +++ b/tests/e2e/high_availability/single_coordinator.py @@ -293,20 +293,24 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov @pytest.mark.parametrize("data_recovery", ["false", "true"]) def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_recovery): - # Goal of this test is to check the replication works after failover command if one instance missed - # couple of epochs but data is still available on one of the instances + # Goal of this test is to check the replication works after failover command if one + # instance missed couple of epochs but data is still available on one of the instances # 1. We start all replicas, main and coordinator manually # 2. Main does commit # 3. instance_2 down - # 4. Instance_1 new main - # 5. Instance 1 commits - # 6. Instance 1 dies - # 7. Instance 4 new main - # 8 Instance 4 commits - # 9. Instance 2 wakes up - # 10. All other instances wake up - # 11. Everything is replicated + # 4. Main commits more + # 5. Main down + # 6. Instance_1 new main + # 7. Instance 1 commits + # 8. Instance 4 gets data + # 9. Instance 1 dies + # 10. Instance 4 new main + # 11. Instance 4 commits + # 12. Instance 2 wakes up + # 13. Instance 2 gets data from old epochs + # 14. All other instances wake up + # 15. Everything is replicated temp_dir = tempfile.TemporaryDirectory().name @@ -557,6 +561,7 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r @pytest.mark.parametrize("data_recovery", ["false"]) def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_recovery): + # TODO(antoniofilipovic) Test should pass when logic is added # Goal of this test is to check the replication works forcefully if replica misses epoch # 1. We start all replicas, main and coordinator manually # 2. We check that main has correct state @@ -571,16 +576,446 @@ def test_replication_forcefully_works_on_failover_replica_misses_epoch(data_reco # 11. Instance 1 up (missed epoch) # 12 Instance 1 new main # 13 instance 2 up - # 14 Force data from instance 1 to instance 2 (TODO) next PR + # 14 Force data from instance 1 to instance 2 temp_dir = tempfile.TemporaryDirectory().name pass +@pytest.mark.parametrize("data_recovery", ["false"]) +def test_replication_works_with_snapshot(data_recovery): + # TODO(antoniofilipovic) Test should pass + # Goal of this test is to check the replication works only recovering with snapshot + # 1. We start all replicas, main and coordinator manually + # 2. We check that main has correct state + # 3. Create initial data on MAIN + # 4. Expect data to be copied on all replicas + # 5. Kill instance_1 + # 6. Create more data on MAIN + # 7. Create snapshot + # 8. Run show replicas to get last commit timestamp + # 9. Delete WAL files from folder of instance_3 + # 10. Start instance 1 + # 11. Check data is replicated + # 12. Call SHOW REPLICAS on MAIN to get correct state + + temp_dir = tempfile.TemporaryDirectory().name + + CONFIGURATION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "coordinator": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "SET INSTANCE instance_3 TO MAIN", + ], + }, + } + + # 1 + + interactive_mg_runner.start_all(CONFIGURATION) + + # 2 + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + # 3 + + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});") + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});") + + # 4 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 5 + + interactive_mg_runner.kill(CONFIGURATION, "instance_1") + + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 6 + num_vertices = 100 + with pytest.raises(Exception) as e: + execute_and_fetch_all(main_cursor, f"FOREACH (i in range(1, {num_vertices}) | CREATE (:Vertex));") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == num_vertices + 2 + + # 7 + + execute_and_fetch_all(main_cursor, "CREATE SNAPSHOT;") + + # 8 + + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("instance_2", "127.0.0.1:10002", "sync", 6, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + # 9 + + folder = f"{temp_dir}/instance_2/wal" + for filename in os.listdir(folder): + file_path = os.path.join(folder, filename) + print(file_path) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + print("removing file") + shutil.rmtree(file_path) + except Exception as e: + print("Failed to delete %s. Reason: %s" % (file_path, e)) + + # 10 + + interactive_mg_runner.start(CONFIGURATION, "instance_1") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", True, "replica"), + ("instance_3", "", "127.0.0.1:10013", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 11 + + instance_1_cursor = connect(host="localhost", port=7688).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(num_vertices + 2, get_vertex_count) + + # 12 + + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 6, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 6, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + +@pytest.mark.parametrize("data_recovery", ["false", "true"]) +def test_replication_correct_replica_chosen_up_to_date_data(data_recovery): + # TODO(antoniofilipovic): Test should pass when base branch is updated + # Goal of this test is to check that correct replica instance as new MAIN is chosen + # 1. We start all replicas, main and coordinator manually + # 2. We check that main has correct state + # 3. Create initial data on MAIN + # 4. Expect data to be copied on all replicas + # 5. Kill instance_1 ( this one will miss complete epoch) + # 6. Kill main (instance_3) + # 7. Instance_2 new MAIN + # 8. Instance_2 commits and replicates data + # 9. Instance_4 down (not main) + # 10. instance_2 down (MAIN), instance 1 up (missed epoch), + # instance 4 up (In this case we should always choose instance_4 because it has up-to-date data) + # 11 Instance 4 new main + # 12 instance_1 gets up-to-date data, instance_4 has all data + + temp_dir = tempfile.TemporaryDirectory().name + + MEMGRAPH_INNER_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_1.log", + "data_directory": f"{temp_dir}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + "--replication-restore-state-on-startup", + "true", + f"--data-recovery-on-startup={data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_2.log", + "data_directory": f"{temp_dir}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_3.log", + "data_directory": f"{temp_dir}/instance_3", + "setup_queries": [], + }, + "instance_4": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10014", + "--replication-restore-state-on-startup", + "true", + "--data-recovery-on-startup", + f"{data_recovery}", + "--storage-recover-on-startup=false", + ], + "log_file": "instance_4.log", + "data_directory": f"{temp_dir}/instance_4", + "setup_queries": [], + }, + "coordinator": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + "--instance-down-timeout-sec=10", + ], + "log_file": "coordinator.log", + "setup_queries": [ + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';", + "REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';", + "SET INSTANCE instance_3 TO MAIN", + ], + }, + } + + # 1 + + interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION) + + # 2 + + main_cursor = connect(host="localhost", port=7687).cursor() + expected_data_on_main = [ + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("instance_4", "127.0.0.1:10004", "sync", 0, 0, "ready"), + ] + actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))) + assert actual_data_on_main == expected_data_on_main + + # 3 + + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});") + execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});") + + # 4 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_2_cursor = connect(host="localhost", port=7689).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2 + + # 5 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + + # 6 + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3") + + # 7 + coord_cursor = connect(host="localhost", port=7690).cursor() + + def retrieve_data_show_instances(): + return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;"))) + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "replica"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 8 + + with pytest.raises(Exception) as e: + execute_and_fetch_all(instance_2_cursor, "CREATE (:Epoch2Vertex {prop:1});") + assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value) + + def get_vertex_count(): + return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3 + + # 9 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4") + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", False, "unknown"), + ("instance_2", "", "127.0.0.1:10012", True, "main"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", False, "unknown"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 10 + + interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1") + interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_4") + + # 11 + + expected_data_on_coord = [ + ("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"), + ("instance_1", "", "127.0.0.1:10011", True, "replica"), + ("instance_2", "", "127.0.0.1:10012", False, "unknown"), + ("instance_3", "", "127.0.0.1:10013", False, "unknown"), + ("instance_4", "", "127.0.0.1:10014", True, "main"), + ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances) + + # 12 + instance_1_cursor = connect(host="localhost", port=7688).cursor() + instance_4_cursor = connect(host="localhost", port=7691).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + def get_vertex_count(): + return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(3, get_vertex_count) + + def test_replication_works_on_failover_simple(): # Goal of this test is to check the replication works after failover command. - # 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 1. We start all replicas, main and coordinator manually # 2. We check that main has correct state # 3. We kill main # 4. We check that coordinator and new main have correct state @@ -1164,5 +1599,5 @@ def test_disable_multiple_mains(): if __name__ == "__main__": - sys.exit(pytest.main([__file__, "-k", "test_replication_works_on_failover_replica_2_epochs_more_commits_away"])) + sys.exit(pytest.main([__file__, "-k", "test_replication_works_with_snapshot"])) sys.exit(pytest.main([__file__, "-rA"]))