registration backed-up by raft

This commit is contained in:
Andi Skrgat 2024-02-23 08:43:21 +01:00
parent 9c43983675
commit c7035f9c8b
9 changed files with 183 additions and 3 deletions

View File

@ -89,5 +89,22 @@ auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterSta
return cluster_state; return cluster_state;
} }
auto CoordinatorClusterState::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
auto const role_to_string = [](auto const &role) -> std::string {
switch (role) {
case replication_coordination_glue::ReplicationRole::MAIN:
return "main";
case replication_coordination_glue::ReplicationRole::REPLICA:
return "replica";
}
};
auto const entry_to_pair = [&role_to_string](auto const &entry) {
return std::make_pair(entry.first, role_to_string(entry.second));
};
return instance_roles | ranges::views::transform(entry_to_pair) | ranges::to<std::vector>();
}
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -32,6 +32,7 @@ CoordinatorInstance::CoordinatorInstance()
: raft_state_(RaftState::MakeRaftState( : raft_state_(RaftState::MakeRaftState(
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
<<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_}; auto lock = std::unique_lock{self->coord_instance_lock_};
@ -177,6 +178,129 @@ CoordinatorInstance::CoordinatorInstance()
self->TryFailover(); self->TryFailover();
} }
}; };
||||||| parent of 99c53148c (registration backed-up by raft)
auto find_repl_instance = [](CoordinatorInstance *self,
std::string_view repl_instance_name) -> ReplicationInstance & {
auto repl_instance =
std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == repl_instance_name;
});
MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!",
repl_instance_name);
return *repl_instance;
};
replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->main_uuid_)) {
spdlog::error("Failed to swap uuid for alive replica instance {}.", repl_instance.InstanceName());
return;
}
repl_instance.OnSuccessPing();
};
replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
// We need to restart main uuid from instance since it was "down" at least a second
// There is slight delay, if we choose to use isAlive, instance can be down and back up in less than
// our isAlive time difference, which would lead to instance
// https://github.com/memgraph/memgraph/pull/1720#discussion_r1493833414 setting UUID to nullopt and stopping
// accepting any incoming RPCs from valid main
// TODO(antoniofilipovic) this needs here more complex logic
// We need to get id of main replica is listening to on successful ping
// and swap it to correct uuid if it failed
repl_instance.ResetMainUUID();
};
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
return;
}
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
if (self->main_uuid_ == repl_instance_uuid.value()) {
if (!repl_instance.EnableWritingOnMain()) {
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
return;
}
repl_instance.OnSuccessPing();
return;
}
if (!self->raft_state_.RequestLeadership()) {
spdlog::error("Failed to request leadership for demoting instance to replica {}.", repl_instance_name);
return;
}
// TODO: (andi) Improve std::string, appending...
auto const res = self->raft_state_.AppendSetInstanceAsReplica(std::string(repl_instance_name));
if (!res->get_accepted()) {
spdlog::error(
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not the "
"leader.",
repl_instance_name);
return;
}
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
spdlog::error("Failed to demote instance {} with error code {}", repl_instance_name, res->get_result_code());
return;
}
spdlog::info("Request for demoting instance {} accepted", repl_instance_name);
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
if (!repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) {
spdlog::error("Instance {} failed to become replica", repl_instance_name);
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(self->main_uuid_)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
return;
}
repl_instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", repl_instance_name);
};
main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
if (!repl_instance.IsAlive() && self->main_uuid_ == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
self->TryFailover();
}
};
=======
>>>>>>> 99c53148c (registration backed-up by raft)
} }
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> { auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {

View File

@ -49,7 +49,6 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<std::string,
buffer_serializer bs(data); buffer_serializer bs(data);
auto const log_str = bs.get_str(); auto const log_str = bs.get_str();
auto const sep = log_str.find('_'); auto const sep = log_str.find('_');
auto const action = log_str.substr(0, sep); auto const action = log_str.substr(0, sep);
auto const name = log_str.substr(sep + 1); auto const name = log_str.substr(sep + 1);

View File

@ -83,5 +83,16 @@ class RaftCouldNotParseFlagsException final : public utils::BasicException {
SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotParseFlagsException) SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotParseFlagsException)
}; };
class InvalidRaftLogActionException final : public utils::BasicException {
public:
explicit InvalidRaftLogActionException(std::string_view what) noexcept : BasicException(what) {}
template <class... Args>
explicit InvalidRaftLogActionException(fmt::format_string<Args...> fmt, Args &&...args) noexcept
: InvalidRaftLogActionException(fmt::format(fmt, std::forward<Args>(args)...)) {}
SPECIALIZE_GET_EXCEPTION_NAME(InvalidRaftLogActionException)
};
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -67,7 +67,8 @@ class CoordinatorInstance {
private: private:
HealthCheckClientCallback client_succ_cb_, client_fail_cb_; HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability // NOTE: Only leader should have repl_instances_, not followers.
// NOTE: Must be std::list because we rely on pointer stability.
std::list<ReplicationInstance> repl_instances_; std::list<ReplicationInstance> repl_instances_;
mutable utils::ResourceLock coord_instance_lock_{}; mutable utils::ResourceLock coord_instance_lock_{};

View File

@ -63,13 +63,16 @@ class RaftState {
auto IsMain(std::string const &instance_name) const -> bool; auto IsMain(std::string const &instance_name) const -> bool;
auto IsReplica(std::string const &instance_name) const -> bool; auto IsReplica(std::string const &instance_name) const -> bool;
/// TODO: (andi) Add log in the name of methods
auto AppendRegisterReplicationInstance(std::string const &instance_name) -> ptr<raft_result>; auto AppendRegisterReplicationInstance(std::string const &instance_name) -> ptr<raft_result>;
auto AppendUnregisterReplicationInstance(std::string const &instance_name) -> ptr<raft_result>; auto AppendUnregisterReplicationInstance(std::string const &instance_name) -> ptr<raft_result>;
auto AppendSetInstanceAsMain(std::string const &instance_name) -> ptr<raft_result>; auto AppendSetInstanceAsMain(std::string const &instance_name) -> ptr<raft_result>;
auto AppendSetInstanceAsReplica(std::string const &instance_name) -> ptr<raft_result>; auto AppendSetInstanceAsReplica(std::string const &instance_name) -> ptr<raft_result>;
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
private: private:
// TODO: (andi) I think variables below can be abstracted // TODO: (andi) I think variables below can be abstracted/clean them.
uint32_t raft_server_id_; uint32_t raft_server_id_;
uint32_t raft_port_; uint32_t raft_port_;
std::string raft_address_; std::string raft_address_;

View File

@ -44,6 +44,8 @@ class CoordinatorClusterState {
static auto Deserialize(buffer &data) -> CoordinatorClusterState; static auto Deserialize(buffer &data) -> CoordinatorClusterState;
auto GetInstances() const -> std::vector<std::pair<std::string, std::string>>;
private: private:
std::map<std::string, replication_coordination_glue::ReplicationRole> instance_roles; std::map<std::string, replication_coordination_glue::ReplicationRole> instance_roles;
}; };

View File

@ -13,7 +13,10 @@
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
#include "coordination/coordinator_exceptions.hpp"
#include <cstdint> #include <cstdint>
#include <string>
namespace memgraph::coordination { namespace memgraph::coordination {
@ -24,5 +27,21 @@ enum class RaftLogAction : uint8_t {
SET_INSTANCE_AS_REPLICA SET_INSTANCE_AS_REPLICA
}; };
inline auto ParseRaftLogAction(std::string const &action) -> RaftLogAction {
if (action == "register") {
return RaftLogAction::REGISTER_REPLICATION_INSTANCE;
}
if (action == "unregister") {
return RaftLogAction::UNREGISTER_REPLICATION_INSTANCE;
}
if (action == "set_main") {
return RaftLogAction::SET_INSTANCE_AS_MAIN;
}
if (action == "set_replica") {
return RaftLogAction::SET_INSTANCE_AS_REPLICA;
}
throw InvalidRaftLogActionException("Invalid Raft log action: " + action);
}
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif

View File

@ -156,5 +156,9 @@ auto RaftState::IsReplica(std::string const &instance_name) const -> bool {
return state_machine_->IsReplica(instance_name); return state_machine_->IsReplica(instance_name);
} }
auto RaftState::GetInstances() const -> std::vector<std::pair<std::string, std::string>> {
return state_machine_->GetInstances();
}
} // namespace memgraph::coordination } // namespace memgraph::coordination
#endif #endif