Distributed failover

This commit is contained in:
Andi Skrgat 2024-03-01 07:15:47 +01:00
parent 12d17c2707
commit 8805021dd9
12 changed files with 206 additions and 62 deletions

View File

@ -12,59 +12,92 @@
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_cluster_state.hpp"
#include "utils/logging.hpp"
#include <shared_mutex>
namespace memgraph::coordination {
using replication_coordination_glue::ReplicationRole;
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
: instance_roles_{other.instance_roles_} {}
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) {
if (this == &other) {
return *this;
}
instance_roles_ = other.instance_roles_;
return *this;
}
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept
: instance_roles_{std::move(other.instance_roles_)} {}
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept {
if (this == &other) {
return *this;
}
instance_roles_ = std::move(other.instance_roles_);
return *this;
}
auto CoordinatorClusterState::MainExists() const -> bool {
return std::ranges::any_of(instance_roles, [](auto const &entry) { return entry.second == ReplicationRole::MAIN; });
auto lock = std::shared_lock{log_lock_};
return std::ranges::any_of(instance_roles_,
[](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; });
}
auto CoordinatorClusterState::IsMain(std::string_view instance_name) const -> bool {
auto const it = instance_roles.find(instance_name);
return it != instance_roles.end() && it->second == ReplicationRole::MAIN;
auto lock = std::shared_lock{log_lock_};
auto const it = instance_roles_.find(instance_name);
return it != instance_roles_.end() && it->second.role == ReplicationRole::MAIN;
}
auto CoordinatorClusterState::IsReplica(std::string_view instance_name) const -> bool {
auto const it = instance_roles.find(instance_name);
return it != instance_roles.end() && it->second == ReplicationRole::REPLICA;
auto lock = std::shared_lock{log_lock_};
auto const it = instance_roles_.find(instance_name);
return it != instance_roles_.end() && it->second.role == ReplicationRole::REPLICA;
}
auto CoordinatorClusterState::InsertInstance(std::string_view instance_name, ReplicationRole role) -> void {
instance_roles[instance_name.data()] = role;
auto lock = std::unique_lock{log_lock_};
instance_roles_[instance_name.data()].role = role;
}
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
auto lock = std::unique_lock{log_lock_};
switch (log_action) {
case RaftLogAction::REGISTER_REPLICATION_INSTANCE: {
auto const instance_name = std::get<CoordinatorClientConfig>(log_entry).instance_name;
instance_roles[instance_name] = ReplicationRole::REPLICA;
auto const &config = std::get<CoordinatorClientConfig>(log_entry);
instance_roles_[config.instance_name] = InstanceState{config, ReplicationRole::REPLICA};
break;
}
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: {
auto const instance_name = std::get<std::string>(log_entry);
instance_roles.erase(instance_name);
instance_roles_.erase(instance_name);
break;
}
case RaftLogAction::SET_INSTANCE_AS_MAIN: {
auto const instance_name = std::get<std::string>(log_entry);
auto it = instance_roles.find(instance_name);
MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!");
it->second = ReplicationRole::MAIN;
auto it = instance_roles_.find(instance_name);
MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!");
it->second.role = ReplicationRole::MAIN;
break;
}
case RaftLogAction::SET_INSTANCE_AS_REPLICA: {
auto const instance_name = std::get<std::string>(log_entry);
auto it = instance_roles.find(instance_name);
MG_ASSERT(it != instance_roles.end(), "Instance does not exist as part of raft state!");
it->second = ReplicationRole::REPLICA;
auto it = instance_roles_.find(instance_name);
MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!");
it->second.role = ReplicationRole::REPLICA;
break;
}
}
}
// TODO: (andi) Improve based on Gareth's comments
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
auto lock = std::shared_lock{log_lock_};
auto const role_to_string = [](auto const &role) -> std::string_view {
switch (role) {
case ReplicationRole::MAIN:
@ -75,10 +108,10 @@ auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
};
auto const entry_to_string = [&role_to_string](auto const &entry) {
return fmt::format("{}_{}", entry.first, role_to_string(entry.second));
return fmt::format("{}_{}", entry.first, role_to_string(entry.second.role));
};
auto instances_str_view = instance_roles | ranges::views::transform(entry_to_string);
auto instances_str_view = instance_roles_ | ranges::views::transform(entry_to_string);
uint32_t size =
std::accumulate(instances_str_view.begin(), instances_str_view.end(), 0,
[](uint32_t acc, auto const &entry) { return acc + sizeof(uint32_t) + entry.size(); });
@ -109,6 +142,8 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta
}
auto CoordinatorClusterState::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
auto lock = std::shared_lock{log_lock_};
// TODO: (andi) Abstract
auto const role_to_string = [](auto const &role) -> std::string {
switch (role) {
case ReplicationRole::MAIN:
@ -119,10 +154,27 @@ auto CoordinatorClusterState::GetInstances() const -> std::vector<std::pair<std:
};
auto const entry_to_pair = [&role_to_string](auto const &entry) {
return std::make_pair(entry.first, role_to_string(entry.second));
return std::make_pair(entry.first, role_to_string(entry.second.role));
};
return instance_roles | ranges::views::transform(entry_to_pair) | ranges::to<std::vector>();
return instance_roles_ | ranges::views::transform(entry_to_pair) | ranges::to<std::vector>();
}
auto CoordinatorClusterState::GetClientConfigs() const -> std::vector<CoordinatorClientConfig> {
auto lock = std::shared_lock{log_lock_};
return instance_roles_ | ranges::views::values |
ranges::views::transform([](auto const &instance_state) { return instance_state.config; }) |
ranges::to<std::vector>();
}
auto CoordinatorClusterState::FindCurrentMainInstanceName() const -> std::optional<std::string> {
auto lock = std::shared_lock{log_lock_};
auto const it = std::ranges::find_if(instance_roles_,
[](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; });
if (it == instance_roles_.end()) {
return {};
}
return it->first;
}
} // namespace memgraph::coordination

View File

@ -562,3 +562,4 @@ auto CoordinatorInstance::IsReplica(std::string_view instance_name) const -> boo
} // namespace memgraph::coordination
#endif

View File

@ -12,9 +12,14 @@
#ifdef MG_ENTERPRISE
#include "nuraft/coordinator_state_machine.hpp"
#include "utils/logging.hpp"
namespace memgraph::coordination {
auto CoordinatorStateMachine::FindCurrentMainInstanceName() const -> std::optional<std::string> {
return cluster_state_.FindCurrentMainInstanceName();
}
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
auto CoordinatorStateMachine::IsMain(std::string_view instance_name) const -> bool {
@ -78,7 +83,6 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr<buffer> { return nullptr; }
auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr<buffer> {
// TODO: (andi) think about locking scheme
buffer_serializer bs(data);
auto const [parsed_data, log_action] = DecodeLog(data);
@ -183,5 +187,9 @@ auto CoordinatorStateMachine::GetInstances() const -> std::vector<std::pair<std:
return cluster_state_.GetInstances();
}
auto CoordinatorStateMachine::GetClientConfigs() const -> std::vector<CoordinatorClientConfig> {
return cluster_state_.GetClientConfigs();
}
} // namespace memgraph::coordination
#endif

View File

@ -67,6 +67,9 @@ class CoordinatorInstance {
void ReplicaFailCallback(std::string_view);
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
// NOTE: Must be std::list because we rely on pointer stability.
// Leader and followers should both have same view on repl_instances_
std::list<ReplicationInstance> repl_instances_;

View File

@ -63,18 +63,20 @@ class RaftState {
auto RequestLeadership() -> bool;
auto IsLeader() const -> bool;
auto FindCurrentMainInstanceName() const -> std::optional<std::string>;
auto MainExists() const -> bool;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
/// TODO: (andi) Add log in the name of methods
auto AppendRegisterReplicationInstance(CoordinatorClientConfig const &config) -> ptr<raft_result>;
auto AppendUnregisterReplicationInstance(std::string_view instance_name) -> ptr<raft_result>;
auto AppendSetInstanceAsMain(std::string_view instance_name) -> ptr<raft_result>;
auto AppendSetInstanceAsReplica(std::string_view instance_name) -> ptr<raft_result>;
auto AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool;
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool;
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
auto GetClientConfigs() const -> std::vector<CoordinatorClientConfig>;
private:
// TODO: (andi) I think variables below can be abstracted/clean them.
uint32_t raft_server_id_;

View File

@ -24,8 +24,7 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
NOT_COORDINATOR,
NOT_LEADER,
RPC_FAILED,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
RAFT_LOG_ERROR,
SUCCESS
};
@ -35,8 +34,7 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t {
NOT_COORDINATOR,
RPC_FAILED,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
RAFT_LOG_ERROR,
SUCCESS,
};
@ -45,8 +43,7 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t {
MAIN_ALREADY_EXISTS,
NOT_COORDINATOR,
NOT_LEADER,
RAFT_COULD_NOT_ACCEPT,
RAFT_COULD_NOT_APPEND,
RAFT_LOG_ERROR,
COULD_NOT_PROMOTE_TO_MAIN,
SWAP_UUID_FAILED,
SUCCESS,

View File

@ -57,6 +57,8 @@ class ReplicationInstance {
auto PromoteToMain(utils::UUID const &uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
auto SendDemoteToReplicaRpc() -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;

View File

@ -16,7 +16,7 @@
#include "coordination/coordinator_config.hpp"
#include "nuraft/raft_log_action.hpp"
#include "replication_coordination_glue/role.hpp"
#include "utils/rw_lock.hpp"
#include "utils/resource_lock.hpp"
#include <libnuraft/nuraft.hxx>
#include <range/v3/view.hpp>
@ -28,6 +28,13 @@
namespace memgraph::coordination {
using replication_coordination_glue::ReplicationRole;
struct InstanceState {
CoordinatorClientConfig config;
ReplicationRole role;
};
using TRaftLog = std::variant<CoordinatorClientConfig, std::string>;
using nuraft::buffer;
@ -36,13 +43,23 @@ using nuraft::ptr;
class CoordinatorClusterState {
public:
CoordinatorClusterState() = default;
CoordinatorClusterState(CoordinatorClusterState const &);
CoordinatorClusterState &operator=(CoordinatorClusterState const &);
CoordinatorClusterState(CoordinatorClusterState &&other) noexcept;
CoordinatorClusterState &operator=(CoordinatorClusterState &&other) noexcept;
~CoordinatorClusterState() = default;
auto FindCurrentMainInstanceName() const -> std::optional<std::string>;
auto MainExists() const -> bool;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
auto InsertInstance(std::string_view instance_name, replication_coordination_glue::ReplicationRole role) -> void;
auto InsertInstance(std::string_view instance_name, ReplicationRole role) -> void;
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
@ -52,9 +69,11 @@ class CoordinatorClusterState {
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
auto GetClientConfigs() const -> std::vector<CoordinatorClientConfig>;
private:
std::map<std::string, replication_coordination_glue::ReplicationRole, std::less<>> instance_roles;
// TODO: (andi) Good place for separate lock
std::map<std::string, InstanceState, std::less<>> instance_roles_;
mutable utils::ResourceLock log_lock_{};
};
} // namespace memgraph::coordination

View File

@ -42,6 +42,7 @@ class CoordinatorStateMachine : public state_machine {
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
~CoordinatorStateMachine() override {}
auto FindCurrentMainInstanceName() const -> std::optional<std::string>;
auto MainExists() const -> bool;
auto IsMain(std::string_view instance_name) const -> bool;
auto IsReplica(std::string_view instance_name) const -> bool;
@ -80,6 +81,8 @@ class CoordinatorStateMachine : public state_machine {
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
auto GetClientConfigs() const -> std::vector<CoordinatorClientConfig>;
private:
struct SnapshotCtx {
SnapshotCtx(ptr<snapshot> &snapshot, CoordinatorClusterState const &cluster_state)

View File

@ -126,24 +126,91 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
auto RaftState::AppendRegisterReplicationInstance(CoordinatorClientConfig const &config) -> ptr<raft_result> {
auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool {
auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config);
return raft_server_->append_entries({new_log});
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for registering instance {}. Most likely the reason is that the instance is not "
"the "
"leader.",
config.instance_name);
return false;
}
spdlog::info("Request for registering instance {} accepted", config.instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendUnregisterReplicationInstance(std::string_view instance_name) -> ptr<raft_result> {
auto RaftState::AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeUnregisterInstance(instance_name);
return raft_server_->append_entries({new_log});
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for unregistering instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for unregistering instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to unregister instance {} with error code {}", instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendSetInstanceAsMain(std::string_view instance_name) -> ptr<raft_result> {
auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name);
return raft_server_->append_entries({new_log});
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for promoting instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::AppendSetInstanceAsReplica(std::string_view instance_name) -> ptr<raft_result> {
auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool {
auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsReplica(instance_name);
return raft_server_->append_entries({new_log});
auto const res = raft_server_->append_entries({new_log});
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not "
"the leader.",
instance_name);
return false;
}
spdlog::info("Request for demoting instance {} accepted", instance_name);
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code());
return false;
}
return true;
}
auto RaftState::FindCurrentMainInstanceName() const -> std::optional<std::string> {
return state_machine_->FindCurrentMainInstanceName();
}
auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); }
@ -158,5 +225,9 @@ auto RaftState::GetInstances() const -> std::vector<std::pair<std::string, std::
return state_machine_->GetInstances();
}
auto RaftState::GetClientConfigs() const -> std::vector<CoordinatorClientConfig> {
return state_machine_->GetClientConfigs();
}
} // namespace memgraph::coordination
#endif

View File

@ -27,10 +27,6 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
succ_cb_(succ_instance_cb),
fail_cb_(fail_instance_cb) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
client_.StartFrequentCheck();
}
@ -69,6 +65,8 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication
return true;
}
auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); }
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) {

View File

@ -423,11 +423,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("UNREGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't unregister replica instance since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't unregister replica instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
case RAFT_LOG_ERROR:
throw QueryRuntimeException("Couldn't unregister replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
@ -486,11 +482,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
case RAFT_LOG_ERROR:
throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!");
case RPC_FAILED:
throw QueryRuntimeException(
@ -524,11 +516,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!");
case NOT_LEADER:
throw QueryRuntimeException("Couldn't set instance to main since coordinator is not a leader!");
case RAFT_COULD_NOT_ACCEPT:
throw QueryRuntimeException(
"Couldn't promote instance since raft server couldn't accept the log! Most likely the raft "
"instance is not a leader!");
case RAFT_COULD_NOT_APPEND:
case RAFT_LOG_ERROR:
throw QueryRuntimeException("Couldn't promote instance since raft server couldn't append the log!");
case COULD_NOT_PROMOTE_TO_MAIN:
throw QueryRuntimeException(