Compare commits

...

3 Commits

Author SHA1 Message Date
antoniofilipovic
742f3a2e4c fix issue with follower callback 2024-03-12 16:53:10 +01:00
antoniofilipovic
7d9352beb5 add change 2024-03-05 11:38:12 +01:00
antoniofilipovic
97b0a56b3e build test case which can fail 2024-03-04 18:03:48 +01:00
6 changed files with 428 additions and 10 deletions

View File

@ -140,7 +140,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
.port = repl_info_config.replication_port, .port = repl_info_config.replication_port,
}; };
}; };
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
// registering replicas // registering replicas
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) { for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
auto instance_client = replication_handler.RegisterReplica(config); auto instance_client = replication_handler.RegisterReplica(config);

View File

@ -61,21 +61,39 @@ CoordinatorInstance::CoordinatorInstance()
std::ranges::for_each(repl_instances_, [this](auto &instance) { std::ranges::for_each(repl_instances_, [this](auto &instance) {
instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename
instance.StartFrequentCheck(); instance.ResumeFrequentCheck();
}); });
is_running_.store(true, std::memory_order::acquire);
}, },
[this]() { [this]() {
spdlog::info("Leader changed, stopping all replication instances!"); spdlog::info("Leader changed, trying to stop all replication instances, thread id {}!",
std::this_thread::get_id());
/// TODO Add to pool
// auto lock = std::lock_guard{coord_instance_lock_}; // RAII
// std::ranges::for_each(repl_instances_)
repl_instances_.clear(); repl_instances_.clear();
spdlog::info("Leader changed, stopped all replication instances!");
return;
is_running_.store(false, std::memory_order::acquire);
pool_.AddTask([this]() {
auto lock = std::lock_guard{coord_instance_lock_};
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.PauseFrequentCheck(); });
});
})) { })) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_}; if (!self->is_running_) {
return;
}
auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
}; };
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_}; if (!self->is_running_) {
return;
}
auto lock = std::lock_guard{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name); auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
}; };
@ -146,6 +164,9 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
} }
auto CoordinatorInstance::TryFailover() -> void { auto CoordinatorInstance::TryFailover() -> void {
if (!is_running_) {
return;
}
auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); }; auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); };
auto alive_replicas = auto alive_replicas =
@ -216,22 +237,40 @@ auto CoordinatorInstance::TryFailover() -> void {
spdlog::warn("Failover failed since promoting replica to main failed!"); spdlog::warn("Failover failed since promoting replica to main failed!");
return; return;
} }
spdlog::info("Promote to main done, trying to append update uuid, thread id {}, is leader {}",
std::this_thread::get_id(), raft_state_.IsLeader());
// TODO (antoniofilipovic) : This can fail and we don't know we change uuid
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
spdlog::trace("Append entry update uuid failed, thread id {}, is leader {}", std::this_thread::get_id(),
raft_state_.IsLeader());
return; return;
} }
auto const new_main_instance_name = new_main->InstanceName(); auto const new_main_instance_name = new_main->InstanceName();
spdlog::info("Promote to main done, trying to set instance as main log");
// TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
spdlog::trace("Append set instance as main failed, thread id {}, is leader {}", std::this_thread::get_id(),
raft_state_.IsLeader());
return; return;
} }
// TODO:
// Set trying failover on instance (chosen instance)
// If all passes, all good
// if it fails, we need to check if main was promoted and we have correct new uuid
// PromoteToMain -> split on promote to main and enable writting -> solves issue that we have
// consistent state
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
} }
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name) auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
-> SetInstanceToMainCoordinatorStatus { -> SetInstanceToMainCoordinatorStatus {
if (!is_running_) {
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_}; auto lock = std::lock_guard{coord_instance_lock_};
if (raft_state_.MainExists()) { if (raft_state_.MainExists()) {
@ -295,6 +334,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config) auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config)
-> RegisterInstanceCoordinatorStatus { -> RegisterInstanceCoordinatorStatus {
if (!is_running_) {
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_}; auto lock = std::lock_guard{coord_instance_lock_};
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) { if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
@ -341,6 +384,10 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name) auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name)
-> UnregisterInstanceCoordinatorStatus { -> UnregisterInstanceCoordinatorStatus {
if (!is_running_) {
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
;
}
auto lock = std::lock_guard{coord_instance_lock_}; auto lock = std::lock_guard{coord_instance_lock_};
if (!raft_state_.RequestLeadership()) { if (!raft_state_.RequestLeadership()) {
@ -390,6 +437,9 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32
} }
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) { void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing main fail callback", repl_instance_name); spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);
repl_instance.OnFailPing(); repl_instance.OnFailPing();
@ -404,6 +454,9 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name)
} }
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) { void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing main successful callback", repl_instance_name); spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);
@ -452,6 +505,9 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
} }
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) { void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);
@ -472,6 +528,9 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
} }
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) { void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
if (!is_running_) {
return;
}
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name);

View File

@ -59,6 +59,8 @@ auto RaftState::InitRaftServer() -> void {
raft_server::init_options init_opts; raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
spdlog::info("Received some message, my id {}, leader id {}, event_type {}, thread_id {}", param->myId,
param->leaderId, event_type, std::this_thread::get_id());
if (event_type == cb_func::BecomeLeader) { if (event_type == cb_func::BecomeLeader) {
spdlog::info("Node {} became leader", param->leaderId); spdlog::info("Node {} became leader", param->leaderId);
become_leader_cb_(); become_leader_cb_();

View File

@ -265,8 +265,6 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph:
spdlog::debug("Starting replica recovery"); spdlog::debug("Starting replica recovery");
auto *mem_storage = static_cast<InMemoryStorage *>(storage); auto *mem_storage = static_cast<InMemoryStorage *>(storage);
// TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using
// only snapshot
while (true) { while (true) {
auto file_locker = mem_storage->file_retainer_.AddLocker(); auto file_locker = mem_storage->file_retainer_.AddLocker();

View File

@ -13,6 +13,7 @@ import os
import shutil import shutil
import sys import sys
import tempfile import tempfile
from time import sleep
import interactive_mg_runner import interactive_mg_runner
import pytest import pytest
@ -122,9 +123,19 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
def test_distributed_automatic_failover(): def test_distributed_automatic_failover():
# Goal of this test is to check that coordination works if one instance fails
# with multiple coordinators
# 1. Start all manually
# 2. Everything is set up on main
# 3. MAIN dies
# 4. New main elected
# 1
safe_execute(shutil.rmtree, TEMP_DIR) safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
main_cursor = connect(host="localhost", port=7689).cursor() main_cursor = connect(host="localhost", port=7689).cursor()
expected_data_on_main = [ expected_data_on_main = [
( (
@ -148,8 +159,10 @@ def test_distributed_automatic_failover():
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas) mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
# 4
coord_cursor = connect(host="localhost", port=7692).cursor() coord_cursor = connect(host="localhost", port=7692).cursor()
def retrieve_data_show_repl_cluster(): def retrieve_data_show_repl_cluster():
@ -210,11 +223,21 @@ def test_distributed_automatic_failover():
def test_distributed_automatic_failover_after_coord_dies(): def test_distributed_automatic_failover_after_coord_dies():
# Goal of this test is to check if main and coordinator die at same time in the beginning that
# everything works fine
# 1. Start all manually
# 2. Coordinator dies
# 3. MAIN dies
# 4.
safe_execute(shutil.rmtree, TEMP_DIR) safe_execute(shutil.rmtree, TEMP_DIR)
# 1
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
# 2
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
# 3
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor() coord_cursor_1 = connect(host="localhost", port=7690).cursor()
@ -243,7 +266,7 @@ def test_distributed_automatic_failover_after_coord_dies():
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"), ("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"), ("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. ("instance_3", "", "", "unknown", "main"),
] ]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
@ -292,5 +315,332 @@ def test_distributed_automatic_failover_after_coord_dies():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
@pytest.mark.parametrize("data_recovery", ["false"])
def test_distributed_coordinators_work_partial_failover(data_recovery):
# Goal of this test is to check that correct MAIN instance is chosen
# if coordinator dies while failover is being done
# 1. We start all replicas, main and 4 coordinators manually
# 2. We check that main has correct state
# 3. Create initial data on MAIN
# 4. Expect data to be copied on all replicas
# 5. Kill MAIN: instance 3
# 6. Failover should succeed to instance 1 -> SHOW ROLE -> MAIN
# 7. Kill 2 coordinators (1 and 2) as soon as instance becomes MAIN
# 8. coord 4 become follower (2 coords dead, no leader)
# 8. instance_1 writes (MAIN, successfully)
# 9. Kill coordinator 4
# 10. Connect to coordinator 3
# 11. Start coordinator_1 and coordinator_2 (coord 3 probably leader)
# 12. Failover again
# 13. Main can't write to replicas anymore, coordinator can't progress
temp_dir = tempfile.TemporaryDirectory().name
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_1.log",
"data_directory": f"{temp_dir}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
"--replication-restore-state-on-startup",
"true",
f"--data-recovery-on-startup={data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_2.log",
"data_directory": f"{temp_dir}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
"--replication-restore-state-on-startup",
"true",
"--data-recovery-on-startup",
f"{data_recovery}",
"--storage-recover-on-startup=false",
],
"log_file": "instance_3.log",
"data_directory": f"{temp_dir}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"data_directory": f"{temp_dir}/coordinator_1",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"data_directory": f"{temp_dir}/coordinator_2",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"data_directory": f"{temp_dir}/coordinator_3",
"setup_queries": [],
},
"coordinator_4": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7693",
"--log-level=TRACE",
"--raft-server-id=4",
"--raft-server-port=10114",
],
"log_file": "coordinator4.log",
"data_directory": f"{temp_dir}/coordinator_4",
"setup_queries": [],
},
}
# 1
interactive_mg_runner.start_all_except(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, {"coordinator_4"})
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
coord_cursor = connect(host="localhost", port=7693).cursor()
for query in [
"ADD COORDINATOR 1 ON '127.0.0.1:10111';",
"ADD COORDINATOR 2 ON '127.0.0.1:10112';",
"ADD COORDINATOR 3 ON '127.0.0.1:10113';",
]:
sleep(1)
execute_and_fetch_all(coord_cursor, query)
for query in [
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
"SET INSTANCE instance_3 TO MAIN;",
]:
execute_and_fetch_all(coord_cursor, query)
# 2
main_cursor = connect(host="localhost", port=7687).cursor()
expected_data_on_main = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"behind": None, "status": "ready", "ts": 0},
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
),
]
main_cursor = connect(host="localhost", port=7687).cursor()
def retrieve_data_show_replicas():
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
coord_cursor = connect(host="localhost", port=7693).cursor()
def retrieve_data_show_instances_main_coord():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_main_coord)
# 3
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});")
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});")
# 4
instance_1_cursor = connect(host="localhost", port=7688).cursor()
instance_2_cursor = connect(host="localhost", port=7689).cursor()
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
# 5
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
# 6
sleep(4.5)
max_tries = 100
last_role = "replica"
while max_tries:
max_tries -= 1
last_role = execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0][0]
if "main" == last_role:
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
break
sleep(0.1)
assert max_tries > 0 or last_role == "main"
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "unknown", "main"), # TODO although above we see it is MAIN
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
("instance_3", "", "127.0.0.1:10013", "unknown", "main"),
]
def retrieve_data_show_instances():
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
# 7
def retrieve_role():
return execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0]
mg_sleep_and_assert("MAIN", retrieve_role)
# 8
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count_instance_2():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(3, get_vertex_count_instance_2)
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
# 9
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
# 10
coord_3_cursor = connect(port=7692, host="localhost").cursor()
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "unknown", "replica"), # TODO: bug this is main
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
("instance_3", "", "127.0.0.1:10013", "unknown", "replica"),
]
def retrieve_data_show_instances_new_leader():
return sorted(list(execute_and_fetch_all(coord_3_cursor, "SHOW INSTANCES;")))
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
# 11
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
# 12
expected_data_on_coord = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
(
"instance_1",
"",
"127.0.0.1:10011",
"up",
"replica",
), # TODO: bug this is main, this might crash instance, because we will maybe execute replica callback on MAIN, which won't work
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
import time
time.sleep(5) # failover
with pytest.raises(Exception) as e:
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:2});")
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
def get_vertex_count_instance_2():
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(4, get_vertex_count_instance_2) # MAIN will not be able to write to it anymore
# 13
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-k", "test_distributed_coordinators_work_partial_failover", "-vv"]))
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -39,6 +39,7 @@ import tempfile
import time import time
from argparse import ArgumentParser from argparse import ArgumentParser
from inspect import signature from inspect import signature
from typing import List, Set
import yaml import yaml
@ -214,6 +215,14 @@ def start_all(context, procdir="", keep_directories=True):
start_instance(context, key, procdir) start_instance(context, key, procdir)
def start_all_except(context, exceptions: Set[str], procdir="", keep_directories=True):
stop_all(keep_directories)
for key, _ in context.items():
if key in exceptions:
continue
start_instance(context, key, procdir)
def start_all_keep_others(context, procdir="", keep_directories=True): def start_all_keep_others(context, procdir="", keep_directories=True):
for key, _ in context.items(): for key, _ in context.items():
start_instance(context, key, procdir) start_instance(context, key, procdir)