Replica instance restored

This commit is contained in:
Andi Skrgat 2024-01-26 11:23:38 +01:00
parent eea21dd73e
commit 8124226ba9
7 changed files with 160 additions and 56 deletions

View File

@ -21,6 +21,7 @@ target_sources(mg-coordination
coordinator_rpc.cpp
coordinator_server.cpp
coordinator_data.cpp
coordinator_instance.cpp
)
target_include_directories(mg-coordination PUBLIC include)

View File

@ -66,7 +66,7 @@ void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetSuccCallback(HealthCheckCallback succ_cb) -> void { succ_cb_ = std::move(succ_cb); }
auto CoordinatorClient::SetFailCallback(HealthCheckCallback fail_cb) -> void { fail_cb_ = std::move(fail_cb); }
auto CoordinatorClient::ReplicationClientInfo() const -> const CoordinatorClientConfig::ReplicationClientInfo & {
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return config_.replication_client_info;
}
@ -90,7 +90,7 @@ auto CoordinatorClient::SendPromoteReplicaToMainRpc(
return false;
}
auto CoordinatorClient::SendSetToReplicaRpc(CoordinatorClient::ReplClientInfo replication_client_info) const -> bool {
auto CoordinatorClient::SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool {
try {
auto stream{rpc_client_.Stream<SetMainToReplicaRpc>(std::move(replication_client_info))};
if (!stream.AwaitResponse().success) {

View File

@ -37,6 +37,7 @@ CoordinatorData::CoordinatorData() {
auto &instance = find_instance(coord_data, instance_name);
MG_ASSERT(instance.IsReplica(), "Instance {} is not a replica!", instance_name);
instance.UpdateLastResponseTime();
instance.UpdateInstanceStatus();
};
replica_fail_cb_ = [find_instance](CoordinatorData *coord_data, std::string_view instance_name) -> void {
@ -104,19 +105,19 @@ auto CoordinatorData::DoFailover() -> DoFailoverStatus {
// TODO: (andi) Don't send replicas which aren't alive
for (const auto &unchosen_replica_instance :
replica_instances | ranges::views::filter(not_chosen_replica_instance) | ranges::views::filter(not_main)) {
repl_clients_info.emplace_back(unchosen_replica_instance.client_.ReplicationClientInfo());
repl_clients_info.emplace_back(unchosen_replica_instance.ReplicationClientInfo());
}
if (!chosen_replica_instance->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
if (!chosen_replica_instance->SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) {
chosen_replica_instance->RestoreAfterFailedFailover();
return DoFailoverStatus::RPC_FAILED;
}
auto old_main = std::ranges::find_if(registered_instances_, &CoordinatorInstance::IsMain);
// TODO: (andi) For performing restoration we will have to improve this
old_main->client_.PauseFrequentCheck();
old_main->PauseFrequentCheck();
chosen_replica_instance->PostFailover(main_succ_cb_, main_fail_cb_);
chosen_replica_instance->PromoteToMain(main_succ_cb_, main_fail_cb_);
return DoFailoverStatus::SUCCESS;
}
@ -150,39 +151,36 @@ auto CoordinatorData::ShowInstances() const -> std::vector<CoordinatorInstanceSt
auto CoordinatorData::SetInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus {
auto lock = std::lock_guard{coord_data_lock_};
// Find replica we already registered
auto registered_replica = std::find_if(
registered_instances_.begin(), registered_instances_.end(),
[instance_name](const CoordinatorInstance &instance) { return instance.InstanceName() == instance_name; });
auto const is_new_main = [&instance_name](const CoordinatorInstance &instance) {
return instance.InstanceName() == instance_name;
};
auto new_main = std::ranges::find_if(registered_instances_, is_new_main);
// if replica not found...
if (registered_replica == registered_instances_.end()) {
if (new_main == registered_instances_.end()) {
spdlog::error("You didn't register instance with given name {}", instance_name);
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
}
registered_replica->client_.PauseFrequentCheck();
new_main->PauseFrequentCheck();
std::vector<CoordinatorClientConfig::ReplicationClientInfo> repl_clients_info;
repl_clients_info.reserve(registered_instances_.size() - 1);
std::ranges::for_each(registered_instances_,
[registered_replica, &repl_clients_info](const CoordinatorInstance &replica) {
if (replica != *registered_replica) {
repl_clients_info.emplace_back(replica.client_.ReplicationClientInfo());
}
});
auto const is_not_new_main = [&instance_name](const CoordinatorInstance &instance) {
return instance.InstanceName() != instance_name;
};
std::ranges::transform(registered_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info),
[](const CoordinatorInstance &instance) { return instance.ReplicationClientInfo(); });
// PROMOTE REPLICA TO MAIN
// THIS SHOULD FAIL HERE IF IT IS DOWN
if (auto result = registered_replica->client_.SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
registered_replica->client_.ResumeFrequentCheck();
if (auto result = new_main->SendPromoteReplicaToMainRpc(std::move(repl_clients_info)); !result) {
new_main->ResumeFrequentCheck();
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
registered_replica->client_.SetSuccCallback(main_succ_cb_);
registered_replica->client_.SetFailCallback(main_fail_cb_);
registered_replica->replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
registered_replica->client_.ResumeFrequentCheck();
new_main->PromoteToMain(main_succ_cb_, main_fail_cb_);
return SetInstanceToMainCoordinatorStatus::SUCCESS;
}
@ -202,16 +200,15 @@ auto CoordinatorData::RegisterInstance(CoordinatorClientConfig config) -> Regist
return RegisterInstanceCoordinatorStatus::END_POINT_EXISTS;
}
CoordinatorClientConfig::ReplicationClientInfo replication_client_info_copy = config.replication_client_info;
ReplClientInfo replication_client_info_copy = config.replication_client_info;
// TODO (antoniofilipovic) create and then push back
auto *instance = &registered_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_,
replication_coordination_glue::ReplicationRole::REPLICA);
if (auto res = instance->client_.SendSetToReplicaRpc(replication_client_info_copy); !res) {
if (auto res = instance->SendSetToReplicaRpc(replication_client_info_copy); !res) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
instance->client_.StartFrequentCheck();
instance->StartFrequentCheck();
return RegisterInstanceCoordinatorStatus::SUCCESS;
}

View File

@ -0,0 +1,77 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_instance.hpp"
namespace memgraph::coordination {
auto CoordinatorInstance::UpdateInstanceStatus() -> bool {
is_alive_ =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_).count() <
CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto CoordinatorInstance::UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto CoordinatorInstance::InstanceName() const -> std::string { return client_.InstanceName(); }
auto CoordinatorInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto CoordinatorInstance::IsAlive() const -> bool { return is_alive_; }
auto CoordinatorInstance::SetReplicationRole(replication_coordination_glue::ReplicationRole role) -> void {
replication_role_ = role;
}
auto CoordinatorInstance::IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto CoordinatorInstance::IsMain() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN;
}
auto CoordinatorInstance::PrepareForFailover() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::PromoteToMain(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void {
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetSuccCallback(std::move(main_succ_cb));
client_.SetFailCallback(std::move(main_fail_cb));
client_.ResumeFrequentCheck();
}
auto CoordinatorInstance::StartFrequentCheck() -> void { client_.StartFrequentCheck(); }
auto CoordinatorInstance::PauseFrequentCheck() -> void { client_.PauseFrequentCheck(); }
auto CoordinatorInstance::ResumeFrequentCheck() -> void { client_.ResumeFrequentCheck(); }
auto CoordinatorInstance::SetSuccCallback(HealthCheckCallback succ_cb) -> void {
client_.SetSuccCallback(std::move(succ_cb));
}
auto CoordinatorInstance::SetFailCallback(HealthCheckCallback fail_cb) -> void {
client_.SetFailCallback(std::move(fail_cb));
}
auto CoordinatorInstance::ResetReplicationClientInfo() -> void { client_.ResetReplicationClientInfo(); }
auto CoordinatorInstance::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo {
return client_.ReplicationClientInfo();
}
auto CoordinatorInstance::SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool {
return client_.SendPromoteReplicaToMainRpc(std::move(replication_clients_info));
}
auto CoordinatorInstance::SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool {
return client_.SendSetToReplicaRpc(std::move(replication_client_info));
}
} // namespace memgraph::coordination
#endif

View File

@ -21,12 +21,11 @@ namespace memgraph::coordination {
class CoordinatorData;
using HealthCheckCallback = std::function<void(CoordinatorData *, std::string_view)>;
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
explicit CoordinatorClient(CoordinatorData *coord_data_, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
@ -48,7 +47,7 @@ class CoordinatorClient {
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
auto ReplicationClientInfo() const -> const ReplClientInfo &;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto ResetReplicationClientInfo() -> void;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;

View File

@ -35,34 +35,35 @@ class CoordinatorInstance {
CoordinatorInstance &operator=(CoordinatorInstance &&other) noexcept = delete;
~CoordinatorInstance() = default;
auto UpdateInstanceStatus() -> bool {
is_alive_ = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - last_response_time_)
.count() < CoordinatorClusterConfig::alive_response_time_difference_sec_;
return is_alive_;
}
auto UpdateLastResponseTime() -> void { last_response_time_ = std::chrono::system_clock::now(); }
auto UpdateInstanceStatus() -> bool;
auto UpdateLastResponseTime() -> void;
auto IsAlive() const -> bool;
auto InstanceName() const -> std::string { return client_.InstanceName(); }
auto SocketAddress() const -> std::string { return client_.SocketAddress(); }
auto IsAlive() const -> bool { return is_alive_; }
auto InstanceName() const -> std::string;
auto SocketAddress() const -> std::string;
auto IsReplica() const -> bool {
return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA;
}
auto IsMain() const -> bool { return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; }
auto SetReplicationRole(replication_coordination_glue::ReplicationRole role) -> void;
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PrepareForFailover() -> void { client_.PauseFrequentCheck(); }
auto RestoreAfterFailedFailover() -> void { client_.ResumeFrequentCheck(); }
auto PrepareForFailover() -> void;
auto RestoreAfterFailedFailover() -> void;
auto PromoteToMain(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void;
auto PostFailover(HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> void {
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
client_.SetSuccCallback(std::move(main_succ_cb));
client_.SetFailCallback(std::move(main_fail_cb));
// Comment with Andi but we shouldn't delete this, what if this MAIN FAILS AGAIN
// client_.ResetReplicationClientInfo();
client_.ResumeFrequentCheck();
}
auto StartFrequentCheck() -> void;
auto PauseFrequentCheck() -> void;
auto ResumeFrequentCheck() -> void;
auto ResetReplicationClientInfo() -> void;
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetSuccCallback(HealthCheckCallback succ_cb) -> void;
auto SetFailCallback(HealthCheckCallback fail_cb) -> void;
auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool;
auto SendSetToReplicaRpc(ReplClientInfo replication_client_info) const -> bool;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
std::chrono::system_clock::time_point last_response_time_{};

View File

@ -162,5 +162,34 @@ def test_registering_replica_fails_endpoint_exists(connection):
)
def test_replica_instance_restarts(connection):
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7690, "coordinator").cursor()
def retrieve_data():
return sorted(list(execute_and_fetch_all(cursor, "SHOW REPLICATION CLUSTER;")))
expected_data_up = [
("instance_1", "127.0.0.1:10011", True, "replica"),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_up, retrieve_data)
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
expected_data_down = [
("instance_1", "127.0.0.1:10011", False, ""),
("instance_2", "127.0.0.1:10012", True, "replica"),
("instance_3", "127.0.0.1:10013", True, "main"),
]
mg_sleep_and_assert(expected_data_down, retrieve_data)
interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1")
mg_sleep_and_assert(expected_data_up, retrieve_data)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))