Compare commits
3 Commits
master
...
distribute
Author | SHA1 | Date | |
---|---|---|---|
|
742f3a2e4c | ||
|
7d9352beb5 | ||
|
97b0a56b3e |
@ -140,7 +140,7 @@ void CoordinatorHandlers::PromoteReplicaToMainHandler(replication::ReplicationHa
|
||||
.port = repl_info_config.replication_port,
|
||||
};
|
||||
};
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
|
||||
// registering replicas
|
||||
for (auto const &config : req.replication_clients_info | ranges::views::transform(converter)) {
|
||||
auto instance_client = replication_handler.RegisterReplica(config);
|
||||
|
@ -61,21 +61,39 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
|
||||
std::ranges::for_each(repl_instances_, [this](auto &instance) {
|
||||
instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename
|
||||
instance.StartFrequentCheck();
|
||||
instance.ResumeFrequentCheck();
|
||||
});
|
||||
is_running_.store(true, std::memory_order::acquire);
|
||||
},
|
||||
[this]() {
|
||||
spdlog::info("Leader changed, stopping all replication instances!");
|
||||
spdlog::info("Leader changed, trying to stop all replication instances, thread id {}!",
|
||||
std::this_thread::get_id());
|
||||
/// TODO Add to pool
|
||||
// auto lock = std::lock_guard{coord_instance_lock_}; // RAII
|
||||
// std::ranges::for_each(repl_instances_)
|
||||
repl_instances_.clear();
|
||||
spdlog::info("Leader changed, stopped all replication instances!");
|
||||
return;
|
||||
is_running_.store(false, std::memory_order::acquire);
|
||||
pool_.AddTask([this]() {
|
||||
auto lock = std::lock_guard{coord_instance_lock_};
|
||||
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.PauseFrequentCheck(); });
|
||||
});
|
||||
})) {
|
||||
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||
if (!self->is_running_) {
|
||||
return;
|
||||
}
|
||||
auto lock = std::lock_guard{self->coord_instance_lock_}; // RAII
|
||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
|
||||
};
|
||||
|
||||
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||
if (!self->is_running_) {
|
||||
return;
|
||||
}
|
||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
|
||||
};
|
||||
@ -146,6 +164,9 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::TryFailover() -> void {
|
||||
if (!is_running_) {
|
||||
return;
|
||||
}
|
||||
auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); };
|
||||
|
||||
auto alive_replicas =
|
||||
@ -216,22 +237,40 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
spdlog::warn("Failover failed since promoting replica to main failed!");
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::info("Promote to main done, trying to append update uuid, thread id {}, is leader {}",
|
||||
std::this_thread::get_id(), raft_state_.IsLeader());
|
||||
// TODO (antoniofilipovic) : This can fail and we don't know we change uuid
|
||||
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
|
||||
spdlog::trace("Append entry update uuid failed, thread id {}, is leader {}", std::this_thread::get_id(),
|
||||
raft_state_.IsLeader());
|
||||
return;
|
||||
}
|
||||
|
||||
auto const new_main_instance_name = new_main->InstanceName();
|
||||
|
||||
spdlog::info("Promote to main done, trying to set instance as main log");
|
||||
// TODO (antoniofilipovic): This can fail and we don't know we have set up new MAIN
|
||||
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
|
||||
spdlog::trace("Append set instance as main failed, thread id {}, is leader {}", std::this_thread::get_id(),
|
||||
raft_state_.IsLeader());
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// Set trying failover on instance (chosen instance)
|
||||
// If all passes, all good
|
||||
// if it fails, we need to check if main was promoted and we have correct new uuid
|
||||
// PromoteToMain -> split on promote to main and enable writting -> solves issue that we have
|
||||
// consistent state
|
||||
|
||||
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
|
||||
-> SetInstanceToMainCoordinatorStatus {
|
||||
if (!is_running_) {
|
||||
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
|
||||
;
|
||||
}
|
||||
auto lock = std::lock_guard{coord_instance_lock_};
|
||||
|
||||
if (raft_state_.MainExists()) {
|
||||
@ -295,6 +334,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
|
||||
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config)
|
||||
-> RegisterInstanceCoordinatorStatus {
|
||||
if (!is_running_) {
|
||||
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||
;
|
||||
}
|
||||
auto lock = std::lock_guard{coord_instance_lock_};
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
|
||||
@ -341,6 +384,10 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
|
||||
|
||||
auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name)
|
||||
-> UnregisterInstanceCoordinatorStatus {
|
||||
if (!is_running_) {
|
||||
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||
;
|
||||
}
|
||||
auto lock = std::lock_guard{coord_instance_lock_};
|
||||
|
||||
if (!raft_state_.RequestLeadership()) {
|
||||
@ -390,6 +437,9 @@ auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32
|
||||
}
|
||||
|
||||
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
|
||||
if (!is_running_) {
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
@ -404,6 +454,9 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name)
|
||||
}
|
||||
|
||||
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
|
||||
if (!is_running_) {
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
|
||||
@ -452,6 +505,9 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
||||
}
|
||||
|
||||
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
|
||||
if (!is_running_) {
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
|
||||
@ -472,6 +528,9 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
|
||||
}
|
||||
|
||||
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
|
||||
if (!is_running_) {
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
|
||||
|
@ -59,6 +59,8 @@ auto RaftState::InitRaftServer() -> void {
|
||||
|
||||
raft_server::init_options init_opts;
|
||||
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
|
||||
spdlog::info("Received some message, my id {}, leader id {}, event_type {}, thread_id {}", param->myId,
|
||||
param->leaderId, event_type, std::this_thread::get_id());
|
||||
if (event_type == cb_func::BecomeLeader) {
|
||||
spdlog::info("Node {} became leader", param->leaderId);
|
||||
become_leader_cb_();
|
||||
|
@ -265,8 +265,6 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph:
|
||||
spdlog::debug("Starting replica recovery");
|
||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage);
|
||||
|
||||
// TODO(antoniofilipovic): Can we get stuck here in while loop if replica commit timestamp is not updated when using
|
||||
// only snapshot
|
||||
while (true) {
|
||||
auto file_locker = mem_storage->file_retainer_.AddLocker();
|
||||
|
||||
|
@ -13,6 +13,7 @@ import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
from time import sleep
|
||||
|
||||
import interactive_mg_runner
|
||||
import pytest
|
||||
@ -122,9 +123,19 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
||||
|
||||
|
||||
def test_distributed_automatic_failover():
|
||||
# Goal of this test is to check that coordination works if one instance fails
|
||||
# with multiple coordinators
|
||||
|
||||
# 1. Start all manually
|
||||
# 2. Everything is set up on main
|
||||
# 3. MAIN dies
|
||||
# 4. New main elected
|
||||
|
||||
# 1
|
||||
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
||||
# 2
|
||||
main_cursor = connect(host="localhost", port=7689).cursor()
|
||||
expected_data_on_main = [
|
||||
(
|
||||
@ -148,8 +159,10 @@ def test_distributed_automatic_failover():
|
||||
|
||||
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
|
||||
|
||||
# 3
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
|
||||
|
||||
# 4
|
||||
coord_cursor = connect(host="localhost", port=7692).cursor()
|
||||
|
||||
def retrieve_data_show_repl_cluster():
|
||||
@ -210,11 +223,21 @@ def test_distributed_automatic_failover():
|
||||
|
||||
|
||||
def test_distributed_automatic_failover_after_coord_dies():
|
||||
# Goal of this test is to check if main and coordinator die at same time in the beginning that
|
||||
# everything works fine
|
||||
# 1. Start all manually
|
||||
# 2. Coordinator dies
|
||||
# 3. MAIN dies
|
||||
# 4.
|
||||
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||
|
||||
# 1
|
||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||
|
||||
# 2
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
|
||||
|
||||
# 3
|
||||
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
|
||||
|
||||
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
|
||||
@ -243,7 +266,7 @@ def test_distributed_automatic_failover_after_coord_dies():
|
||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||
("instance_1", "", "", "unknown", "main"),
|
||||
("instance_2", "", "", "unknown", "replica"),
|
||||
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
|
||||
("instance_3", "", "", "unknown", "main"),
|
||||
]
|
||||
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
||||
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
||||
@ -292,5 +315,332 @@ def test_distributed_automatic_failover_after_coord_dies():
|
||||
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data_recovery", ["false"])
|
||||
def test_distributed_coordinators_work_partial_failover(data_recovery):
|
||||
# Goal of this test is to check that correct MAIN instance is chosen
|
||||
# if coordinator dies while failover is being done
|
||||
# 1. We start all replicas, main and 4 coordinators manually
|
||||
# 2. We check that main has correct state
|
||||
# 3. Create initial data on MAIN
|
||||
# 4. Expect data to be copied on all replicas
|
||||
# 5. Kill MAIN: instance 3
|
||||
|
||||
# 6. Failover should succeed to instance 1 -> SHOW ROLE -> MAIN
|
||||
# 7. Kill 2 coordinators (1 and 2) as soon as instance becomes MAIN
|
||||
# 8. coord 4 become follower (2 coords dead, no leader)
|
||||
# 8. instance_1 writes (MAIN, successfully)
|
||||
# 9. Kill coordinator 4
|
||||
# 10. Connect to coordinator 3
|
||||
# 11. Start coordinator_1 and coordinator_2 (coord 3 probably leader)
|
||||
# 12. Failover again
|
||||
# 13. Main can't write to replicas anymore, coordinator can't progress
|
||||
|
||||
temp_dir = tempfile.TemporaryDirectory().name
|
||||
|
||||
MEMGRAPH_INNER_INSTANCES_DESCRIPTION = {
|
||||
"instance_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7688",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10011",
|
||||
"--replication-restore-state-on-startup",
|
||||
"true",
|
||||
f"--data-recovery-on-startup={data_recovery}",
|
||||
"--storage-recover-on-startup=false",
|
||||
],
|
||||
"log_file": "instance_1.log",
|
||||
"data_directory": f"{temp_dir}/instance_1",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"instance_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7689",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10012",
|
||||
"--replication-restore-state-on-startup",
|
||||
"true",
|
||||
f"--data-recovery-on-startup={data_recovery}",
|
||||
"--storage-recover-on-startup=false",
|
||||
],
|
||||
"log_file": "instance_2.log",
|
||||
"data_directory": f"{temp_dir}/instance_2",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"instance_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7687",
|
||||
"--log-level",
|
||||
"TRACE",
|
||||
"--coordinator-server-port",
|
||||
"10013",
|
||||
"--replication-restore-state-on-startup",
|
||||
"true",
|
||||
"--data-recovery-on-startup",
|
||||
f"{data_recovery}",
|
||||
"--storage-recover-on-startup=false",
|
||||
],
|
||||
"log_file": "instance_3.log",
|
||||
"data_directory": f"{temp_dir}/instance_3",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_1": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7690",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=1",
|
||||
"--raft-server-port=10111",
|
||||
],
|
||||
"log_file": "coordinator1.log",
|
||||
"data_directory": f"{temp_dir}/coordinator_1",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_2": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7691",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=2",
|
||||
"--raft-server-port=10112",
|
||||
],
|
||||
"log_file": "coordinator2.log",
|
||||
"data_directory": f"{temp_dir}/coordinator_2",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_3": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7692",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=3",
|
||||
"--raft-server-port=10113",
|
||||
],
|
||||
"log_file": "coordinator3.log",
|
||||
"data_directory": f"{temp_dir}/coordinator_3",
|
||||
"setup_queries": [],
|
||||
},
|
||||
"coordinator_4": {
|
||||
"args": [
|
||||
"--experimental-enabled=high-availability",
|
||||
"--bolt-port",
|
||||
"7693",
|
||||
"--log-level=TRACE",
|
||||
"--raft-server-id=4",
|
||||
"--raft-server-port=10114",
|
||||
],
|
||||
"log_file": "coordinator4.log",
|
||||
"data_directory": f"{temp_dir}/coordinator_4",
|
||||
"setup_queries": [],
|
||||
},
|
||||
}
|
||||
|
||||
# 1
|
||||
|
||||
interactive_mg_runner.start_all_except(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, {"coordinator_4"})
|
||||
|
||||
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
|
||||
|
||||
coord_cursor = connect(host="localhost", port=7693).cursor()
|
||||
|
||||
for query in [
|
||||
"ADD COORDINATOR 1 ON '127.0.0.1:10111';",
|
||||
"ADD COORDINATOR 2 ON '127.0.0.1:10112';",
|
||||
"ADD COORDINATOR 3 ON '127.0.0.1:10113';",
|
||||
]:
|
||||
sleep(1)
|
||||
execute_and_fetch_all(coord_cursor, query)
|
||||
|
||||
for query in [
|
||||
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003';",
|
||||
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001';",
|
||||
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002';",
|
||||
"SET INSTANCE instance_3 TO MAIN;",
|
||||
]:
|
||||
execute_and_fetch_all(coord_cursor, query)
|
||||
|
||||
# 2
|
||||
|
||||
main_cursor = connect(host="localhost", port=7687).cursor()
|
||||
expected_data_on_main = [
|
||||
(
|
||||
"instance_1",
|
||||
"127.0.0.1:10001",
|
||||
"sync",
|
||||
{"behind": None, "status": "ready", "ts": 0},
|
||||
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
|
||||
),
|
||||
(
|
||||
"instance_2",
|
||||
"127.0.0.1:10002",
|
||||
"sync",
|
||||
{"behind": None, "status": "ready", "ts": 0},
|
||||
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
|
||||
),
|
||||
]
|
||||
|
||||
main_cursor = connect(host="localhost", port=7687).cursor()
|
||||
|
||||
def retrieve_data_show_replicas():
|
||||
return sorted(list(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")))
|
||||
|
||||
mg_sleep_and_assert_collection(expected_data_on_main, retrieve_data_show_replicas)
|
||||
|
||||
coord_cursor = connect(host="localhost", port=7693).cursor()
|
||||
|
||||
def retrieve_data_show_instances_main_coord():
|
||||
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
|
||||
|
||||
expected_data_on_coord = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
|
||||
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
|
||||
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "up", "main"),
|
||||
]
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_main_coord)
|
||||
|
||||
# 3
|
||||
|
||||
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:1});")
|
||||
execute_and_fetch_all(main_cursor, "CREATE (:Epoch1Vertex {prop:2});")
|
||||
|
||||
# 4
|
||||
instance_1_cursor = connect(host="localhost", port=7688).cursor()
|
||||
instance_2_cursor = connect(host="localhost", port=7689).cursor()
|
||||
|
||||
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
|
||||
assert execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0] == 2
|
||||
|
||||
# 5
|
||||
|
||||
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "instance_3")
|
||||
# 6
|
||||
|
||||
sleep(4.5)
|
||||
max_tries = 100
|
||||
last_role = "replica"
|
||||
while max_tries:
|
||||
max_tries -= 1
|
||||
last_role = execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0][0]
|
||||
if "main" == last_role:
|
||||
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
|
||||
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
|
||||
break
|
||||
sleep(0.1)
|
||||
assert max_tries > 0 or last_role == "main"
|
||||
|
||||
expected_data_on_coord = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
|
||||
("instance_1", "", "127.0.0.1:10011", "unknown", "main"), # TODO although above we see it is MAIN
|
||||
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "unknown", "main"),
|
||||
]
|
||||
|
||||
def retrieve_data_show_instances():
|
||||
return sorted(list(execute_and_fetch_all(coord_cursor, "SHOW INSTANCES;")))
|
||||
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances)
|
||||
|
||||
# 7
|
||||
def retrieve_role():
|
||||
return execute_and_fetch_all(instance_1_cursor, "SHOW REPLICATION ROLE;")[0]
|
||||
|
||||
mg_sleep_and_assert("MAIN", retrieve_role)
|
||||
|
||||
# 8
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:1});")
|
||||
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
|
||||
|
||||
def get_vertex_count_instance_2():
|
||||
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
|
||||
|
||||
mg_sleep_and_assert(3, get_vertex_count_instance_2)
|
||||
|
||||
assert execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0] == 3
|
||||
|
||||
# 9
|
||||
|
||||
interactive_mg_runner.kill(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_4")
|
||||
|
||||
# 10
|
||||
|
||||
coord_3_cursor = connect(port=7692, host="localhost").cursor()
|
||||
|
||||
expected_data_on_coord = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
|
||||
("instance_1", "", "127.0.0.1:10011", "unknown", "replica"), # TODO: bug this is main
|
||||
("instance_2", "", "127.0.0.1:10012", "unknown", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "unknown", "replica"),
|
||||
]
|
||||
|
||||
def retrieve_data_show_instances_new_leader():
|
||||
return sorted(list(execute_and_fetch_all(coord_3_cursor, "SHOW INSTANCES;")))
|
||||
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
|
||||
|
||||
# 11
|
||||
|
||||
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_1")
|
||||
interactive_mg_runner.start(MEMGRAPH_INNER_INSTANCES_DESCRIPTION, "coordinator_2")
|
||||
|
||||
# 12
|
||||
|
||||
expected_data_on_coord = [
|
||||
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||
("coordinator_4", "127.0.0.1:10114", "", "unknown", "coordinator"),
|
||||
(
|
||||
"instance_1",
|
||||
"",
|
||||
"127.0.0.1:10011",
|
||||
"up",
|
||||
"replica",
|
||||
), # TODO: bug this is main, this might crash instance, because we will maybe execute replica callback on MAIN, which won't work
|
||||
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
|
||||
]
|
||||
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_instances_new_leader)
|
||||
|
||||
import time
|
||||
|
||||
time.sleep(5) # failover
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
execute_and_fetch_all(instance_1_cursor, "CREATE (:Epoch2Vertex {prop:2});")
|
||||
assert "At least one SYNC replica has not confirmed committing last transaction." in str(e.value)
|
||||
|
||||
def get_vertex_count_instance_2():
|
||||
return execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
|
||||
|
||||
mg_sleep_and_assert(4, get_vertex_count_instance_2) # MAIN will not be able to write to it anymore
|
||||
|
||||
# 13
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-k", "test_distributed_coordinators_work_partial_failover", "-vv"]))
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
||||
|
@ -39,6 +39,7 @@ import tempfile
|
||||
import time
|
||||
from argparse import ArgumentParser
|
||||
from inspect import signature
|
||||
from typing import List, Set
|
||||
|
||||
import yaml
|
||||
|
||||
@ -214,6 +215,14 @@ def start_all(context, procdir="", keep_directories=True):
|
||||
start_instance(context, key, procdir)
|
||||
|
||||
|
||||
def start_all_except(context, exceptions: Set[str], procdir="", keep_directories=True):
|
||||
stop_all(keep_directories)
|
||||
for key, _ in context.items():
|
||||
if key in exceptions:
|
||||
continue
|
||||
start_instance(context, key, procdir)
|
||||
|
||||
|
||||
def start_all_keep_others(context, procdir="", keep_directories=True):
|
||||
for key, _ in context.items():
|
||||
start_instance(context, key, procdir)
|
||||
|
Loading…
Reference in New Issue
Block a user