Compare commits

...

1 Commits

Author SHA1 Message Date
antoniofilipovic
01330fb3b0 add tests for two mains, 1 failover 2024-03-08 18:14:01 +01:00
3 changed files with 183 additions and 7 deletions

View File

@ -46,6 +46,8 @@ CoordinatorInstance::CoordinatorInstance()
&CoordinatorInstance::ReplicaFailCallback);
});
// TODO(antoniofilipovic): is this correct or we have a bug
// Multiple main instances, each will do failover
auto main = instances | ranges::views::filter(
[](auto const &instance) { return instance.status == ReplicationRole::MAIN; });
@ -62,7 +64,9 @@ CoordinatorInstance::CoordinatorInstance()
});
},
[this]() {
// TODO (antoniofilipovic) We can enter here without lock and remove all instances
spdlog::info("Leader changed, stopping all replication instances!");
// we should probably change this to defunct state
repl_instances_.clear();
})) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
@ -122,19 +126,21 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader);
}
} else {
auto const stringify_inst_status = [](ReplicationRole status) -> std::string {
return status == ReplicationRole::MAIN ? "main" : "replica";
auto const stringify_inst_status = [this](const ReplicationInstance &instance) -> std::string {
if (!instance.IsAlive()) return "unknown";
if (raft_state_.IsMain(instance.InstanceName())) return "main";
return "replica";
};
// TODO: (andi) Add capability that followers can also return socket addresses
auto process_repl_instance_as_follower = [&stringify_inst_status](auto const &instance) -> InstanceStatus {
return {.instance_name = instance.config.instance_name,
.cluster_role = stringify_inst_status(instance.status),
auto process_repl_instance_as_follower =
[&stringify_inst_status](const ReplicationInstance &instance) -> InstanceStatus {
return {.instance_name = instance.InstanceName(),
.cluster_role = stringify_inst_status(instance),
.health = "unknown"};
};
std::ranges::transform(raft_state_.GetInstances(), std::back_inserter(instances_status),
process_repl_instance_as_follower);
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_follower);
}
return instances_status;

View File

@ -35,6 +35,8 @@ using replication_coordination_glue::ReplicationRole;
struct InstanceState {
CoordinatorClientConfig config;
ReplicationRole status;
// TODO(antoniofilipovic) We might need this because we need to track correct instance
// utils::UUID main_uuid_;
friend auto operator==(InstanceState const &lhs, InstanceState const &rhs) -> bool {
return lhs.config == rhs.config && lhs.status == rhs.status;
@ -84,6 +86,7 @@ class CoordinatorClusterState {
private:
std::map<std::string, InstanceState, std::less<>> instances_{};
// Current uuid of MAIN which other replicas listen too
utils::UUID uuid_{};
mutable utils::ResourceLock log_lock_{};
};

View File

@ -13,6 +13,7 @@ import os
import shutil
import sys
import tempfile
import time
import interactive_mg_runner
import pytest
@ -204,6 +205,172 @@ def get_instances_description_no_setup():
}
def test_multiple_old_mains_single_failover():
# Goal of this test is to check when leadership changes
# and we have old MAIN down, that we don't start failover
# 1. Start all instances.
# 2. Kill the main instance
# 3. Do failover
# 4. Kill other main
# 5. Kill leader
# 6. Leave first main down, and start second main
# 7. Second main should write data to new instance all the time
# 1
safe_execute(shutil.rmtree, TEMP_DIR)
inner_instances_description = get_instances_description_no_setup()
interactive_mg_runner.start_all(inner_instances_description)
setup_queries = [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
"SET INSTANCE instance_3 TO MAIN",
]
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
for query in setup_queries:
execute_and_fetch_all(coord_cursor_3, query)
coord_cursor = connect(host="localhost", port=7693).cursor()
def retrieve_data_show_repl_cluster():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
coordinators = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
]
basic_instances = [
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
expected_data_on_coord = []
expected_data_on_coord.extend(coordinators)
expected_data_on_coord.extend(basic_instances)
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
# 2
interactive_mg_runner.kill(inner_instances_description, "instance_3")
# 3
basic_instances = [
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
expected_data_on_coord = []
expected_data_on_coord.extend(coordinators)
expected_data_on_coord.extend(basic_instances)
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
# 4
interactive_mg_runner.kill(inner_instances_description, "instance_1")
# 5
interactive_mg_runner.kill(inner_instances_description, "coordinator_3")
# 6
interactive_mg_runner.start(inner_instances_description, "instance_1")
# 7
instance_1_cursor = connect(host="localhost", port=7687).cursor()
def show_replicas():
return sorted(list(execute_and_fetch_all(instance_1_cursor, "SHOW REPLICAS;")))
replicas = [
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
),
]
mg_sleep_and_assert_collection(replicas, show_replicas)
def get_vertex_count_func(cursor):
def get_vertex_count():
return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(n)")[0][0]
return get_vertex_count
mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7687, host="localhost").cursor()))
mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7688, host="localhost").cursor()))
failover_length = 6
vertex_count = 0
instance_3_cursor = connect(port=7689, host="localhost").cursor()
while failover_length:
failover_length -= 0.1
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE ();")
assert vertex_count == execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0]
assert vertex_count == execute_and_fetch_all(instance_3_cursor, "MATCH (n) RETURN count(n);")[0][0]
time.sleep(0.1)
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
coord_cursor_2 = connect(host="localhost", port=7690).cursor()
leader_data = []
leader_data.extend(coordinators)
leader_data.extend(
[
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
)
follower_data = []
follower_data.extend(coordinators)
follower_data.extend(
[
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO(antoniofilipovic) unknown
]
)
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
def show_instances_coord1():
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
def show_instances_coord2():
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
def test_old_main_comes_back_on_new_leader_as_replica():
# 1. Start all instances.
# 2. Kill the main instance