Compare commits

...

3 Commits

Author SHA1 Message Date
antoniofilipovic
742f3a2e4c fix issue with follower callback 2024-03-12 16:53:10 +01:00
antoniofilipovic
7d9352beb5 add change 2024-03-05 11:38:12 +01:00
antoniofilipovic
97b0a56b3e build test case which can fail 2024-03-04 18:03:48 +01:00
6 changed files with 428 additions and 10 deletions

View File

@ -140,7 +140,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
.port = repl_info_config.replication_port,
};
};
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
// registering replicas
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
auto instance_client = replication_handler.RegisterReplica(config);

View File

@ -61,21 +61,39 @@ CoordinatorInstance::CoordinatorInstance()
std::ranges::for_each(repl_instances_, [this](auto &instance) {
instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename
instance.StartFrequentCheck();
instance.ResumeFrequentCheck();
});
is_running_.store(true, std::memory_order::acquire);
},
[this]() {
spdlog::info("Leader changed, stopping all replication instances!");
spdlog::info("Leader changed, trying to stop all replication instances, thread id {}!",
std::this_thread::get_id());
/// TODO Add to pool
// auto lock = std::lock_guard{coord_instance_lock_}; // RAII
// std::ranges::for_each(repl_instances_)
repl_instances_.clear();
spdlog::info("Leader changed, stopped all replication instances!");
return;
is_running_.store(false, std::memory_order::acquire);
pool_.AddTask([this]() {
auto lock = std::lock_guard{coord_instance_lock_};
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.PauseFrequentCheck(); });
});
})) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
if (!self->is_running_) {
return;
}
auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
};
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
if (!self->is_running_) {
return;
}
auto lock = std::lock_guard{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
};
@ -146,6 +164,9 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
}
auto CoordinatorInstance::TryFailover() -> void {
if (!is_running_) {
return;
}
auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); };
auto alive_replicas =
@ -216,22 +237,40 @@ auto CoordinatorInstance::TryFailover() -> void {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
spdlog::info("Promote to main done, trying to append update uuid, thread id {}, is leader {}",
std::this_thread::get_id(), raft_state_.IsLeader());
// TODO (antoniofilipovic) : This can fail and we don't know we change uuid
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
spdlog::trace("Append entry update uuid failed, thread id {}, is leader {}", std::this_thread::get_id(),
raft_state_.IsLeader());
return;
}
auto const new_main_instance_name = new_main->InstanceName();
spdlog::info("Promote to main done, trying to set instance as main log");
// TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
spdlog::trace("Append set instance as main failed, thread id {}, is leader {}", std::this_thread::get_id(),
raft_state_.IsLeader());
return;
}
// TODO:
// Set trying failover on instance (chosen instance)
// If all passes, all good
// if it fails, we need to check if main was promoted and we have correct new uuid
// PromoteToMain -> split on promote to main and enable writting -> solves issue that we have
// consistent state
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
}
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
-> SetInstanceToMainCoordinatorStatus {
if (!is_running_) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_};
if (raft_state_.MainExists()) {
@ -295,6 +334,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus {
if (!is_running_) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_};
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
@ -341,6 +384,10 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus {
if (!is_running_) {
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_};
if (!raft_state_.RequestLeadership()) {
@ -390,6 +437,9 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32
}
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
repl_instance.OnFailPing();
@ -404,6 +454,9 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name)
}
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
@ -452,6 +505,9 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
}
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
@ -472,6 +528,9 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
}
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);

View File

@ -59,6 +59,8 @@ auto RaftState::InitRaftServer() -> void {
raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
spdlog::info("Received some message, my id {}, leader id {}, event_type {}, thread_id {}", param->myId,
param->leaderId, event_type, std::this_thread::get_id());
if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_();

View File

@ -265,8 +265,6 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph:
spdlog::debug("Starting replica recovery");
auto *mem_storage = static_cast<InMemoryStorage *>(storage);
// TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using
// only snapshot
while (true) {
auto file_locker = mem_storage->file_retainer_.AddLocker();

View File

@ -13,6 +13,7 @@ import os
import shutil
import sys
import tempfile
from time import sleep
import interactive_mg_runner
import pytest
@ -122,9 +123,19 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
def test_distributed_automatic_failover():
# Goal of this test is to check that coordination works if one instance fails
# with multiple coordinators
# 1. Start all manually
# 2. Everything is set up on main
# 3. MAIN dies
# 4. New main elected
# 1
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [
(
@ -148,8 +159,10 @@ def test_distributed_automatic_failover():
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# 4
coord_cursor = connect(host="localhost", port=7692).cursor()
def retrieve_data_show_repl_cluster():
@ -210,11 +223,21 @@ def test_distributed_automatic_failover():
def test_distributed_automatic_failover_after_coord_dies():
# Goal of this test is to check if main and coordinator die at same time in the beginning that
# everything works fine
# 1. Start all manually
# 2. Coordinator dies
# 3. MAIN dies
# 4.
safe_execute(shutil.rmtree, TEMP_DIR)
# 1
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
@ -243,7 +266,7 @@ def test_distributed_automatic_failover_after_coord_dies():
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
@ -292,5 +315,332 @@ def test_distributed_automatic_failover_after_coord_dies():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
@pytest.mark.parametrize("data_recovery", ["false"])
def test_distributed_coordinators_work_partial_failover(data_recovery):
# Goal of this test is to check that correct MAIN instance is chosen
# if coordinator dies while failover is being done
# 1. We start all replicas, main and 4 coordinators manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
# 5. Kill MAIN: instance 3
# 6. Failover should succeed to instance 1 -> SHOW ROLE -> MAIN
# 7. Kill 2 coordinators (1 and 2) as soon as instance becomes MAIN
# 8. coord 4 become follower (2 coords dead, no leader)
# 8. instance_1 writes (MAIN, successfully)
# 9. Kill coordinator 4
# 10. Connect to coordinator 3
# 11. Start coordinator_1 and coordinator_2 (coord 3 probably leader)
# 12. Failover again
# 13. Main can't write to replicas anymore, coordinator can't progress
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"data_directory": f"{temp_dir}/coordinator_1",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"data_directory": f"{temp_dir}/coordinator_2",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"data_directory": f"{temp_dir}/coordinator_3",
"setup_queries": [],
},
"coordinator_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7693",
"--log-level=TRACE",
"--raft-server-id=4",
"--raft-server-port=10114",
],
"log_file": "coordinator4.log",
"data_directory": f"{temp_dir}/coordinator_4",
"setup_queries": [],
},
}
# 1
interactive_mg_runner.start_all_except(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, {"coordinator_4"})
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
coord_cursor = connect(host="localhost", port=7693).cursor()
for query in [
"ADD COORDINATOR 1 ON '127.0.0.1:10111';",
"ADD COORDINATOR 2 ON '127.0.0.1:10112';",
"ADD COORDINATOR 3 ON '127.0.0.1:10113';",
]:
sleep(1)
execute_and_fetch_all(coord_cursor, query)
for query in [
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"SET INSTANCE instance_3 TO MAIN;",
]:
execute_and_fetch_all(coord_cursor, query)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
coord_cursor = connect(host="localhost", port=7693).cursor()
def retrieve_data_show_instances_main_coord():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_main_coord)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});")
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 6
sleep(4.5)
max_tries = 100
last_role = "replica"
while max_tries:
max_tries -= 1
last_role = execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0][0]
if "main" == last_role:
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
break
sleep(0.1)
assert max_tries > 0 or last_role == "main"
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "unknown", "main"), # TODO although above we see it is MAIN
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
("instance_3", "", "127.0.0.1:10013", "unknown", "main"),
]
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 7
def retrieve_role():
return execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0]
mg_sleep_and_assert("MAIN", retrieve_role)
# 8
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count_instance_2():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count_instance_2)
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
# 10
coord_3_cursor = connect(port=7692, host="localhost").cursor()
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "unknown", "replica"), # TODO: bug this is main
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
("instance_3", "", "127.0.0.1:10013", "unknown", "replica"),
]
def retrieve_data_show_instances_new_leader():
return sorted(list(execute_and_fetch_all(coord_3_cursor, "SHOW INSTANCES;")))
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
# 11
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
# 12
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
(
"instance_1",
"",
"127.0.0.1:10011",
"up",
"replica",
), # TODO: bug this is main, this might crash instance, because we will maybe execute replica callback on MAIN, which won't work
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
import time
time.sleep(5) # failover
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:2});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count_instance_2():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(4, get_vertex_count_instance_2) # MAIN will not be able to write to it anymore
# 13
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-k", "test_distributed_coordinators_work_partial_failover", "-vv"]))
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -39,6 +39,7 @@ import tempfile
import time
from argparse import ArgumentParser
from inspect import signature
from typing import List, Set
import yaml
@ -214,6 +215,14 @@ def start_all(context, procdir="", keep_directories=True):
start_instance(context, key, procdir)
def start_all_except(context, exceptions: Set[str], procdir="", keep_directories=True):
stop_all(keep_directories)
for key, _ in context.items():
if key in exceptions:
continue
start_instance(context, key, procdir)
def start_all_keep_others(context, procdir="", keep_directories=True):
for key, _ in context.items():
start_instance(context, key, procdir)