Thread-unsafe automatic failover

This commit is contained in:
Andi Skrgat 2024-01-22 12:57:22 +01:00
parent ab34b060c0
commit ef37c44149
7 changed files with 150 additions and 27 deletions

View File

@ -39,7 +39,7 @@ CoordinatorClient::CoordinatorClient(CoordinatorState *coord_state, CoordinatorC
CoordinatorClient::~CoordinatorClient() {
auto exit_job = utils::OnScopeExit([&] {
StopFrequentCheck();
replica_checker_.Stop();
thread_pool_.Shutdown();
});
const auto endpoint = rpc_client_.Endpoint();
@ -68,7 +68,8 @@ void CoordinatorClient::StartFrequentCheck() {
});
}
void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { replica_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { replica_checker_.Resume(); }
auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; }
auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); }

View File

@ -161,22 +161,21 @@ auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterM
registered_main_info.UpdateLastResponseTime();
};
auto fail_cb = [get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
auto fail_cb = [this, get_client_info](CoordinatorState *coord_state, std::string_view instance_name) -> void {
auto &registered_main_info = get_client_info(coord_state, instance_name);
// TODO: (andi) Take unique lock
if (bool main_alive = registered_main_info.UpdateInstanceStatus(); !main_alive) {
// spdlog::warn("Main is not alive, starting failover");
// switch (auto failover_status = DoFailover(); failover_status) {
// using enum DoFailoverStatus;
// case ALL_REPLICAS_DOWN:
// spdlog::warn("Failover aborted since all replicas are down!");
// case MAIN_ALIVE:
// spdlog::warn("Failover aborted since main is alive!");
// case CLUSTER_UNINITIALIZED:
// spdlog::warn("Failover aborted since cluster is uninitialized!");
// case SUCCESS:
// break;
// }
spdlog::warn("Main is not alive, starting failover");
switch (auto failover_status = DoFailover(); failover_status) {
using enum DoFailoverStatus;
case ALL_REPLICAS_DOWN:
spdlog::warn("Failover aborted since all replicas are down!");
case MAIN_ALIVE:
spdlog::warn("Failover aborted since main is alive!");
case CLUSTER_UNINITIALIZED:
spdlog::warn("Failover aborted since cluster is uninitialized!");
case SUCCESS:
break;
}
}
};
@ -228,12 +227,14 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
// 6. remove replica which was promoted to main from all replicas -> this will shut down RPC frequent check client
// (coordinator)
// 7. for new main start frequent checks (coordinator)
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative");
using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative");
auto &coord_state = std::get<CoordinatorData>(data_);
// std::lock_guard<utils::RWLock> lock{coord_state.coord_data_lock_};
// 1.
auto &current_main_info = std::get<CoordinatorData>(data_).registered_main_info_;
auto &current_main_info = coord_state.registered_main_info_;
if (!current_main_info.has_value()) {
return DoFailoverStatus::CLUSTER_UNINITIALIZED;
@ -243,13 +244,13 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
return DoFailoverStatus::MAIN_ALIVE;
}
auto &current_main = std::get<CoordinatorData>(data_).registered_main_;
auto &current_main = coord_state.registered_main_;
// TODO: stop pinging as soon as you figure out that failover is needed
current_main->StopFrequentCheck();
current_main->PauseFrequentCheck();
// 2.
// Get all replicas and find new main
auto &registered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_;
auto &registered_replicas_info = coord_state.registered_replicas_info_;
const auto chosen_replica_info = std::ranges::find_if(
registered_replicas_info, [](const CoordinatorClientInfo &client_info) { return client_info.IsAlive(); });
@ -257,7 +258,7 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
return DoFailoverStatus::ALL_REPLICAS_DOWN;
}
auto &registered_replicas = std::get<CoordinatorData>(data_).registered_replicas_;
auto &registered_replicas = coord_state.registered_replicas_;
auto chosen_replica =
std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) {
return replica.InstanceName() == chosen_replica_info->InstanceName();
@ -299,7 +300,7 @@ auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStat
registered_replicas.erase(chosen_replica);
registered_replicas_info.erase(chosen_replica_info);
current_main->StartFrequentCheck();
current_main->ResumeFrequentCheck();
return DoFailoverStatus::SUCCESS;
}

View File

@ -43,7 +43,8 @@ class CoordinatorClient {
CoordinatorClient &operator=(CoordinatorClient &&) noexcept = delete;
void StartFrequentCheck();
void StopFrequentCheck();
void PauseFrequentCheck();
void ResumeFrequentCheck();
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -58,21 +58,31 @@ class Scheduler {
// the start of the program. Since Server will log some messages on
// the program start we let him log first and we make sure by first
// waiting that funcion f will not log before it.
// Check for pause also.
std::unique_lock<std::mutex> lk(mutex_);
auto now = std::chrono::system_clock::now();
start_time += pause;
if (start_time > now) {
condition_variable_.wait_until(lk, start_time, [&] { return is_working_.load() == false; });
condition_variable_.wait_until(lk, start_time, [&] { return !is_working_.load(); });
} else {
start_time = now;
}
pause_cv_.wait(lk, [&] { return !is_paused_.load(); });
if (!is_working_) break;
f();
}
});
}
void Resume() {
is_paused_.store(false);
pause_cv_.notify_one();
}
void Pause() { is_paused_.store(true); }
/**
* @brief Stops the thread execution. This is a blocking call and may take as
* much time as one call to the function given previously to Run takes.
@ -97,6 +107,16 @@ class Scheduler {
*/
std::atomic<bool> is_working_{false};
/**
* Variable is true when thread is paused.
*/
std::atomic<bool> is_paused_{false};
/*
* Wait until the thread is resumed.
*/
std::condition_variable pause_cv_;
/**
* Mutex used to synchronize threads using condition variable.
*/

View File

@ -2,6 +2,7 @@ find_package(gflags REQUIRED)
copy_e2e_python_files(ha_experimental coordinator.py)
copy_e2e_python_files(ha_experimental client_initiated_failover.py)
copy_e2e_python_files(ha_experimental automatic_failover.py)
copy_e2e_python_files(ha_experimental uninitialized_cluster.py)
copy_e2e_python_files(ha_experimental common.py)
copy_e2e_python_files(ha_experimental conftest.py)

View File

@ -0,0 +1,95 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import os
import sys
import interactive_mg_runner
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
MEMGRAPH_INSTANCES_DESCRIPTION = {
"instance_1": {
"args": ["--bolt-port", "7688", "--log-level", "TRACE", "--coordinator-server-port", "10011"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"instance_2": {
"args": ["--bolt-port", "7689", "--log-level", "TRACE", "--coordinator-server-port", "10012"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"instance_3": {
"args": ["--bolt-port", "7687", "--log-level", "TRACE", "--coordinator-server-port", "10013"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002'",
],
},
"coordinator": {
"args": ["--bolt-port", "7690", "--log-level=TRACE", "--coordinator"],
"log_file": "replica3.log",
"setup_queries": [
"REGISTER REPLICA instance_1 SYNC TO '127.0.0.1:10001' WITH COORDINATOR SERVER ON '127.0.0.1:10011';",
"REGISTER REPLICA instance_2 SYNC TO '127.0.0.1:10002' WITH COORDINATOR SERVER ON '127.0.0.1:10012';",
"REGISTER MAIN instance_3 WITH COORDINATOR SERVER ON '127.0.0.1:10013';",
],
},
}
def test_simple_automatic_failover(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
main_cursor = connection(7687, "instance_3").cursor()
expected_data_on_main = {
("instance_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
}
actual_data_on_main = set(execute_and_fetch_all(main_cursor, "SHOW REPLICAS;"))
assert actual_data_on_main == expected_data_on_main
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3")
coord_cursor = connection(7690, "coordinator").cursor()
def retrieve_data_show_repl_cluster():
return set(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;"))
expected_data_on_coord = {
("instance_1", "127.0.0.1:10011", True, "main"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", False, "main"), # TODO: (andi) Include or exclude dead main from the result?
}
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
new_main_cursor = connection(7688, "instance_1").cursor()
def retrieve_data_show_replicas():
return set(execute_and_fetch_all(new_main_cursor, "SHOW REPLICAS;"))
expected_data_on_new_main = {
("instance_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
}
mg_sleep_and_assert(expected_data_on_new_main, retrieve_data_show_replicas)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -61,3 +61,7 @@ workloads:
- name: "Client initiated failover"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/client_initiated_failover.py"]
- name: "Automatic failover"
binary: "tests/e2e/pytest_runner.sh"
args: ["high_availability_experimental/automatic_failover.py"]