diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp index 2213a052f..cf6e1a574 100644 --- a/src/coordination/coordinator_cluster_state.cpp +++ b/src/coordination/coordinator_cluster_state.cpp @@ -111,8 +111,7 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void { auto lock = std::shared_lock{log_lock_}; - // .at(0) is hack to solve the problem with json serialization of map - auto const log = nlohmann::json{instances_}.at(0).dump(); + auto const log = nlohmann::json(instances_).dump(); data = buffer::alloc(sizeof(uint32_t) + log.size()); buffer_serializer bs(data); diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 920fea3cb..791ffbc59 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -40,7 +40,7 @@ CoordinatorInstance::CoordinatorInstance() }); std::ranges::for_each(replicas, [this](auto &replica) { - spdlog::info("Starting replication instance {}", replica.config.instance_name); + spdlog::info("Started pinging replication instance {}", replica.config.instance_name); repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_, &CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback); @@ -50,7 +50,7 @@ CoordinatorInstance::CoordinatorInstance() [](auto const &instance) { return instance.status == ReplicationRole::MAIN; }); std::ranges::for_each(main, [this](auto &main_instance) { - spdlog::info("Starting main instance {}", main_instance.config.instance_name); + spdlog::info("Started pinging main instance {}", main_instance.config.instance_name); repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_, &CoordinatorInstance::MainSuccessCallback, &CoordinatorInstance::MainFailCallback); @@ -314,17 +314,20 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co return RegisterInstanceCoordinatorStatus::NOT_LEADER; } + auto const undo_action_ = [this]() { repl_instances_.pop_back(); }; + auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_, &CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback); if (!new_instance->SendDemoteToReplicaRpc()) { spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name); - repl_instances_.pop_back(); + undo_action_(); return RegisterInstanceCoordinatorStatus::RPC_FAILED; } if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) { + undo_action_(); return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR; } diff --git a/src/coordination/coordinator_log_store.cpp b/src/coordination/coordinator_log_store.cpp index 37126b747..d5e134492 100644 --- a/src/coordination/coordinator_log_store.cpp +++ b/src/coordination/coordinator_log_store.cpp @@ -62,34 +62,33 @@ ptr<log_entry> CoordinatorLogStore::last_entry() const { uint64_t CoordinatorLogStore::append(ptr<log_entry> &entry) { ptr<log_entry> clone = MakeClone(entry); - uint64_t next_slot{0}; - { - auto lock = std::lock_guard{logs_lock_}; - next_slot = start_idx_ + logs_.size() - 1; - logs_[next_slot] = clone; - } + + auto lock = std::lock_guard{logs_lock_}; + uint64_t next_slot = start_idx_ + logs_.size() - 1; + logs_[next_slot] = clone; + return next_slot; } +// TODO: (andi) I think this is used for resolving conflicts inside NuRaft, check... +// different compared to in_memory_log_store.cxx void CoordinatorLogStore::write_at(uint64_t index, ptr<log_entry> &entry) { ptr<log_entry> clone = MakeClone(entry); // Discard all logs equal to or greater than `index. - { - auto lock = std::lock_guard{logs_lock_}; - auto itr = logs_.lower_bound(index); - while (itr != logs_.end()) { - itr = logs_.erase(itr); - } - logs_[index] = clone; + auto lock = std::lock_guard{logs_lock_}; + auto itr = logs_.lower_bound(index); + while (itr != logs_.end()) { + itr = logs_.erase(itr); } + logs_[index] = clone; } ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start, uint64_t end) { auto ret = cs_new<std::vector<ptr<log_entry>>>(); ret->resize(end - start); - for (uint64_t i = start, curr_index = 0; i < end; ++i, ++curr_index) { + for (uint64_t i = start, curr_index = 0; i < end; i++, curr_index++) { ptr<log_entry> src = nullptr; { auto lock = std::lock_guard{logs_lock_}; @@ -105,21 +104,14 @@ ptr<std::vector<ptr<log_entry>>> CoordinatorLogStore::log_entries(uint64_t start } ptr<log_entry> CoordinatorLogStore::entry_at(uint64_t index) { - ptr<log_entry> src = nullptr; - { - auto lock = std::lock_guard{logs_lock_}; - src = FindOrDefault_(index); - } + auto lock = std::lock_guard{logs_lock_}; + ptr<log_entry> src = FindOrDefault_(index); return MakeClone(src); } uint64_t CoordinatorLogStore::term_at(uint64_t index) { - uint64_t term = 0; - { - auto lock = std::lock_guard{logs_lock_}; - term = FindOrDefault_(index)->get_term(); - } - return term; + auto lock = std::lock_guard{logs_lock_}; + return FindOrDefault_(index)->get_term(); } ptr<buffer> CoordinatorLogStore::pack(uint64_t index, int32 cnt) { diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index 564303f22..631c3c4d2 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -14,6 +14,10 @@ #include "nuraft/coordinator_state_machine.hpp" #include "utils/logging.hpp" +namespace { +constexpr int MAX_SNAPSHOTS = 3; +} // namespace + namespace memgraph::coordination { auto CoordinatorStateMachine::FindCurrentMainInstanceName() const -> std::optional<std::string> { @@ -82,6 +86,7 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; } auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> { + spdlog::debug("Commit: log_idx={}, data.size()={}", log_idx, data.size()); auto const [parsed_data, log_action] = DecodeLog(data); cluster_state_.DoAction(parsed_data, log_action); last_committed_idx_ = log_idx; @@ -95,15 +100,17 @@ auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<b auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr<cluster_config> & /*new_conf*/) -> void { last_committed_idx_ = log_idx; + spdlog::debug("Commit config: log_idx={}", log_idx); } auto CoordinatorStateMachine::rollback(ulong const log_idx, buffer &data) -> void { // NOTE: Nothing since we don't do anything in pre_commit + spdlog::debug("Rollback: log_idx={}, data.size()={}", log_idx, data.size()); } auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /*user_snp_ctx*/, ulong obj_id, ptr<buffer> &data_out, bool &is_last_obj) -> int { - spdlog::info("read logical snapshot object, obj_id: {}", obj_id); + spdlog::debug("read logical snapshot object, obj_id: {}", obj_id); ptr<SnapshotCtx> ctx = nullptr; { @@ -116,20 +123,33 @@ auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& / } ctx = entry->second; } - ctx->cluster_state_.Serialize(data_out); - is_last_obj = true; + + if (obj_id == 0) { + // Object ID == 0: first object, put dummy data. + data_out = buffer::alloc(sizeof(int32)); + buffer_serializer bs(data_out); + bs.put_i32(0); + is_last_obj = false; + } else { + // Object ID > 0: second object, put actual value. + ctx->cluster_state_.Serialize(data_out); + } + return 0; } auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &obj_id, buffer &data, bool is_first_obj, bool is_last_obj) -> void { - spdlog::info("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj, - is_last_obj); + spdlog::debug("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj, + is_last_obj); - buffer_serializer bs(data); - auto cluster_state = CoordinatorClusterState::Deserialize(data); + if (obj_id == 0) { + ptr<buffer> snp_buf = snapshot.serialize(); + auto ss = snapshot::deserialize(*snp_buf); + create_snapshot_internal(ss); + } else { + auto cluster_state = CoordinatorClusterState::Deserialize(data); - { auto ll = std::lock_guard{snapshots_lock_}; auto entry = snapshots_.find(snapshot.get_last_log_idx()); DMG_ASSERT(entry != snapshots_.end()); @@ -139,6 +159,7 @@ auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &ob auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool { auto ll = std::lock_guard{snapshots_lock_}; + spdlog::debug("apply snapshot, last_log_idx: {}", s.get_last_log_idx()); auto entry = snapshots_.find(s.get_last_log_idx()); if (entry == snapshots_.end()) return false; @@ -151,6 +172,7 @@ auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {} auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> { auto ll = std::lock_guard{snapshots_lock_}; + spdlog::debug("last_snapshot"); auto entry = snapshots_.rbegin(); if (entry == snapshots_.rend()) return nullptr; @@ -161,6 +183,7 @@ auto CoordinatorStateMachine::last_snapshot() -> ptr<snapshot> { auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; } auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::handler_type &when_done) -> void { + spdlog::debug("create_snapshot, last_log_idx: {}", s.get_last_log_idx()); ptr<buffer> snp_buf = s.serialize(); ptr<snapshot> ss = snapshot::deserialize(*snp_buf); create_snapshot_internal(ss); @@ -172,11 +195,11 @@ auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result<bool>::h auto CoordinatorStateMachine::create_snapshot_internal(ptr<snapshot> snapshot) -> void { auto ll = std::lock_guard{snapshots_lock_}; + spdlog::debug("create_snapshot_internal, last_log_idx: {}", snapshot->get_last_log_idx()); auto ctx = cs_new<SnapshotCtx>(snapshot, cluster_state_); snapshots_[snapshot->get_last_log_idx()] = ctx; - constexpr int MAX_SNAPSHOTS = 3; while (snapshots_.size() > MAX_SNAPSHOTS) { snapshots_.erase(snapshots_.begin()); } diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 516b8efc5..836ac17a6 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -95,12 +95,8 @@ class CoordinatorStateMachine : public state_machine { auto create_snapshot_internal(ptr<snapshot> snapshot) -> void; CoordinatorClusterState cluster_state_; - - // mutable utils::RWLock lock{utils::RWLock::Priority::READ}; - std::atomic<uint64_t> last_committed_idx_{0}; - // TODO: (andi) Maybe not needed, remove it std::map<uint64_t, ptr<SnapshotCtx>> snapshots_; std::mutex snapshots_lock_; diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index dd441db74..38acfd85e 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -10,7 +10,6 @@ // licenses/APL.txt. #ifdef MG_ENTERPRISE - #include <chrono> #include "coordination/coordinator_config.hpp" @@ -43,19 +42,25 @@ RaftState::RaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_fo auto RaftState::InitRaftServer() -> void { asio_service::options asio_opts; - asio_opts.thread_pool_size_ = 1; // TODO: (andi) Improve this + asio_opts.thread_pool_size_ = 1; raft_params params; params.heart_beat_interval_ = 100; params.election_timeout_lower_bound_ = 200; params.election_timeout_upper_bound_ = 400; - // 5 logs are preserved before the last snapshot params.reserved_log_items_ = 5; - // Create snapshot for every 5 log appends params.snapshot_distance_ = 5; params.client_req_timeout_ = 3000; params.return_method_ = raft_params::blocking; + // If the leader doesn't receive any response from quorum nodes + // in 200ms, it will step down. + // This allows us to achieve strong consistency even if network partition + // happens between the current leader and followers. + // The value must be <= election_timeout_lower_bound_ so that cluster can never + // have multiple leaders. + params.leadership_expiry_ = 200; + raft_server::init_options init_opts; init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode { if (event_type == cb_func::BecomeLeader) { @@ -117,7 +122,8 @@ auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_po if (cmd_result->get_result_code() == nuraft::cmd_result_code::OK) { spdlog::info("Request to add server {} to the cluster accepted", endpoint); } else { - throw RaftAddServerException("Failed to accept request to add server {} to the cluster", endpoint); + throw RaftAddServerException("Failed to accept request to add server {} to the cluster with error code {}", + endpoint, cmd_result->get_result_code()); } // Waiting for server to join diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index e51620bf6..ecec4fccb 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -497,7 +497,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrAddress(raft_socket_address); if (maybe_ip_and_port) { auto const [ip, port] = *maybe_ip_and_port; - spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, port, ip); + spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, ip, port); coordinator_handler_.AddCoordinatorInstance(raft_server_id, port, ip); } else { spdlog::error("Invalid raft socket address {}.", raft_socket_address); diff --git a/tests/e2e/high_availability/coord_cluster_registration.py b/tests/e2e/high_availability/coord_cluster_registration.py index a285adcea..774c6dca1 100644 --- a/tests/e2e/high_availability/coord_cluster_registration.py +++ b/tests/e2e/high_availability/coord_cluster_registration.py @@ -430,7 +430,6 @@ def test_unregister_main(): coordinator2_cursor = connect(host="localhost", port=7691).cursor() coordinator3_cursor = connect(host="localhost", port=7692).cursor() assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 1 ON '127.0.0.1:10111'") - assert add_coordinator(coordinator3_cursor, "ADD COORDINATOR 2 ON '127.0.0.1:10112'") execute_and_fetch_all( coordinator3_cursor, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'" diff --git a/tests/e2e/high_availability/distributed_coords.py b/tests/e2e/high_availability/distributed_coords.py index 33901f1d4..3b0964111 100644 --- a/tests/e2e/high_availability/distributed_coords.py +++ b/tests/e2e/high_availability/distributed_coords.py @@ -121,6 +121,202 @@ MEMGRAPH_INSTANCES_DESCRIPTION = { } +def get_instances_description_no_setup(): + return { + "instance_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7687", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10011", + ], + "log_file": "instance_1.log", + "data_directory": f"{TEMP_DIR}/instance_1", + "setup_queries": [], + }, + "instance_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7688", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10012", + ], + "log_file": "instance_2.log", + "data_directory": f"{TEMP_DIR}/instance_2", + "setup_queries": [], + }, + "instance_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7689", + "--log-level", + "TRACE", + "--coordinator-server-port", + "10013", + ], + "log_file": "instance_3.log", + "data_directory": f"{TEMP_DIR}/instance_3", + "setup_queries": [], + }, + "coordinator_1": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7690", + "--log-level=TRACE", + "--raft-server-id=1", + "--raft-server-port=10111", + ], + "log_file": "coordinator1.log", + "setup_queries": [], + }, + "coordinator_2": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7691", + "--log-level=TRACE", + "--raft-server-id=2", + "--raft-server-port=10112", + ], + "log_file": "coordinator2.log", + "setup_queries": [], + }, + "coordinator_3": { + "args": [ + "--experimental-enabled=high-availability", + "--bolt-port", + "7692", + "--log-level=TRACE", + "--raft-server-id=3", + "--raft-server-port=10113", + ], + "log_file": "coordinator3.log", + "setup_queries": [], + }, + } + + +def test_old_main_comes_back_on_new_leader_as_replica(): + # 1. Start all instances. + # 2. Kill the main instance + # 3. Kill the leader + # 4. Start the old main instance + # 5. Run SHOW INSTANCES on the new leader and check that the old main instance is registered as a replica + # 6. Start again previous leader + + safe_execute(shutil.rmtree, TEMP_DIR) + inner_instances_description = get_instances_description_no_setup() + + interactive_mg_runner.start_all(inner_instances_description) + + setup_queries = [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111'", + "ADD COORDINATOR 2 ON '127.0.0.1:10112'", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "SET INSTANCE instance_3 TO MAIN", + ] + coord_cursor_3 = connect(host="localhost", port=7692).cursor() + for query in setup_queries: + execute_and_fetch_all(coord_cursor_3, query) + + interactive_mg_runner.kill(inner_instances_description, "coordinator_3") + interactive_mg_runner.kill(inner_instances_description, "instance_3") + + coord_cursor_1 = connect(host="localhost", port=7690).cursor() + + def show_instances_coord1(): + return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;"))) + + coord_cursor_2 = connect(host="localhost", port=7691).cursor() + + def show_instances_coord2(): + return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;"))) + + leader_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "up", "main"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + + follower_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "", "unknown", "main"), + ("instance_2", "", "", "unknown", "replica"), + ("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown. + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) + + interactive_mg_runner.start(inner_instances_description, "instance_3") + + leader_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "up", "main"), + ("instance_2", "", "127.0.0.1:10012", "up", "replica"), + ("instance_3", "", "127.0.0.1:10013", "up", "replica"), + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + + new_main_cursor = connect(host="localhost", port=7687).cursor() + + def show_replicas(): + return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) + + replicas = [ + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ( + "instance_3", + "127.0.0.1:10003", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ] + mg_sleep_and_assert_collection(replicas, show_replicas) + + execute_and_fetch_all(new_main_cursor, "CREATE (n:Node {name: 'node'})") + + replica_2_cursor = connect(host="localhost", port=7688).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(replica_2_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(1, get_vertex_count) + + replica_3_cursor = connect(host="localhost", port=7689).cursor() + + def get_vertex_count(): + return execute_and_fetch_all(replica_3_cursor, "MATCH (n) RETURN count(n)")[0][0] + + mg_sleep_and_assert(1, get_vertex_count) + + interactive_mg_runner.start(inner_instances_description, "coordinator_3") + + def test_distributed_automatic_failover(): safe_execute(shutil.rmtree, TEMP_DIR) interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) @@ -163,6 +359,7 @@ def test_distributed_automatic_failover(): ("instance_2", "", "127.0.0.1:10012", "up", "replica"), ("instance_3", "", "127.0.0.1:10013", "down", "unknown"), ] + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) new_main_cursor = connect(host="localhost", port=7687).cursor() @@ -209,13 +406,26 @@ def test_distributed_automatic_failover(): mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) -def test_distributed_automatic_failover_after_coord_dies(): +def test_distributed_automatic_failover_with_leadership_change(): safe_execute(shutil.rmtree, TEMP_DIR) - interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + inner_instances_description = get_instances_description_no_setup() - interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3") + interactive_mg_runner.start_all(inner_instances_description) - interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + setup_queries = [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111'", + "ADD COORDINATOR 2 ON '127.0.0.1:10112'", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "SET INSTANCE instance_3 TO MAIN", + ] + coord_cursor_3 = connect(host="localhost", port=7692).cursor() + for query in setup_queries: + execute_and_fetch_all(coord_cursor_3, query) + + interactive_mg_runner.kill(inner_instances_description, "coordinator_3") + interactive_mg_runner.kill(inner_instances_description, "instance_3") coord_cursor_1 = connect(host="localhost", port=7690).cursor() @@ -271,7 +481,7 @@ def test_distributed_automatic_failover_after_coord_dies(): ] mg_sleep_and_assert_collection(expected_data_on_new_main, retrieve_data_show_replicas) - interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + interactive_mg_runner.start(inner_instances_description, "instance_3") expected_data_on_new_main_old_alive = [ ( "instance_2", @@ -291,6 +501,125 @@ def test_distributed_automatic_failover_after_coord_dies(): mg_sleep_and_assert_collection(expected_data_on_new_main_old_alive, retrieve_data_show_replicas) + interactive_mg_runner.start(inner_instances_description, "coordinator_3") + + +def test_no_leader_after_leader_and_follower_die(): + # 1. Register all but one replication instnce on the first leader. + # 2. Kill the leader and a follower. + # 3. Check that the remaining follower is not promoted to leader by trying to register remaining replication instance. + + safe_execute(shutil.rmtree, TEMP_DIR) + + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_3") + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "coordinator_2") + + coord_cursor_1 = connect(host="localhost", port=7690).cursor() + + with pytest.raises(Exception) as e: + execute_and_fetch_all(coord_cursor_1, "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.10001'") + assert str(e) == "Couldn't register replica instance since coordinator is not a leader!" + + +def test_old_main_comes_back_on_new_leader_as_main(): + # 1. Start all instances. + # 2. Kill all instances + # 3. Kill the leader + # 4. Start the old main instance + # 5. Run SHOW INSTANCES on the new leader and check that the old main instance is main once again + + inner_memgraph_instances = get_instances_description_no_setup() + interactive_mg_runner.start_all(inner_memgraph_instances) + + coord_cursor_3 = connect(host="localhost", port=7692).cursor() + + setup_queries = [ + "ADD COORDINATOR 1 ON '127.0.0.1:10111'", + "ADD COORDINATOR 2 ON '127.0.0.1:10112'", + "REGISTER INSTANCE instance_1 ON '127.0.0.1:10011' WITH '127.0.0.1:10001'", + "REGISTER INSTANCE instance_2 ON '127.0.0.1:10012' WITH '127.0.0.1:10002'", + "REGISTER INSTANCE instance_3 ON '127.0.0.1:10013' WITH '127.0.0.1:10003'", + "SET INSTANCE instance_3 TO MAIN", + ] + + for query in setup_queries: + execute_and_fetch_all(coord_cursor_3, query) + + interactive_mg_runner.kill(inner_memgraph_instances, "instance_1") + interactive_mg_runner.kill(inner_memgraph_instances, "instance_2") + interactive_mg_runner.kill(inner_memgraph_instances, "instance_3") + interactive_mg_runner.kill(inner_memgraph_instances, "coordinator_3") + + coord_cursor_1 = connect(host="localhost", port=7690).cursor() + + def show_instances_coord1(): + return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;"))) + + coord_cursor_2 = connect(host="localhost", port=7691).cursor() + + def show_instances_coord2(): + return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;"))) + + interactive_mg_runner.start(inner_memgraph_instances, "instance_3") + + leader_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "127.0.0.1:10011", "down", "unknown"), + ("instance_2", "", "127.0.0.1:10012", "down", "unknown"), + ("instance_3", "", "127.0.0.1:10013", "up", "main"), + ] + + follower_data = [ + ("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"), + ("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"), + ("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"), + ("instance_1", "", "", "unknown", "replica"), + ("instance_2", "", "", "unknown", "replica"), + ("instance_3", "", "", "unknown", "main"), + ] + mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2]) + mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2]) + + interactive_mg_runner.start(inner_memgraph_instances, "instance_1") + interactive_mg_runner.start(inner_memgraph_instances, "instance_2") + + new_main_cursor = connect(host="localhost", port=7689).cursor() + + def show_replicas(): + return sorted(list(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))) + + replicas = [ + ( + "instance_1", + "127.0.0.1:10001", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ( + "instance_2", + "127.0.0.1:10002", + "sync", + {"ts": 0, "behind": None, "status": "ready"}, + {"memgraph": {"ts": 0, "behind": 0, "status": "ready"}}, + ), + ] + mg_sleep_and_assert_collection(replicas, show_replicas) + + execute_and_fetch_all(new_main_cursor, "CREATE (n:Node {name: 'node'})") + + replica_1_cursor = connect(host="localhost", port=7687).cursor() + assert len(execute_and_fetch_all(replica_1_cursor, "MATCH (n) RETURN n;")) == 1 + + replica_2_cursor = connect(host="localhost", port=7688).cursor() + assert len(execute_and_fetch_all(replica_2_cursor, "MATCH (n) RETURN n;")) == 1 + + interactive_mg_runner.start(inner_memgraph_instances, "coordinator_3") + def test_registering_4_coords(): # Goal of this test is to assure registering of multiple coordinators in row works