add initial version of force reset
This commit is contained in:
parent
32bb99d919
commit
229e66b8bc
@ -136,10 +136,6 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
|
||||
spdlog::trace("DoAction: update uuid of new main {}", std::string{current_main_uuid_});
|
||||
break;
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_FORCE_RESET: {
|
||||
is_healthy_ = false;
|
||||
break;
|
||||
}
|
||||
case RaftLogAction::UPDATE_UUID_FOR_INSTANCE: {
|
||||
auto const instance_uuid_change = std::get<InstanceUUIDUpdate>(log_entry);
|
||||
auto it = repl_instances_.find(instance_uuid_change.instance_name);
|
||||
@ -155,35 +151,10 @@ auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_act
|
||||
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
|
||||
break;
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
||||
case RaftLogAction::OPEN_LOCK: {
|
||||
is_lock_opened_ = true;
|
||||
spdlog::trace("DoAction: open lock register");
|
||||
spdlog::trace("DoAction: Opened lock");
|
||||
break;
|
||||
// TODO(antoniofilipovic) save what we are doing to be able to undo....
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE: {
|
||||
is_lock_opened_ = true;
|
||||
spdlog::trace("DoAction: open lock unregister");
|
||||
break;
|
||||
// TODO(antoniofilipovic) save what we are doing
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN: {
|
||||
is_lock_opened_ = true;
|
||||
spdlog::trace("DoAction: open lock set instance as main");
|
||||
break;
|
||||
// TODO(antoniofilipovic) save what we are doing
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_FAILOVER: {
|
||||
is_lock_opened_ = true;
|
||||
spdlog::trace("DoAction: open lock failover");
|
||||
break;
|
||||
// TODO(antoniofilipovic) save what we are doing
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: {
|
||||
is_lock_opened_ = true;
|
||||
spdlog::trace("DoAction: open lock set instance as replica");
|
||||
break;
|
||||
// TODO(antoniofilipovic) save what we need to undo
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
#include "dbms/constants.hpp"
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
#include "nuraft/coordinator_state_manager.hpp"
|
||||
#include "utils/counter.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
#include "utils/resource_lock.hpp"
|
||||
|
||||
@ -183,18 +182,20 @@ void CoordinatorInstance::ForceResetCluster() {
|
||||
return;
|
||||
}
|
||||
|
||||
utils::OnScopeExit do_another_reset{[this]() {
|
||||
if (raft_state_.IsLockOpened()) {
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
}
|
||||
}};
|
||||
|
||||
if (!raft_state_.AppendOpenLockForceReset()) {
|
||||
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower, something is wrong.");
|
||||
if (!raft_state_.AppendOpenLock()) {
|
||||
spdlog::trace("Appending log force reset failed, aborting force reset");
|
||||
return;
|
||||
}
|
||||
|
||||
utils::OnScopeExit maybe_do_another_reset{[this]() {
|
||||
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
|
||||
spdlog::trace("Adding task to try force reset cluster again.");
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Lock is not opened anymore or coordinator is not leader, not doing force reset again.");
|
||||
}};
|
||||
|
||||
auto const instances = raft_state_.GetReplicationInstances();
|
||||
|
||||
// To each instance we send RPC
|
||||
@ -209,44 +210,35 @@ void CoordinatorInstance::ForceResetCluster() {
|
||||
});
|
||||
|
||||
auto const new_uuid = utils::UUID{};
|
||||
auto const append_log_update_uuid_failed = [this, &new_uuid](ReplicationInstance &repl_instance) {
|
||||
if (!repl_instance.SendSwapAndUpdateUUID(new_uuid)) {
|
||||
// We assume this cannot fail as we filtered out instances which are not alive...
|
||||
return true;
|
||||
}
|
||||
if (!raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance.InstanceName(), new_uuid)) {
|
||||
// error is only if we swap uuid but append log doesn't succeed
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
for (auto const &repl_instance : repl_instances_ | ranges::views::filter([&new_uuid](ReplicationInstance &instance) {
|
||||
instance.OnSuccessPing();
|
||||
return instance.SendSwapAndUpdateUUID(new_uuid);
|
||||
})) {
|
||||
if (!raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance.InstanceName(), new_uuid)) {
|
||||
auto const update_on_success_ping_on_swap_uuid = [&new_uuid](ReplicationInstance &instance) {
|
||||
if (!instance.SendSwapAndUpdateUUID(new_uuid)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
instance.OnSuccessPing();
|
||||
};
|
||||
std::ranges::for_each(repl_instances_, update_on_success_ping_on_swap_uuid);
|
||||
|
||||
if (std::ranges::any_of(repl_instances_, append_log_update_uuid_failed)) {
|
||||
spdlog::error("Force reset failed since update log or swap uuid failed, assuming coordinator is now follower.");
|
||||
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower");
|
||||
spdlog::trace("Adding task to try force reset cluster again.");
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
auto const is_swap_uuid_success = [](ReplicationInstance &instance) { return instance.IsAlive(); };
|
||||
|
||||
auto instances_with_swapped_uuid = repl_instances_ | ranges::views::filter(is_swap_uuid_success);
|
||||
|
||||
auto update_uuid_failed = [&new_uuid, this](auto &repl_instance) {
|
||||
return !raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance.InstanceName(), new_uuid);
|
||||
};
|
||||
if (std::ranges::any_of(instances_with_swapped_uuid, update_uuid_failed)) {
|
||||
spdlog::error("Force reset failed since update log swap uuid failed, assuming coordinator is now follower.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!raft_state_.AppendUpdateUUIDForNewMainLog(new_uuid)) {
|
||||
spdlog::error("Update log for new MAIN failed, assuming coordinator is now follower");
|
||||
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower");
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO(antoniofilipovic) Think how we can isolate this part in common function
|
||||
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
|
||||
|
||||
auto maybe_instance_db_histories = repl_instances_ | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
||||
auto maybe_instance_db_histories =
|
||||
instances_with_swapped_uuid | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
||||
|
||||
auto const ts_has_value = [](auto const &zipped) -> bool {
|
||||
auto &[replica, res] = zipped;
|
||||
@ -258,7 +250,7 @@ void CoordinatorInstance::ForceResetCluster() {
|
||||
return std::make_pair(replica.InstanceName(), res.GetValue());
|
||||
});
|
||||
|
||||
auto instance_db_histories = ranges::views::zip(repl_instances_, maybe_instance_db_histories) |
|
||||
auto instance_db_histories = ranges::views::zip(instances_with_swapped_uuid, maybe_instance_db_histories) |
|
||||
ranges::views::filter(ts_has_value) | transform_to_pairs | ranges::to<std::vector>();
|
||||
|
||||
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
|
||||
@ -269,28 +261,23 @@ void CoordinatorInstance::ForceResetCluster() {
|
||||
auto const is_not_new_main = [&new_main](ReplicationInstance const &repl_instance) {
|
||||
return repl_instance.InstanceName() != new_main.InstanceName();
|
||||
};
|
||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||
auto repl_clients_info = instances_with_swapped_uuid | ranges::views::filter(is_not_new_main) |
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main.PromoteToMain(new_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
&CoordinatorInstance::MainFailCallback)) {
|
||||
spdlog::warn("Force reset failed since promoting replica to main failed, trying again!");
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
spdlog::trace("Adding task to try force reset cluster again.");
|
||||
spdlog::warn("Force reset failed since promoting replica to main failed.");
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO(antoniofilipovic) this will set it in healthy state -> check if we need something else
|
||||
// This will set cluster in healthy state again
|
||||
if (!raft_state_.AppendSetInstanceAsMainLog(most_up_to_date_instance, new_uuid)) {
|
||||
spdlog::error("Update log for new MAIN failed, assuming coordinator is now follower");
|
||||
MG_ASSERT(!raft_state_.IsLeader(), "Coordinator is not follower");
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
spdlog::trace("Adding task to try force reset cluster again.");
|
||||
spdlog::error("Update log for new MAIN failed");
|
||||
return;
|
||||
}
|
||||
|
||||
MG_ASSERT(raft_state_.IsHealthy(), "After force reset we need to be in healthy state.");
|
||||
MG_ASSERT(!raft_state_.IsLockOpened(), "After force reset we need to be in healthy state.");
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::TryFailover() -> void {
|
||||
@ -333,10 +320,17 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
|
||||
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
|
||||
|
||||
if (!raft_state_.AppendOpenLockFailover(most_up_to_date_instance)) {
|
||||
if (!raft_state_.AppendOpenLock()) {
|
||||
spdlog::error("Aborting failover as instance is not anymore leader.");
|
||||
return;
|
||||
}
|
||||
|
||||
utils::OnScopeExit do_reset{[this]() {
|
||||
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
}
|
||||
}};
|
||||
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
@ -414,10 +408,16 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
||||
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||
}
|
||||
|
||||
if (!raft_state_.AppendOpenLockSetInstanceToMain(instance_name)) {
|
||||
return SetInstanceToMainCoordinatorStatus::OPEN_LOCK;
|
||||
if (!raft_state_.AppendOpenLock()) {
|
||||
return SetInstanceToMainCoordinatorStatus::FAILED_TO_OPEN_LOCK;
|
||||
}
|
||||
|
||||
utils::OnScopeExit do_reset{[this]() {
|
||||
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
}
|
||||
}};
|
||||
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
|
||||
@ -491,10 +491,16 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
|
||||
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||
}
|
||||
|
||||
if (!raft_state_.AppendOpenLockRegister(config)) {
|
||||
return RegisterInstanceCoordinatorStatus::OPEN_LOCK;
|
||||
if (!raft_state_.AppendOpenLock()) {
|
||||
return RegisterInstanceCoordinatorStatus::FAILED_TO_OPEN_LOCK;
|
||||
}
|
||||
|
||||
utils::OnScopeExit do_reset{[this]() {
|
||||
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
}
|
||||
}};
|
||||
|
||||
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
|
||||
&CoordinatorInstance::ReplicaSuccessCallback,
|
||||
&CoordinatorInstance::ReplicaFailCallback);
|
||||
@ -543,10 +549,16 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
||||
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
|
||||
}
|
||||
|
||||
if (!raft_state_.AppendOpenLockUnregister(instance_name)) {
|
||||
return UnregisterInstanceCoordinatorStatus::OPEN_LOCK;
|
||||
if (!raft_state_.AppendOpenLock()) {
|
||||
return UnregisterInstanceCoordinatorStatus::FAILED_TO_OPEN_LOCK;
|
||||
}
|
||||
|
||||
utils::OnScopeExit do_reset{[this]() {
|
||||
if (raft_state_.IsLockOpened() && raft_state_.IsLeader()) {
|
||||
thread_pool_.AddTask([this]() { this->ForceResetCluster(); });
|
||||
}
|
||||
}};
|
||||
|
||||
inst_to_remove->StopFrequentCheck();
|
||||
|
||||
auto curr_main = std::ranges::find_if(repl_instances_, is_current_main);
|
||||
@ -617,7 +629,7 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
||||
return;
|
||||
}
|
||||
|
||||
if (!raft_state_.AppendOpenLockSetInstanceToReplica(repl_instance.InstanceName())) {
|
||||
if (!raft_state_.AppendOpenLock()) {
|
||||
spdlog::error("Failed to open lock for demoting OLD MAIN {} to REPLICA", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
@ -38,21 +38,8 @@ auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr<buffer> {
|
||||
return log_buf;
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr<buffer> {
|
||||
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE}, {"info", config}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeOpenLockUnregister(std::string_view instance_name) -> ptr<buffer> {
|
||||
return CreateLog(
|
||||
{{"action", RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE}, {"info", std::string{instance_name}}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeOpenLockFailover(std::string_view instance_name) -> ptr<buffer> {
|
||||
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_FAILOVER}, {"info", std::string(instance_name)}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> {
|
||||
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN}, {"info", std::string(instance_name)}});
|
||||
auto CoordinatorStateMachine::SerializeOpenLock() -> ptr<buffer> {
|
||||
return CreateLog({{"action", RaftLogAction::OPEN_LOCK}, {"info", nullptr}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer> {
|
||||
@ -86,14 +73,6 @@ auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoord
|
||||
return CreateLog({{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
|
||||
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::SerializeOpenLockForceReset() -> ptr<buffer> {
|
||||
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA}, {"info", std::string{}}});
|
||||
}
|
||||
|
||||
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
|
||||
buffer_serializer bs(data);
|
||||
auto const json = nlohmann::json::parse(bs.get_str());
|
||||
@ -101,19 +80,8 @@ auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, Raf
|
||||
auto const &info = json["info"];
|
||||
|
||||
switch (action) {
|
||||
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
||||
return {info.get<CoordinatorToReplicaConfig>(), action};
|
||||
}
|
||||
case RaftLogAction::OPEN_LOCK_FORCE_RESET:
|
||||
[[fallthrough]];
|
||||
case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE:
|
||||
[[fallthrough]];
|
||||
case RaftLogAction::OPEN_LOCK_FAILOVER:
|
||||
[[fallthrough]];
|
||||
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN:
|
||||
[[fallthrough]];
|
||||
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: {
|
||||
return {info.get<std::string>(), action};
|
||||
case RaftLogAction::OPEN_LOCK: {
|
||||
return {std::monostate{}, action};
|
||||
}
|
||||
case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
|
||||
return {info.get<CoordinatorToReplicaConfig>(), action};
|
||||
|
@ -70,13 +70,8 @@ class RaftState {
|
||||
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
|
||||
auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool;
|
||||
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||
auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool;
|
||||
auto AppendOpenLockUnregister(std::string_view) -> bool;
|
||||
auto AppendOpenLockFailover(std::string_view instance_name) -> bool;
|
||||
auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool;
|
||||
auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool;
|
||||
auto AppendOpenLock() -> bool;
|
||||
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
|
||||
auto AppendOpenLockForceReset() -> bool;
|
||||
|
||||
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
||||
// TODO: (andi) Do we need then GetAllCoordinators?
|
||||
|
@ -27,7 +27,7 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
|
||||
RAFT_LOG_ERROR,
|
||||
SUCCESS,
|
||||
LOCK_OPENED,
|
||||
OPEN_LOCK
|
||||
FAILED_TO_OPEN_LOCK
|
||||
};
|
||||
|
||||
enum class UnregisterInstanceCoordinatorStatus : uint8_t {
|
||||
@ -39,7 +39,7 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t {
|
||||
RAFT_LOG_ERROR,
|
||||
SUCCESS,
|
||||
LOCK_OPENED,
|
||||
OPEN_LOCK
|
||||
FAILED_TO_OPEN_LOCK
|
||||
};
|
||||
|
||||
enum class SetInstanceToMainCoordinatorStatus : uint8_t {
|
||||
@ -52,7 +52,7 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t {
|
||||
SWAP_UUID_FAILED,
|
||||
SUCCESS,
|
||||
LOCK_OPENED,
|
||||
OPEN_LOCK,
|
||||
FAILED_TO_OPEN_LOCK,
|
||||
ENABLE_WRITING_FAILED
|
||||
};
|
||||
|
||||
|
@ -61,8 +61,8 @@ struct CoordinatorInstanceState {
|
||||
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state);
|
||||
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state);
|
||||
|
||||
using TRaftLog = std::variant<CoordinatorToReplicaConfig, std::string, utils::UUID, CoordinatorToCoordinatorConfig,
|
||||
InstanceUUIDUpdate>;
|
||||
using TRaftLog = std::variant<std::monostate, CoordinatorToReplicaConfig, std::string, utils::UUID,
|
||||
CoordinatorToCoordinatorConfig, InstanceUUIDUpdate>;
|
||||
|
||||
using nuraft::buffer;
|
||||
using nuraft::buffer_serializer;
|
||||
|
@ -43,10 +43,7 @@ class CoordinatorStateMachine : public state_machine {
|
||||
~CoordinatorStateMachine() override = default;
|
||||
|
||||
static auto CreateLog(nlohmann::json &&log) -> ptr<buffer>;
|
||||
static auto SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr<buffer>;
|
||||
static auto SerializeOpenLockUnregister(std::string_view instance_name) -> ptr<buffer>;
|
||||
static auto SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;
|
||||
static auto SerializeOpenLockFailover(std::string_view instance_name) -> ptr<buffer>;
|
||||
static auto SerializeOpenLock() -> ptr<buffer>;
|
||||
static auto SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer>;
|
||||
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
|
||||
static auto SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
|
||||
@ -54,8 +51,6 @@ class CoordinatorStateMachine : public state_machine {
|
||||
static auto SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer>;
|
||||
static auto SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
|
||||
static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr<buffer>;
|
||||
static auto SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>;
|
||||
static auto SerializeOpenLockForceReset() -> ptr<buffer>;
|
||||
|
||||
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;
|
||||
|
||||
|
@ -23,11 +23,7 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
enum class RaftLogAction : uint8_t {
|
||||
OPEN_LOCK_REGISTER_REPLICATION_INSTANCE,
|
||||
OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE,
|
||||
OPEN_LOCK_FAILOVER,
|
||||
OPEN_LOCK_SET_INSTANCE_AS_MAIN,
|
||||
OPEN_LOCK_SET_INSTANCE_AS_REPLICA,
|
||||
OPEN_LOCK,
|
||||
REGISTER_REPLICATION_INSTANCE,
|
||||
UNREGISTER_REPLICATION_INSTANCE,
|
||||
SET_INSTANCE_AS_MAIN,
|
||||
@ -35,24 +31,16 @@ enum class RaftLogAction : uint8_t {
|
||||
UPDATE_UUID_OF_NEW_MAIN,
|
||||
ADD_COORDINATOR_INSTANCE,
|
||||
UPDATE_UUID_FOR_INSTANCE,
|
||||
OPEN_LOCK_FORCE_RESET,
|
||||
};
|
||||
|
||||
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction,
|
||||
{{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
|
||||
{RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"},
|
||||
{RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"},
|
||||
{RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"},
|
||||
{RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"},
|
||||
{RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"},
|
||||
{RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"},
|
||||
{RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE, "open_lock_register_instance"},
|
||||
{RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE,
|
||||
"open_lock_unregister_instance"},
|
||||
{RaftLogAction::OPEN_LOCK_FAILOVER, "open_lock_failover"},
|
||||
{RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN, "open_lock_set_instance_as_main"},
|
||||
{RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA, "open_lock_set_instance_as_replica"},
|
||||
{RaftLogAction::OPEN_LOCK_FORCE_RESET, "force_reset"}})
|
||||
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
|
||||
{RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"},
|
||||
{RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"},
|
||||
{RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"},
|
||||
{RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"},
|
||||
{RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"},
|
||||
{RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"},
|
||||
{RaftLogAction::OPEN_LOCK, "open_lock"}})
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -160,72 +160,17 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
|
||||
|
||||
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
|
||||
|
||||
auto RaftState::AppendOpenLockRegister(CoordinatorToReplicaConfig const &config) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLockRegister(config);
|
||||
auto RaftState::AppendOpenLock() -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLock();
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error("Failed to accept request to open lock to register instance {}", config.instance_name);
|
||||
spdlog::error("Failed to accept request to open lock");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to open lock for registering instance {} with error code {}", config.instance_name,
|
||||
int(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendOpenLockUnregister(std::string_view instance_name) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLockUnregister(instance_name);
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error("Failed to accept request to open lock to unregister instance {}.", instance_name);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to open lock for unregistering instance {} with error code {}", instance_name,
|
||||
int(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendOpenLockFailover(std::string_view instance_name) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLockFailover(instance_name);
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error("Failed to accept request to open lock for failover {}", instance_name);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to open lock for failover to instance {} with error code {}", instance_name,
|
||||
int(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(instance_name);
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error("Failed to accept request to open lock and set instance {} to MAIN", instance_name);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to open lock to set instance {} to MAIN with error code {}", instance_name,
|
||||
int(res->get_result_code()));
|
||||
spdlog::error("Failed to open lock with error code {}", int(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -317,26 +262,6 @@ auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) ->
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(instance_name);
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error(
|
||||
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not "
|
||||
"the leader.",
|
||||
instance_name);
|
||||
return false;
|
||||
}
|
||||
spdlog::info("Request for demoting instance {} accepted", instance_name);
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to promote instance {} with error code {}", instance_name, int(res->get_result_code()));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid);
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
@ -378,26 +303,6 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendOpenLockForceReset() -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeOpenLockForceReset();
|
||||
auto const res = raft_server_->append_entries({new_log});
|
||||
if (!res->get_accepted()) {
|
||||
spdlog::error(
|
||||
"Failed to accept request for force reset. Most likely the reason is that the instance is "
|
||||
"not the leader.");
|
||||
return false;
|
||||
}
|
||||
|
||||
spdlog::info("Request for force reset accepted");
|
||||
|
||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||
spdlog::error("Failed to add log for force reset");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto RaftState::AppendUpdateUUIDForInstanceLog(std::string_view instance_name, const utils::UUID &uuid) -> bool {
|
||||
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForInstance(
|
||||
{.instance_name = std::string{instance_name}, .uuid = uuid});
|
||||
|
@ -409,9 +409,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
"Couldn't unregister replica instance because current main instance couldn't unregister replica!");
|
||||
case LOCK_OPENED:
|
||||
throw QueryRuntimeException("Couldn't unregister replica because the last action didn't finish successfully!");
|
||||
case OPEN_LOCK:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't register instance as cluster didn't accept entering unregistration state!");
|
||||
case FAILED_TO_OPEN_LOCK:
|
||||
throw QueryRuntimeException("Couldn't register instance as cluster didn't accept start of action!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
@ -477,9 +476,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
case LOCK_OPENED:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't register replica instance because because the last action didn't finish successfully!");
|
||||
case OPEN_LOCK:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't register replica instance because cluster didn't accept registration query!");
|
||||
case FAILED_TO_OPEN_LOCK:
|
||||
throw QueryRuntimeException("Couldn't register instance as cluster didn't accept start of action!");
|
||||
case SUCCESS:
|
||||
break;
|
||||
}
|
||||
@ -525,9 +523,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
||||
"Couldn't set replica instance to main! Check coordinator and replica for more logs");
|
||||
case SWAP_UUID_FAILED:
|
||||
throw QueryRuntimeException("Couldn't set replica instance to main. Replicas didn't swap uuid of new main.");
|
||||
case OPEN_LOCK:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't set replica instance to main as cluster didn't accept setting instance state.");
|
||||
case FAILED_TO_OPEN_LOCK:
|
||||
throw QueryRuntimeException("Couldn't register instance as cluster didn't accept start of action!");
|
||||
case LOCK_OPENED:
|
||||
throw QueryRuntimeException(
|
||||
"Couldn't register replica instance because because the last action didn't finish successfully!");
|
||||
|
Loading…
Reference in New Issue
Block a user