extend with one missing epochs test, append old test

This commit is contained in:
antoniofilipovic 2024-02-21 17:58:56 +01:00 committed by Antonio Filipovic
parent a13bc5e2a6
commit 576de61a50

View File

@ -102,7 +102,7 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
@pytest.mark.parametrize("data_recovery", ["false", "true"])
def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recovery):
# Goal of this test is to check the replication works after failover command.
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1. We start all replicas, main and coordinator manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
@ -111,7 +111,7 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
# 7. Kill main
# 8. Instance_2 new MAIN
# 9. Create vertex on instance 2
# 10. Start instance_1 ( it should have one commit on old epoch and new epoch with new commit shouldn't be replicated)
# 10. Start instance_1(it should have one commit on old epoch and new epoch with new commit shouldn't be replicated)
# 11. Expect data to be copied on instance_1
# 12. Start old MAIN (instance_3)
# 13. Expect data to be copied to instance_3
@ -271,8 +271,25 @@ def test_replication_works_on_failover_replica_1_epoch_2_commits_away(data_recov
# 12. Start old MAIN (instance_3)
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
new_expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "main"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
]
mg_sleep_and_assert(new_expected_data_on_coord, retrieve_data_show_instances)
# 13. Expect data to be copied to instance_3
instance_3_cursor = connect(host="localhost", port=7687).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_3_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count)
@pytest.mark.parametrize("data_recovery", ["false", "true"])
def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_recovery):
@ -291,7 +308,251 @@ def test_replication_works_on_failover_replica_2_epochs_more_commits_away(data_r
# 10. All other instances wake up
# 11. Everything is replicated
pass
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"instance_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10014",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_4.log",
"data_directory": f"{temp_dir}/instance_4",
"setup_queries": [],
},
"coordinator": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator.log",
"setup_queries": [
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_4 ON '127.0.0.1:10014' WITH '127.0.0.1:10004';",
"SET INSTANCE instance_3 TO MAIN",
],
},
}
# 1. We start all replicas, main and coordinator manually
interactive_mg_runner.start_all(MEMGRAPH_INNER_INSTANCES_DESCRIPTION)
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_4", "127.0.0.1:10004", "sync", 0, 0, "ready"),
]
actual_data_on_main = sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
assert actual_data_on_main == expected_data_on_main
# 2. Main does commit
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:2});")
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 3. instance_2 down
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
coord_cursor = connect(host="localhost", port=7690).cursor()
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 4. Main commits more
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE (:EpochVertex1 {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 5. Main down
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 6. Instance_1 new main
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "replica"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 7. Instance 1 commits
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 8. Instance 4 gets data
assert execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n);")[0][0] == 4
# 8. Instance 1 dies
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
# 9. Instance 4 new main
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 10 Instance 4 commits
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_4_cursor, "CREATE (:Epoch3Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
# 11 Instance 2 wakes up
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_2")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", False, "unknown"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 12 Instance 2 gets data from old epochs
instance_2_cursor = connect(host="localhost", port=7689).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(5, get_vertex_count)
# 12. All other instances wake up
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "replica"),
("instance_4", "", "127.0.0.1:10014", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 13. Everything is replicated
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_4_cursor = connect(host="localhost", port=7691).cursor()
def get_vertex_count():
return execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(5, get_vertex_count)
def get_vertex_count():
return execute_and_fetch_all(instance_4_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(5, get_vertex_count)
@pytest.mark.parametrize("data_recovery", ["false"])
@ -903,4 +1164,5 @@ def test_disable_multiple_mains():
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-k", "test_replication_works_on_failover_replica_2_epochs_more_commits_away"]))
sys.exit(pytest.main([__file__, "-rA"]))