Add distributed locks in HA (#1819)
- Add distributed locks - Fix the wrong MAIN state on the follower coordinator - Fix wrong main doing failover
This commit is contained in:
parent
89e13109d7
commit
13e3a1d0f7
@ -19,36 +19,47 @@
|
|||||||
namespace memgraph::coordination {
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state) {
|
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state) {
|
||||||
j = nlohmann::json{{"config", instance_state.config}, {"status", instance_state.status}};
|
j = nlohmann::json{
|
||||||
|
{"config", instance_state.config}, {"status", instance_state.status}, {"uuid", instance_state.instance_uuid}};
|
||||||
}
|
}
|
||||||
|
|
||||||
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state) {
|
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state) {
|
||||||
j.at("config").get_to(instance_state.config);
|
j.at("config").get_to(instance_state.config);
|
||||||
j.at("status").get_to(instance_state.status);
|
j.at("status").get_to(instance_state.status);
|
||||||
|
j.at("uuid").get_to(instance_state.instance_uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances)
|
CoordinatorClusterState::CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||||
: repl_instances_{std::move(instances)} {}
|
utils::UUID const ¤t_main_uuid, bool is_lock_opened)
|
||||||
|
: repl_instances_{std::move(instances)}, current_main_uuid_(current_main_uuid), is_lock_opened_(is_lock_opened) {}
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
|
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other)
|
||||||
: repl_instances_{other.repl_instances_} {}
|
: repl_instances_{other.repl_instances_},
|
||||||
|
current_main_uuid_(other.current_main_uuid_),
|
||||||
|
is_lock_opened_(other.is_lock_opened_) {}
|
||||||
|
|
||||||
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) {
|
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) {
|
||||||
if (this == &other) {
|
if (this == &other) {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
repl_instances_ = other.repl_instances_;
|
repl_instances_ = other.repl_instances_;
|
||||||
|
current_main_uuid_ = other.current_main_uuid_;
|
||||||
|
is_lock_opened_ = other.is_lock_opened_;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept
|
CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept
|
||||||
: repl_instances_{std::move(other.repl_instances_)} {}
|
: repl_instances_{std::move(other.repl_instances_)},
|
||||||
|
current_main_uuid_(other.current_main_uuid_),
|
||||||
|
is_lock_opened_(other.is_lock_opened_) {}
|
||||||
|
|
||||||
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept {
|
CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept {
|
||||||
if (this == &other) {
|
if (this == &other) {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
repl_instances_ = std::move(other.repl_instances_);
|
repl_instances_ = std::move(other.repl_instances_);
|
||||||
|
current_main_uuid_ = other.current_main_uuid_;
|
||||||
|
is_lock_opened_ = other.is_lock_opened_;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,68 +69,127 @@ auto CoordinatorClusterState::MainExists() const -> bool {
|
|||||||
[](auto const &entry) { return entry.second.status == ReplicationRole::MAIN; });
|
[](auto const &entry) { return entry.second.status == ReplicationRole::MAIN; });
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::IsMain(std::string_view instance_name) const -> bool {
|
auto CoordinatorClusterState::HasMainState(std::string_view instance_name) const -> bool {
|
||||||
auto lock = std::shared_lock{log_lock_};
|
auto lock = std::shared_lock{log_lock_};
|
||||||
auto const it = repl_instances_.find(instance_name);
|
auto const it = repl_instances_.find(instance_name);
|
||||||
return it != repl_instances_.end() && it->second.status == ReplicationRole::MAIN;
|
return it != repl_instances_.end() && it->second.status == ReplicationRole::MAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::IsReplica(std::string_view instance_name) const -> bool {
|
auto CoordinatorClusterState::HasReplicaState(std::string_view instance_name) const -> bool {
|
||||||
auto lock = std::shared_lock{log_lock_};
|
auto lock = std::shared_lock{log_lock_};
|
||||||
auto const it = repl_instances_.find(instance_name);
|
auto const it = repl_instances_.find(instance_name);
|
||||||
return it != repl_instances_.end() && it->second.status == ReplicationRole::REPLICA;
|
return it != repl_instances_.end() && it->second.status == ReplicationRole::REPLICA;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::InsertInstance(std::string instance_name, ReplicationInstanceState instance_state)
|
auto CoordinatorClusterState::IsCurrentMain(std::string_view instance_name) const -> bool {
|
||||||
-> void {
|
auto lock = std::shared_lock{log_lock_};
|
||||||
auto lock = std::lock_guard{log_lock_};
|
auto const it = repl_instances_.find(instance_name);
|
||||||
repl_instances_.insert_or_assign(std::move(instance_name), std::move(instance_state));
|
return it != repl_instances_.end() && it->second.status == ReplicationRole::MAIN &&
|
||||||
|
it->second.instance_uuid == current_main_uuid_;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
|
auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void {
|
||||||
auto lock = std::lock_guard{log_lock_};
|
auto lock = std::lock_guard{log_lock_};
|
||||||
switch (log_action) {
|
switch (log_action) {
|
||||||
|
// end of OPEN_LOCK_REGISTER_REPLICATION_INSTANCE
|
||||||
case RaftLogAction::REGISTER_REPLICATION_INSTANCE: {
|
case RaftLogAction::REGISTER_REPLICATION_INSTANCE: {
|
||||||
auto const &config = std::get<CoordinatorToReplicaConfig>(log_entry);
|
auto const &config = std::get<CoordinatorToReplicaConfig>(log_entry);
|
||||||
repl_instances_[config.instance_name] = ReplicationInstanceState{config, ReplicationRole::REPLICA};
|
spdlog::trace("DoAction: register replication instance {}", config.instance_name);
|
||||||
|
// Setting instance uuid to random, if registration fails, we are still in random state
|
||||||
|
repl_instances_.emplace(config.instance_name,
|
||||||
|
ReplicationInstanceState{config, ReplicationRole::REPLICA, utils::UUID{}});
|
||||||
|
is_lock_opened_ = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// end of OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE
|
||||||
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: {
|
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: {
|
||||||
auto const instance_name = std::get<std::string>(log_entry);
|
auto const instance_name = std::get<std::string>(log_entry);
|
||||||
|
spdlog::trace("DoAction: unregister replication instance {}", instance_name);
|
||||||
repl_instances_.erase(instance_name);
|
repl_instances_.erase(instance_name);
|
||||||
|
is_lock_opened_ = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// end of OPEN_LOCK_SET_INSTANCE_AS_MAIN and OPEN_LOCK_FAILOVER
|
||||||
case RaftLogAction::SET_INSTANCE_AS_MAIN: {
|
case RaftLogAction::SET_INSTANCE_AS_MAIN: {
|
||||||
auto const instance_name = std::get<std::string>(log_entry);
|
auto const instance_uuid_change = std::get<InstanceUUIDUpdate>(log_entry);
|
||||||
auto it = repl_instances_.find(instance_name);
|
auto it = repl_instances_.find(instance_uuid_change.instance_name);
|
||||||
MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!");
|
MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!");
|
||||||
it->second.status = ReplicationRole::MAIN;
|
it->second.status = ReplicationRole::MAIN;
|
||||||
|
it->second.instance_uuid = instance_uuid_change.uuid;
|
||||||
|
is_lock_opened_ = false;
|
||||||
|
spdlog::trace("DoAction: set replication instance {} as main with uuid {}", instance_uuid_change.instance_name,
|
||||||
|
std::string{instance_uuid_change.uuid});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// end of OPEN_LOCK_SET_INSTANCE_AS_REPLICA
|
||||||
case RaftLogAction::SET_INSTANCE_AS_REPLICA: {
|
case RaftLogAction::SET_INSTANCE_AS_REPLICA: {
|
||||||
auto const instance_name = std::get<std::string>(log_entry);
|
auto const instance_name = std::get<std::string>(log_entry);
|
||||||
auto it = repl_instances_.find(instance_name);
|
auto it = repl_instances_.find(instance_name);
|
||||||
MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!");
|
MG_ASSERT(it != repl_instances_.end(), "Instance does not exist as part of raft state!");
|
||||||
it->second.status = ReplicationRole::REPLICA;
|
it->second.status = ReplicationRole::REPLICA;
|
||||||
|
is_lock_opened_ = false;
|
||||||
|
spdlog::trace("DoAction: set replication instance {} as replica", instance_name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case RaftLogAction::UPDATE_UUID: {
|
case RaftLogAction::UPDATE_UUID_OF_NEW_MAIN: {
|
||||||
uuid_ = std::get<utils::UUID>(log_entry);
|
current_main_uuid_ = std::get<utils::UUID>(log_entry);
|
||||||
|
spdlog::trace("DoAction: update uuid of new main {}", std::string{current_main_uuid_});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case RaftLogAction::UPDATE_UUID_FOR_INSTANCE: {
|
||||||
|
auto const instance_uuid_change = std::get<InstanceUUIDUpdate>(log_entry);
|
||||||
|
auto it = repl_instances_.find(instance_uuid_change.instance_name);
|
||||||
|
MG_ASSERT(it != repl_instances_.end(), "Instance doesn't exist as part of RAFT state");
|
||||||
|
it->second.instance_uuid = instance_uuid_change.uuid;
|
||||||
|
spdlog::trace("DoAction: update uuid for instance {} to {}", instance_uuid_change.instance_name,
|
||||||
|
std::string{instance_uuid_change.uuid});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case RaftLogAction::ADD_COORDINATOR_INSTANCE: {
|
case RaftLogAction::ADD_COORDINATOR_INSTANCE: {
|
||||||
auto const &config = std::get<CoordinatorToCoordinatorConfig>(log_entry);
|
auto const &config = std::get<CoordinatorToCoordinatorConfig>(log_entry);
|
||||||
coordinators_.emplace_back(CoordinatorInstanceState{config});
|
coordinators_.emplace_back(CoordinatorInstanceState{config});
|
||||||
|
spdlog::trace("DoAction: add coordinator instance {}", config.coordinator_server_id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
||||||
|
is_lock_opened_ = true;
|
||||||
|
spdlog::trace("DoAction: open lock register");
|
||||||
|
break;
|
||||||
|
// TODO(antoniofilipovic) save what we are doing to be able to undo....
|
||||||
|
}
|
||||||
|
case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE: {
|
||||||
|
is_lock_opened_ = true;
|
||||||
|
spdlog::trace("DoAction: open lock unregister");
|
||||||
|
break;
|
||||||
|
// TODO(antoniofilipovic) save what we are doing
|
||||||
|
}
|
||||||
|
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN: {
|
||||||
|
is_lock_opened_ = true;
|
||||||
|
spdlog::trace("DoAction: open lock set instance as main");
|
||||||
|
break;
|
||||||
|
// TODO(antoniofilipovic) save what we are doing
|
||||||
|
}
|
||||||
|
case RaftLogAction::OPEN_LOCK_FAILOVER: {
|
||||||
|
is_lock_opened_ = true;
|
||||||
|
spdlog::trace("DoAction: open lock failover");
|
||||||
|
break;
|
||||||
|
// TODO(antoniofilipovic) save what we are doing
|
||||||
|
}
|
||||||
|
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: {
|
||||||
|
is_lock_opened_ = true;
|
||||||
|
spdlog::trace("DoAction: open lock set instance as replica");
|
||||||
|
break;
|
||||||
|
// TODO(antoniofilipovic) save what we need to undo
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
||||||
auto lock = std::shared_lock{log_lock_};
|
auto lock = std::shared_lock{log_lock_};
|
||||||
|
nlohmann::json j = {{"repl_instances", repl_instances_},
|
||||||
auto const log = nlohmann::json(repl_instances_).dump();
|
{"is_lock_opened", is_lock_opened_},
|
||||||
|
{"current_main_uuid", current_main_uuid_}};
|
||||||
|
auto const log = j.dump();
|
||||||
data = buffer::alloc(sizeof(uint32_t) + log.size());
|
data = buffer::alloc(sizeof(uint32_t) + log.size());
|
||||||
buffer_serializer bs(data);
|
buffer_serializer bs(data);
|
||||||
bs.put_str(log);
|
bs.put_str(log);
|
||||||
@ -128,9 +198,10 @@ auto CoordinatorClusterState::Serialize(ptr<buffer> &data) -> void {
|
|||||||
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
|
auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState {
|
||||||
buffer_serializer bs(data);
|
buffer_serializer bs(data);
|
||||||
auto const j = nlohmann::json::parse(bs.get_str());
|
auto const j = nlohmann::json::parse(bs.get_str());
|
||||||
auto instances = j.get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
|
auto instances = j["repl_instances"].get<std::map<std::string, ReplicationInstanceState, std::less<>>>();
|
||||||
|
auto current_main_uuid = j["current_main_uuid"].get<utils::UUID>();
|
||||||
return CoordinatorClusterState{std::move(instances)};
|
bool is_lock_opened = j["is_lock_opened"].get<int>();
|
||||||
|
return CoordinatorClusterState{std::move(instances), current_main_uuid, is_lock_opened};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {
|
auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {
|
||||||
@ -138,12 +209,24 @@ auto CoordinatorClusterState::GetReplicationInstances() const -> std::vector<Rep
|
|||||||
return repl_instances_ | ranges::views::values | ranges::to<std::vector<ReplicationInstanceState>>;
|
return repl_instances_ | ranges::views::values | ranges::to<std::vector<ReplicationInstanceState>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto CoordinatorClusterState::GetCurrentMainUUID() const -> utils::UUID { return current_main_uuid_; }
|
||||||
|
|
||||||
|
auto CoordinatorClusterState::GetInstanceUUID(std::string_view instance_name) const -> utils::UUID {
|
||||||
|
auto lock = std::shared_lock{log_lock_};
|
||||||
|
auto const it = repl_instances_.find(instance_name);
|
||||||
|
MG_ASSERT(it != repl_instances_.end(), "Instance with that name doesn't exist.");
|
||||||
|
return it->second.instance_uuid;
|
||||||
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState> {
|
auto CoordinatorClusterState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState> {
|
||||||
auto lock = std::shared_lock{log_lock_};
|
auto lock = std::shared_lock{log_lock_};
|
||||||
return coordinators_;
|
return coordinators_;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorClusterState::GetUUID() const -> utils::UUID { return uuid_; }
|
auto CoordinatorClusterState::IsLockOpened() const -> bool {
|
||||||
|
auto lock = std::shared_lock{log_lock_};
|
||||||
|
return is_lock_opened_;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
#endif
|
#endif
|
||||||
|
@ -60,5 +60,14 @@ void from_json(nlohmann::json const &j, CoordinatorToReplicaConfig &config) {
|
|||||||
config.replication_client_info = j.at("replication_client_info").get<ReplicationClientInfo>();
|
config.replication_client_info = j.at("replication_client_info").get<ReplicationClientInfo>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void from_json(nlohmann::json const &j, InstanceUUIDUpdate &instance_uuid_change) {
|
||||||
|
instance_uuid_change.uuid = j.at("uuid").get<utils::UUID>();
|
||||||
|
instance_uuid_change.instance_name = j.at("instance_name").get<std::string>();
|
||||||
|
}
|
||||||
|
|
||||||
|
void to_json(nlohmann::json &j, InstanceUUIDUpdate const &instance_uuid_change) {
|
||||||
|
j = nlohmann::json{{"instance_name", instance_uuid_change.instance_name}, {"uuid", instance_uuid_change.uuid}};
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
#endif
|
#endif
|
||||||
|
@ -30,7 +30,8 @@ using nuraft::ptr;
|
|||||||
using nuraft::srv_config;
|
using nuraft::srv_config;
|
||||||
|
|
||||||
CoordinatorInstance::CoordinatorInstance()
|
CoordinatorInstance::CoordinatorInstance()
|
||||||
: raft_state_(RaftState::MakeRaftState(
|
: thread_pool_{1},
|
||||||
|
raft_state_(RaftState::MakeRaftState(
|
||||||
[this]() {
|
[this]() {
|
||||||
spdlog::info("Leader changed, starting all replication instances!");
|
spdlog::info("Leader changed, starting all replication instances!");
|
||||||
auto const instances = raft_state_.GetReplicationInstances();
|
auto const instances = raft_state_.GetReplicationInstances();
|
||||||
@ -55,23 +56,34 @@ CoordinatorInstance::CoordinatorInstance()
|
|||||||
&CoordinatorInstance::MainFailCallback);
|
&CoordinatorInstance::MainFailCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
std::ranges::for_each(repl_instances_, [this](auto &instance) {
|
std::ranges::for_each(repl_instances_, [](auto &instance) { instance.StartFrequentCheck(); });
|
||||||
instance.SetNewMainUUID(raft_state_.GetUUID());
|
|
||||||
instance.StartFrequentCheck();
|
|
||||||
});
|
|
||||||
},
|
},
|
||||||
[this]() {
|
[this]() {
|
||||||
spdlog::info("Leader changed, stopping all replication instances!");
|
thread_pool_.AddTask([this]() {
|
||||||
|
spdlog::info("Leader changed, trying to stop all replication instances frequent checks!");
|
||||||
|
// We need to stop checks before taking a lock because deadlock can happen if instances waits
|
||||||
|
// to take a lock in frequent check, and this thread already has a lock and waits for instance to
|
||||||
|
// be done with frequent check
|
||||||
|
for (auto &repl_instance : repl_instances_) {
|
||||||
|
repl_instance.StopFrequentCheck();
|
||||||
|
}
|
||||||
|
auto lock = std::unique_lock{coord_instance_lock_};
|
||||||
repl_instances_.clear();
|
repl_instances_.clear();
|
||||||
|
spdlog::info("Stopped all replication instance frequent checks.");
|
||||||
|
});
|
||||||
})) {
|
})) {
|
||||||
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||||
|
// when coordinator is becoming follower it will want to stop all threads doing frequent checks
|
||||||
|
// Thread can get stuck here waiting for lock so we need to frequently check if we are in shutdown state
|
||||||
|
|
||||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||||
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
|
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
|
||||||
};
|
};
|
||||||
|
|
||||||
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||||
|
|
||||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||||
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
|
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
|
||||||
};
|
};
|
||||||
@ -100,7 +112,7 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
|||||||
if (raft_state_.IsLeader()) {
|
if (raft_state_.IsLeader()) {
|
||||||
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
|
auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string {
|
||||||
if (!instance.IsAlive()) return "unknown";
|
if (!instance.IsAlive()) return "unknown";
|
||||||
if (raft_state_.IsMain(instance.InstanceName())) return "main";
|
if (raft_state_.IsCurrentMain(instance.InstanceName())) return "main";
|
||||||
return "replica";
|
return "replica";
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -121,26 +133,36 @@ auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
|||||||
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader);
|
std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
auto const stringify_inst_status = [](ReplicationRole status) -> std::string {
|
auto const stringify_inst_status = [raft_state_ptr = &raft_state_](
|
||||||
return status == ReplicationRole::MAIN ? "main" : "replica";
|
utils::UUID const &main_uuid,
|
||||||
|
ReplicationInstanceState const &instance) -> std::string {
|
||||||
|
if (raft_state_ptr->IsCurrentMain(instance.config.instance_name)) {
|
||||||
|
return "main";
|
||||||
|
}
|
||||||
|
if (raft_state_ptr->HasMainState(instance.config.instance_name)) {
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
return "replica";
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: (andi) Add capability that followers can also return socket addresses
|
// TODO: (andi) Add capability that followers can also return socket addresses
|
||||||
auto process_repl_instance_as_follower = [&stringify_inst_status](auto const &instance) -> InstanceStatus {
|
auto process_repl_instance_as_follower =
|
||||||
|
[this, &stringify_inst_status](ReplicationInstanceState const &instance) -> InstanceStatus {
|
||||||
return {.instance_name = instance.config.instance_name,
|
return {.instance_name = instance.config.instance_name,
|
||||||
.cluster_role = stringify_inst_status(instance.status),
|
.cluster_role = stringify_inst_status(raft_state_.GetCurrentMainUUID(), instance),
|
||||||
.health = "unknown"};
|
.health = "unknown"};
|
||||||
};
|
};
|
||||||
|
|
||||||
std::ranges::transform(raft_state_.GetReplicationInstances(), std::back_inserter(instances_status),
|
std::ranges::transform(raft_state_.GetReplicationInstances(), std::back_inserter(instances_status),
|
||||||
process_repl_instance_as_follower);
|
process_repl_instance_as_follower);
|
||||||
}
|
}
|
||||||
|
|
||||||
return instances_status;
|
return instances_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::TryFailover() -> void {
|
auto CoordinatorInstance::TryFailover() -> void {
|
||||||
auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); };
|
auto const is_replica = [this](ReplicationInstance const &instance) {
|
||||||
|
return HasReplicaState(instance.InstanceName());
|
||||||
|
};
|
||||||
|
|
||||||
auto alive_replicas =
|
auto alive_replicas =
|
||||||
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
|
repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive);
|
||||||
@ -150,11 +172,6 @@ auto CoordinatorInstance::TryFailover() -> void {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!raft_state_.RequestLeadership()) {
|
|
||||||
spdlog::error("Failover failed since the instance is not the leader!");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
|
auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); };
|
||||||
|
|
||||||
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to<std::vector>();
|
||||||
@ -182,6 +199,10 @@ auto CoordinatorInstance::TryFailover() -> void {
|
|||||||
|
|
||||||
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
|
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
|
||||||
|
|
||||||
|
if (!raft_state_.AppendOpenLockFailover(most_up_to_date_instance)) {
|
||||||
|
spdlog::error("Aborting failover as instance is not anymore leader.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
new_main->PauseFrequentCheck();
|
new_main->PauseFrequentCheck();
|
||||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||||
|
|
||||||
@ -191,16 +212,18 @@ auto CoordinatorInstance::TryFailover() -> void {
|
|||||||
|
|
||||||
auto const new_main_uuid = utils::UUID{};
|
auto const new_main_uuid = utils::UUID{};
|
||||||
|
|
||||||
auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) {
|
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
|
||||||
return !instance.SendSwapAndUpdateUUID(new_main_uuid);
|
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
|
||||||
|
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
|
||||||
};
|
};
|
||||||
|
|
||||||
// If for some replicas swap fails, for others on successful ping we will revert back on next change
|
// If for some replicas swap fails, for others on successful ping we will revert back on next change
|
||||||
// or we will do failover first again and then it will be consistent again
|
// or we will do failover first again and then it will be consistent again
|
||||||
if (std::ranges::any_of(alive_replicas | ranges::views::filter(is_not_new_main), failed_to_swap)) {
|
if (std::ranges::any_of(alive_replicas | ranges::views::filter(is_not_new_main), failed_to_swap)) {
|
||||||
spdlog::error("Failed to swap uuid for all instances");
|
spdlog::error("Aborting failover. Failed to swap uuid for all alive instances.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) |
|
||||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||||
ranges::to<ReplicationClientsInfo>();
|
ranges::to<ReplicationClientsInfo>();
|
||||||
@ -211,27 +234,36 @@ auto CoordinatorInstance::TryFailover() -> void {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
|
if (!raft_state_.AppendUpdateUUIDForNewMainLog(new_main_uuid)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto const new_main_instance_name = new_main->InstanceName();
|
auto const new_main_instance_name = new_main->InstanceName();
|
||||||
|
|
||||||
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) {
|
if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name, new_main_uuid)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!new_main->EnableWritingOnMain()) {
|
||||||
|
spdlog::error("Failover successful but couldn't enable writing on instance.");
|
||||||
|
}
|
||||||
|
|
||||||
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
|
spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
|
auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name)
|
||||||
-> SetInstanceToMainCoordinatorStatus {
|
-> SetInstanceToMainCoordinatorStatus {
|
||||||
auto lock = std::lock_guard{coord_instance_lock_};
|
auto lock = std::lock_guard{coord_instance_lock_};
|
||||||
|
if (raft_state_.IsLockOpened()) {
|
||||||
|
return SetInstanceToMainCoordinatorStatus::LOCK_OPENED;
|
||||||
|
}
|
||||||
|
|
||||||
if (raft_state_.MainExists()) {
|
if (raft_state_.MainExists()) {
|
||||||
return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS;
|
return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(antoniofilipovic) Check if request leadership can cause problems due to changing of leadership while other
|
||||||
|
// doing failover
|
||||||
if (!raft_state_.RequestLeadership()) {
|
if (!raft_state_.RequestLeadership()) {
|
||||||
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
|
return SetInstanceToMainCoordinatorStatus::NOT_LEADER;
|
||||||
}
|
}
|
||||||
@ -248,6 +280,10 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
|||||||
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
return SetInstanceToMainCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!raft_state_.AppendOpenLockSetInstanceToMain(instance_name)) {
|
||||||
|
return SetInstanceToMainCoordinatorStatus::OPEN_LOCK;
|
||||||
|
}
|
||||||
|
|
||||||
new_main->PauseFrequentCheck();
|
new_main->PauseFrequentCheck();
|
||||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||||
|
|
||||||
@ -257,12 +293,13 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
|||||||
|
|
||||||
auto const new_main_uuid = utils::UUID{};
|
auto const new_main_uuid = utils::UUID{};
|
||||||
|
|
||||||
auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) {
|
auto const failed_to_swap = [this, &new_main_uuid](ReplicationInstance &instance) {
|
||||||
return !instance.SendSwapAndUpdateUUID(new_main_uuid);
|
return !instance.SendSwapAndUpdateUUID(new_main_uuid) ||
|
||||||
|
!raft_state_.AppendUpdateUUIDForInstanceLog(instance.InstanceName(), new_main_uuid);
|
||||||
};
|
};
|
||||||
|
|
||||||
if (std::ranges::any_of(repl_instances_ | ranges::views::filter(is_not_new_main), failed_to_swap)) {
|
if (std::ranges::any_of(repl_instances_ | ranges::views::filter(is_not_new_main), failed_to_swap)) {
|
||||||
spdlog::error("Failed to swap uuid for all instances");
|
spdlog::error("Failed to swap uuid for all currently alive instances.");
|
||||||
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
|
return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,22 +311,28 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance
|
|||||||
&CoordinatorInstance::MainFailCallback)) {
|
&CoordinatorInstance::MainFailCallback)) {
|
||||||
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
|
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
|
||||||
}
|
}
|
||||||
|
if (!raft_state_.AppendUpdateUUIDForNewMainLog(new_main_uuid)) {
|
||||||
if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) {
|
|
||||||
return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR;
|
return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!raft_state_.AppendSetInstanceAsMainLog(instance_name)) {
|
if (!raft_state_.AppendSetInstanceAsMainLog(instance_name, new_main_uuid)) {
|
||||||
return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR;
|
return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
spdlog::info("Instance {} promoted to main on leader", instance_name);
|
spdlog::info("Instance {} promoted to main on leader", instance_name);
|
||||||
|
|
||||||
|
if (!new_main->EnableWritingOnMain()) {
|
||||||
|
return SetInstanceToMainCoordinatorStatus::ENABLE_WRITING_FAILED;
|
||||||
|
}
|
||||||
return SetInstanceToMainCoordinatorStatus::SUCCESS;
|
return SetInstanceToMainCoordinatorStatus::SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig const &config)
|
||||||
-> RegisterInstanceCoordinatorStatus {
|
-> RegisterInstanceCoordinatorStatus {
|
||||||
auto lock = std::lock_guard{coord_instance_lock_};
|
auto lock = std::lock_guard{coord_instance_lock_};
|
||||||
|
if (raft_state_.IsLockOpened()) {
|
||||||
|
return RegisterInstanceCoordinatorStatus::LOCK_OPENED;
|
||||||
|
}
|
||||||
|
|
||||||
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
|
if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) {
|
||||||
return instance.InstanceName() == instance_name;
|
return instance.InstanceName() == instance_name;
|
||||||
@ -309,11 +352,14 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
|
|||||||
return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS;
|
return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(antoniofilipovic) Check if this is an issue
|
||||||
if (!raft_state_.RequestLeadership()) {
|
if (!raft_state_.RequestLeadership()) {
|
||||||
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
|
return RegisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto const undo_action_ = [this]() { repl_instances_.pop_back(); };
|
if (!raft_state_.AppendOpenLockRegister(config)) {
|
||||||
|
return RegisterInstanceCoordinatorStatus::OPEN_LOCK;
|
||||||
|
}
|
||||||
|
|
||||||
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
|
auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_,
|
||||||
&CoordinatorInstance::ReplicaSuccessCallback,
|
&CoordinatorInstance::ReplicaSuccessCallback,
|
||||||
@ -321,15 +367,12 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorToReplicaConfig
|
|||||||
|
|
||||||
if (!new_instance->SendDemoteToReplicaRpc()) {
|
if (!new_instance->SendDemoteToReplicaRpc()) {
|
||||||
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
|
spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name);
|
||||||
undo_action_();
|
|
||||||
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
|
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) {
|
if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) {
|
||||||
undo_action_();
|
|
||||||
return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR;
|
return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
new_instance->StartFrequentCheck();
|
new_instance->StartFrequentCheck();
|
||||||
|
|
||||||
spdlog::info("Instance {} registered", config.instance_name);
|
spdlog::info("Instance {} registered", config.instance_name);
|
||||||
@ -340,6 +383,11 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
|||||||
-> UnregisterInstanceCoordinatorStatus {
|
-> UnregisterInstanceCoordinatorStatus {
|
||||||
auto lock = std::lock_guard{coord_instance_lock_};
|
auto lock = std::lock_guard{coord_instance_lock_};
|
||||||
|
|
||||||
|
if (raft_state_.IsLockOpened()) {
|
||||||
|
return UnregisterInstanceCoordinatorStatus::LOCK_OPENED;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(antoniofilipovic) Check if this is an issue
|
||||||
if (!raft_state_.RequestLeadership()) {
|
if (!raft_state_.RequestLeadership()) {
|
||||||
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
|
return UnregisterInstanceCoordinatorStatus::NOT_LEADER;
|
||||||
}
|
}
|
||||||
@ -353,19 +401,23 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
|||||||
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto const is_main = [this](ReplicationInstance const &instance) {
|
auto const is_current_main = [this](ReplicationInstance const &instance) {
|
||||||
return IsMain(instance.InstanceName()) && instance.GetMainUUID() == raft_state_.GetUUID() && instance.IsAlive();
|
return raft_state_.IsCurrentMain(instance.InstanceName()) && instance.IsAlive();
|
||||||
};
|
};
|
||||||
|
|
||||||
if (is_main(*inst_to_remove)) {
|
if (is_current_main(*inst_to_remove)) {
|
||||||
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
|
return UnregisterInstanceCoordinatorStatus::IS_MAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!raft_state_.AppendOpenLockUnregister(instance_name)) {
|
||||||
|
return UnregisterInstanceCoordinatorStatus::OPEN_LOCK;
|
||||||
|
}
|
||||||
|
|
||||||
inst_to_remove->StopFrequentCheck();
|
inst_to_remove->StopFrequentCheck();
|
||||||
|
|
||||||
auto curr_main = std::ranges::find_if(repl_instances_, is_main);
|
auto curr_main = std::ranges::find_if(repl_instances_, is_current_main);
|
||||||
|
|
||||||
if (curr_main != repl_instances_.end() && curr_main->IsAlive()) {
|
if (curr_main != repl_instances_.end()) {
|
||||||
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
|
if (!curr_main->SendUnregisterReplicaRpc(instance_name)) {
|
||||||
inst_to_remove->StartFrequentCheck();
|
inst_to_remove->StartFrequentCheck();
|
||||||
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
|
return UnregisterInstanceCoordinatorStatus::RPC_FAILED;
|
||||||
@ -383,7 +435,7 @@ auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instanc
|
|||||||
|
|
||||||
auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoordinatorConfig const &config) -> void {
|
||||||
raft_state_.AddCoordinatorInstance(config);
|
raft_state_.AddCoordinatorInstance(config);
|
||||||
// NOTE: We ignore error we added coordinator instance to networkign stuff but not in raft log.
|
// NOTE: We ignore error we added coordinator instance to networking stuff but not in raft log.
|
||||||
if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) {
|
if (!raft_state_.AppendAddCoordinatorInstanceLog(config)) {
|
||||||
spdlog::error("Failed to append add coordinator instance log");
|
spdlog::error("Failed to append add coordinator instance log");
|
||||||
}
|
}
|
||||||
@ -391,13 +443,15 @@ auto CoordinatorInstance::AddCoordinatorInstance(coordination::CoordinatorToCoor
|
|||||||
|
|
||||||
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
|
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
|
||||||
spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
|
spdlog::trace("Instance {} performing main fail callback", repl_instance_name);
|
||||||
|
if (raft_state_.IsLockOpened()) {
|
||||||
|
spdlog::error("Returning from main fail callback as the last action didn't successfully finish");
|
||||||
|
}
|
||||||
|
|
||||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||||
repl_instance.OnFailPing();
|
repl_instance.OnFailPing();
|
||||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
|
||||||
MG_ASSERT(repl_instance_uuid.has_value(), "Replication instance must have uuid set");
|
|
||||||
|
|
||||||
// NOLINTNEXTLINE
|
// NOLINTNEXTLINE
|
||||||
if (!repl_instance.IsAlive() && raft_state_.GetUUID() == repl_instance_uuid.value()) {
|
if (!repl_instance.IsAlive() && raft_state_.IsCurrentMain(repl_instance_name)) {
|
||||||
spdlog::info("Cluster without main instance, trying automatic failover");
|
spdlog::info("Cluster without main instance, trying automatic failover");
|
||||||
TryFailover();
|
TryFailover();
|
||||||
}
|
}
|
||||||
@ -405,6 +459,12 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name)
|
|||||||
|
|
||||||
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
|
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
|
||||||
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
|
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
|
||||||
|
|
||||||
|
if (raft_state_.IsLockOpened()) {
|
||||||
|
spdlog::error("Stopping main successful callback as the last action didn't successfully finish");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||||
|
|
||||||
if (repl_instance.IsAlive()) {
|
if (repl_instance.IsAlive()) {
|
||||||
@ -412,11 +472,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
|
||||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
|
|
||||||
|
|
||||||
// NOLINTNEXTLINE
|
// NOLINTNEXTLINE
|
||||||
if (raft_state_.GetUUID() == repl_instance_uuid.value()) {
|
if (raft_state_.IsCurrentMain(repl_instance.InstanceName())) {
|
||||||
if (!repl_instance.EnableWritingOnMain()) {
|
if (!repl_instance.EnableWritingOnMain()) {
|
||||||
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
|
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
|
||||||
return;
|
return;
|
||||||
@ -426,9 +483,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!raft_state_.RequestLeadership()) {
|
if (!raft_state_.AppendOpenLockSetInstanceToReplica(repl_instance.InstanceName())) {
|
||||||
spdlog::error("Demoting main instance {} to replica failed since the instance is not the leader!",
|
spdlog::error("Failed to open lock for demoting OLD MAIN {} to REPLICA", repl_instance_name);
|
||||||
repl_instance_name);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -441,29 +497,38 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!repl_instance.SendSwapAndUpdateUUID(raft_state_.GetUUID())) {
|
if (!repl_instance.SendSwapAndUpdateUUID(raft_state_.GetCurrentMainUUID())) {
|
||||||
spdlog::error("Failed to swap uuid for demoted main instance {}", repl_instance_name);
|
spdlog::error("Failed to swap uuid for demoted main instance {}", repl_instance_name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!raft_state_.AppendUpdateUUIDForInstanceLog(repl_instance_name, raft_state_.GetCurrentMainUUID())) {
|
||||||
|
spdlog::error("Failed to update log of changing instance uuid {} to {}", repl_instance_name,
|
||||||
|
std::string{raft_state_.GetCurrentMainUUID()});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) {
|
if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) {
|
||||||
|
spdlog::error("Failed to append log that OLD MAIN was demoted to REPLICA {}", repl_instance_name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
|
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
|
||||||
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
||||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
|
||||||
|
|
||||||
if (!IsReplica(repl_instance_name)) {
|
if (raft_state_.IsLockOpened()) {
|
||||||
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
|
spdlog::error("Stopping main successful callback as the last action didn't successfully finish");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||||
|
|
||||||
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
|
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
|
||||||
// and that it didn't go down for less time than we could notice
|
// and that it didn't go down for less time than we could notice
|
||||||
// We need to get id of main replica is listening to
|
// We need to get id of main replica is listening to
|
||||||
// and swap if necessary
|
// and swap if necessary
|
||||||
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(raft_state_.GetUUID())) {
|
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(raft_state_.GetCurrentMainUUID())) {
|
||||||
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
|
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -473,13 +538,14 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
|
|||||||
|
|
||||||
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
|
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
|
||||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
||||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
|
||||||
|
|
||||||
if (!IsReplica(repl_instance_name)) {
|
if (raft_state_.IsLockOpened()) {
|
||||||
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
|
spdlog::error("Stopping main successful callback as the last action didn't successfully finish.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||||
|
|
||||||
repl_instance.OnFailPing();
|
repl_instance.OnFailPing();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -551,12 +617,12 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span<InstanceNameDbHis
|
|||||||
return std::move(*new_main_res);
|
return std::move(*new_main_res);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::IsMain(std::string_view instance_name) const -> bool {
|
auto CoordinatorInstance::HasMainState(std::string_view instance_name) const -> bool {
|
||||||
return raft_state_.IsMain(instance_name);
|
return raft_state_.HasMainState(instance_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::IsReplica(std::string_view instance_name) const -> bool {
|
auto CoordinatorInstance::HasReplicaState(std::string_view instance_name) const -> bool {
|
||||||
return raft_state_.IsReplica(instance_name);
|
return raft_state_.HasReplicaState(instance_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
|
auto CoordinatorInstance::GetRoutingTable(std::map<std::string, std::string> const &routing) -> RoutingTable {
|
||||||
|
@ -22,12 +22,12 @@ namespace memgraph::coordination {
|
|||||||
|
|
||||||
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
|
auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); }
|
||||||
|
|
||||||
auto CoordinatorStateMachine::IsMain(std::string_view instance_name) const -> bool {
|
auto CoordinatorStateMachine::HasMainState(std::string_view instance_name) const -> bool {
|
||||||
return cluster_state_.IsMain(instance_name);
|
return cluster_state_.HasMainState(instance_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::IsReplica(std::string_view instance_name) const -> bool {
|
auto CoordinatorStateMachine::HasReplicaState(std::string_view instance_name) const -> bool {
|
||||||
return cluster_state_.IsReplica(instance_name);
|
return cluster_state_.HasReplicaState(instance_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr<buffer> {
|
auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr<buffer> {
|
||||||
@ -38,6 +38,23 @@ auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr<buffer> {
|
|||||||
return log_buf;
|
return log_buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr<buffer> {
|
||||||
|
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE}, {"info", config}});
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::SerializeOpenLockUnregister(std::string_view instance_name) -> ptr<buffer> {
|
||||||
|
return CreateLog(
|
||||||
|
{{"action", RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE}, {"info", std::string{instance_name}}});
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::SerializeOpenLockFailover(std::string_view instance_name) -> ptr<buffer> {
|
||||||
|
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_FAILOVER}, {"info", std::string(instance_name)}});
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> {
|
||||||
|
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN}, {"info", std::string(instance_name)}});
|
||||||
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer> {
|
auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer> {
|
||||||
return CreateLog({{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}});
|
return CreateLog({{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}});
|
||||||
}
|
}
|
||||||
@ -46,16 +63,22 @@ auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view insta
|
|||||||
return CreateLog({{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", instance_name}});
|
return CreateLog({{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", instance_name}});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer> {
|
auto CoordinatorStateMachine::SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change)
|
||||||
return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_name}});
|
-> ptr<buffer> {
|
||||||
|
return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_uuid_change}});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
|
auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
|
||||||
return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
|
return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer> {
|
auto CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer> {
|
||||||
return CreateLog({{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}});
|
return CreateLog({{"action", RaftLogAction::UPDATE_UUID_OF_NEW_MAIN}, {"info", uuid}});
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change)
|
||||||
|
-> ptr<buffer> {
|
||||||
|
return CreateLog({{"action", RaftLogAction::UPDATE_UUID_FOR_INSTANCE}, {"info", instance_uuid_change}});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config)
|
auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config)
|
||||||
@ -63,20 +86,37 @@ auto CoordinatorStateMachine::SerializeAddCoordinatorInstance(CoordinatorToCoord
|
|||||||
return CreateLog({{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}});
|
return CreateLog({{"action", RaftLogAction::ADD_COORDINATOR_INSTANCE}, {"info", config}});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer> {
|
||||||
|
return CreateLog({{"action", RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA}, {"info", instance_name}});
|
||||||
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
|
auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction> {
|
||||||
buffer_serializer bs(data);
|
buffer_serializer bs(data);
|
||||||
auto const json = nlohmann::json::parse(bs.get_str());
|
auto const json = nlohmann::json::parse(bs.get_str());
|
||||||
|
|
||||||
auto const action = json["action"].get<RaftLogAction>();
|
auto const action = json["action"].get<RaftLogAction>();
|
||||||
auto const &info = json["info"];
|
auto const &info = json["info"];
|
||||||
|
|
||||||
switch (action) {
|
switch (action) {
|
||||||
|
case RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE: {
|
||||||
|
return {info.get<CoordinatorToReplicaConfig>(), action};
|
||||||
|
}
|
||||||
|
case RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE:
|
||||||
|
[[fallthrough]];
|
||||||
|
case RaftLogAction::OPEN_LOCK_FAILOVER:
|
||||||
|
[[fallthrough]];
|
||||||
|
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN:
|
||||||
|
[[fallthrough]];
|
||||||
|
case RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA: {
|
||||||
|
return {info.get<std::string>(), action};
|
||||||
|
}
|
||||||
case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
|
case RaftLogAction::REGISTER_REPLICATION_INSTANCE:
|
||||||
return {info.get<CoordinatorToReplicaConfig>(), action};
|
return {info.get<CoordinatorToReplicaConfig>(), action};
|
||||||
case RaftLogAction::UPDATE_UUID:
|
case RaftLogAction::UPDATE_UUID_OF_NEW_MAIN:
|
||||||
return {info.get<utils::UUID>(), action};
|
return {info.get<utils::UUID>(), action};
|
||||||
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
|
case RaftLogAction::UPDATE_UUID_FOR_INSTANCE:
|
||||||
case RaftLogAction::SET_INSTANCE_AS_MAIN:
|
case RaftLogAction::SET_INSTANCE_AS_MAIN:
|
||||||
|
return {info.get<InstanceUUIDUpdate>(), action};
|
||||||
|
case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE:
|
||||||
[[fallthrough]];
|
[[fallthrough]];
|
||||||
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
|
case RaftLogAction::SET_INSTANCE_AS_REPLICA:
|
||||||
return {info.get<std::string>(), action};
|
return {info.get<std::string>(), action};
|
||||||
@ -214,11 +254,20 @@ auto CoordinatorStateMachine::GetReplicationInstances() const -> std::vector<Rep
|
|||||||
return cluster_state_.GetReplicationInstances();
|
return cluster_state_.GetReplicationInstances();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::GetCurrentMainUUID() const -> utils::UUID { return cluster_state_.GetCurrentMainUUID(); }
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::IsCurrentMain(std::string_view instance_name) const -> bool {
|
||||||
|
return cluster_state_.IsCurrentMain(instance_name);
|
||||||
|
}
|
||||||
auto CoordinatorStateMachine::GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState> {
|
auto CoordinatorStateMachine::GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState> {
|
||||||
return cluster_state_.GetCoordinatorInstances();
|
return cluster_state_.GetCoordinatorInstances();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto CoordinatorStateMachine::GetUUID() const -> utils::UUID { return cluster_state_.GetUUID(); }
|
auto CoordinatorStateMachine::GetInstanceUUID(std::string_view instance_name) const -> utils::UUID {
|
||||||
|
return cluster_state_.GetInstanceUUID(instance_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CoordinatorStateMachine::IsLockOpened() const -> bool { return cluster_state_.IsLockOpened(); }
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
#endif
|
#endif
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
|
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include "json/json.hpp"
|
#include "json/json.hpp"
|
||||||
|
#include "utils/uuid.hpp"
|
||||||
|
|
||||||
namespace memgraph::coordination {
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
@ -88,6 +89,11 @@ struct ManagementServerConfig {
|
|||||||
friend bool operator==(ManagementServerConfig const &, ManagementServerConfig const &) = default;
|
friend bool operator==(ManagementServerConfig const &, ManagementServerConfig const &) = default;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct InstanceUUIDUpdate {
|
||||||
|
std::string instance_name;
|
||||||
|
memgraph::utils::UUID uuid;
|
||||||
|
};
|
||||||
|
|
||||||
void to_json(nlohmann::json &j, CoordinatorToReplicaConfig const &config);
|
void to_json(nlohmann::json &j, CoordinatorToReplicaConfig const &config);
|
||||||
void from_json(nlohmann::json const &j, CoordinatorToReplicaConfig &config);
|
void from_json(nlohmann::json const &j, CoordinatorToReplicaConfig &config);
|
||||||
|
|
||||||
@ -97,5 +103,8 @@ void from_json(nlohmann::json const &j, CoordinatorToCoordinatorConfig &config);
|
|||||||
void to_json(nlohmann::json &j, ReplicationClientInfo const &config);
|
void to_json(nlohmann::json &j, ReplicationClientInfo const &config);
|
||||||
void from_json(nlohmann::json const &j, ReplicationClientInfo &config);
|
void from_json(nlohmann::json const &j, ReplicationClientInfo &config);
|
||||||
|
|
||||||
|
void to_json(nlohmann::json &j, InstanceUUIDUpdate const &config);
|
||||||
|
void from_json(nlohmann::json const &j, InstanceUUIDUpdate &config);
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
#endif
|
#endif
|
||||||
|
@ -62,9 +62,11 @@ class CoordinatorInstance {
|
|||||||
|
|
||||||
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;
|
static auto ChooseMostUpToDateInstance(std::span<InstanceNameDbHistories> histories) -> NewMainRes;
|
||||||
|
|
||||||
private:
|
auto HasMainState(std::string_view instance_name) const -> bool;
|
||||||
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
|
|
||||||
|
|
||||||
|
auto HasReplicaState(std::string_view instance_name) const -> bool;
|
||||||
|
|
||||||
|
private:
|
||||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
|
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
|
||||||
|
|
||||||
void MainFailCallback(std::string_view);
|
void MainFailCallback(std::string_view);
|
||||||
@ -75,13 +77,14 @@ class CoordinatorInstance {
|
|||||||
|
|
||||||
void ReplicaFailCallback(std::string_view);
|
void ReplicaFailCallback(std::string_view);
|
||||||
|
|
||||||
auto IsMain(std::string_view instance_name) const -> bool;
|
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
|
||||||
auto IsReplica(std::string_view instance_name) const -> bool;
|
|
||||||
|
|
||||||
// NOTE: Must be std::list because we rely on pointer stability.
|
// NOTE: Must be std::list because we rely on pointer stability.
|
||||||
std::list<ReplicationInstance> repl_instances_;
|
std::list<ReplicationInstance> repl_instances_;
|
||||||
mutable utils::ResourceLock coord_instance_lock_{};
|
mutable utils::ResourceLock coord_instance_lock_{};
|
||||||
|
|
||||||
|
// Thread pool needs to be constructed before raft state as raft state can call thread pool
|
||||||
|
utils::ThreadPool thread_pool_;
|
||||||
|
|
||||||
RaftState raft_state_;
|
RaftState raft_state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -64,22 +64,32 @@ class RaftState {
|
|||||||
auto RequestLeadership() -> bool;
|
auto RequestLeadership() -> bool;
|
||||||
auto IsLeader() const -> bool;
|
auto IsLeader() const -> bool;
|
||||||
|
|
||||||
auto MainExists() const -> bool;
|
|
||||||
auto IsMain(std::string_view instance_name) const -> bool;
|
|
||||||
auto IsReplica(std::string_view instance_name) const -> bool;
|
|
||||||
|
|
||||||
auto AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig const &config) -> bool;
|
auto AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig const &config) -> bool;
|
||||||
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
|
auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool;
|
||||||
auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool;
|
auto AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||||
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
|
auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool;
|
||||||
auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool;
|
auto AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool;
|
||||||
|
auto AppendUpdateUUIDForInstanceLog(std::string_view instance_name, utils::UUID const &uuid) -> bool;
|
||||||
|
auto AppendOpenLockRegister(CoordinatorToReplicaConfig const &) -> bool;
|
||||||
|
auto AppendOpenLockUnregister(std::string_view) -> bool;
|
||||||
|
auto AppendOpenLockFailover(std::string_view instance_name) -> bool;
|
||||||
|
auto AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool;
|
||||||
|
auto AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool;
|
||||||
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
|
auto AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig const &config) -> bool;
|
||||||
|
|
||||||
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
||||||
// TODO: (andi) Do we need then GetAllCoordinators?
|
// TODO: (andi) Do we need then GetAllCoordinators?
|
||||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||||
|
|
||||||
auto GetUUID() const -> utils::UUID;
|
auto MainExists() const -> bool;
|
||||||
|
auto HasMainState(std::string_view instance_name) const -> bool;
|
||||||
|
auto HasReplicaState(std::string_view instance_name) const -> bool;
|
||||||
|
auto IsCurrentMain(std::string_view instance_name) const -> bool;
|
||||||
|
|
||||||
|
auto GetCurrentMainUUID() const -> utils::UUID;
|
||||||
|
auto GetInstanceUUID(std::string_view) const -> utils::UUID;
|
||||||
|
|
||||||
|
auto IsLockOpened() const -> bool;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// TODO: (andi) I think variables below can be abstracted/clean them.
|
// TODO: (andi) I think variables below can be abstracted/clean them.
|
||||||
|
@ -25,7 +25,9 @@ enum class RegisterInstanceCoordinatorStatus : uint8_t {
|
|||||||
NOT_LEADER,
|
NOT_LEADER,
|
||||||
RPC_FAILED,
|
RPC_FAILED,
|
||||||
RAFT_LOG_ERROR,
|
RAFT_LOG_ERROR,
|
||||||
SUCCESS
|
SUCCESS,
|
||||||
|
LOCK_OPENED,
|
||||||
|
OPEN_LOCK
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class UnregisterInstanceCoordinatorStatus : uint8_t {
|
enum class UnregisterInstanceCoordinatorStatus : uint8_t {
|
||||||
@ -36,6 +38,8 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t {
|
|||||||
NOT_LEADER,
|
NOT_LEADER,
|
||||||
RAFT_LOG_ERROR,
|
RAFT_LOG_ERROR,
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
|
LOCK_OPENED,
|
||||||
|
OPEN_LOCK
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class SetInstanceToMainCoordinatorStatus : uint8_t {
|
enum class SetInstanceToMainCoordinatorStatus : uint8_t {
|
||||||
@ -47,6 +51,9 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t {
|
|||||||
COULD_NOT_PROMOTE_TO_MAIN,
|
COULD_NOT_PROMOTE_TO_MAIN,
|
||||||
SWAP_UUID_FAILED,
|
SWAP_UUID_FAILED,
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
|
LOCK_OPENED,
|
||||||
|
OPEN_LOCK,
|
||||||
|
ENABLE_WRITING_FAILED
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
|
@ -79,10 +79,6 @@ class ReplicationInstance {
|
|||||||
|
|
||||||
auto EnableWritingOnMain() -> bool;
|
auto EnableWritingOnMain() -> bool;
|
||||||
|
|
||||||
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
|
|
||||||
auto ResetMainUUID() -> void;
|
|
||||||
auto GetMainUUID() const -> std::optional<utils::UUID> const &;
|
|
||||||
|
|
||||||
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
|
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
|
||||||
auto GetFailCallback() -> HealthCheckInstanceCallback &;
|
auto GetFailCallback() -> HealthCheckInstanceCallback &;
|
||||||
|
|
||||||
@ -92,19 +88,12 @@ class ReplicationInstance {
|
|||||||
bool is_alive_{false};
|
bool is_alive_{false};
|
||||||
std::chrono::system_clock::time_point last_check_of_uuid_{};
|
std::chrono::system_clock::time_point last_check_of_uuid_{};
|
||||||
|
|
||||||
// for replica this is main uuid of current main
|
|
||||||
// for "main" main this same as in CoordinatorData
|
|
||||||
// it is set to nullopt when replica is down
|
|
||||||
// TLDR; when replica is down and comes back up we reset uuid of main replica is listening to
|
|
||||||
// so we need to send swap uuid again
|
|
||||||
std::optional<utils::UUID> main_uuid_;
|
|
||||||
|
|
||||||
HealthCheckInstanceCallback succ_cb_;
|
HealthCheckInstanceCallback succ_cb_;
|
||||||
HealthCheckInstanceCallback fail_cb_;
|
HealthCheckInstanceCallback fail_cb_;
|
||||||
|
|
||||||
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
|
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
|
||||||
return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ &&
|
return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ &&
|
||||||
first.is_alive_ == second.is_alive_ && first.main_uuid_ == second.main_uuid_;
|
first.is_alive_ == second.is_alive_;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -36,8 +36,15 @@ struct ReplicationInstanceState {
|
|||||||
CoordinatorToReplicaConfig config;
|
CoordinatorToReplicaConfig config;
|
||||||
ReplicationRole status;
|
ReplicationRole status;
|
||||||
|
|
||||||
|
// for replica this is main uuid of current main
|
||||||
|
// for "main" main this same as current_main_id_
|
||||||
|
// when replica is down and comes back up we reset uuid of main replica is listening to
|
||||||
|
// so we need to send swap uuid again
|
||||||
|
// For MAIN we don't enable writing until cluster is in healthy state
|
||||||
|
utils::UUID instance_uuid;
|
||||||
|
|
||||||
friend auto operator==(ReplicationInstanceState const &lhs, ReplicationInstanceState const &rhs) -> bool {
|
friend auto operator==(ReplicationInstanceState const &lhs, ReplicationInstanceState const &rhs) -> bool {
|
||||||
return lhs.config == rhs.config && lhs.status == rhs.status;
|
return lhs.config == rhs.config && lhs.status == rhs.status && lhs.instance_uuid == rhs.instance_uuid;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -54,7 +61,8 @@ struct CoordinatorInstanceState {
|
|||||||
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state);
|
void to_json(nlohmann::json &j, ReplicationInstanceState const &instance_state);
|
||||||
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state);
|
void from_json(nlohmann::json const &j, ReplicationInstanceState &instance_state);
|
||||||
|
|
||||||
using TRaftLog = std::variant<CoordinatorToReplicaConfig, CoordinatorToCoordinatorConfig, std::string, utils::UUID>;
|
using TRaftLog = std::variant<CoordinatorToReplicaConfig, std::string, utils::UUID, CoordinatorToCoordinatorConfig,
|
||||||
|
InstanceUUIDUpdate>;
|
||||||
|
|
||||||
using nuraft::buffer;
|
using nuraft::buffer;
|
||||||
using nuraft::buffer_serializer;
|
using nuraft::buffer_serializer;
|
||||||
@ -63,7 +71,8 @@ using nuraft::ptr;
|
|||||||
class CoordinatorClusterState {
|
class CoordinatorClusterState {
|
||||||
public:
|
public:
|
||||||
CoordinatorClusterState() = default;
|
CoordinatorClusterState() = default;
|
||||||
explicit CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances);
|
explicit CoordinatorClusterState(std::map<std::string, ReplicationInstanceState, std::less<>> instances,
|
||||||
|
utils::UUID const ¤t_main_uuid, bool is_lock_opened);
|
||||||
|
|
||||||
CoordinatorClusterState(CoordinatorClusterState const &);
|
CoordinatorClusterState(CoordinatorClusterState const &);
|
||||||
CoordinatorClusterState &operator=(CoordinatorClusterState const &);
|
CoordinatorClusterState &operator=(CoordinatorClusterState const &);
|
||||||
@ -74,11 +83,11 @@ class CoordinatorClusterState {
|
|||||||
|
|
||||||
auto MainExists() const -> bool;
|
auto MainExists() const -> bool;
|
||||||
|
|
||||||
auto IsMain(std::string_view instance_name) const -> bool;
|
auto HasMainState(std::string_view instance_name) const -> bool;
|
||||||
|
|
||||||
auto IsReplica(std::string_view instance_name) const -> bool;
|
auto HasReplicaState(std::string_view instance_name) const -> bool;
|
||||||
|
|
||||||
auto InsertInstance(std::string instance_name, ReplicationInstanceState instance_state) -> void;
|
auto IsCurrentMain(std::string_view instance_name) const -> bool;
|
||||||
|
|
||||||
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
|
auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void;
|
||||||
|
|
||||||
@ -88,15 +97,20 @@ class CoordinatorClusterState {
|
|||||||
|
|
||||||
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
auto GetReplicationInstances() const -> std::vector<ReplicationInstanceState>;
|
||||||
|
|
||||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
auto GetCurrentMainUUID() const -> utils::UUID;
|
||||||
|
|
||||||
auto GetUUID() const -> utils::UUID;
|
auto GetInstanceUUID(std::string_view) const -> utils::UUID;
|
||||||
|
|
||||||
|
auto IsLockOpened() const -> bool;
|
||||||
|
|
||||||
|
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<CoordinatorInstanceState> coordinators_{};
|
std::vector<CoordinatorInstanceState> coordinators_{};
|
||||||
std::map<std::string, ReplicationInstanceState, std::less<>> repl_instances_{};
|
std::map<std::string, ReplicationInstanceState, std::less<>> repl_instances_{};
|
||||||
utils::UUID uuid_{};
|
utils::UUID current_main_uuid_{};
|
||||||
mutable utils::ResourceLock log_lock_{};
|
mutable utils::ResourceLock log_lock_{};
|
||||||
|
bool is_lock_opened_{false};
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
|
@ -40,20 +40,21 @@ class CoordinatorStateMachine : public state_machine {
|
|||||||
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
|
CoordinatorStateMachine &operator=(CoordinatorStateMachine const &) = delete;
|
||||||
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;
|
CoordinatorStateMachine(CoordinatorStateMachine &&) = delete;
|
||||||
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
|
CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete;
|
||||||
~CoordinatorStateMachine() override {}
|
~CoordinatorStateMachine() override = default;
|
||||||
|
|
||||||
// TODO: (andi) Check API of this class.
|
|
||||||
auto MainExists() const -> bool;
|
|
||||||
auto IsMain(std::string_view instance_name) const -> bool;
|
|
||||||
auto IsReplica(std::string_view instance_name) const -> bool;
|
|
||||||
|
|
||||||
static auto CreateLog(nlohmann::json &&log) -> ptr<buffer>;
|
static auto CreateLog(nlohmann::json &&log) -> ptr<buffer>;
|
||||||
|
static auto SerializeOpenLockRegister(CoordinatorToReplicaConfig const &config) -> ptr<buffer>;
|
||||||
|
static auto SerializeOpenLockUnregister(std::string_view instance_name) -> ptr<buffer>;
|
||||||
|
static auto SerializeOpenLockSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;
|
||||||
|
static auto SerializeOpenLockFailover(std::string_view instance_name) -> ptr<buffer>;
|
||||||
static auto SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer>;
|
static auto SerializeRegisterInstance(CoordinatorToReplicaConfig const &config) -> ptr<buffer>;
|
||||||
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
|
static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr<buffer>;
|
||||||
static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr<buffer>;
|
static auto SerializeSetInstanceAsMain(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
|
||||||
static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>;
|
static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>;
|
||||||
static auto SerializeUpdateUUID(utils::UUID const &uuid) -> ptr<buffer>;
|
static auto SerializeUpdateUUIDForNewMain(utils::UUID const &uuid) -> ptr<buffer>;
|
||||||
|
static auto SerializeUpdateUUIDForInstance(InstanceUUIDUpdate const &instance_uuid_change) -> ptr<buffer>;
|
||||||
static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr<buffer>;
|
static auto SerializeAddCoordinatorInstance(CoordinatorToCoordinatorConfig const &config) -> ptr<buffer>;
|
||||||
|
static auto SerializeOpenLockSetInstanceAsReplica(std::string_view instance_name) -> ptr<buffer>;
|
||||||
|
|
||||||
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;
|
static auto DecodeLog(buffer &data) -> std::pair<TRaftLog, RaftLogAction>;
|
||||||
|
|
||||||
@ -85,7 +86,15 @@ class CoordinatorStateMachine : public state_machine {
|
|||||||
|
|
||||||
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
auto GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState>;
|
||||||
|
|
||||||
auto GetUUID() const -> utils::UUID;
|
// Getters
|
||||||
|
auto MainExists() const -> bool;
|
||||||
|
auto HasMainState(std::string_view instance_name) const -> bool;
|
||||||
|
auto HasReplicaState(std::string_view instance_name) const -> bool;
|
||||||
|
auto IsCurrentMain(std::string_view instance_name) const -> bool;
|
||||||
|
|
||||||
|
auto GetCurrentMainUUID() const -> utils::UUID;
|
||||||
|
auto GetInstanceUUID(std::string_view instance_name) const -> utils::UUID;
|
||||||
|
auto IsLockOpened() const -> bool;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct SnapshotCtx {
|
struct SnapshotCtx {
|
||||||
|
@ -23,20 +23,34 @@
|
|||||||
namespace memgraph::coordination {
|
namespace memgraph::coordination {
|
||||||
|
|
||||||
enum class RaftLogAction : uint8_t {
|
enum class RaftLogAction : uint8_t {
|
||||||
|
OPEN_LOCK_REGISTER_REPLICATION_INSTANCE,
|
||||||
|
OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE,
|
||||||
|
OPEN_LOCK_FAILOVER,
|
||||||
|
OPEN_LOCK_SET_INSTANCE_AS_MAIN,
|
||||||
|
OPEN_LOCK_SET_INSTANCE_AS_REPLICA,
|
||||||
REGISTER_REPLICATION_INSTANCE,
|
REGISTER_REPLICATION_INSTANCE,
|
||||||
UNREGISTER_REPLICATION_INSTANCE,
|
UNREGISTER_REPLICATION_INSTANCE,
|
||||||
SET_INSTANCE_AS_MAIN,
|
SET_INSTANCE_AS_MAIN,
|
||||||
SET_INSTANCE_AS_REPLICA,
|
SET_INSTANCE_AS_REPLICA,
|
||||||
UPDATE_UUID,
|
UPDATE_UUID_OF_NEW_MAIN,
|
||||||
ADD_COORDINATOR_INSTANCE
|
ADD_COORDINATOR_INSTANCE,
|
||||||
|
UPDATE_UUID_FOR_INSTANCE,
|
||||||
};
|
};
|
||||||
|
|
||||||
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, {{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
|
NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction,
|
||||||
|
{{RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"},
|
||||||
{RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"},
|
{RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"},
|
||||||
{RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"},
|
{RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"},
|
||||||
{RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"},
|
{RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"},
|
||||||
{RaftLogAction::UPDATE_UUID, "update_uuid"},
|
{RaftLogAction::UPDATE_UUID_OF_NEW_MAIN, "update_uuid_of_new_main"},
|
||||||
{RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"}})
|
{RaftLogAction::ADD_COORDINATOR_INSTANCE, "add_coordinator_instance"},
|
||||||
|
{RaftLogAction::UPDATE_UUID_FOR_INSTANCE, "update_uuid_for_instance"},
|
||||||
|
{RaftLogAction::OPEN_LOCK_REGISTER_REPLICATION_INSTANCE, "open_lock_register_instance"},
|
||||||
|
{RaftLogAction::OPEN_LOCK_UNREGISTER_REPLICATION_INSTANCE,
|
||||||
|
"open_lock_unregister_instance"},
|
||||||
|
{RaftLogAction::OPEN_LOCK_FAILOVER, "open_lock_failover"},
|
||||||
|
{RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_MAIN, "open_lock_set_instance_as_main"},
|
||||||
|
{RaftLogAction::OPEN_LOCK_SET_INSTANCE_AS_REPLICA, "open_lock_set_instance_as_replica"}})
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
#endif
|
#endif
|
||||||
|
@ -12,7 +12,6 @@
|
|||||||
#ifdef MG_ENTERPRISE
|
#ifdef MG_ENTERPRISE
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
|
||||||
#include <spdlog/spdlog.h>
|
|
||||||
#include "coordination/coordinator_communication_config.hpp"
|
#include "coordination/coordinator_communication_config.hpp"
|
||||||
#include "coordination/coordinator_exceptions.hpp"
|
#include "coordination/coordinator_exceptions.hpp"
|
||||||
#include "coordination/raft_state.hpp"
|
#include "coordination/raft_state.hpp"
|
||||||
@ -63,13 +62,18 @@ auto RaftState::InitRaftServer() -> void {
|
|||||||
params.leadership_expiry_ = 200;
|
params.leadership_expiry_ = 200;
|
||||||
|
|
||||||
raft_server::init_options init_opts;
|
raft_server::init_options init_opts;
|
||||||
|
|
||||||
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
|
init_opts.raft_callback_ = [this](cb_func::Type event_type, cb_func::Param *param) -> nuraft::CbReturnCode {
|
||||||
if (event_type == cb_func::BecomeLeader) {
|
if (event_type == cb_func::BecomeLeader) {
|
||||||
spdlog::info("Node {} became leader", param->leaderId);
|
spdlog::info("Node {} became leader", param->leaderId);
|
||||||
become_leader_cb_();
|
become_leader_cb_();
|
||||||
} else if (event_type == cb_func::BecomeFollower) {
|
} else if (event_type == cb_func::BecomeFollower) {
|
||||||
spdlog::info("Node {} became follower", param->myId);
|
// TODO(antoniofilipovic) Check what happens when becoming follower while doing failover
|
||||||
|
// There is no way to stop becoming a follower:
|
||||||
|
// https://github.com/eBay/NuRaft/blob/188947bcc73ce38ab1c3cf9d01015ca8a29decd9/src/raft_server.cxx#L1334-L1335
|
||||||
|
spdlog::trace("Got request to become follower");
|
||||||
become_follower_cb_();
|
become_follower_cb_();
|
||||||
|
spdlog::trace("Node {} became follower", param->myId);
|
||||||
}
|
}
|
||||||
return CbReturnCode::Ok;
|
return CbReturnCode::Ok;
|
||||||
};
|
};
|
||||||
@ -82,7 +86,6 @@ auto RaftState::InitRaftServer() -> void {
|
|||||||
if (!raft_server_) {
|
if (!raft_server_) {
|
||||||
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint_.SocketAddress());
|
throw RaftServerStartException("Failed to launch raft server on {}", raft_endpoint_.SocketAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto maybe_stop = utils::ResettableCounter<20>();
|
auto maybe_stop = utils::ResettableCounter<20>();
|
||||||
do {
|
do {
|
||||||
if (raft_server_->is_initialized()) {
|
if (raft_server_->is_initialized()) {
|
||||||
@ -157,6 +160,78 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); }
|
|||||||
|
|
||||||
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
|
auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); }
|
||||||
|
|
||||||
|
auto RaftState::AppendOpenLockRegister(CoordinatorToReplicaConfig const &config) -> bool {
|
||||||
|
auto new_log = CoordinatorStateMachine::SerializeOpenLockRegister(config);
|
||||||
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
|
|
||||||
|
if (!res->get_accepted()) {
|
||||||
|
spdlog::error("Failed to accept request to open lock to register instance {}", config.instance_name);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
|
spdlog::error("Failed to open lock for registering instance {} with error code {}", config.instance_name,
|
||||||
|
int(res->get_result_code()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto RaftState::AppendOpenLockUnregister(std::string_view instance_name) -> bool {
|
||||||
|
auto new_log = CoordinatorStateMachine::SerializeOpenLockUnregister(instance_name);
|
||||||
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
|
|
||||||
|
if (!res->get_accepted()) {
|
||||||
|
spdlog::error("Failed to accept request to open lock to unregister instance {}.", instance_name);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
|
spdlog::error("Failed to open lock for unregistering instance {} with error code {}", instance_name,
|
||||||
|
int(res->get_result_code()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto RaftState::AppendOpenLockFailover(std::string_view instance_name) -> bool {
|
||||||
|
auto new_log = CoordinatorStateMachine::SerializeOpenLockFailover(instance_name);
|
||||||
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
|
|
||||||
|
if (!res->get_accepted()) {
|
||||||
|
spdlog::error("Failed to accept request to open lock for failover {}", instance_name);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
|
spdlog::error("Failed to open lock for failover to instance {} with error code {}", instance_name,
|
||||||
|
int(res->get_result_code()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto RaftState::AppendOpenLockSetInstanceToMain(std::string_view instance_name) -> bool {
|
||||||
|
auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsMain(instance_name);
|
||||||
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
|
|
||||||
|
if (!res->get_accepted()) {
|
||||||
|
spdlog::error("Failed to accept request to open lock and set instance {} to MAIN", instance_name);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
|
spdlog::error("Failed to open lock to set instance {} to MAIN with error code {}", instance_name,
|
||||||
|
int(res->get_result_code()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig const &config) -> bool {
|
auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorToReplicaConfig const &config) -> bool {
|
||||||
auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config);
|
||||||
auto const res = raft_server_->append_entries({new_log});
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
@ -201,8 +276,9 @@ auto RaftState::AppendUnregisterReplicationInstanceLog(std::string_view instance
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool {
|
auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name, utils::UUID const &uuid) -> bool {
|
||||||
auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name);
|
auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(
|
||||||
|
InstanceUUIDUpdate{.instance_name = std::string{instance_name}, .uuid = uuid});
|
||||||
auto const res = raft_server_->append_entries({new_log});
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
if (!res->get_accepted()) {
|
if (!res->get_accepted()) {
|
||||||
spdlog::error(
|
spdlog::error(
|
||||||
@ -241,8 +317,28 @@ auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) ->
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool {
|
auto RaftState::AppendOpenLockSetInstanceToReplica(std::string_view instance_name) -> bool {
|
||||||
auto new_log = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
|
auto new_log = CoordinatorStateMachine::SerializeOpenLockSetInstanceAsReplica(instance_name);
|
||||||
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
|
if (!res->get_accepted()) {
|
||||||
|
spdlog::error(
|
||||||
|
"Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not "
|
||||||
|
"the leader.",
|
||||||
|
instance_name);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
spdlog::info("Request for demoting instance {} accepted", instance_name);
|
||||||
|
|
||||||
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
|
spdlog::error("Failed to promote instance {} with error code {}", instance_name, int(res->get_result_code()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto RaftState::AppendUpdateUUIDForNewMainLog(utils::UUID const &uuid) -> bool {
|
||||||
|
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid);
|
||||||
auto const res = raft_server_->append_entries({new_log});
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
if (!res->get_accepted()) {
|
if (!res->get_accepted()) {
|
||||||
spdlog::error(
|
spdlog::error(
|
||||||
@ -250,7 +346,7 @@ auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool {
|
|||||||
"the leader.");
|
"the leader.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
spdlog::info("Request for updating UUID accepted");
|
spdlog::trace("Request for updating UUID accepted");
|
||||||
|
|
||||||
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
spdlog::error("Failed to update UUID with error code {}", int(res->get_result_code()));
|
spdlog::error("Failed to update UUID with error code {}", int(res->get_result_code()));
|
||||||
@ -282,23 +378,53 @@ auto RaftState::AppendAddCoordinatorInstanceLog(CoordinatorToCoordinatorConfig c
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto RaftState::AppendUpdateUUIDForInstanceLog(std::string_view instance_name, const utils::UUID &uuid) -> bool {
|
||||||
|
auto new_log = CoordinatorStateMachine::SerializeUpdateUUIDForInstance(
|
||||||
|
{.instance_name = std::string{instance_name}, .uuid = uuid});
|
||||||
|
auto const res = raft_server_->append_entries({new_log});
|
||||||
|
if (!res->get_accepted()) {
|
||||||
|
spdlog::error("Failed to accept request for updating UUID of instance.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
spdlog::trace("Request for updating UUID of instance accepted");
|
||||||
|
|
||||||
|
if (res->get_result_code() != nuraft::cmd_result_code::OK) {
|
||||||
|
spdlog::error("Failed to update UUID of instance with error code {}", int(res->get_result_code()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); }
|
auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); }
|
||||||
|
|
||||||
auto RaftState::IsMain(std::string_view instance_name) const -> bool { return state_machine_->IsMain(instance_name); }
|
auto RaftState::HasMainState(std::string_view instance_name) const -> bool {
|
||||||
|
return state_machine_->HasMainState(instance_name);
|
||||||
|
}
|
||||||
|
|
||||||
auto RaftState::IsReplica(std::string_view instance_name) const -> bool {
|
auto RaftState::HasReplicaState(std::string_view instance_name) const -> bool {
|
||||||
return state_machine_->IsReplica(instance_name);
|
return state_machine_->HasReplicaState(instance_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto RaftState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {
|
auto RaftState::GetReplicationInstances() const -> std::vector<ReplicationInstanceState> {
|
||||||
return state_machine_->GetReplicationInstances();
|
return state_machine_->GetReplicationInstances();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto RaftState::GetCurrentMainUUID() const -> utils::UUID { return state_machine_->GetCurrentMainUUID(); }
|
||||||
|
|
||||||
|
auto RaftState::IsCurrentMain(std::string_view instance_name) const -> bool {
|
||||||
|
return state_machine_->IsCurrentMain(instance_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto RaftState::IsLockOpened() const -> bool { return state_machine_->IsLockOpened(); }
|
||||||
|
|
||||||
|
auto RaftState::GetInstanceUUID(std::string_view instance_name) const -> utils::UUID {
|
||||||
|
return state_machine_->GetInstanceUUID(instance_name);
|
||||||
|
}
|
||||||
|
|
||||||
auto RaftState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState> {
|
auto RaftState::GetCoordinatorInstances() const -> std::vector<CoordinatorInstanceState> {
|
||||||
return state_machine_->GetCoordinatorInstances();
|
return state_machine_->GetCoordinatorInstances();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto RaftState::GetUUID() const -> utils::UUID { return state_machine_->GetUUID(); }
|
|
||||||
|
|
||||||
} // namespace memgraph::coordination
|
} // namespace memgraph::coordination
|
||||||
#endif
|
#endif
|
||||||
|
@ -56,7 +56,6 @@ auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, Replication
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
main_uuid_ = new_uuid;
|
|
||||||
succ_cb_ = main_succ_cb;
|
succ_cb_ = main_succ_cb;
|
||||||
fail_cb_ = main_fail_cb;
|
fail_cb_ = main_fail_cb;
|
||||||
|
|
||||||
@ -91,9 +90,6 @@ auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { r
|
|||||||
|
|
||||||
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
|
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
|
||||||
|
|
||||||
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
|
|
||||||
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
|
|
||||||
|
|
||||||
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||||
if (!IsReadyForUUIDPing()) {
|
if (!IsReadyForUUIDPing()) {
|
||||||
return true;
|
return true;
|
||||||
@ -116,7 +112,6 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid
|
|||||||
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
|
if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
SetNewMainUUID(new_main_uuid);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,6 +407,11 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
|||||||
case RPC_FAILED:
|
case RPC_FAILED:
|
||||||
throw QueryRuntimeException(
|
throw QueryRuntimeException(
|
||||||
"Couldn't unregister replica instance because current main instance couldn't unregister replica!");
|
"Couldn't unregister replica instance because current main instance couldn't unregister replica!");
|
||||||
|
case LOCK_OPENED:
|
||||||
|
throw QueryRuntimeException("Couldn't unregister replica because the last action didn't finish successfully!");
|
||||||
|
case OPEN_LOCK:
|
||||||
|
throw QueryRuntimeException(
|
||||||
|
"Couldn't register instance as cluster didn't accept entering unregistration state!");
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -469,6 +474,12 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
|||||||
throw QueryRuntimeException(
|
throw QueryRuntimeException(
|
||||||
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
|
"Couldn't register replica instance because setting instance to replica failed! Check logs on replica to "
|
||||||
"find out more info!");
|
"find out more info!");
|
||||||
|
case LOCK_OPENED:
|
||||||
|
throw QueryRuntimeException(
|
||||||
|
"Couldn't register replica instance because because the last action didn't finish successfully!");
|
||||||
|
case OPEN_LOCK:
|
||||||
|
throw QueryRuntimeException(
|
||||||
|
"Couldn't register replica instance because cluster didn't accept registration query!");
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -514,6 +525,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
|||||||
"Couldn't set replica instance to main! Check coordinator and replica for more logs");
|
"Couldn't set replica instance to main! Check coordinator and replica for more logs");
|
||||||
case SWAP_UUID_FAILED:
|
case SWAP_UUID_FAILED:
|
||||||
throw QueryRuntimeException("Couldn't set replica instance to main. Replicas didn't swap uuid of new main.");
|
throw QueryRuntimeException("Couldn't set replica instance to main. Replicas didn't swap uuid of new main.");
|
||||||
|
case OPEN_LOCK:
|
||||||
|
throw QueryRuntimeException(
|
||||||
|
"Couldn't set replica instance to main as cluster didn't accept setting instance state.");
|
||||||
|
case LOCK_OPENED:
|
||||||
|
throw QueryRuntimeException(
|
||||||
|
"Couldn't register replica instance because because the last action didn't finish successfully!");
|
||||||
|
case ENABLE_WRITING_FAILED:
|
||||||
|
throw QueryRuntimeException("Instance promoted to MAIN, but couldn't enable writing to instance.");
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -529,7 +548,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler {
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/// returns false if the replication role can't be set
|
/// returns false if the replication role can't be set
|
||||||
/// @throw QueryRuntimeException if an error ocurred.
|
/// @throw QueryRuntimeException if an error occurred.
|
||||||
|
|
||||||
Callback HandleAuthQuery(AuthQuery *auth_query, InterpreterContext *interpreter_context, const Parameters ¶meters,
|
Callback HandleAuthQuery(AuthQuery *auth_query, InterpreterContext *interpreter_context, const Parameters ¶meters,
|
||||||
Interpreter &interpreter) {
|
Interpreter &interpreter) {
|
||||||
|
@ -254,7 +254,8 @@ bool ReplicationState::SetReplicationRoleMain(const utils::UUID &main_uuid) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, true, main_uuid};
|
// By default, writing on MAIN is disabled until cluster is in healthy state
|
||||||
|
replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}, /*is_writing enabled*/ false, main_uuid};
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2022 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -10,7 +10,6 @@
|
|||||||
// licenses/APL.txt.
|
// licenses/APL.txt.
|
||||||
|
|
||||||
#include "utils/thread_pool.hpp"
|
#include "utils/thread_pool.hpp"
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
ThreadPool::ThreadPool(const size_t pool_size) {
|
ThreadPool::ThreadPool(const size_t pool_size) {
|
||||||
|
@ -241,7 +241,10 @@ MEMGRAPH_INSTANCES_DESCRIPTION = {
|
|||||||
|
|
||||||
|
|
||||||
def test_coordinators_communication_with_restarts():
|
def test_coordinators_communication_with_restarts():
|
||||||
|
# 1 Start all instances
|
||||||
safe_execute(shutil.rmtree, TEMP_DIR)
|
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||||
|
|
||||||
|
# 1
|
||||||
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
|
||||||
|
|
||||||
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
|
coordinator3_cursor = connect(host="localhost", port=7692).cursor()
|
||||||
|
@ -13,6 +13,7 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import time
|
||||||
|
|
||||||
import interactive_mg_runner
|
import interactive_mg_runner
|
||||||
import pytest
|
import pytest
|
||||||
@ -261,7 +262,7 @@ def test_old_main_comes_back_on_new_leader_as_replica():
|
|||||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
("instance_1", "", "", "unknown", "main"),
|
("instance_1", "", "", "unknown", "main"),
|
||||||
("instance_2", "", "", "unknown", "replica"),
|
("instance_2", "", "", "unknown", "replica"),
|
||||||
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
|
("instance_3", "", "", "unknown", "unknown"),
|
||||||
]
|
]
|
||||||
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
||||||
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
||||||
@ -456,7 +457,7 @@ def test_distributed_automatic_failover_with_leadership_change():
|
|||||||
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
("instance_1", "", "", "unknown", "main"),
|
("instance_1", "", "", "unknown", "main"),
|
||||||
("instance_2", "", "", "unknown", "replica"),
|
("instance_2", "", "", "unknown", "replica"),
|
||||||
("instance_3", "", "", "unknown", "main"), # TODO: (andi) Will become unknown.
|
("instance_3", "", "", "unknown", "unknown"),
|
||||||
]
|
]
|
||||||
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
||||||
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
||||||
@ -1092,8 +1093,8 @@ def test_multiple_failovers_in_row_no_leadership_change():
|
|||||||
"",
|
"",
|
||||||
"",
|
"",
|
||||||
"unknown",
|
"unknown",
|
||||||
"main",
|
"unknown",
|
||||||
), # TODO(antoniofilipovic) change to unknown after PR with transitions
|
),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1119,9 +1120,9 @@ def test_multiple_failovers_in_row_no_leadership_change():
|
|||||||
follower_data.extend(coordinator_data)
|
follower_data.extend(coordinator_data)
|
||||||
follower_data.extend(
|
follower_data.extend(
|
||||||
[
|
[
|
||||||
("instance_1", "", "", "unknown", "main"),
|
("instance_1", "", "", "unknown", "unknown"),
|
||||||
("instance_2", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown
|
("instance_2", "", "", "unknown", "main"),
|
||||||
("instance_3", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown
|
("instance_3", "", "", "unknown", "unknown"),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1149,7 +1150,7 @@ def test_multiple_failovers_in_row_no_leadership_change():
|
|||||||
follower_data.extend(coordinator_data)
|
follower_data.extend(coordinator_data)
|
||||||
follower_data.extend(
|
follower_data.extend(
|
||||||
[
|
[
|
||||||
("instance_1", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown
|
("instance_1", "", "", "unknown", "unknown"),
|
||||||
("instance_2", "", "", "unknown", "main"),
|
("instance_2", "", "", "unknown", "main"),
|
||||||
("instance_3", "", "", "unknown", "replica"),
|
("instance_3", "", "", "unknown", "replica"),
|
||||||
]
|
]
|
||||||
@ -1177,8 +1178,8 @@ def test_multiple_failovers_in_row_no_leadership_change():
|
|||||||
follower_data.extend(coordinator_data)
|
follower_data.extend(coordinator_data)
|
||||||
follower_data.extend(
|
follower_data.extend(
|
||||||
[
|
[
|
||||||
("instance_1", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown
|
("instance_1", "", "", "unknown", "unknown"),
|
||||||
("instance_2", "", "", "unknown", "main"), # TODO(antoniofilipovic) change to unknown
|
("instance_2", "", "", "unknown", "unknown"),
|
||||||
("instance_3", "", "", "unknown", "main"),
|
("instance_3", "", "", "unknown", "main"),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
@ -1258,5 +1259,166 @@ def test_multiple_failovers_in_row_no_leadership_change():
|
|||||||
mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7688, host="localhost").cursor()))
|
mg_sleep_and_assert(1, get_vertex_count_func(connect(port=7688, host="localhost").cursor()))
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_old_mains_single_failover():
|
||||||
|
# Goal of this test is to check when leadership changes
|
||||||
|
# and we have old MAIN down, that we don't start failover
|
||||||
|
# 1. Start all instances.
|
||||||
|
# 2. Kill the main instance
|
||||||
|
# 3. Do failover
|
||||||
|
# 4. Kill other main
|
||||||
|
# 5. Kill leader
|
||||||
|
# 6. Leave first main down, and start second main
|
||||||
|
# 7. Second main should write data to new instance all the time
|
||||||
|
|
||||||
|
# 1
|
||||||
|
safe_execute(shutil.rmtree, TEMP_DIR)
|
||||||
|
inner_instances_description = get_instances_description_no_setup()
|
||||||
|
|
||||||
|
interactive_mg_runner.start_all(inner_instances_description)
|
||||||
|
|
||||||
|
setup_queries = [
|
||||||
|
"ADD COORDINATOR 1 WITH CONFIG {'bolt_server': '127.0.0.1:7690', 'coordinator_server': '127.0.0.1:10111'}",
|
||||||
|
"ADD COORDINATOR 2 WITH CONFIG {'bolt_server': '127.0.0.1:7691', 'coordinator_server': '127.0.0.1:10112'}",
|
||||||
|
"REGISTER INSTANCE instance_1 WITH CONFIG {'bolt_server': '127.0.0.1:7687', 'management_server': '127.0.0.1:10011', 'replication_server': '127.0.0.1:10001'};",
|
||||||
|
"REGISTER INSTANCE instance_2 WITH CONFIG {'bolt_server': '127.0.0.1:7688', 'management_server': '127.0.0.1:10012', 'replication_server': '127.0.0.1:10002'};",
|
||||||
|
"REGISTER INSTANCE instance_3 WITH CONFIG {'bolt_server': '127.0.0.1:7689', 'management_server': '127.0.0.1:10013', 'replication_server': '127.0.0.1:10003'};",
|
||||||
|
"SET INSTANCE instance_3 TO MAIN",
|
||||||
|
]
|
||||||
|
coord_cursor_3 = connect(host="localhost", port=7692).cursor()
|
||||||
|
for query in setup_queries:
|
||||||
|
execute_and_fetch_all(coord_cursor_3, query)
|
||||||
|
|
||||||
|
def retrieve_data_show_repl_cluster():
|
||||||
|
return sorted(list(execute_and_fetch_all(coord_cursor_3, "SHOW INSTANCES;")))
|
||||||
|
|
||||||
|
coordinators = [
|
||||||
|
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||||
|
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||||
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
|
]
|
||||||
|
|
||||||
|
basic_instances = [
|
||||||
|
("instance_1", "", "127.0.0.1:10011", "up", "replica"),
|
||||||
|
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||||
|
("instance_3", "", "127.0.0.1:10013", "up", "main"),
|
||||||
|
]
|
||||||
|
|
||||||
|
expected_data_on_coord = []
|
||||||
|
expected_data_on_coord.extend(coordinators)
|
||||||
|
expected_data_on_coord.extend(basic_instances)
|
||||||
|
|
||||||
|
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
|
||||||
|
|
||||||
|
# 2
|
||||||
|
|
||||||
|
interactive_mg_runner.kill(inner_instances_description, "instance_3")
|
||||||
|
|
||||||
|
# 3
|
||||||
|
|
||||||
|
basic_instances = [
|
||||||
|
("instance_1", "", "127.0.0.1:10011", "up", "main"),
|
||||||
|
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||||
|
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
|
||||||
|
]
|
||||||
|
|
||||||
|
expected_data_on_coord = []
|
||||||
|
expected_data_on_coord.extend(coordinators)
|
||||||
|
expected_data_on_coord.extend(basic_instances)
|
||||||
|
|
||||||
|
mg_sleep_and_assert(expected_data_on_coord, retrieve_data_show_repl_cluster)
|
||||||
|
|
||||||
|
# 4
|
||||||
|
|
||||||
|
interactive_mg_runner.kill(inner_instances_description, "instance_1")
|
||||||
|
|
||||||
|
# 5
|
||||||
|
interactive_mg_runner.kill(inner_instances_description, "coordinator_3")
|
||||||
|
|
||||||
|
# 6
|
||||||
|
|
||||||
|
interactive_mg_runner.start(inner_instances_description, "instance_1")
|
||||||
|
|
||||||
|
# 7
|
||||||
|
|
||||||
|
coord_cursor_1 = connect(host="localhost", port=7690).cursor()
|
||||||
|
|
||||||
|
def show_instances_coord1():
|
||||||
|
return sorted(list(execute_and_fetch_all(coord_cursor_1, "SHOW INSTANCES;")))
|
||||||
|
|
||||||
|
coord_cursor_2 = connect(host="localhost", port=7691).cursor()
|
||||||
|
|
||||||
|
def show_instances_coord2():
|
||||||
|
return sorted(list(execute_and_fetch_all(coord_cursor_2, "SHOW INSTANCES;")))
|
||||||
|
|
||||||
|
leader_data = [
|
||||||
|
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||||
|
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||||
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
|
("instance_1", "", "127.0.0.1:10011", "up", "main"),
|
||||||
|
("instance_2", "", "127.0.0.1:10012", "up", "replica"),
|
||||||
|
("instance_3", "", "127.0.0.1:10013", "down", "unknown"),
|
||||||
|
]
|
||||||
|
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
||||||
|
|
||||||
|
follower_data = [
|
||||||
|
("coordinator_1", "127.0.0.1:10111", "", "unknown", "coordinator"),
|
||||||
|
("coordinator_2", "127.0.0.1:10112", "", "unknown", "coordinator"),
|
||||||
|
("coordinator_3", "127.0.0.1:10113", "", "unknown", "coordinator"),
|
||||||
|
("instance_1", "", "", "unknown", "main"),
|
||||||
|
("instance_2", "", "", "unknown", "replica"),
|
||||||
|
("instance_3", "", "", "unknown", "unknown"),
|
||||||
|
]
|
||||||
|
mg_sleep_and_assert_any_function(leader_data, [show_instances_coord1, show_instances_coord2])
|
||||||
|
mg_sleep_and_assert_any_function(follower_data, [show_instances_coord1, show_instances_coord2])
|
||||||
|
|
||||||
|
instance_1_cursor = connect(host="localhost", port=7687).cursor()
|
||||||
|
|
||||||
|
def show_replicas():
|
||||||
|
return sorted(list(execute_and_fetch_all(instance_1_cursor, "SHOW REPLICAS;")))
|
||||||
|
|
||||||
|
replicas = [
|
||||||
|
(
|
||||||
|
"instance_2",
|
||||||
|
"127.0.0.1:10002",
|
||||||
|
"sync",
|
||||||
|
{"behind": None, "status": "ready", "ts": 0},
|
||||||
|
{"memgraph": {"behind": 0, "status": "ready", "ts": 0}},
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"instance_3",
|
||||||
|
"127.0.0.1:10003",
|
||||||
|
"sync",
|
||||||
|
{"behind": None, "status": "invalid", "ts": 0},
|
||||||
|
{"memgraph": {"behind": 0, "status": "invalid", "ts": 0}},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
mg_sleep_and_assert_collection(replicas, show_replicas)
|
||||||
|
|
||||||
|
def get_vertex_count_func(cursor):
|
||||||
|
def get_vertex_count():
|
||||||
|
return execute_and_fetch_all(cursor, "MATCH (n) RETURN count(n)")[0][0]
|
||||||
|
|
||||||
|
return get_vertex_count
|
||||||
|
|
||||||
|
vertex_count = 0
|
||||||
|
instance_1_cursor = connect(port=7687, host="localhost").cursor()
|
||||||
|
instance_2_cursor = connect(port=7688, host="localhost").cursor()
|
||||||
|
|
||||||
|
mg_sleep_and_assert(vertex_count, get_vertex_count_func(instance_1_cursor))
|
||||||
|
mg_sleep_and_assert(vertex_count, get_vertex_count_func(instance_2_cursor))
|
||||||
|
|
||||||
|
time_slept = 0
|
||||||
|
failover_time = 5
|
||||||
|
while time_slept < failover_time:
|
||||||
|
with pytest.raises(Exception) as e:
|
||||||
|
execute_and_fetch_all(instance_1_cursor, "CREATE ();")
|
||||||
|
vertex_count += 1
|
||||||
|
|
||||||
|
assert vertex_count == execute_and_fetch_all(instance_1_cursor, "MATCH (n) RETURN count(n);")[0][0]
|
||||||
|
assert vertex_count == execute_and_fetch_all(instance_2_cursor, "MATCH (n) RETURN count(n);")[0][0]
|
||||||
|
time.sleep(0.1)
|
||||||
|
time_slept += 0.1
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(pytest.main([__file__, "-rA"]))
|
sys.exit(pytest.main([__file__, "-rA"]))
|
||||||
|
@ -101,8 +101,8 @@ TEST_F(RaftLogSerialization, RaftLogActionDemote) {
|
|||||||
ASSERT_EQ(action, action2);
|
ASSERT_EQ(action, action2);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RaftLogSerialization, RaftLogActionUpdateUUID) {
|
TEST_F(RaftLogSerialization, RaftLogActionUpdateUUIDForInstance) {
|
||||||
auto action = RaftLogAction::UPDATE_UUID;
|
auto action = RaftLogAction::UPDATE_UUID_FOR_INSTANCE;
|
||||||
|
|
||||||
nlohmann::json j = action;
|
nlohmann::json j = action;
|
||||||
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
|
RaftLogAction action2 = j.get<memgraph::coordination::RaftLogAction>();
|
||||||
@ -135,10 +135,14 @@ TEST_F(RaftLogSerialization, UnregisterInstance) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RaftLogSerialization, SetInstanceAsMain) {
|
TEST_F(RaftLogSerialization, SetInstanceAsMain) {
|
||||||
auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsMain("instance3");
|
auto instance_uuid_update =
|
||||||
|
memgraph::coordination::InstanceUUIDUpdate{.instance_name = "instance3", .uuid = memgraph::utils::UUID{}};
|
||||||
|
auto buffer = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_uuid_update);
|
||||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||||
ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_MAIN);
|
ASSERT_EQ(action, RaftLogAction::SET_INSTANCE_AS_MAIN);
|
||||||
ASSERT_EQ("instance3", std::get<std::string>(payload));
|
ASSERT_EQ(instance_uuid_update.instance_name,
|
||||||
|
std::get<memgraph::coordination::InstanceUUIDUpdate>(payload).instance_name);
|
||||||
|
ASSERT_EQ(instance_uuid_update.uuid, std::get<memgraph::coordination::InstanceUUIDUpdate>(payload).uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RaftLogSerialization, SetInstanceAsReplica) {
|
TEST_F(RaftLogSerialization, SetInstanceAsReplica) {
|
||||||
@ -148,10 +152,10 @@ TEST_F(RaftLogSerialization, SetInstanceAsReplica) {
|
|||||||
ASSERT_EQ("instance3", std::get<std::string>(payload));
|
ASSERT_EQ("instance3", std::get<std::string>(payload));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RaftLogSerialization, UpdateUUID) {
|
TEST_F(RaftLogSerialization, UpdateUUIDForNewMain) {
|
||||||
UUID uuid;
|
UUID uuid;
|
||||||
auto buffer = CoordinatorStateMachine::SerializeUpdateUUID(uuid);
|
auto buffer = CoordinatorStateMachine::SerializeUpdateUUIDForNewMain(uuid);
|
||||||
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
auto [payload, action] = CoordinatorStateMachine::DecodeLog(*buffer);
|
||||||
ASSERT_EQ(action, RaftLogAction::UPDATE_UUID);
|
ASSERT_EQ(action, RaftLogAction::UPDATE_UUID_OF_NEW_MAIN);
|
||||||
ASSERT_EQ(uuid, std::get<UUID>(payload));
|
ASSERT_EQ(uuid, std::get<UUID>(payload));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user