diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index ff005f217..56b1cb4c6 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -39,7 +39,7 @@ CoordinatorClient::CoordinatorClient(CoordinatorState *coord_state, CoordinatorC CoordinatorClient::~CoordinatorClient() { auto exit_job = utils::OnScopeExit([&] { - StopFrequentCheck(); + replica_checker_.Stop(); thread_pool_.Shutdown(); }); const auto endpoint = rpc_client_.Endpoint(); @@ -68,7 +68,8 @@ void CoordinatorClient::StartFrequentCheck() { }); } -void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } +void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); } +void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); } auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; } auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); } diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index eed28885b..27503b3c7 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -161,22 +161,21 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM registered_main_info.UpdateLastResponseTime(); }; - auto fail_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { + auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void { auto ®istered_main_info = get_client_info(coord_state, instance_name); - // TODO: (andi) Take unique lock if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) { - // spdlog::warn("Main is not alive, starting failover"); - // switch (auto failover_status = DoFailover(); failover_status) { - // using enum DoFailoverStatus; - // case ALL_REPLICAS_DOWN: - // spdlog::warn("Failover aborted since all replicas are down!"); - // case MAIN_ALIVE: - // spdlog::warn("Failover aborted since main is alive!"); - // case CLUSTER_UNINITIALIZED: - // spdlog::warn("Failover aborted since cluster is uninitialized!"); - // case SUCCESS: - // break; - // } + spdlog::warn("Main is not alive, starting failover"); + switch (auto failover_status = DoFailover(); failover_status) { + using enum DoFailoverStatus; + case ALL_REPLICAS_DOWN: + spdlog::warn("Failover aborted since all replicas are down!"); + case MAIN_ALIVE: + spdlog::warn("Failover aborted since main is alive!"); + case CLUSTER_UNINITIALIZED: + spdlog::warn("Failover aborted since cluster is uninitialized!"); + case SUCCESS: + break; + } } }; @@ -228,12 +227,14 @@ auto CoordinatorState::ShowMain() const -> std::optional this will shut down RPC frequent check client // (coordinator) // 7. for new main start frequent checks (coordinator) - - MG_ASSERT(std::holds_alternative(data_), "Cannot do failover since variant holds wrong alternative"); using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; + MG_ASSERT(std::holds_alternative(data_), "Cannot do failover since variant holds wrong alternative"); + auto &coord_state = std::get(data_); + + // std::lock_guard lock{coord_state.coord_data_lock_}; // 1. - auto ¤t_main_info = std::get(data_).registered_main_info_; + auto ¤t_main_info = coord_state.registered_main_info_; if (!current_main_info.has_value()) { return DoFailoverStatus::CLUSTER_UNINITIALIZED; @@ -243,13 +244,13 @@ auto CoordinatorState::ShowMain() const -> std::optional(data_).registered_main_; + auto ¤t_main = coord_state.registered_main_; // TODO: stop pinging as soon as you figure out that failover is needed - current_main->StopFrequentCheck(); + current_main->PauseFrequentCheck(); // 2. // Get all replicas and find new main - auto ®istered_replicas_info = std::get(data_).registered_replicas_info_; + auto ®istered_replicas_info = coord_state.registered_replicas_info_; const auto chosen_replica_info = std::ranges::find_if( registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.IsAlive(); }); @@ -257,7 +258,7 @@ auto CoordinatorState::ShowMain() const -> std::optional(data_).registered_replicas_; + auto ®istered_replicas = coord_state.registered_replicas_; auto chosen_replica = std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) { return replica.InstanceName() == chosen_replica_info->InstanceName(); @@ -299,7 +300,7 @@ auto CoordinatorState::ShowMain() const -> std::optionalStartFrequentCheck(); + current_main->ResumeFrequentCheck(); return DoFailoverStatus::SUCCESS; } diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 461251337..281f892f9 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -43,7 +43,8 @@ class CoordinatorClient { CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete; void StartFrequentCheck(); - void StopFrequentCheck(); + void PauseFrequentCheck(); + void ResumeFrequentCheck(); auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index d96178598..53b890252 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -58,21 +58,31 @@ class Scheduler { // the start of the program. Since Server will log some messages on // the program start we let him log first and we make sure by first // waiting that funcion f will not log before it. + // Check for pause also. std::unique_lock lk(mutex_); auto now = std::chrono::system_clock::now(); start_time += pause; if (start_time > now) { - condition_variable_.wait_until(lk, start_time, [&] { return is_working_.load() == false; }); + condition_variable_.wait_until(lk, start_time, [&] { return !is_working_.load(); }); } else { start_time = now; } + pause_cv_.wait(lk, [&] { return !is_paused_.load(); }); + if (!is_working_) break; f(); } }); } + void Resume() { + is_paused_.store(false); + pause_cv_.notify_one(); + } + + void Pause() { is_paused_.store(true); } + /** * @brief Stops the thread execution. This is a blocking call and may take as * much time as one call to the function given previously to Run takes. @@ -97,6 +107,16 @@ class Scheduler { */ std::atomic is_working_{false}; + /** + * Variable is true when thread is paused. + */ + std::atomic is_paused_{false}; + + /* + * Wait until the thread is resumed. + */ + std::condition_variable pause_cv_; + /** * Mutex used to synchronize threads using condition variable. */ diff --git a/tests/e2e/high_availability_experimental/CMakeLists.txt b/tests/e2e/high_availability_experimental/CMakeLists.txt index 2975ce588..be7ff294f 100644 --- a/tests/e2e/high_availability_experimental/CMakeLists.txt +++ b/tests/e2e/high_availability_experimental/CMakeLists.txt @@ -2,6 +2,7 @@ find_package(gflags REQUIRED) copy_e2e_python_files(ha_experimental coordinator.py) copy_e2e_python_files(ha_experimental client_initiated_failover.py) +copy_e2e_python_files(ha_experimental automatic_failover.py) copy_e2e_python_files(ha_experimental uninitialized_cluster.py) copy_e2e_python_files(ha_experimental common.py) copy_e2e_python_files(ha_experimental conftest.py) diff --git a/tests/e2e/high_availability_experimental/automatic_failover.py b/tests/e2e/high_availability_experimental/automatic_failover.py new file mode 100644 index 000000000..bae243b79 --- /dev/null +++ b/tests/e2e/high_availability_experimental/automatic_failover.py @@ -0,0 +1,95 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import os +import sys + +import interactive_mg_runner +import pytest +from common import execute_and_fetch_all +from mg_utils import mg_sleep_and_assert + +interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +interactive_mg_runner.PROJECT_DIR = os.path.normpath( + os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..") +) +interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build")) +interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph")) + +MEMGRAPH_INSTANCES_DESCRIPTION = { + "instance_1": { + "args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"], + "log_file": "replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "instance_2": { + "args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"], + "log_file": "replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "instance_3": { + "args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001'", + "REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002'", + ], + }, + "coordinator": { + "args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"], + "log_file": "replica3.log", + "setup_queries": [ + "REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10011';", + "REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012';", + "REGISTER MAIN instance_3 WITH COORDINATOR SERVER ON '127.0.0.1:10013';", + ], + }, +} + + +def test_simple_automatic_failover(connection): + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + main_cursor = connection(7687, "instance_3").cursor() + expected_data_on_main = { + ("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + actual_data_on_main = set(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;")) + assert actual_data_on_main == expected_data_on_main + + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") + + coord_cursor = connection(7690, "coordinator").cursor() + + def retrieve_data_show_repl_cluster(): + return set(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")) + + expected_data_on_coord = { + ("instance_1", "127.0.0.1:10011", True, "main"), + ("instance_2", "127.0.0.1:10012", True, "replica"), + ("instance_3", "127.0.0.1:10013", False, "main"), # TODO: (andi) Include or exclude dead main from the result? + } + mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster) + + new_main_cursor = connection(7688, "instance_1").cursor() + + def retrieve_data_show_replicas(): + return set(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;")) + + expected_data_on_new_main = { + ("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/high_availability_experimental/workloads.yaml b/tests/e2e/high_availability_experimental/workloads.yaml index dbb6457ff..11d5d75d9 100644 --- a/tests/e2e/high_availability_experimental/workloads.yaml +++ b/tests/e2e/high_availability_experimental/workloads.yaml @@ -61,3 +61,7 @@ workloads: - name: "Client initiated failover" binary: "tests/e2e/pytest_runner.sh" args: ["high_availability_experimental/client_initiated_failover.py"] + + - name: "Automatic failover" + binary: "tests/e2e/pytest_runner.sh" + args: ["high_availability_experimental/automatic_failover.py"]