Fix snapshot creation in RSM and forbid multiple leaders (#1788)

This commit is contained in:
Andi 2024-03-07 18:40:32 +01:00 committed by GitHub
parent a099417c56
commit 5ca98f9543
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 402 additions and 55 deletions

View File

@ -111,8 +111,7 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto lock = std::shared_lock{log_lock_};
// .at(0) is hack to solve the problem with json serialization of map
auto const log = nlohmann::json{instances_}.at(0).dump();
auto const log = nlohmann::json(instances_).dump();
data = buffer::alloc(sizeof(uint32_t) + log.size());
buffer_serializer bs(data);

View File

@ -40,7 +40,7 @@ CoordinatorInstance::CoordinatorInstance()
});
std::ranges::for_each(replicas, [this](auto &replica) {
spdlog::info("Starting replication instance {}", replica.config.instance_name);
spdlog::info("Started pinging replication instance {}", replica.config.instance_name);
repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
@ -50,7 +50,7 @@ CoordinatorInstance::CoordinatorInstance()
[](auto const &instance) { return instance.status == ReplicationRole::MAIN; });
std::ranges::for_each(main, [this](auto &main_instance) {
spdlog::info("Starting main instance {}", main_instance.config.instance_name);
spdlog::info("Started pinging main instance {}", main_instance.config.instance_name);
repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback);
@ -314,17 +314,20 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
}
auto const undo_action_ = [this]() { repl_instances_.pop_back(); };
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
if (!new_instance->SendDemoteToReplicaRpc()) {
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
repl_instances_.pop_back();
undo_action_();
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) {
undo_action_();
return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR;
}

View File

@ -62,34 +62,33 @@ ptr<log_entry> CoordinatorLogStore::last_entry() const {
uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
uint64_t next_slot{0};
{
auto lock = std::lock_guard{logs_lock_};
next_slot = start_idx_ + logs_.size() - 1;
logs_[next_slot] = clone;
}
auto lock = std::lock_guard{logs_lock_};
uint64_t next_slot = start_idx_ + logs_.size() - 1;
logs_[next_slot] = clone;
return next_slot;
}
// TODO: (andi) I think this is used for resolving conflicts inside NuRaft, check...
// different compared to in_memory_log_store.cxx
void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) {
ptr<log_entry> clone = MakeClone(entry);
// Discard all logs equal to or greater than `index.
{
auto lock = std::lock_guard{logs_lock_};
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
logs_[index] = clone;
auto lock = std::lock_guard{logs_lock_};
auto itr = logs_.lower_bound(index);
while (itr != logs_.end()) {
itr = logs_.erase(itr);
}
logs_[index] = clone;
}
ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start, uint64_t end) {
auto ret = cs_new<std::vector<ptr<log_entry>>>();
ret->resize(end - start);
for (uint64_t i = start, curr_index = 0; i < end; ++i, ++curr_index) {
for (uint64_t i = start, curr_index = 0; i < end; i++, curr_index++) {
ptr<log_entry> src = nullptr;
{
auto lock = std::lock_guard{logs_lock_};
@ -105,21 +104,14 @@ ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start
}
ptr<log_entry> CoordinatorLogStore::entry_at(uint64_t index) {
ptr<log_entry> src = nullptr;
{
auto lock = std::lock_guard{logs_lock_};
src = FindOrDefault_(index);
}
auto lock = std::lock_guard{logs_lock_};
ptr<log_entry> src = FindOrDefault_(index);
return MakeClone(src);
}
uint64_t CoordinatorLogStore::term_at(uint64_t index) {
uint64_t term = 0;
{
auto lock = std::lock_guard{logs_lock_};
term = FindOrDefault_(index)->get_term();
}
return term;
auto lock = std::lock_guard{logs_lock_};
return FindOrDefault_(index)->get_term();
}
ptr<buffer> CoordinatorLogStore::pack(uint64_t index, int32 cnt) {

View File

@ -14,6 +14,10 @@
#include "nuraft/coordinator_state_machine.hpp"
#include "utils/logging.hpp"
namespace {
constexpr int MAX_SNAPSHOTS = 3;
} // namespace
namespace memgraph::coordination {
auto CoordinatorStateMachine::FindCurrentMainInstanceName() const -> std::optional<std::string> {
@ -82,6 +86,7 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; }
auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
spdlog::debug("Commit: log_idx={}, data.size()={}", log_idx, data.size());
auto const [parsed_data, log_action] = DecodeLog(data);
cluster_state_.DoAction(parsed_data, log_action);
last_committed_idx_ = log_idx;
@ -95,15 +100,17 @@ auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<b
auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_config> & /*new_conf*/) -> void {
last_committed_idx_ = log_idx;
spdlog::debug("Commit config: log_idx={}", log_idx);
}
auto CoordinatorStateMachine::rollback(ulong const log_idx, buffer &data) -> void {
// NOTE: Nothing since we don't do anything in pre_commit
spdlog::debug("Rollback: log_idx={}, data.size()={}", log_idx, data.size());
}
auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /*user_snp_ctx*/, ulong obj_id,
ptr<buffer> &data_out, bool &is_last_obj) -> int {
spdlog::info("read logical snapshot object, obj_id: {}", obj_id);
spdlog::debug("read logical snapshot object, obj_id: {}", obj_id);
ptr<SnapshotCtx> ctx = nullptr;
{
@ -116,20 +123,33 @@ auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /
}
ctx = entry->second;
}
ctx->cluster_state_.Serialize(data_out);
is_last_obj = true;
if (obj_id == 0) {
// Object ID == 0: first object, put dummy data.
data_out = buffer::alloc(sizeof(int32));
buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = false;
} else {
// Object ID > 0: second object, put actual value.
ctx->cluster_state_.Serialize(data_out);
}
return 0;
}
auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &obj_id, buffer &data, bool is_first_obj,
bool is_last_obj) -> void {
spdlog::info("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj,
is_last_obj);
spdlog::debug("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj,
is_last_obj);
buffer_serializer bs(data);
auto cluster_state = CoordinatorClusterState::Deserialize(data);
if (obj_id == 0) {
ptr<buffer> snp_buf = snapshot.serialize();
auto ss = snapshot::deserialize(*snp_buf);
create_snapshot_internal(ss);
} else {
auto cluster_state = CoordinatorClusterState::Deserialize(data);
{
auto ll = std::lock_guard{snapshots_lock_};
auto entry = snapshots_.find(snapshot.get_last_log_idx());
DMG_ASSERT(entry != snapshots_.end());
@ -139,6 +159,7 @@ auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &ob
auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool {
auto ll = std::lock_guard{snapshots_lock_};
spdlog::debug("apply snapshot, last_log_idx: {}", s.get_last_log_idx());
auto entry = snapshots_.find(s.get_last_log_idx());
if (entry == snapshots_.end()) return false;
@ -151,6 +172,7 @@ auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {}
auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> {
auto ll = std::lock_guard{snapshots_lock_};
spdlog::debug("last_snapshot");
auto entry = snapshots_.rbegin();
if (entry == snapshots_.rend()) return nullptr;
@ -161,6 +183,7 @@ auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> {
auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; }
auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void {
spdlog::debug("create_snapshot, last_log_idx: {}", s.get_last_log_idx());
ptr<buffer> snp_buf = s.serialize();
ptr<snapshot> ss = snapshot::deserialize(*snp_buf);
create_snapshot_internal(ss);
@ -172,11 +195,11 @@ auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::h
auto CoordinatorStateMachine::create_snapshot_internal(ptr<snapshot> snapshot) -> void {
auto ll = std::lock_guard{snapshots_lock_};
spdlog::debug("create_snapshot_internal, last_log_idx: {}", snapshot->get_last_log_idx());
auto ctx = cs_new<SnapshotCtx>(snapshot, cluster_state_);
snapshots_[snapshot->get_last_log_idx()] = ctx;
constexpr int MAX_SNAPSHOTS = 3;
while (snapshots_.size() > MAX_SNAPSHOTS) {
snapshots_.erase(snapshots_.begin());
}

View File

@ -95,12 +95,8 @@ class CoordinatorStateMachine : public state_machine {
auto create_snapshot_internal(ptr<snapshot> snapshot) -> void;
CoordinatorClusterState cluster_state_;
// mutable utils::RWLock lock{utils::RWLock::Priority::READ};
std::atomic<uint64_t> last_committed_idx_{0};
// TODO: (andi) Maybe not needed, remove it
std::map<uint64_t, ptr<SnapshotCtx>> snapshots_;
std::mutex snapshots_lock_;

View File

@ -10,7 +10,6 @@
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include <chrono>
#include "coordination/coordinator_config.hpp"
@ -43,19 +42,25 @@ RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_fo
auto RaftState::InitRaftServer() -> void {
asio_service::options asio_opts;
asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this
asio_opts.thread_pool_size_ = 1;
raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
// 5 logs are preserved before the last snapshot
params.reserved_log_items_ = 5;
// Create snapshot for every 5 log appends
params.snapshot_distance_ = 5;
params.client_req_timeout_ = 3000;
params.return_method_ = raft_params::blocking;
// If the leader doesn't receive any response from quorum nodes
// in 200ms, it will step down.
// This allows us to achieve strong consistency even if network partition
// happens between the current leader and followers.
// The value must be <= election_timeout_lower_bound_ so that cluster can never
// have multiple leaders.
params.leadership_expiry_ = 200;
raft_server::init_options init_opts;
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
if (event_type == cb_func::BecomeLeader) {
@ -117,7 +122,8 @@ auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_po
if (cmd_result->get_result_code() == nuraft::cmd_result_code::OK) {
spdlog::info("Request to add server {} to the cluster accepted", endpoint);
} else {
throw RaftAddServerException("Failed to accept request to add server {} to the cluster", endpoint);
throw RaftAddServerException("Failed to accept request to add server {} to the cluster with error code {}",
endpoint, cmd_result->get_result_code());
}
// Waiting for server to join

View File

@ -497,7 +497,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrAddress(raft_socket_address);
if (maybe_ip_and_port) {
auto const [ip, port] = *maybe_ip_and_port;
spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, port, ip);
spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, ip, port);
coordinator_handler_.AddCoordinatorInstance(raft_server_id, port, ip);
} else {
spdlog::error("Invalid raft socket address {}.", raft_socket_address);

View File

@ -430,7 +430,6 @@ def test_unregister_main():
coordinator2_cursor = connect(host="localhost", port=7691).cursor()
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'")
assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'")
execute_and_fetch_all(
coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'"

View File

@ -121,6 +121,202 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
}
def get_instances_description_no_setup():
return {
"instance_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7687",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10011",
],
"log_file": "instance_1.log",
"data_directory": f"{TEMP_DIR}/instance_1",
"setup_queries": [],
},
"instance_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7688",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10012",
],
"log_file": "instance_2.log",
"data_directory": f"{TEMP_DIR}/instance_2",
"setup_queries": [],
},
"instance_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7689",
"--log-level",
"TRACE",
"--coordinator-server-port",
"10013",
],
"log_file": "instance_3.log",
"data_directory": f"{TEMP_DIR}/instance_3",
"setup_queries": [],
},
"coordinator_1": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7690",
"--log-level=TRACE",
"--raft-server-id=1",
"--raft-server-port=10111",
],
"log_file": "coordinator1.log",
"setup_queries": [],
},
"coordinator_2": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7691",
"--log-level=TRACE",
"--raft-server-id=2",
"--raft-server-port=10112",
],
"log_file": "coordinator2.log",
"setup_queries": [],
},
"coordinator_3": {
"args": [
"--experimental-enabled=high-availability",
"--bolt-port",
"7692",
"--log-level=TRACE",
"--raft-server-id=3",
"--raft-server-port=10113",
],
"log_file": "coordinator3.log",
"setup_queries": [],
},
}
def test_old_main_comes_back_on_new_leader_as_replica():
# 1. Start all instances.
# 2. Kill the main instance
# 3. Kill the leader
# 4. Start the old main instance
# 5. Run SHOW INSTANCES on the new leader and check that the old main instance is registered as a replica
# 6. Start again previous leader
safe_execute(shutil.rmtree, TEMP_DIR)
inner_instances_description = get_instances_description_no_setup()
interactive_mg_runner.start_all(inner_instances_description)
setup_queries = [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
"SET INSTANCE instance_3 TO MAIN",
]
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
for query in setup_queries:
execute_and_fetch_all(coord_cursor_3, query)
interactive_mg_runner.kill(inner_instances_description, "coordinator_3")
interactive_mg_runner.kill(inner_instances_description, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
def show_instances_coord1():
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
def show_instances_coord2():
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
follower_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "main"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
interactive_mg_runner.start(inner_instances_description, "instance_3")
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "up", "main"),
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "up", "replica"),
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
new_main_cursor = connect(host="localhost", port=7687).cursor()
def show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
replicas = [
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_3",
"127.0.0.1:10003",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(replicas, show_replicas)
execute_and_fetch_all(new_main_cursor, "CREATE (n:Node {name: 'node'})")
replica_2_cursor = connect(host="localhost", port=7688).cursor()
def get_vertex_count():
return execute_and_fetch_all(replica_2_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(1, get_vertex_count)
replica_3_cursor = connect(host="localhost", port=7689).cursor()
def get_vertex_count():
return execute_and_fetch_all(replica_3_cursor, "MATCH (n) RETURN count(n)")[0][0]
mg_sleep_and_assert(1, get_vertex_count)
interactive_mg_runner.start(inner_instances_description, "coordinator_3")
def test_distributed_automatic_failover():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
@ -163,6 +359,7 @@ def test_distributed_automatic_failover():
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
]
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connect(host="localhost", port=7687).cursor()
@ -209,13 +406,26 @@ def test_distributed_automatic_failover():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
def test_distributed_automatic_failover_after_coord_dies():
def test_distributed_automatic_failover_with_leadership_change():
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
inner_instances_description = get_instances_description_no_setup()
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
interactive_mg_runner.start_all(inner_instances_description)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
setup_queries = [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
"SET INSTANCE instance_3 TO MAIN",
]
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
for query in setup_queries:
execute_and_fetch_all(coord_cursor_3, query)
interactive_mg_runner.kill(inner_instances_description, "coordinator_3")
interactive_mg_runner.kill(inner_instances_description, "instance_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
@ -271,7 +481,7 @@ def test_distributed_automatic_failover_after_coord_dies():
]
mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
interactive_mg_runner.start(inner_instances_description, "instance_3")
expected_data_on_new_main_old_alive = [
(
"instance_2",
@ -291,6 +501,125 @@ def test_distributed_automatic_failover_after_coord_dies():
mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas)
interactive_mg_runner.start(inner_instances_description, "coordinator_3")
def test_no_leader_after_leader_and_follower_die():
# 1. Register all but one replication instnce on the first leader.
# 2. Kill the leader and a follower.
# 3. Check that the remaining follower is not promoted to leader by trying to register remaining replication instance.
safe_execute(shutil.rmtree, TEMP_DIR)
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3")
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
with pytest.raises(Exception) as e:
execute_and_fetch_all(coord_cursor_1, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.10001'")
assert str(e) == "Couldn't register replica instance since coordinator is not a leader!"
def test_old_main_comes_back_on_new_leader_as_main():
# 1. Start all instances.
# 2. Kill all instances
# 3. Kill the leader
# 4. Start the old main instance
# 5. Run SHOW INSTANCES on the new leader and check that the old main instance is main once again
inner_memgraph_instances = get_instances_description_no_setup()
interactive_mg_runner.start_all(inner_memgraph_instances)
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
setup_queries = [
"ADD COORDINATOR 1 ON '127.0.0.1:10111'",
"ADD COORDINATOR 2 ON '127.0.0.1:10112'",
"REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'",
"REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'",
"REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'",
"SET INSTANCE instance_3 TO MAIN",
]
for query in setup_queries:
execute_and_fetch_all(coord_cursor_3, query)
interactive_mg_runner.kill(inner_memgraph_instances, "instance_1")
interactive_mg_runner.kill(inner_memgraph_instances, "instance_2")
interactive_mg_runner.kill(inner_memgraph_instances, "instance_3")
interactive_mg_runner.kill(inner_memgraph_instances, "coordinator_3")
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
def show_instances_coord1():
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
def show_instances_coord2():
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
interactive_mg_runner.start(inner_memgraph_instances, "instance_3")
leader_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "127.0.0.1:10011", "down", "unknown"),
("instance_2", "", "127.0.0.1:10012", "down", "unknown"),
("instance_3", "", "127.0.0.1:10013", "up", "main"),
]
follower_data = [
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
("instance_1", "", "", "unknown", "replica"),
("instance_2", "", "", "unknown", "replica"),
("instance_3", "", "", "unknown", "main"),
]
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
interactive_mg_runner.start(inner_memgraph_instances, "instance_1")
interactive_mg_runner.start(inner_memgraph_instances, "instance_2")
new_main_cursor = connect(host="localhost", port=7689).cursor()
def show_replicas():
return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")))
replicas = [
(
"instance_1",
"127.0.0.1:10001",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
(
"instance_2",
"127.0.0.1:10002",
"sync",
{"ts": 0, "behind": None, "status": "ready"},
{"memgraph": {"ts": 0, "behind": 0, "status": "ready"}},
),
]
mg_sleep_and_assert_collection(replicas, show_replicas)
execute_and_fetch_all(new_main_cursor, "CREATE (n:Node {name: 'node'})")
replica_1_cursor = connect(host="localhost", port=7687).cursor()
assert len(execute_and_fetch_all(replica_1_cursor, "MATCH (n) RETURN n;")) == 1
replica_2_cursor = connect(host="localhost", port=7688).cursor()
assert len(execute_and_fetch_all(replica_2_cursor, "MATCH (n) RETURN n;")) == 1
interactive_mg_runner.start(inner_memgraph_instances, "coordinator_3")
def test_registering_4_coords():
# Goal of this test is to assure registering of multiple coordinators in row works