Compare commits
1 Commits
master
...
add-more-c
Author | SHA1 | Date | |
---|---|---|---|
|
01330fb3b0 |
@ -46,6 +46,8 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
&CoordinatorInstance::ReplicaFailCallback);
|
||||
});
|
||||
|
||||
// TODO(antoniofilipovic): is this correct or we have a bug
|
||||
// Multiple main instances, each will do failover
|
||||
auto main = instances | ranges::views::filter(
|
||||
[](auto const &instance) { return instance.status == ReplicationRole::MAIN; });
|
||||
|
||||
@ -62,7 +64,9 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
});
|
||||
},
|
||||
[this]() {
|
||||
// TODO (antoniofilipovic) We can enter here without lock and remove all instances
|
||||
spdlog::info("Leader changed, stopping all replication instances!");
|
||||
// we should probably change this to defunct state
|
||||
repl_instances_.clear();
|
||||
})) {
|
||||
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
@ -122,19 +126,21 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader);
|
||||
}
|
||||
} else {
|
||||
auto const stringify_inst_status = [](ReplicationRole status) -> std::string {
|
||||
return status == ReplicationRole::MAIN ? "main" : "replica";
|
||||
auto const stringify_inst_status = [this](const ReplicationInstance &instance) -> std::string {
|
||||
if (!instance.IsAlive()) return "unknown";
|
||||
if (raft_state_.IsMain(instance.InstanceName())) return "main";
|
||||
return "replica";
|
||||
};
|
||||
|
||||
// TODO: (andi) Add capability that followers can also return socket addresses
|
||||
auto process_repl_instance_as_follower = [&stringify_inst_status](auto const &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.config.instance_name,
|
||||
.cluster_role = stringify_inst_status(instance.status),
|
||||
auto process_repl_instance_as_follower =
|
||||
[&stringify_inst_status](const ReplicationInstance &instance) -> InstanceStatus {
|
||||
return {.instance_name = instance.InstanceName(),
|
||||
.cluster_role = stringify_inst_status(instance),
|
||||
.health = "unknown"};
|
||||
};
|
||||
|
||||
std::ranges::transform(raft_state_.GetInstances(), std::back_inserter(instances_status),
|
||||
process_repl_instance_as_follower);
|
||||
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_follower);
|
||||
}
|
||||
|
||||
return instances_status;
|
||||
|
@ -35,6 +35,8 @@ using replication_coordination_glue::ReplicationRole;
|
||||
struct InstanceState {
|
||||
CoordinatorClientConfig config;
|
||||
ReplicationRole status;
|
||||
// TODO(antoniofilipovic) We might need this because we need to track correct instance
|
||||
// utils::UUID main_uuid_;
|
||||
|
||||
friend auto operator==(InstanceState const &lhs, InstanceState const &rhs) -> bool {
|
||||
return lhs.config == rhs.config && lhs.status == rhs.status;
|
||||
@ -84,6 +86,7 @@ class CoordinatorClusterState {
|
||||
|
||||
private:
|
||||
std::map<std::string, InstanceState, std::less<>> instances_{};
|
||||
// Current uuid of MAIN which other replicas listen too
|
||||
utils::UUID uuid_{};
|
||||
mutable utils::ResourceLock log_lock_{};
|
||||
};
|
||||
|
@ -13,6 +13,7 @@ import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import interactive_mg_runner
|
||||
import pytest
|
||||
@ -204,6 +205,172 @@ def get_instances_description_no_setup():
|
||||
}
|
||||
|
||||
|
||||
def test_multiple_old_mains_single_failover():
|
||||
# Goal of this test is to check when leadership changes
|
||||
# and we have old MAIN down, that we don't start failover
|
||||
# 1. Start all instances.
|
||||
# 2. Kill the main instance
|
||||
# 3. Do failover
|
||||
# 4. Kill other main
|
||||
# 5. Kill leader
|
||||
# 6. Leave first main down, and start second main
|
||||
# 7. Second main should write data to new instance all the time
|
||||
|
||||
# 1
|
||||
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||
inner_instances_description = get_instances_description_no_setup()
|
||||
|
||||
interactive_mg_runner.start_all(inner_instances_description)
|
||||
|
||||
setup_queries = [
|
||||
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
|
||||
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
|
||||
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
|
||||
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
|
||||
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
|
||||
"SET INSTANCE instance_3 TO MAIN",
|
||||
]
|
||||
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
|
||||
for query in setup_queries:
|
||||
execute_and_fetch_all(coord_cursor_3, query)
|
||||
|
||||
coord_cursor = connect(host="localhost", port=7693).cursor()
|
||||
|
||||
def retrieve_data_show_repl_cluster():
|
||||
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
|
||||
|
||||
coordinators = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||
]
|
||||
|
||||
basic_instances = [
|
||||
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
|
||||
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "up", "main"),
|
||||
]
|
||||
|
||||
expected_data_on_coord = []
|
||||
expected_data_on_coord.extend(coordinators)
|
||||
expected_data_on_coord.extend(basic_instances)
|
||||
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
|
||||
|
||||
# 2
|
||||
|
||||
interactive_mg_runner.kill(inner_instances_description, "instance_3")
|
||||
|
||||
# 3
|
||||
|
||||
basic_instances = [
|
||||
("instance_1", "", "127.0.0.1:10011", "up", "main"),
|
||||
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
|
||||
]
|
||||
|
||||
expected_data_on_coord = []
|
||||
expected_data_on_coord.extend(coordinators)
|
||||
expected_data_on_coord.extend(basic_instances)
|
||||
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
|
||||
|
||||
# 4
|
||||
|
||||
interactive_mg_runner.kill(inner_instances_description, "instance_1")
|
||||
|
||||
# 5
|
||||
interactive_mg_runner.kill(inner_instances_description, "coordinator_3")
|
||||
|
||||
# 6
|
||||
|
||||
interactive_mg_runner.start(inner_instances_description, "instance_1")
|
||||
|
||||
# 7
|
||||
|
||||
instance_1_cursor = connect(host="localhost", port=7687).cursor()
|
||||
|
||||
def show_replicas():
|
||||
return sorted(list(execute_and_fetch_all(instance_1_cursor, "SHOW REPLICAS;")))
|
||||
|
||||
replicas = [
|
||||
(
|
||||
"instance_2",
|
||||
"127.0.0.1:10002",
|
||||
"sync",
|
||||
{"ts": 0, "behind": None, "status": "ready"},
|
||||
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
|
||||
),
|
||||
(
|
||||
"instance_3",
|
||||
"127.0.0.1:10003",
|
||||
"sync",
|
||||
{"ts": 0, "behind": None, "status": "ready"},
|
||||
{"memgraph": {"ts": 0, "behind": 0, "status": "invalid"}},
|
||||
),
|
||||
]
|
||||
mg_sleep_and_assert_collection(replicas, show_replicas)
|
||||
|
||||
def get_vertex_count_func(cursor):
|
||||
def get_vertex_count():
|
||||
return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(n)")[0][0]
|
||||
|
||||
return get_vertex_count
|
||||
|
||||
mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7687, host="localhost").cursor()))
|
||||
|
||||
mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7688, host="localhost").cursor()))
|
||||
|
||||
failover_length = 6
|
||||
vertex_count = 0
|
||||
instance_3_cursor = connect(port=7689, host="localhost").cursor()
|
||||
|
||||
while failover_length:
|
||||
failover_length -= 0.1
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(instance_1_cursor, "CREATE ();")
|
||||
|
||||
assert vertex_count == execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0]
|
||||
assert vertex_count == execute_and_fetch_all(instance_3_cursor, "MATCH (n) RETURN count(n);")[0][0]
|
||||
time.sleep(0.1)
|
||||
|
||||
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
|
||||
coord_cursor_2 = connect(host="localhost", port=7690).cursor()
|
||||
|
||||
leader_data = []
|
||||
leader_data.extend(coordinators)
|
||||
leader_data.extend(
|
||||
[
|
||||
("instance_1", "", "127.0.0.1:10011", "up", "main"),
|
||||
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
|
||||
]
|
||||
)
|
||||
|
||||
follower_data = []
|
||||
follower_data.extend(coordinators)
|
||||
follower_data.extend(
|
||||
[
|
||||
("instance_1", "", "", "unknown", "main"),
|
||||
("instance_2", "", "", "unknown", "replica"),
|
||||
("instance_3", "", "", "unknown", "main"), # TODO(antoniofilipovic) unknown
|
||||
]
|
||||
)
|
||||
|
||||
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
|
||||
|
||||
def show_instances_coord1():
|
||||
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
|
||||
|
||||
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
|
||||
|
||||
def show_instances_coord2():
|
||||
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
|
||||
|
||||
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
||||
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
||||
|
||||
|
||||
def test_old_main_comes_back_on_new_leader_as_replica():
|
||||
# 1. Start all instances.
|
||||
# 2. Kill the main instance
|
||||
|
Loading…
Reference in New Issue
Block a user