Set instance to main backed by log

This commit is contained in:
Andi Skrgat 2024-02-23 12:29:58 +01:00
parent 8f70d1ce28
commit 1177a46dc3
7 changed files with 196 additions and 317 deletions

View File

@ -47,9 +47,13 @@ auto CoordinatorClusterState::DoAction(std::string const &instance_name, RaftLog
spdlog::info("Instance {} unregistered", instance_name);
break;
case RaftLogAction::SET_INSTANCE_AS_MAIN:
MG_ASSERT(instance_roles.find(instance_name) != instance_roles.end(),
"Instance does not exist as part of raft state!");
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::MAIN;
break;
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
MG_ASSERT(instance_roles.find(instance_name) != instance_roles.end(),
"Instance does not exist as part of raft state!");
instance_roles[instance_name] = replication_coordination_glue::ReplicationRole::REPLICA;
break;
}

View File

@ -542,6 +542,24 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
if (!raft_state_.RequestLeadership()) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
}
auto const res = raft_state_.AppendSetInstanceAsMain(instance_name);
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_ACCEPT;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return SetInstanceToMainCoordinatorStatus::RAFT_COULD_NOT_APPEND;
}
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
@ -569,7 +587,6 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
// TODO: (andi) This should be replicated across all coordinator instances with Raft log
SetMainUUID(new_main_uuid);
spdlog::info("Instance {} promoted to main", instance_name);
return SetInstanceToMainCoordinatorStatus::SUCCESS;

View File

@ -43,9 +43,12 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t {
NO_INSTANCE_WITH_NAME,
MAIN_ALREADY_EXISTS,
NOT_COORDINATOR,
SUCCESS,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
COULD_NOT_PROMOTE_TO_MAIN,
SWAP_UUID_FAILED
SWAP_UUID_FAILED,
SUCCESS,
};
} // namespace memgraph::coordination

View File

@ -34,10 +34,10 @@ inline auto ParseRaftLogAction(std::string const &action) -> RaftLogAction {
if (action == "unregister") {
return RaftLogAction::UNREGISTER_REPLICATION_INSTANCE;
}
if (action == "set_main") {
if (action == "promote") {
return RaftLogAction::SET_INSTANCE_AS_MAIN;
}
if (action == "set_replica") {
if (action == "demote") {
return RaftLogAction::SET_INSTANCE_AS_REPLICA;
}
throw InvalidRaftLogActionException("Invalid Raft log action: " + action);

View File

@ -107,6 +107,7 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur
}
UpdateReplicaLastResponseUUID();
// NOLINTNEXTLINE
if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) {
return true;
}

View File

@ -521,6 +521,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("Couldn't set instance to main since there is already a main instance in cluster!");
case NOT_COORDINATOR:
throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't set instance to main since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't promote instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
throw QueryRuntimeException("Couldn't promote instance since raft server couldn't append the log!");
case COULD_NOT_PROMOTE_TO_MAIN:
throw QueryRuntimeException(
"Couldn't set replica instance to main! Check coordinator and replica for more logs");

View File

@ -133,12 +133,12 @@ def test_register_repl_instances_then_coordinators():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
@ -147,24 +147,23 @@ def test_register_repl_instances_then_coordinators():
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) Waiting for product decision about this
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "", False, "replica"),
("instance_2", "", "", False, "replica"),
("instance_3", "", "", False, "main"),
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
def test_register_coordinator_then_repl_instances():
@ -190,12 +189,12 @@ def test_register_coordinator_then_repl_instances():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
@ -204,21 +203,23 @@ def test_register_coordinator_then_repl_instances():
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
def test_coordinators_communication_with_restarts():
@ -240,10 +241,13 @@ def test_coordinators_communication_with_restarts():
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
@ -251,20 +255,20 @@ def test_coordinators_communication_with_restarts():
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
@ -274,11 +278,11 @@ def test_coordinators_communication_with_restarts():
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
# TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator
# # TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator
@pytest.mark.parametrize(
"kill_instance",
[True, False],
@ -287,7 +291,12 @@ def test_unregister_replicas(kill_instance):
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
@ -299,6 +308,12 @@ def test_unregister_replicas(kill_instance):
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
@ -308,10 +323,21 @@ def test_unregister_replicas(kill_instance):
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
expected_replicas = [
@ -331,6 +357,8 @@ def test_unregister_replicas(kill_instance):
),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
@ -339,9 +367,19 @@ def test_unregister_replicas(kill_instance):
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_1")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
expected_replicas = [
@ -354,6 +392,8 @@ def test_unregister_replicas(kill_instance):
),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
@ -362,11 +402,22 @@ def test_unregister_replicas(kill_instance):
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_2")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_3", "", "", "unknown", "main"),
]
expected_replicas = []
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
@ -375,7 +426,11 @@ def test_unregister_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
@ -387,16 +442,35 @@ def test_unregister_main():
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
try:
@ -410,20 +484,43 @@ def test_unregister_main():
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
]
expected_cluster_shared = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
]
expected_replicas = [
@ -441,259 +538,8 @@ def test_unregister_main():
def check_main():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
def test_register_coordinator_then_repl_instances():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster_coord3 = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster_coord3, check_coordinator3)
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
# TODO: (andi) This should be solved eventually
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
def test_coordinators_communication_with_restarts():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
expected_cluster_not_shared = [
("coordinator_1", "127.0.0.1:10111", "", True, "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", True, "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
]
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
def check_coordinator1():
return sorted(list(execute_and_fetch_all(coordinator1_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
def check_coordinator2():
return sorted(list(execute_and_fetch_all(coordinator2_cursor, "SHOW INSTANCES")))
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
coordinator1_cursor = connect(host="localhost", port=7690).cursor()
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_not_shared, check_coordinator2)
# TODO: (andi) Test when dealing with distributed coordinators that you can register on one coordinator and unregister from any other coordinator
@pytest.mark.parametrize(
"kill_instance",
[True, False],
)
def test_unregister_replicas(kill_instance):
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
main_cursor = connect(host="localhost", port=7689).cursor()
def check_main():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
expected_replicas = [
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
if kill_instance:
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_1")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
if kill_instance:
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2")
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_2")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
expected_replicas = []
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)
def test_unregister_main():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'"
)
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'"
)
execute_and_fetch_all(coordinator3_cursor, "SET INSTANCE instance_3 TO MAIN")
def check_coordinator3():
return sorted(list(execute_and_fetch_all(coordinator3_cursor, "SHOW INSTANCES")))
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "replica"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
try:
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3")
except Exception as e:
assert (
str(e)
== "Alive main instance can't be unregistered! Shut it down to trigger failover and then unregister it!"
)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
("instance_3", "", "127.0.0.1:10013", False, "unknown"),
]
mg_sleep_and_assert(expected_cluster, check_coordinator3)
execute_and_fetch_all(coordinator3_cursor, "UNREGISTER INSTANCE instance_3")
expected_cluster = [
("coordinator_3", "127.0.0.1:10113", "", True, "coordinator"),
("instance_1", "", "127.0.0.1:10011", True, "main"),
("instance_2", "", "127.0.0.1:10012", True, "replica"),
]
expected_replicas = [
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def check_main():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS")))
mg_sleep_and_assert(expected_cluster_shared, check_coordinator1)
mg_sleep_and_assert(expected_cluster_shared, check_coordinator2)
mg_sleep_and_assert(expected_cluster, check_coordinator3)
mg_sleep_and_assert(expected_replicas, check_main)