add callback for demote to replica on force reset

This commit is contained in:
antoniofilipovic 2024-03-23 16:40:04 +01:00
parent 71f0f4a4b1
commit ee81a42923
15 changed files with 160 additions and 62 deletions

View File

@ -60,21 +60,16 @@ void CoordinatorClient::StartFrequentCheck() {
MG_ASSERT(config_.instance_health_check_frequency_sec > std::chrono::seconds(0),
"Health check frequency must be greater than 0");
instance_checker_.Run(
config_.instance_name, config_.instance_health_check_frequency_sec,
[this, instance_name = config_.instance_name] {
try {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
config_.CoordinatorSocketAddress());
{ // NOTE: This is intentionally scoped so that stream lock could get released.
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_instance_, instance_name);
}
});
instance_checker_.Run(config_.instance_name, config_.instance_health_check_frequency_sec,
[this, instance_name = config_.instance_name] {
spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name,
config_.CoordinatorSocketAddress());
if (SendFrequentHeartbeat()) {
succ_cb_(coord_instance_, instance_name);
return;
}
fail_cb_(coord_instance_, instance_name);
});
}
void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
@ -116,6 +111,16 @@ auto CoordinatorClient::DemoteToReplica() const -> bool {
return false;
}
auto CoordinatorClient::SendFrequentHeartbeat() const -> bool {
try {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
return true;
} catch (rpc::RpcFailedException const &) {
return false;
}
}
auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool {
try {
auto stream{rpc_client_.Stream<replication_coordination_glue::SwapMainUUIDRpc>(uuid)};

View File

@ -19,14 +19,17 @@
namespace memgraph::coordination {
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state) {
j = nlohmann::json{
{"config", instance_state.config}, {"status", instance_state.status}, {"uuid", instance_state.instance_uuid}};
j = nlohmann::json{{"config", instance_state.config},
{"status", instance_state.status},
{"uuid", instance_state.instance_uuid},
{"needs_demote", instance_state.needs_demote}};
}
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state) {
j.at("config").get_to(instance_state.config);
j.at("status").get_to(instance_state.status);
j.at("uuid").get_to(instance_state.instance_uuid);
j.at("needs_demote").get_to(instance_state.needs_demote);
}
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
@ -151,6 +154,13 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
break;
}
case RaftLogAction::INSTANCE_NEEDS_DEMOTE: {
auto const instance_name = std::get<std::string>(log_entry);
auto it = repl_instances_.find(instance_name);
MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!");
it->second.needs_demote = true;
spdlog::trace("Added action that instance {} needs demote", instance_name);
}
case RaftLogAction::OPEN_LOCK: {
is_lock_opened_ = true;
spdlog::trace("DoAction: Opened lock");

View File

@ -83,8 +83,6 @@ CoordinatorInstance::CoordinatorInstance()
})) {
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
// when coordinator is becoming follower it will want to stop all threads doing frequent checks
// Thread can get stuck here waiting for lock so we need to frequently check if we are in shutdown state
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
@ -180,7 +178,7 @@ void CoordinatorInstance::ForceResetCluster() {
// 5. For instances which were down set correct callback as before
// 6. After instance get's back up, do steps needed to recover
spdlog::info("Force resetting cluster!");
spdlog::trace("Force resetting cluster!");
// Ordering is important here, we must stop frequent check before
// taking lock to avoid deadlock between us stopping thread and thread wanting to take lock but can't because
// we have it
@ -189,10 +187,9 @@ void CoordinatorInstance::ForceResetCluster() {
repl_instance.StopFrequentCheck();
spdlog::trace("Stopped frequent check for instance {}", repl_instance.InstanceName());
});
spdlog::trace("Stopped all replication instance frequent checks.");
auto lock = std::unique_lock{coord_instance_lock_};
repl_instances_.clear();
spdlog::info("Stopped all replication instance frequent checks.");
if (!raft_state_.IsLeader()) {
spdlog::trace("Exiting force reset as coordinator is not any more leader!");
@ -201,12 +198,13 @@ void CoordinatorInstance::ForceResetCluster() {
if (!raft_state_.AppendOpenLock()) {
spdlog::trace("Appending log force reset failed, aborting force reset");
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is leader but append open lock failed, encountered wrong state.");
return;
}
utils::OnScopeExit maybe_do_another_reset{[this]() {
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
spdlog::trace("Adding task to try force reset cluster again.");
spdlog::trace("Adding task to try force reset cluster again as lock is opened still.");
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
return;
}
@ -226,21 +224,23 @@ void CoordinatorInstance::ForceResetCluster() {
&CoordinatorInstance::ReplicaFailCallback);
});
auto instances_mapped_to_resp =
repl_instances_ | ranges::views::transform([](ReplicationInstance &instance) {
return std::pair{instance.InstanceName(), instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)};
}) |
ranges::to<std::unordered_map<std::string, bool>>();
auto instances_mapped_to_resp = repl_instances_ | ranges::views::transform([](auto &instance) {
return std::pair{instance.InstanceName(), instance.SendFrequentHeartbeat()};
}) |
ranges::to<std::unordered_map<std::string, bool>>();
auto alive_instances =
repl_instances_ | ranges::views::filter([&instances_mapped_to_resp](ReplicationInstance &instance) {
return instances_mapped_to_resp[instance.InstanceName()];
});
auto alive_instances = repl_instances_ | ranges::views::filter([&instances_mapped_to_resp](auto const &instance) {
return instances_mapped_to_resp[instance.InstanceName()];
});
if (std::ranges::any_of(alive_instances, [this](ReplicationInstance &instance) {
return !raft_state_.AppendSetInstanceAsReplicaLog(instance.InstanceName());
})) {
auto demote_to_replica_failed = [this](auto &instance) {
if (!instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
return true;
}
return !raft_state_.AppendSetInstanceAsReplicaLog(instance.InstanceName());
};
if (std::ranges::any_of(alive_instances, demote_to_replica_failed)) {
spdlog::error("Failed to send log instance demoted to replica.");
return;
}
@ -290,28 +290,44 @@ void CoordinatorInstance::ForceResetCluster() {
return;
}
// Go through instances which were down and update callbacks
// We need to clear repl instances in the beginning as we don't know where exactly action failed and
// we need to recreate state from raft log
// If instance in raft log is MAIN, it can be REPLICA but raft append failed when we demoted it
// If instance in raft log is REPLICA, it can be MAIN but raft log failed when we promoted it
// CRUX of problem: We need universal callback which will get correct state of instance and swap callback then
std::ranges::for_each(repl_instances_, [&instances_mapped_to_resp, this](ReplicationInstance &repl_instance) {
// TODO(antoniofilipovic): Summary of problem:
// above
// TODO(antoniofilipovic) Update this part here
auto needs_demote_setup_failed = [&instances_mapped_to_resp, this](ReplicationInstance &repl_instance) {
if (instances_mapped_to_resp[repl_instance.InstanceName()]) {
if (raft_state_.HasReplicaState(repl_instance.InstanceName())) {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::ReplicaSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::ReplicaFailCallback,
"Callbacks are wrong");
}
if (raft_state_.HasMainState(repl_instance.InstanceName())) {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::MainSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::MainFailCallback);
}
} else {
if (raft_state_.HasReplicaState(repl_instance.InstanceName())) {
repl_instance.SetCallbacks(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
} else {
repl_instance.SetCallbacks(&CoordinatorInstance::MainSuccessCallback, &CoordinatorInstance::MainFailCallback);
}
return false;
}
});
if (!raft_state_.AppendInstanceNeedsDemote(repl_instance.InstanceName())) {
return true;
}
repl_instance.SetCallbacks(&CoordinatorInstance::UniversalSuccessCallback,
&CoordinatorInstance::UniversalFailCallback);
return false;
};
if (std::ranges::any_of(repl_instances_, needs_demote_setup_failed)) {
spdlog::error("Raft log didn't accept that some instances are in unknown state.");
return;
}
auto check_correct_callbacks_set = [this](auto &repl_instance) {
if (raft_state_.HasReplicaState(repl_instance.InstanceName())) {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::ReplicaSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::ReplicaFailCallback,
"Callbacks are wrong");
} else {
MG_ASSERT(repl_instance.GetSuccessCallback() == &CoordinatorInstance::MainSuccessCallback &&
repl_instance.GetFailCallback() == &CoordinatorInstance::MainFailCallback);
}
};
std::ranges::for_each(alive_instances, check_correct_callbacks_set);
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
@ -525,7 +541,8 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
if (!new_instance->SendDemoteToReplicaRpc()) {
if (!new_instance->DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
// TODO(antoniofilipovic) We don't need to do here force reset, only close lock later on
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
@ -716,6 +733,27 @@ void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_nam
repl_instance.OnFailPing();
}
void CoordinatorInstance::UniversalSuccessCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.SendDemoteToReplicaRpc()) {
return;
}
if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) {
return;
}
// TODO(antoniofilipovic) Double check that switching works
repl_instance.SetCallbacks(&CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback);
}
void CoordinatorInstance::UniversalFailCallback(std::string_view repl_instance_name) {
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
}
auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> instance_database_histories)
-> NewMainRes {
std::optional<NewMainRes> new_main_res;

View File

@ -59,6 +59,10 @@ auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view ins
return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
}
auto CoordinatorStateMachine::SerializeInstanceNeedsDemote(std::string_view instance_name) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::INSTANCE_NEEDS_DEMOTE}, {"info", std::string{instance_name}}});
}
auto CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer> {
return CreateLog({{"action", RaftLogAction::UPDATE_UUID_OF_NEW_MAIN}, {"info", uuid}});
}
@ -92,6 +96,8 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
return {info.get<InstanceUUIDUpdate>(), action};
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
[[fallthrough]];
case RaftLogAction::INSTANCE_NEEDS_DEMOTE:
[[fallthrough]];
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
return {info.get<std::string>(), action};
case RaftLogAction::ADD_COORDINATOR_INSTANCE:

View File

@ -15,6 +15,7 @@
#include "coordination/coordinator_communication_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/role.hpp"
#include "rpc/client.hpp"
#include "rpc_errors.hpp"
#include "utils/result.hpp"
@ -64,6 +65,8 @@ class CoordinatorClient {
auto ReplicationClientInfo() const -> ReplicationClientInfo;
auto SendFrequentHeartbeat() const -> bool;
auto SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>;

View File

@ -112,6 +112,10 @@ class CoordinatorInstance {
void ReplicaFailCallback(std::string_view);
void UniversalSuccessCallback(std::string_view);
void UniversalFailCallback(std::string_view);
void ForceResetCluster();
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;

View File

@ -16,6 +16,7 @@
#include "coordination/coordinator_communication_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/role.hpp"
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"

View File

@ -72,6 +72,7 @@ class RaftState {
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
auto AppendOpenLock() -> bool;
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
auto AppendInstanceNeedsDemote(std::string_view) -> bool;
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
// TODO: (andi) Do we need then GetAllCoordinators?

View File

@ -23,6 +23,7 @@
#include <libnuraft/nuraft.hxx>
#include <functional>
namespace memgraph::coordination {
class CoordinatorInstance;
@ -59,6 +60,8 @@ class ReplicationInstance {
auto SendDemoteToReplicaRpc() -> bool;
auto SendFrequentHeartbeat() const -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;
@ -79,8 +82,8 @@ class ReplicationInstance {
auto EnableWritingOnMain() -> bool;
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
auto GetFailCallback() -> HealthCheckInstanceCallback &;
auto GetSuccessCallback() -> HealthCheckInstanceCallback;
auto GetFailCallback() -> HealthCheckInstanceCallback;
void SetCallbacks(HealthCheckInstanceCallback succ_cb, HealthCheckInstanceCallback fail_cb);

View File

@ -43,8 +43,11 @@ struct ReplicationInstanceState {
// For MAIN we don't enable writing until cluster is in healthy state
utils::UUID instance_uuid;
bool needs_demote{false};
friend auto operator==(ReplicationInstanceState const &lhs, ReplicationInstanceState const &rhs) -> bool {
return lhs.config == rhs.config && lhs.status == rhs.status && lhs.instance_uuid == rhs.instance_uuid;
return lhs.config == rhs.config && lhs.status == rhs.status && lhs.instance_uuid == rhs.instance_uuid &&
lhs.needs_demote == rhs.needs_demote;
}
};

View File

@ -51,6 +51,7 @@ class CoordinatorStateMachine : public state_machine {
static auto SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer>;
static auto SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr<buffer>;
static auto SerializeInstanceNeedsDemote(std::string_view instance_name) -> ptr<buffer>;
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;

View File

@ -31,6 +31,7 @@ enum class RaftLogAction : uint8_t {
UPDATE_UUID_OF_NEW_MAIN,
ADD_COORDINATOR_INSTANCE,
UPDATE_UUID_FOR_INSTANCE,
INSTANCE_NEEDS_DEMOTE
};
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
@ -40,6 +41,7 @@ NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {{RaftLogAction::REGISTER_REPLICATIO
{RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"},
{RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"},
{RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"},
{RaftLogAction::INSTANCE_NEEDS_DEMOTE, "instance_needs_demote"},
{RaftLogAction::OPEN_LOCK, "open_lock"}})
} // namespace memgraph::coordination

View File

@ -303,6 +303,25 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
return true;
}
auto RaftState::AppendInstanceNeedsDemote(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeInstanceNeedsDemote(instance_name);
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error("Failed to accept request that instance {} needs demote", instance_name);
return false;
}
spdlog::trace("Request that instance {} needs demote accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to add instance {} needs demote with error code {}", instance_name,
static_cast<int>(res->get_result_code()));
return false;
}
return true;
}
auto RaftState::AppendUpdateUUIDForInstanceLog(std::string_view instance_name, const utils::UUID &uuid) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForInstance(
{.instance_name = std::string{instance_name}, .uuid = uuid});

View File

@ -64,6 +64,8 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
auto ReplicationInstance::SendFrequentHeartbeat() const -> bool { return client_.SendFrequentHeartbeat(); }
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) {
@ -85,8 +87,8 @@ auto ReplicationInstance::ReplicationClientInfo() const -> coordination::Replica
return client_.ReplicationClientInfo();
}
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback { return succ_cb_; }
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback { return fail_cb_; }
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }

View File

@ -17,7 +17,7 @@
namespace memgraph::replication_coordination_glue {
// TODO: figure out a way of ensuring that usage of this type is never uninitialed/defaulted incorrectly to MAIN
// TODO: figure out a way of ensuring that usage of this type is never uninitialized/defaulted incorrectly to MAIN
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
NLOHMANN_JSON_SERIALIZE_ENUM(ReplicationRole, {{ReplicationRole::MAIN, "main"}, {ReplicationRole::REPLICA, "replica"}})