add test for get uuid, fix replica recovery bug with coordinator

This commit is contained in:
antoniofilipovic 2024-02-13 12:59:07 +01:00
parent 88e429889e
commit 867c192363
2 changed files with 66 additions and 32 deletions

View File

@ -62,8 +62,10 @@ ReplicationState::ReplicationState(std::optional<std::filesystem::path> durabili
}
#endif
if (std::holds_alternative<RoleReplicaData>(replication_data)) {
spdlog::trace("Recovered main's uuid for replica {}",
std::string(std::get<RoleReplicaData>(replication_data).uuid_.value()));
std::string uuid = std::get<RoleReplicaData>(replication_data).uuid_.has_value()
? std::string(std::get<RoleReplicaData>(replication_data).uuid_.value())
: "";
spdlog::trace("Recovered main's uuid for replica {}", uuid);
} else {
spdlog::trace("Recovered uuid for main {}", std::string(std::get<RoleMainData>(replication_data).uuid_));
}

View File

@ -147,14 +147,14 @@ def test_replication_works_on_failover():
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
def test_replication_works_on_instance_down():
# Goal of this test is to check the replication works after failover command.
def test_replication_works_on_replica_instance_restart():
# Goal of this test is to check the replication works after replica goes down and restarts
# 1. We start all replicas, main and coordinator manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 2. We check that main has correct state
# 3. We kill main
# 4. We check that coordinator and new main have correct state
# 5. We insert one vertex on new main
# 6. We check that vertex appears on new replica
# 3. We kill replica
# 4. We check that main cannot replicate to replica
# 5. We bring replica back up
# 6. We check that replica gets data
safe_execute(shutil.rmtree, TEMP_DIR)
# 1
@ -170,48 +170,80 @@ def test_replication_works_on_instance_down():
assert actual_data_on_main == expected_data_on_main
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# 4
coord_cursor = connect(host="localhost", port=7690).cursor()
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", False, "unknown"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connect(host="localhost", port=7688).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(main_cursor, "CREATE ();")
assert (
str(e.value)
== "Replication Exception: At least one SYNC replica has not confirmed committing last transaction. Check the status of the replicas using 'SHOW REPLICAS' query."
)
res_instance_1 = execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n)")[0][0]
assert res_instance_1 == 1
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "invalid"),
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_data_on_new_main = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("instance_3", "127.0.0.1:10003", "sync", 0, 0, "ready"),
# 5.
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
# 5
execute_and_fetch_all(new_main_cursor, "CREATE ();")
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
# 6
alive_replica_cursror = connect(host="localhost", port=7689).cursor()
res = execute_and_fetch_all(alive_replica_cursror, "MATCH (n) RETURN count(n) as count;")[0][0]
assert res == 1, "Vertex should be replicated"
interactive_mg_runner.stop_all(MEMGRAPH_INSTANCES_DESCRIPTION)
expected_data_on_main = [
("instance_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 2, 0, "ready"),
]
mg_sleep_and_assert(expected_data_on_main, retrieve_data_show_replicas)
# 6.
instance_2_cursor = connect(port=7689, host="localhost").cursor()
execute_and_fetch_all(main_cursor, "CREATE ();")
res_instance_2 = execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
assert res_instance_2 == 2
def test_show_instances():