diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 936d7a5c2..ef9376a70 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -16,11 +16,14 @@ target_sources(mg-coordination include/coordination/raft_state.hpp include/coordination/rpc_errors.hpp + include/nuraft/raft_log_action.hpp + include/nuraft/coordinator_cluster_state.hpp include/nuraft/coordinator_log_store.hpp include/nuraft/coordinator_state_machine.hpp include/nuraft/coordinator_state_manager.hpp PRIVATE + coordinator_config.cpp coordinator_client.cpp coordinator_state.cpp coordinator_rpc.cpp @@ -33,6 +36,7 @@ target_sources(mg-coordination coordinator_log_store.cpp coordinator_state_machine.cpp coordinator_state_manager.cpp + coordinator_cluster_state.cpp ) target_include_directories(mg-coordination PUBLIC include) diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index f4d2da838..bc7f42eaa 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -41,7 +41,9 @@ CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, Coordi fail_cb_{std::move(fail_cb)} {} auto CoordinatorClient::InstanceName() const -> std::string { return config_.instance_name; } -auto CoordinatorClient::SocketAddress() const -> std::string { return rpc_client_.Endpoint().SocketAddress(); } + +auto CoordinatorClient::CoordinatorSocketAddress() const -> std::string { return config_.CoordinatorSocketAddress(); } +auto CoordinatorClient::ReplicationSocketAddress() const -> std::string { return config_.ReplicationSocketAddress(); } auto CoordinatorClient::InstanceDownTimeoutSec() const -> std::chrono::seconds { return config_.instance_down_timeout_sec; @@ -64,7 +66,7 @@ void CoordinatorClient::StartFrequentCheck() { [this, instance_name = config_.instance_name] { try { spdlog::trace("Sending frequent heartbeat to machine {} on {}", instance_name, - rpc_client_.Endpoint().SocketAddress()); + config_.CoordinatorSocketAddress()); { // NOTE: This is intentionally scoped so that stream lock could get released. auto stream{rpc_client_.Stream()}; stream.AwaitResponse(); @@ -117,7 +119,7 @@ auto CoordinatorClient::DemoteToReplica() const -> bool { return false; } -auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool { +auto CoordinatorClient::SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool { try { auto stream{rpc_client_.Stream(uuid)}; if (!stream.AwaitResponse().success) { @@ -131,9 +133,10 @@ auto CoordinatorClient::SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bo return false; } -auto CoordinatorClient::SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool { +auto CoordinatorClient::SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool { try { - auto stream{rpc_client_.Stream(instance_name)}; + auto stream{rpc_client_.Stream( + std::string(instance_name))}; // TODO: (andi) Try to change to stream string_view and do just one copy later if (!stream.AwaitResponse().success) { spdlog::error("Failed to receive successful RPC response for unregistering replica!"); return false; @@ -175,9 +178,7 @@ auto CoordinatorClient::SendGetInstanceTimestampsRpc() const -> utils::BasicResult { try { auto stream{rpc_client_.Stream()}; - auto res = stream.AwaitResponse(); - - return res.database_histories; + return stream.AwaitResponse().database_histories; } catch (const rpc::RpcFailedException &) { spdlog::error("RPC error occured while sending GetInstance UUID RPC"); diff --git a/src/coordination/coordinator_cluster_state.cpp b/src/coordination/coordinator_cluster_state.cpp new file mode 100644 index 000000000..60f0ca622 --- /dev/null +++ b/src/coordination/coordinator_cluster_state.cpp @@ -0,0 +1,166 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "nuraft/coordinator_cluster_state.hpp" +#include "utils/logging.hpp" + +#include + +namespace memgraph::coordination { + +using replication_coordination_glue::ReplicationRole; + +CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState const &other) + : instance_roles_{other.instance_roles_} {} + +CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState const &other) { + if (this == &other) { + return *this; + } + instance_roles_ = other.instance_roles_; + return *this; +} + +CoordinatorClusterState::CoordinatorClusterState(CoordinatorClusterState &&other) noexcept + : instance_roles_{std::move(other.instance_roles_)} {} + +CoordinatorClusterState &CoordinatorClusterState::operator=(CoordinatorClusterState &&other) noexcept { + if (this == &other) { + return *this; + } + instance_roles_ = std::move(other.instance_roles_); + return *this; +} + +auto CoordinatorClusterState::MainExists() const -> bool { + auto lock = std::shared_lock{log_lock_}; + return std::ranges::any_of(instance_roles_, + [](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; }); +} + +auto CoordinatorClusterState::IsMain(std::string_view instance_name) const -> bool { + auto lock = std::shared_lock{log_lock_}; + auto const it = instance_roles_.find(instance_name); + return it != instance_roles_.end() && it->second.role == ReplicationRole::MAIN; +} + +auto CoordinatorClusterState::IsReplica(std::string_view instance_name) const -> bool { + auto lock = std::shared_lock{log_lock_}; + auto const it = instance_roles_.find(instance_name); + return it != instance_roles_.end() && it->second.role == ReplicationRole::REPLICA; +} + +auto CoordinatorClusterState::InsertInstance(std::string_view instance_name, ReplicationRole role) -> void { + auto lock = std::unique_lock{log_lock_}; + instance_roles_[instance_name.data()].role = role; +} + +auto CoordinatorClusterState::DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void { + auto lock = std::unique_lock{log_lock_}; + switch (log_action) { + case RaftLogAction::REGISTER_REPLICATION_INSTANCE: { + auto const &config = std::get(log_entry); + instance_roles_[config.instance_name] = InstanceState{config, ReplicationRole::REPLICA}; + break; + } + case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: { + auto const instance_name = std::get(log_entry); + instance_roles_.erase(instance_name); + break; + } + case RaftLogAction::SET_INSTANCE_AS_MAIN: { + auto const instance_name = std::get(log_entry); + auto it = instance_roles_.find(instance_name); + MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!"); + it->second.role = ReplicationRole::MAIN; + break; + } + case RaftLogAction::SET_INSTANCE_AS_REPLICA: { + auto const instance_name = std::get(log_entry); + auto it = instance_roles_.find(instance_name); + MG_ASSERT(it != instance_roles_.end(), "Instance does not exist as part of raft state!"); + it->second.role = ReplicationRole::REPLICA; + break; + } + case RaftLogAction::UPDATE_UUID: { + uuid_ = std::get(log_entry); + break; + } + } +} + +// TODO: (andi) Improve based on Gareth's comments +auto CoordinatorClusterState::Serialize(ptr &data) -> void { + auto lock = std::shared_lock{log_lock_}; + auto const role_to_string = [](auto const &role) -> std::string_view { + switch (role) { + case ReplicationRole::MAIN: + return "main"; + case ReplicationRole::REPLICA: + return "replica"; + } + }; + + auto const entry_to_string = [&role_to_string](auto const &entry) { + return fmt::format("{}_{}", entry.first, role_to_string(entry.second.role)); + }; + + auto instances_str_view = instance_roles_ | ranges::views::transform(entry_to_string); + uint32_t size = + std::accumulate(instances_str_view.begin(), instances_str_view.end(), 0, + [](uint32_t acc, auto const &entry) { return acc + sizeof(uint32_t) + entry.size(); }); + + data = buffer::alloc(size); + buffer_serializer bs(data); + std::for_each(instances_str_view.begin(), instances_str_view.end(), [&bs](auto const &entry) { bs.put_str(entry); }); +} + +auto CoordinatorClusterState::Deserialize(buffer &data) -> CoordinatorClusterState { + auto const str_to_role = [](auto const &str) -> ReplicationRole { + if (str == "main") { + return ReplicationRole::MAIN; + } + return ReplicationRole::REPLICA; + }; + + CoordinatorClusterState cluster_state; + buffer_serializer bs(data); + while (bs.size() > 0) { + auto const entry = bs.get_str(); + auto const first_dash = entry.find('_'); + auto const instance_name = entry.substr(0, first_dash); + auto const role_str = entry.substr(first_dash + 1); + cluster_state.InsertInstance(instance_name, str_to_role(role_str)); + } + return cluster_state; +} + +auto CoordinatorClusterState::GetInstances() const -> std::vector { + auto lock = std::shared_lock{log_lock_}; + return instance_roles_ | ranges::views::values | ranges::to>; +} + +auto CoordinatorClusterState::GetUUID() const -> utils::UUID { return uuid_; } + +auto CoordinatorClusterState::FindCurrentMainInstanceName() const -> std::optional { + auto lock = std::shared_lock{log_lock_}; + auto const it = std::ranges::find_if(instance_roles_, + [](auto const &entry) { return entry.second.role == ReplicationRole::MAIN; }); + if (it == instance_roles_.end()) { + return {}; + } + return it->first; +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/coordinator_config.cpp b/src/coordination/coordinator_config.cpp new file mode 100644 index 000000000..a1147d3b6 --- /dev/null +++ b/src/coordination/coordinator_config.cpp @@ -0,0 +1,54 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_config.hpp" + +namespace memgraph::coordination { + +void to_json(nlohmann::json &j, ReplClientInfo const &config) { + j = nlohmann::json{{"instance_name", config.instance_name}, + {"replication_mode", config.replication_mode}, + {"replication_ip_address", config.replication_ip_address}, + {"replication_port", config.replication_port}}; +} + +void from_json(nlohmann::json const &j, ReplClientInfo &config) { + config.instance_name = j.at("instance_name").get(); + config.replication_mode = j.at("replication_mode").get(); + config.replication_ip_address = j.at("replication_ip_address").get(); + config.replication_port = j.at("replication_port").get(); +} + +void to_json(nlohmann::json &j, CoordinatorClientConfig const &config) { + j = nlohmann::json{{"instance_name", config.instance_name}, + {"ip_address", config.ip_address}, + {"port", config.port}, + {"instance_health_check_frequency_sec", config.instance_health_check_frequency_sec.count()}, + {"instance_down_timeout_sec", config.instance_down_timeout_sec.count()}, + {"instance_get_uuid_frequency_sec", config.instance_get_uuid_frequency_sec.count()}, + {"replication_client_info", config.replication_client_info}}; +} + +void from_json(nlohmann::json const &j, CoordinatorClientConfig &config) { + config.instance_name = j.at("instance_name").get(); + config.ip_address = j.at("ip_address").get(); + config.port = j.at("port").get(); + config.instance_health_check_frequency_sec = + std::chrono::seconds{j.at("instance_health_check_frequency_sec").get()}; + config.instance_down_timeout_sec = std::chrono::seconds{j.at("instance_down_timeout_sec").get()}; + config.instance_get_uuid_frequency_sec = std::chrono::seconds{j.at("instance_get_uuid_frequency_sec").get()}; + config.replication_client_info = j.at("replication_client_info").get(); +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index ba94d9d5f..9a00ca87c 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -32,8 +32,42 @@ using nuraft::srv_config; CoordinatorInstance::CoordinatorInstance() : raft_state_(RaftState::MakeRaftState( - [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, - [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { + [this]() { + spdlog::info("Leader changed, starting all replication instances!"); + auto const instances = raft_state_.GetInstances(); + auto replicas = instances | ranges::views::filter([](auto const &instance) { + return instance.role == ReplicationRole::REPLICA; + }); + + std::ranges::for_each(replicas, [this](auto &replica) { + spdlog::info("Starting replication instance {}", replica.config.instance_name); + repl_instances_.emplace_back(this, replica.config, client_succ_cb_, client_fail_cb_, + &CoordinatorInstance::ReplicaSuccessCallback, + &CoordinatorInstance::ReplicaFailCallback); + }); + + auto main = instances | ranges::views::filter( + [](auto const &instance) { return instance.role == ReplicationRole::MAIN; }); + + // TODO: (andi) Add support for this + // MG_ASSERT(std::ranges::distance(main) == 1, "There should be exactly one main instance"); + + std::ranges::for_each(main, [this](auto &main_instance) { + spdlog::info("Starting main instance {}", main_instance.config.instance_name); + repl_instances_.emplace_back(this, main_instance.config, client_succ_cb_, client_fail_cb_, + &CoordinatorInstance::MainSuccessCallback, + &CoordinatorInstance::MainFailCallback); + }); + + std::ranges::for_each(repl_instances_, [this](auto &instance) { + instance.SetNewMainUUID(raft_state_.GetUUID()); // TODO: (andi) Rename + instance.StartFrequentCheck(); + }); + }, + [this]() { + spdlog::info("Leader changed, stopping all replication instances!"); + repl_instances_.clear(); + })) { client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { auto lock = std::unique_lock{self->coord_instance_lock_}; auto &repl_instance = self->FindReplicationInstance(repl_instance_name); @@ -59,75 +93,97 @@ auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_i } auto CoordinatorInstance::ShowInstances() const -> std::vector { - auto const coord_instances = raft_state_.GetAllCoordinators(); - - auto const stringify_repl_role = [](ReplicationInstance const &instance) -> std::string { - if (!instance.IsAlive()) return "unknown"; - if (instance.IsMain()) return "main"; - return "replica"; - }; - - auto const repl_instance_to_status = [&stringify_repl_role](ReplicationInstance const &instance) -> InstanceStatus { - return {.instance_name = instance.InstanceName(), - .coord_socket_address = instance.SocketAddress(), - .cluster_role = stringify_repl_role(instance), - .is_alive = instance.IsAlive()}; - }; - auto const coord_instance_to_status = [](ptr const &instance) -> InstanceStatus { return {.instance_name = "coordinator_" + std::to_string(instance->get_id()), .raft_socket_address = instance->get_endpoint(), .cluster_role = "coordinator", - .is_alive = true}; // TODO: (andi) Get this info from RAFT and test it or when we will move - // CoordinatorState to every instance, we can be smarter about this using our RPC. + .health = "unknown"}; // TODO: (andi) Get this info from RAFT and test it or when we will move + // CoordinatorState to every instance, we can be smarter about this using our RPC. }; + auto instances_status = utils::fmap(raft_state_.GetAllCoordinators(), coord_instance_to_status); - auto instances_status = utils::fmap(coord_instance_to_status, coord_instances); - { - auto lock = std::shared_lock{coord_instance_lock_}; - std::ranges::transform(repl_instances_, std::back_inserter(instances_status), repl_instance_to_status); + if (raft_state_.IsLeader()) { + auto const stringify_repl_role = [this](ReplicationInstance const &instance) -> std::string { + if (!instance.IsAlive()) return "unknown"; + if (raft_state_.IsMain(instance.InstanceName())) return "main"; + return "replica"; + }; + + auto const stringify_repl_health = [](ReplicationInstance const &instance) -> std::string { + return instance.IsAlive() ? "up" : "down"; + }; + + auto process_repl_instance_as_leader = + [&stringify_repl_role, &stringify_repl_health](ReplicationInstance const &instance) -> InstanceStatus { + return {.instance_name = instance.InstanceName(), + .coord_socket_address = instance.CoordinatorSocketAddress(), + .cluster_role = stringify_repl_role(instance), + .health = stringify_repl_health(instance)}; + }; + + { + auto lock = std::shared_lock{coord_instance_lock_}; + std::ranges::transform(repl_instances_, std::back_inserter(instances_status), process_repl_instance_as_leader); + } + } else { + auto const stringify_repl_role = [](ReplicationRole role) -> std::string { + return role == ReplicationRole::MAIN ? "main" : "replica"; + }; + + // TODO: (andi) Add capability that followers can also return socket addresses + auto process_repl_instance_as_follower = [&stringify_repl_role](auto const &instance) -> InstanceStatus { + return {.instance_name = instance.config.instance_name, + .cluster_role = stringify_repl_role(instance.role), + .health = "unknown"}; + }; + + std::ranges::transform(raft_state_.GetInstances(), std::back_inserter(instances_status), + process_repl_instance_as_follower); } return instances_status; } auto CoordinatorInstance::TryFailover() -> void { - auto alive_replicas = repl_instances_ | ranges::views::filter(&ReplicationInstance::IsReplica) | - ranges::views::filter(&ReplicationInstance::IsAlive); + auto const is_replica = [this](ReplicationInstance const &instance) { return IsReplica(instance.InstanceName()); }; + + auto alive_replicas = + repl_instances_ | ranges::views::filter(is_replica) | ranges::views::filter(&ReplicationInstance::IsAlive); if (ranges::empty(alive_replicas)) { spdlog::warn("Failover failed since all replicas are down!"); return; } - // for each DB in instance we get one DatabaseHistory - using DatabaseHistories = replication_coordination_glue::DatabaseHistories; - std::vector> instance_database_histories; + if (!raft_state_.RequestLeadership()) { + spdlog::error("Failover failed since the instance is not the leader!"); + return; + } - bool success{true}; - std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) { - if (!success) { - return; - } - auto res = replica.GetClient().SendGetInstanceTimestampsRpc(); - if (res.HasError()) { - spdlog::error("Could get per db history data for instance {}", replica.InstanceName()); - success = false; - return; - } - instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue())); - }); + auto const get_ts = [](ReplicationInstance &replica) { return replica.GetClient().SendGetInstanceTimestampsRpc(); }; - if (!success) { + auto maybe_instance_db_histories = alive_replicas | ranges::views::transform(get_ts) | ranges::to(); + + auto const ts_has_error = [](auto const &res) -> bool { return res.HasError(); }; + + if (std::ranges::any_of(maybe_instance_db_histories, ts_has_error)) { spdlog::error("Aborting failover as at least one instance didn't provide per database history."); return; } + auto transform_to_pairs = ranges::views::transform([](auto const &zipped) { + auto &[replica, res] = zipped; + return std::make_pair(replica.InstanceName(), res.GetValue()); + }); + + auto instance_db_histories = + ranges::views::zip(alive_replicas, maybe_instance_db_histories) | transform_to_pairs | ranges::to(); + auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] = - ChooseMostUpToDateInstance(instance_database_histories); + ChooseMostUpToDateInstance(instance_db_histories); spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp", - most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp); + most_up_to_date_instance, latest_epoch, latest_commit_timestamp); // NOLINT auto *new_main = &FindReplicationInstance(most_up_to_date_instance); @@ -139,16 +195,17 @@ auto CoordinatorInstance::TryFailover() -> void { }; auto const new_main_uuid = utils::UUID{}; + + auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) { + return !instance.SendSwapAndUpdateUUID(new_main_uuid); + }; + // If for some replicas swap fails, for others on successful ping we will revert back on next change // or we will do failover first again and then it will be consistent again - for (auto &other_replica_instance : alive_replicas | ranges::views::filter(is_not_new_main)) { - if (!other_replica_instance.SendSwapAndUpdateUUID(new_main_uuid)) { - spdlog::error(fmt::format("Failed to swap uuid for instance {} which is alive, aborting failover", - other_replica_instance.InstanceName())); - return; - } + if (std::ranges::any_of(alive_replicas | ranges::views::filter(is_not_new_main), failed_to_swap)) { + spdlog::error("Failed to swap uuid for all instances"); + return; } - auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | ranges::to(); @@ -158,23 +215,36 @@ auto CoordinatorInstance::TryFailover() -> void { spdlog::warn("Failover failed since promoting replica to main failed!"); return; } - // TODO: (andi) This should be replicated across all coordinator instances with Raft log - SetMainUUID(new_main_uuid); + + if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { + return; + } + + auto const new_main_instance_name = new_main->InstanceName(); + + if (!raft_state_.AppendSetInstanceAsMainLog(new_main_instance_name)) { + return; + } + spdlog::info("Failover successful! Instance {} promoted to main.", new_main->InstanceName()); } -// TODO: (andi) Make sure you cannot put coordinator instance to the main -auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name) +auto CoordinatorInstance::SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; - if (std::ranges::any_of(repl_instances_, &ReplicationInstance::IsMain)) { + if (raft_state_.MainExists()) { return SetInstanceToMainCoordinatorStatus::MAIN_ALREADY_EXISTS; } + if (!raft_state_.RequestLeadership()) { + return SetInstanceToMainCoordinatorStatus::NOT_LEADER; + } + auto const is_new_main = [&instance_name](ReplicationInstance const &instance) { return instance.InstanceName() == instance_name; }; + auto new_main = std::ranges::find_if(repl_instances_, is_new_main); if (new_main == repl_instances_.end()) { @@ -192,99 +262,149 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name auto const new_main_uuid = utils::UUID{}; - for (auto &other_instance : repl_instances_ | ranges::views::filter(is_not_new_main)) { - if (!other_instance.SendSwapAndUpdateUUID(new_main_uuid)) { - spdlog::error( - fmt::format("Failed to swap uuid for instance {}, aborting failover", other_instance.InstanceName())); - return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; - } + auto const failed_to_swap = [&new_main_uuid](ReplicationInstance &instance) { + return !instance.SendSwapAndUpdateUUID(new_main_uuid); + }; + + if (std::ranges::any_of(repl_instances_ | ranges::views::filter(is_not_new_main), failed_to_swap)) { + spdlog::error("Failed to swap uuid for all instances"); + return SetInstanceToMainCoordinatorStatus::SWAP_UUID_FAILED; } - ReplicationClientsInfo repl_clients_info; - repl_clients_info.reserve(repl_instances_.size() - 1); - std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), - std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); + auto repl_clients_info = repl_instances_ | ranges::views::filter(is_not_new_main) | + ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | + ranges::to(); if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, &CoordinatorInstance::MainFailCallback)) { return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; } - // TODO: (andi) This should be replicated across all coordinator instances with Raft log - SetMainUUID(new_main_uuid); - spdlog::info("Instance {} promoted to main", instance_name); + if (!raft_state_.AppendUpdateUUIDLog(new_main_uuid)) { + return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR; + } + + if (!raft_state_.AppendSetInstanceAsMainLog(instance_name)) { + return SetInstanceToMainCoordinatorStatus::RAFT_LOG_ERROR; + } + + spdlog::info("Instance {} promoted to main on leader", instance_name); return SetInstanceToMainCoordinatorStatus::SUCCESS; } -auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig config) +auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig const &config) -> RegisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; - auto instance_name = config.instance_name; - - auto const name_matches = [&instance_name](ReplicationInstance const &instance) { - return instance.InstanceName() == instance_name; - }; - - if (std::ranges::any_of(repl_instances_, name_matches)) { + if (std::ranges::any_of(repl_instances_, [instance_name = config.instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + })) { return RegisterInstanceCoordinatorStatus::NAME_EXISTS; } - auto const socket_address_matches = [&config](ReplicationInstance const &instance) { - return instance.SocketAddress() == config.SocketAddress(); - }; + if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { + return instance.CoordinatorSocketAddress() == config.CoordinatorSocketAddress(); + })) { + return RegisterInstanceCoordinatorStatus::COORD_ENDPOINT_EXISTS; + } - if (std::ranges::any_of(repl_instances_, socket_address_matches)) { - return RegisterInstanceCoordinatorStatus::ENDPOINT_EXISTS; + if (std::ranges::any_of(repl_instances_, [&config](ReplicationInstance const &instance) { + return instance.ReplicationSocketAddress() == config.ReplicationSocketAddress(); + })) { + return RegisterInstanceCoordinatorStatus::REPL_ENDPOINT_EXISTS; } if (!raft_state_.RequestLeadership()) { return RegisterInstanceCoordinatorStatus::NOT_LEADER; } - auto const res = raft_state_.AppendRegisterReplicationInstance(instance_name); - if (!res->get_accepted()) { - spdlog::error( - "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not " - "the " - "leader.", - config.instance_name); - return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_ACCEPT; - } + auto *new_instance = &repl_instances_.emplace_back(this, config, client_succ_cb_, client_fail_cb_, + &CoordinatorInstance::ReplicaSuccessCallback, + &CoordinatorInstance::ReplicaFailCallback); - spdlog::info("Request for registering instance {} accepted", instance_name); - try { - repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_, - &CoordinatorInstance::ReplicaSuccessCallback, - &CoordinatorInstance::ReplicaFailCallback); - } catch (CoordinatorRegisterInstanceException const &) { + if (!new_instance->SendDemoteToReplicaRpc()) { + spdlog::error("Failed to send demote to replica rpc for instance {}", config.instance_name); + repl_instances_.pop_back(); return RegisterInstanceCoordinatorStatus::RPC_FAILED; } - if (res->get_result_code() != nuraft::cmd_result_code::OK) { - spdlog::error("Failed to register instance {} with error code {}", instance_name, res->get_result_code()); - return RegisterInstanceCoordinatorStatus::RAFT_COULD_NOT_APPEND; + if (!raft_state_.AppendRegisterReplicationInstanceLog(config)) { + return RegisterInstanceCoordinatorStatus::RAFT_LOG_ERROR; } - spdlog::info("Instance {} registered", instance_name); + new_instance->StartFrequentCheck(); + + spdlog::info("Instance {} registered", config.instance_name); return RegisterInstanceCoordinatorStatus::SUCCESS; } +auto CoordinatorInstance::UnregisterReplicationInstance(std::string_view instance_name) + -> UnregisterInstanceCoordinatorStatus { + auto lock = std::lock_guard{coord_instance_lock_}; + + if (!raft_state_.RequestLeadership()) { + return UnregisterInstanceCoordinatorStatus::NOT_LEADER; + } + + auto const name_matches = [&instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == instance_name; + }; + + auto inst_to_remove = std::ranges::find_if(repl_instances_, name_matches); + if (inst_to_remove == repl_instances_.end()) { + return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME; + } + + // TODO: (andi) Change so that RaftLogState is the central place for asking who is main... + + auto const is_main = [this](ReplicationInstance const &instance) { return IsMain(instance.InstanceName()); }; + + if (is_main(*inst_to_remove) && inst_to_remove->IsAlive()) { + return UnregisterInstanceCoordinatorStatus::IS_MAIN; + } + + inst_to_remove->StopFrequentCheck(); + + auto curr_main = std::ranges::find_if(repl_instances_, is_main); + + if (curr_main != repl_instances_.end() && curr_main->IsAlive()) { + if (!curr_main->SendUnregisterReplicaRpc(instance_name)) { + inst_to_remove->StartFrequentCheck(); + return UnregisterInstanceCoordinatorStatus::RPC_FAILED; + } + } + + std::erase_if(repl_instances_, name_matches); + + if (!raft_state_.AppendUnregisterReplicationInstanceLog(instance_name)) { + return UnregisterInstanceCoordinatorStatus::RAFT_LOG_ERROR; + } + + return UnregisterInstanceCoordinatorStatus::SUCCESS; +} + +auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, + std::string_view raft_address) -> void { + raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, raft_address); +} + void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) { + spdlog::trace("Instance {} performing main fail callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); repl_instance.OnFailPing(); const auto &repl_instance_uuid = repl_instance.GetMainUUID(); - MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set"); + MG_ASSERT(repl_instance_uuid.has_value(), "Replication instance must have uuid set"); - if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) { + // NOLINTNEXTLINE + if (!repl_instance.IsAlive() && raft_state_.GetUUID() == repl_instance_uuid.value()) { spdlog::info("Cluster without main instance, trying automatic failover"); TryFailover(); } } void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) { - auto &repl_instance = FindReplicationInstance(repl_instance_name); spdlog::trace("Instance {} performing main successful callback", repl_instance_name); + auto &repl_instance = FindReplicationInstance(repl_instance_name); if (repl_instance.IsAlive()) { repl_instance.OnSuccessPing(); @@ -294,8 +414,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam const auto &repl_instance_uuid = repl_instance.GetMainUUID(); MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set."); - auto const curr_main_uuid = GetMainUUID(); - if (curr_main_uuid == repl_instance_uuid.value()) { + // NOLINTNEXTLINE + if (raft_state_.GetUUID() == repl_instance_uuid.value()) { if (!repl_instance.EnableWritingOnMain()) { spdlog::error("Failed to enable writing on main instance {}", repl_instance_name); return; @@ -305,6 +425,12 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam return; } + if (!raft_state_.RequestLeadership()) { + spdlog::error("Demoting main instance {} to replica failed since the instance is not the leader!", + repl_instance_name); + return; + } + if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback, &CoordinatorInstance::ReplicaFailCallback)) { repl_instance.OnSuccessPing(); @@ -314,24 +440,29 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam return; } - if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) { - spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName())); + if (!repl_instance.SendSwapAndUpdateUUID(raft_state_.GetUUID())) { + spdlog::error("Failed to swap uuid for demoted main instance {}", repl_instance_name); + return; + } + + if (!raft_state_.AppendSetInstanceAsReplicaLog(repl_instance_name)) { return; } } void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) { + spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); - if (!repl_instance.IsReplica()) { + + if (!IsReplica(repl_instance_name)) { spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name); return; } - spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); // We need to get replicas UUID from time to time to ensure replica is listening to correct main // and that it didn't go down for less time than we could notice // We need to get id of main replica is listening to // and swap if necessary - if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) { + if (!repl_instance.EnsureReplicaHasCorrectMainUUID(raft_state_.GetUUID())) { spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()); return; } @@ -340,58 +471,20 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_ } void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) { + spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); auto &repl_instance = FindReplicationInstance(repl_instance_name); - if (!repl_instance.IsReplica()) { + + if (!IsReplica(repl_instance_name)) { spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name); return; } - spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); + repl_instance.OnFailPing(); } -auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name) - -> UnregisterInstanceCoordinatorStatus { - auto lock = std::lock_guard{coord_instance_lock_}; - - auto const name_matches = [&instance_name](ReplicationInstance const &instance) { - return instance.InstanceName() == instance_name; - }; - - auto inst_to_remove = std::ranges::find_if(repl_instances_, name_matches); - if (inst_to_remove == repl_instances_.end()) { - return UnregisterInstanceCoordinatorStatus::NO_INSTANCE_WITH_NAME; - } - - if (inst_to_remove->IsMain() && inst_to_remove->IsAlive()) { - return UnregisterInstanceCoordinatorStatus::IS_MAIN; - } - - inst_to_remove->StopFrequentCheck(); - auto curr_main = std::ranges::find_if(repl_instances_, &ReplicationInstance::IsMain); - MG_ASSERT(curr_main != repl_instances_.end(), "There must be a main instance when unregistering a replica"); - if (!curr_main->SendUnregisterReplicaRpc(instance_name)) { - inst_to_remove->StartFrequentCheck(); - return UnregisterInstanceCoordinatorStatus::RPC_FAILED; - } - std::erase_if(repl_instances_, name_matches); - - return UnregisterInstanceCoordinatorStatus::SUCCESS; -} - -auto CoordinatorInstance::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) - -> void { - raft_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); -} - -auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_; } - -// TODO: (andi) Add to the RAFT log. -auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; } - -auto CoordinatorInstance::ChooseMostUpToDateInstance( - const std::vector> - &instance_database_histories) -> NewMainRes { - NewMainRes new_main_res; +auto CoordinatorInstance::ChooseMostUpToDateInstance(std::span instance_database_histories) + -> NewMainRes { + std::optional new_main_res; std::for_each( instance_database_histories.begin(), instance_database_histories.end(), [&new_main_res](const InstanceNameDbHistories &instance_res_pair) { @@ -407,7 +500,7 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance( std::ranges::for_each( instance_db_histories, [&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) { - spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name, + spdlog::debug("Instance {}: name {}, default db {}", instance_name, db_history.name, memgraph::dbms::kDefaultDB); }); @@ -417,35 +510,26 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance( std::ranges::for_each(instance_default_db_history | ranges::views::reverse, [&instance_name = instance_name](const auto &epoch_history_it) { - spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, + spdlog::debug("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, std::get<0>(epoch_history_it), std::get<1>(epoch_history_it)); }); // get latest epoch // get latest timestamp - if (!new_main_res.latest_epoch) { + if (!new_main_res) { const auto &[epoch, timestamp] = *instance_default_db_history.crbegin(); - new_main_res = NewMainRes{ - .most_up_to_date_instance = instance_name, - .latest_epoch = epoch, - .latest_commit_timestamp = timestamp, - }; - spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", + new_main_res = std::make_optional({instance_name, epoch, timestamp}); + spdlog::debug("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", instance_name, epoch, timestamp); return; } bool found_same_point{false}; - std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch}; + std::string last_most_up_to_date_epoch{new_main_res->latest_epoch}; for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) { - if (*new_main_res.latest_commit_timestamp < timestamp) { - new_main_res = NewMainRes{ - .most_up_to_date_instance = instance_name, - .latest_epoch = epoch, - .latest_commit_timestamp = timestamp, - }; - + if (new_main_res->latest_commit_timestamp < timestamp) { + new_main_res = std::make_optional({instance_name, epoch, timestamp}); spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp", instance_name, epoch, timestamp); } @@ -459,11 +543,20 @@ auto CoordinatorInstance::ChooseMostUpToDateInstance( if (!found_same_point) { spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch, - new_main_res.most_up_to_date_instance, instance_name); + new_main_res->most_up_to_date_instance, instance_name); } }); - return new_main_res; + return std::move(*new_main_res); } + +auto CoordinatorInstance::IsMain(std::string_view instance_name) const -> bool { + return raft_state_.IsMain(instance_name); +} + +auto CoordinatorInstance::IsReplica(std::string_view instance_name) const -> bool { + return raft_state_.IsReplica(instance_name); +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 28d6c604e..f429cd5a7 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -41,7 +41,7 @@ CoordinatorState::CoordinatorState() { } } -auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig config) +auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig const &config) -> RegisterInstanceCoordinatorStatus { MG_ASSERT(std::holds_alternative(data_), "Coordinator cannot register replica since variant holds wrong alternative"); @@ -56,7 +56,8 @@ auto CoordinatorState::RegisterReplicationInstance(CoordinatorClientConfig confi data_); } -auto CoordinatorState::UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus { +auto CoordinatorState::UnregisterReplicationInstance(std::string_view instance_name) + -> UnregisterInstanceCoordinatorStatus { MG_ASSERT(std::holds_alternative(data_), "Coordinator cannot unregister instance since variant holds wrong alternative"); @@ -70,7 +71,8 @@ auto CoordinatorState::UnregisterReplicationInstance(std::string instance_name) data_); } -auto CoordinatorState::SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus { +auto CoordinatorState::SetReplicationInstanceToMain(std::string_view instance_name) + -> SetInstanceToMainCoordinatorStatus { MG_ASSERT(std::holds_alternative(data_), "Coordinator cannot register replica since variant holds wrong alternative"); @@ -96,8 +98,8 @@ auto CoordinatorState::GetCoordinatorServer() const -> CoordinatorServer & { return *std::get(data_).coordinator_server_; } -auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) - -> void { +auto CoordinatorState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, + std::string_view raft_address) -> void { MG_ASSERT(std::holds_alternative(data_), "Coordinator cannot register replica since variant holds wrong alternative"); return std::get(data_).AddCoordinatorInstance(raft_server_id, raft_port, raft_address); diff --git a/src/coordination/coordinator_state_machine.cpp b/src/coordination/coordinator_state_machine.cpp index b939bd304..564303f22 100644 --- a/src/coordination/coordinator_state_machine.cpp +++ b/src/coordination/coordinator_state_machine.cpp @@ -12,38 +12,85 @@ #ifdef MG_ENTERPRISE #include "nuraft/coordinator_state_machine.hpp" +#include "utils/logging.hpp" namespace memgraph::coordination { -auto CoordinatorStateMachine::EncodeRegisterReplicationInstance(const std::string &name) -> ptr { - std::string str_log = name + "_replica"; - ptr log = buffer::alloc(sizeof(uint32_t) + str_log.size()); - buffer_serializer bs(log); - bs.put_str(str_log); - return log; +auto CoordinatorStateMachine::FindCurrentMainInstanceName() const -> std::optional { + return cluster_state_.FindCurrentMainInstanceName(); } -auto CoordinatorStateMachine::DecodeRegisterReplicationInstance(buffer &data) -> std::string { +auto CoordinatorStateMachine::MainExists() const -> bool { return cluster_state_.MainExists(); } + +auto CoordinatorStateMachine::IsMain(std::string_view instance_name) const -> bool { + return cluster_state_.IsMain(instance_name); +} + +auto CoordinatorStateMachine::IsReplica(std::string_view instance_name) const -> bool { + return cluster_state_.IsReplica(instance_name); +} + +auto CoordinatorStateMachine::CreateLog(nlohmann::json &&log) -> ptr { + auto const log_dump = log.dump(); + ptr log_buf = buffer::alloc(sizeof(uint32_t) + log_dump.size()); + buffer_serializer bs(log_buf); + bs.put_str(log_dump); + return log_buf; +} + +auto CoordinatorStateMachine::SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr { + return CreateLog({{"action", RaftLogAction::REGISTER_REPLICATION_INSTANCE}, {"info", config}}); +} + +auto CoordinatorStateMachine::SerializeUnregisterInstance(std::string_view instance_name) -> ptr { + return CreateLog({{"action", RaftLogAction::UNREGISTER_REPLICATION_INSTANCE}, {"info", instance_name}}); +} + +auto CoordinatorStateMachine::SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr { + return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_MAIN}, {"info", instance_name}}); +} + +auto CoordinatorStateMachine::SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr { + return CreateLog({{"action", RaftLogAction::SET_INSTANCE_AS_REPLICA}, {"info", instance_name}}); +} + +auto CoordinatorStateMachine::SerializeUpdateUUID(utils::UUID const &uuid) -> ptr { + return CreateLog({{"action", RaftLogAction::UPDATE_UUID}, {"info", uuid}}); +} + +auto CoordinatorStateMachine::DecodeLog(buffer &data) -> std::pair { buffer_serializer bs(data); - return bs.get_str(); + auto const json = nlohmann::json::parse(bs.get_str()); + + auto const action = json["action"].get(); + auto const &info = json["info"]; + + switch (action) { + case RaftLogAction::REGISTER_REPLICATION_INSTANCE: + return {info.get(), action}; + case RaftLogAction::UPDATE_UUID: + return {info.get(), action}; + case RaftLogAction::UNREGISTER_REPLICATION_INSTANCE: + case RaftLogAction::SET_INSTANCE_AS_MAIN: + [[fallthrough]]; + case RaftLogAction::SET_INSTANCE_AS_REPLICA: + return {info.get(), action}; + } + throw std::runtime_error("Unknown action"); } -auto CoordinatorStateMachine::pre_commit(ulong const log_idx, buffer &data) -> ptr { - buffer_serializer bs(data); - std::string str = bs.get_str(); - - spdlog::info("pre_commit {} : {}", log_idx, str); - return nullptr; -} +auto CoordinatorStateMachine::pre_commit(ulong const /*log_idx*/, buffer & /*data*/) -> ptr { return nullptr; } auto CoordinatorStateMachine::commit(ulong const log_idx, buffer &data) -> ptr { - buffer_serializer bs(data); - std::string str = bs.get_str(); - - spdlog::info("commit {} : {}", log_idx, str); - + auto const [parsed_data, log_action] = DecodeLog(data); + cluster_state_.DoAction(parsed_data, log_action); last_committed_idx_ = log_idx; - return nullptr; + + // Return raft log number + ptr ret = buffer::alloc(sizeof(log_idx)); + buffer_serializer bs_ret(ret); + bs_ret.put_u64(log_idx); + return ret; } auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr & /*new_conf*/) -> void { @@ -51,61 +98,95 @@ auto CoordinatorStateMachine::commit_config(ulong const log_idx, ptr void { - buffer_serializer bs(data); - std::string str = bs.get_str(); - - spdlog::info("rollback {} : {}", log_idx, str); + // NOTE: Nothing since we don't do anything in pre_commit } -auto CoordinatorStateMachine::read_logical_snp_obj(snapshot & /*snapshot*/, void *& /*user_snp_ctx*/, ulong /*obj_id*/, +auto CoordinatorStateMachine::read_logical_snp_obj(snapshot &snapshot, void *& /*user_snp_ctx*/, ulong obj_id, ptr &data_out, bool &is_last_obj) -> int { - // Put dummy data. - data_out = buffer::alloc(sizeof(int32)); - buffer_serializer bs(data_out); - bs.put_i32(0); + spdlog::info("read logical snapshot object, obj_id: {}", obj_id); + ptr ctx = nullptr; + { + auto ll = std::lock_guard{snapshots_lock_}; + auto entry = snapshots_.find(snapshot.get_last_log_idx()); + if (entry == snapshots_.end()) { + data_out = nullptr; + is_last_obj = true; + return 0; + } + ctx = entry->second; + } + ctx->cluster_state_.Serialize(data_out); is_last_obj = true; return 0; } -auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &s, ulong &obj_id, buffer & /*data*/, bool /*is_first_obj*/, - bool /*is_last_obj*/) -> void { - spdlog::info("save snapshot {} term {} object ID", s.get_last_log_idx(), s.get_last_log_term(), obj_id); - // Request next object. - obj_id++; +auto CoordinatorStateMachine::save_logical_snp_obj(snapshot &snapshot, ulong &obj_id, buffer &data, bool is_first_obj, + bool is_last_obj) -> void { + spdlog::info("save logical snapshot object, obj_id: {}, is_first_obj: {}, is_last_obj: {}", obj_id, is_first_obj, + is_last_obj); + + buffer_serializer bs(data); + auto cluster_state = CoordinatorClusterState::Deserialize(data); + + { + auto ll = std::lock_guard{snapshots_lock_}; + auto entry = snapshots_.find(snapshot.get_last_log_idx()); + DMG_ASSERT(entry != snapshots_.end()); + entry->second->cluster_state_ = cluster_state; + } } auto CoordinatorStateMachine::apply_snapshot(snapshot &s) -> bool { - spdlog::info("apply snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term()); - { - auto lock = std::lock_guard{last_snapshot_lock_}; - ptr snp_buf = s.serialize(); - last_snapshot_ = snapshot::deserialize(*snp_buf); - } + auto ll = std::lock_guard{snapshots_lock_}; + + auto entry = snapshots_.find(s.get_last_log_idx()); + if (entry == snapshots_.end()) return false; + + cluster_state_ = entry->second->cluster_state_; return true; } auto CoordinatorStateMachine::free_user_snp_ctx(void *&user_snp_ctx) -> void {} auto CoordinatorStateMachine::last_snapshot() -> ptr { - auto lock = std::lock_guard{last_snapshot_lock_}; - return last_snapshot_; + auto ll = std::lock_guard{snapshots_lock_}; + auto entry = snapshots_.rbegin(); + if (entry == snapshots_.rend()) return nullptr; + + ptr ctx = entry->second; + return ctx->snapshot_; } auto CoordinatorStateMachine::last_commit_index() -> ulong { return last_committed_idx_; } auto CoordinatorStateMachine::create_snapshot(snapshot &s, async_result::handler_type &when_done) -> void { - spdlog::info("create snapshot {} term {}", s.get_last_log_idx(), s.get_last_log_term()); - // Clone snapshot from `s`. - { - auto lock = std::lock_guard{last_snapshot_lock_}; - ptr snp_buf = s.serialize(); - last_snapshot_ = snapshot::deserialize(*snp_buf); - } + ptr snp_buf = s.serialize(); + ptr ss = snapshot::deserialize(*snp_buf); + create_snapshot_internal(ss); + ptr except(nullptr); bool ret = true; when_done(ret, except); } +auto CoordinatorStateMachine::create_snapshot_internal(ptr snapshot) -> void { + auto ll = std::lock_guard{snapshots_lock_}; + + auto ctx = cs_new(snapshot, cluster_state_); + snapshots_[snapshot->get_last_log_idx()] = ctx; + + constexpr int MAX_SNAPSHOTS = 3; + while (snapshots_.size() > MAX_SNAPSHOTS) { + snapshots_.erase(snapshots_.begin()); + } +} + +auto CoordinatorStateMachine::GetInstances() const -> std::vector { + return cluster_state_.GetInstances(); +} + +auto CoordinatorStateMachine::GetUUID() const -> utils::UUID { return cluster_state_.GetUUID(); } + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 994c78d18..5d4795f81 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -46,16 +46,17 @@ class CoordinatorClient { void ResumeFrequentCheck(); auto InstanceName() const -> std::string; - auto SocketAddress() const -> std::string; + auto CoordinatorSocketAddress() const -> std::string; + auto ReplicationSocketAddress() const -> std::string; [[nodiscard]] auto DemoteToReplica() const -> bool; - auto SendPromoteReplicaToMainRpc(const utils::UUID &uuid, ReplicationClientsInfo replication_clients_info) const + auto SendPromoteReplicaToMainRpc(utils::UUID const &uuid, ReplicationClientsInfo replication_clients_info) const -> bool; - auto SendSwapMainUUIDRpc(const utils::UUID &uuid) const -> bool; + auto SendSwapMainUUIDRpc(utils::UUID const &uuid) const -> bool; - auto SendUnregisterReplicaRpc(std::string const &instance_name) const -> bool; + auto SendUnregisterReplicaRpc(std::string_view instance_name) const -> bool; auto SendEnableWritingOnMainRpc() const -> bool; diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index df7a5f94f..127a365eb 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -14,12 +14,16 @@ #ifdef MG_ENTERPRISE #include "replication_coordination_glue/mode.hpp" +#include "utils/string.hpp" #include #include #include #include +#include +#include "json/json.hpp" + namespace memgraph::coordination { inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; @@ -32,7 +36,11 @@ struct CoordinatorClientConfig { std::chrono::seconds instance_down_timeout_sec{5}; std::chrono::seconds instance_get_uuid_frequency_sec{10}; - auto SocketAddress() const -> std::string { return ip_address + ":" + std::to_string(port); } + auto CoordinatorSocketAddress() const -> std::string { return fmt::format("{}:{}", ip_address, port); } + auto ReplicationSocketAddress() const -> std::string { + return fmt::format("{}:{}", replication_client_info.replication_ip_address, + replication_client_info.replication_port); + } struct ReplicationClientInfo { std::string instance_name; @@ -75,5 +83,11 @@ struct CoordinatorServerConfig { friend bool operator==(CoordinatorServerConfig const &, CoordinatorServerConfig const &) = default; }; +void to_json(nlohmann::json &j, CoordinatorClientConfig const &config); +void from_json(nlohmann::json const &j, CoordinatorClientConfig &config); + +void to_json(nlohmann::json &j, ReplClientInfo const &config); +void from_json(nlohmann::json const &j, ReplClientInfo &config); + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_exceptions.hpp b/src/coordination/include/coordination/coordinator_exceptions.hpp index 59a2e89d8..7a967f80b 100644 --- a/src/coordination/include/coordination/coordinator_exceptions.hpp +++ b/src/coordination/include/coordination/coordinator_exceptions.hpp @@ -83,5 +83,16 @@ class RaftCouldNotParseFlagsException final : public utils::BasicException { SPECIALIZE_GET_EXCEPTION_NAME(RaftCouldNotParseFlagsException) }; +class InvalidRaftLogActionException final : public utils::BasicException { + public: + explicit InvalidRaftLogActionException(std::string_view what) noexcept : BasicException(what) {} + + template + explicit InvalidRaftLogActionException(fmt::format_string fmt, Args &&...args) noexcept + : InvalidRaftLogActionException(fmt::format(fmt, std::forward(args)...)) {} + + SPECIALIZE_GET_EXCEPTION_NAME(InvalidRaftLogActionException) +}; + } // namespace memgraph::coordination #endif diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index bed202744..10549f468 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -28,8 +28,8 @@ namespace memgraph::coordination { struct NewMainRes { std::string most_up_to_date_instance; - std::optional latest_epoch; - std::optional latest_commit_timestamp; + std::string latest_epoch; + uint64_t latest_commit_timestamp; }; using InstanceNameDbHistories = std::pair; @@ -37,20 +37,25 @@ class CoordinatorInstance { public: CoordinatorInstance(); - [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; - [[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus; + [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig const &config) + -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto UnregisterReplicationInstance(std::string_view instance_name) + -> UnregisterInstanceCoordinatorStatus; - [[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + [[nodiscard]] auto SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus; auto ShowInstances() const -> std::vector; auto TryFailover() -> void; - auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void; - auto GetMainUUID() const -> utils::UUID; + static auto ChooseMostUpToDateInstance(std::span histories) -> NewMainRes; - auto SetMainUUID(utils::UUID new_uuid) -> void; + private: + HealthCheckClientCallback client_succ_cb_, client_fail_cb_; + + auto OnRaftCommitCallback(TRaftLog const &log_entry, RaftLogAction log_action) -> void; auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; @@ -62,17 +67,14 @@ class CoordinatorInstance { void ReplicaFailCallback(std::string_view); - static auto ChooseMostUpToDateInstance(const std::vector &) -> NewMainRes; + auto IsMain(std::string_view instance_name) const -> bool; + auto IsReplica(std::string_view instance_name) const -> bool; - private: - HealthCheckClientCallback client_succ_cb_, client_fail_cb_; - - // NOTE: Must be std::list because we rely on pointer stability + // NOTE: Must be std::list because we rely on pointer stability. + // Leader and followers should both have same view on repl_instances_ std::list repl_instances_; mutable utils::ResourceLock coord_instance_lock_{}; - utils::UUID main_uuid_; - RaftState raft_state_; }; diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index 256af66f9..400c36940 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -33,14 +33,16 @@ class CoordinatorState { CoordinatorState(CoordinatorState &&) noexcept = delete; CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; - [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig config) -> RegisterInstanceCoordinatorStatus; - [[nodiscard]] auto UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus; + [[nodiscard]] auto RegisterReplicationInstance(CoordinatorClientConfig const &config) + -> RegisterInstanceCoordinatorStatus; + [[nodiscard]] auto UnregisterReplicationInstance(std::string_view instance_name) + -> UnregisterInstanceCoordinatorStatus; - [[nodiscard]] auto SetReplicationInstanceToMain(std::string instance_name) -> SetInstanceToMainCoordinatorStatus; + [[nodiscard]] auto SetReplicationInstanceToMain(std::string_view instance_name) -> SetInstanceToMainCoordinatorStatus; auto ShowInstances() const -> std::vector; - auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void; // NOTE: The client code must check that the server exists before calling this method. auto GetCoordinatorServer() const -> CoordinatorServer &; diff --git a/src/coordination/include/coordination/instance_status.hpp b/src/coordination/include/coordination/instance_status.hpp index 492410061..da6fd8828 100644 --- a/src/coordination/include/coordination/instance_status.hpp +++ b/src/coordination/include/coordination/instance_status.hpp @@ -26,7 +26,7 @@ struct InstanceStatus { std::string raft_socket_address; std::string coord_socket_address; std::string cluster_role; - bool is_alive; + std::string health; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/raft_state.hpp b/src/coordination/include/coordination/raft_state.hpp index b6ef06008..d702697f1 100644 --- a/src/coordination/include/coordination/raft_state.hpp +++ b/src/coordination/include/coordination/raft_state.hpp @@ -14,11 +14,16 @@ #ifdef MG_ENTERPRISE #include +#include "nuraft/coordinator_state_machine.hpp" +#include "nuraft/coordinator_state_manager.hpp" #include namespace memgraph::coordination { +class CoordinatorInstance; +struct CoordinatorClientConfig; + using BecomeLeaderCb = std::function; using BecomeFollowerCb = std::function; @@ -47,26 +52,39 @@ class RaftState { RaftState &operator=(RaftState &&other) noexcept = default; ~RaftState(); - static auto MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState; + static auto MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState; auto InstanceName() const -> std::string; auto RaftSocketAddress() const -> std::string; - auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void; auto GetAllCoordinators() const -> std::vector>; auto RequestLeadership() -> bool; auto IsLeader() const -> bool; - auto AppendRegisterReplicationInstance(std::string const &instance) -> ptr; + auto FindCurrentMainInstanceName() const -> std::optional; + auto MainExists() const -> bool; + auto IsMain(std::string_view instance_name) const -> bool; + auto IsReplica(std::string_view instance_name) const -> bool; - // TODO: (andi) I think variables below can be abstracted + auto AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool; + auto AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool; + auto AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool; + auto AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool; + auto AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool; + + auto GetInstances() const -> std::vector; + auto GetUUID() const -> utils::UUID; + + private: + // TODO: (andi) I think variables below can be abstracted/clean them. uint32_t raft_server_id_; uint32_t raft_port_; std::string raft_address_; - ptr state_machine_; - ptr state_manager_; + ptr state_machine_; + ptr state_manager_; ptr raft_server_; ptr logger_; raft_launcher launcher_; diff --git a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp index 3aa7e3ca1..13b58ff9f 100644 --- a/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp +++ b/src/coordination/include/coordination/register_main_replica_coordinator_status.hpp @@ -19,12 +19,12 @@ namespace memgraph::coordination { enum class RegisterInstanceCoordinatorStatus : uint8_t { NAME_EXISTS, - ENDPOINT_EXISTS, + COORD_ENDPOINT_EXISTS, + REPL_ENDPOINT_EXISTS, NOT_COORDINATOR, - RPC_FAILED, NOT_LEADER, - RAFT_COULD_NOT_ACCEPT, - RAFT_COULD_NOT_APPEND, + RPC_FAILED, + RAFT_LOG_ERROR, SUCCESS }; @@ -32,8 +32,9 @@ enum class UnregisterInstanceCoordinatorStatus : uint8_t { NO_INSTANCE_WITH_NAME, IS_MAIN, NOT_COORDINATOR, - NOT_LEADER, RPC_FAILED, + NOT_LEADER, + RAFT_LOG_ERROR, SUCCESS, }; @@ -41,9 +42,11 @@ enum class SetInstanceToMainCoordinatorStatus : uint8_t { NO_INSTANCE_WITH_NAME, MAIN_ALREADY_EXISTS, NOT_COORDINATOR, - SUCCESS, + NOT_LEADER, + RAFT_LOG_ERROR, COULD_NOT_PROMOTE_TO_MAIN, - SWAP_UUID_FAILED + SWAP_UUID_FAILED, + SUCCESS, }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index e8e00a0a8..7b5d73b81 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -17,11 +17,12 @@ #include "coordination/coordinator_exceptions.hpp" #include "replication_coordination_glue/role.hpp" -#include #include "utils/resource_lock.hpp" #include "utils/result.hpp" #include "utils/uuid.hpp" +#include + namespace memgraph::coordination { class CoordinatorInstance; @@ -50,13 +51,14 @@ class ReplicationInstance { auto IsAlive() const -> bool; auto InstanceName() const -> std::string; - auto SocketAddress() const -> std::string; + auto CoordinatorSocketAddress() const -> std::string; + auto ReplicationSocketAddress() const -> std::string; - auto IsReplica() const -> bool; - auto IsMain() const -> bool; - - auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, + auto PromoteToMain(utils::UUID const &uuid, ReplicationClientsInfo repl_clients_info, HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool; + + auto SendDemoteToReplicaRpc() -> bool; + auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool; @@ -69,8 +71,8 @@ class ReplicationInstance { auto EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool; - auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool; - auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool; + auto SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool; + auto SendUnregisterReplicaRpc(std::string_view instance_name) -> bool; auto SendGetInstanceUUID() -> utils::BasicResult>; auto GetClient() -> CoordinatorClient &; @@ -78,14 +80,14 @@ class ReplicationInstance { auto EnableWritingOnMain() -> bool; auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; - auto GetMainUUID() const -> const std::optional &; + auto ResetMainUUID() -> void; + auto GetMainUUID() const -> std::optional const &; auto GetSuccessCallback() -> HealthCheckInstanceCallback &; auto GetFailCallback() -> HealthCheckInstanceCallback &; private: CoordinatorClient client_; - replication_coordination_glue::ReplicationRole replication_role_; std::chrono::system_clock::time_point last_response_time_{}; bool is_alive_{false}; std::chrono::system_clock::time_point last_check_of_uuid_{}; @@ -101,7 +103,8 @@ class ReplicationInstance { HealthCheckInstanceCallback fail_cb_; friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) { - return first.client_ == second.client_ && first.replication_role_ == second.replication_role_; + return first.client_ == second.client_ && first.last_response_time_ == second.last_response_time_ && + first.is_alive_ == second.is_alive_ && first.main_uuid_ == second.main_uuid_; } }; diff --git a/src/coordination/include/nuraft/coordinator_cluster_state.hpp b/src/coordination/include/nuraft/coordinator_cluster_state.hpp new file mode 100644 index 000000000..f38d00073 --- /dev/null +++ b/src/coordination/include/nuraft/coordinator_cluster_state.hpp @@ -0,0 +1,82 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_config.hpp" +#include "nuraft/raft_log_action.hpp" +#include "replication_coordination_glue/role.hpp" +#include "utils/resource_lock.hpp" +#include "utils/uuid.hpp" + +#include +#include + +#include +#include +#include +#include + +namespace memgraph::coordination { + +using replication_coordination_glue::ReplicationRole; + +struct InstanceState { + CoordinatorClientConfig config; + ReplicationRole role; +}; + +using TRaftLog = std::variant; + +using nuraft::buffer; +using nuraft::buffer_serializer; +using nuraft::ptr; + +class CoordinatorClusterState { + public: + CoordinatorClusterState() = default; + CoordinatorClusterState(CoordinatorClusterState const &); + CoordinatorClusterState &operator=(CoordinatorClusterState const &); + + CoordinatorClusterState(CoordinatorClusterState &&other) noexcept; + CoordinatorClusterState &operator=(CoordinatorClusterState &&other) noexcept; + ~CoordinatorClusterState() = default; + + auto FindCurrentMainInstanceName() const -> std::optional; + + auto MainExists() const -> bool; + + auto IsMain(std::string_view instance_name) const -> bool; + + auto IsReplica(std::string_view instance_name) const -> bool; + + auto InsertInstance(std::string_view instance_name, ReplicationRole role) -> void; + + auto DoAction(TRaftLog log_entry, RaftLogAction log_action) -> void; + + auto Serialize(ptr &data) -> void; + + static auto Deserialize(buffer &data) -> CoordinatorClusterState; + + auto GetInstances() const -> std::vector; + + auto GetUUID() const -> utils::UUID; + + private: + std::map> instance_roles_; + utils::UUID uuid_{}; + mutable utils::ResourceLock log_lock_{}; +}; + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/include/nuraft/coordinator_state_machine.hpp b/src/coordination/include/nuraft/coordinator_state_machine.hpp index 5b5f37b48..516b8efc5 100644 --- a/src/coordination/include/nuraft/coordinator_state_machine.hpp +++ b/src/coordination/include/nuraft/coordinator_state_machine.hpp @@ -13,9 +13,15 @@ #ifdef MG_ENTERPRISE +#include "coordination/coordinator_config.hpp" +#include "nuraft/coordinator_cluster_state.hpp" +#include "nuraft/raft_log_action.hpp" + #include #include +#include + namespace memgraph::coordination { using nuraft::async_result; @@ -36,9 +42,19 @@ class CoordinatorStateMachine : public state_machine { CoordinatorStateMachine &operator=(CoordinatorStateMachine &&) = delete; ~CoordinatorStateMachine() override {} - static auto EncodeRegisterReplicationInstance(const std::string &name) -> ptr; + auto FindCurrentMainInstanceName() const -> std::optional; + auto MainExists() const -> bool; + auto IsMain(std::string_view instance_name) const -> bool; + auto IsReplica(std::string_view instance_name) const -> bool; - static auto DecodeRegisterReplicationInstance(buffer &data) -> std::string; + static auto CreateLog(nlohmann::json &&log) -> ptr; + static auto SerializeRegisterInstance(CoordinatorClientConfig const &config) -> ptr; + static auto SerializeUnregisterInstance(std::string_view instance_name) -> ptr; + static auto SerializeSetInstanceAsMain(std::string_view instance_name) -> ptr; + static auto SerializeSetInstanceAsReplica(std::string_view instance_name) -> ptr; + static auto SerializeUpdateUUID(utils::UUID const &uuid) -> ptr; + + static auto DecodeLog(buffer &data) -> std::pair; auto pre_commit(ulong log_idx, buffer &data) -> ptr override; @@ -64,11 +80,31 @@ class CoordinatorStateMachine : public state_machine { auto create_snapshot(snapshot &s, async_result::handler_type &when_done) -> void override; + auto GetInstances() const -> std::vector; + auto GetUUID() const -> utils::UUID; + private: + struct SnapshotCtx { + SnapshotCtx(ptr &snapshot, CoordinatorClusterState const &cluster_state) + : snapshot_(snapshot), cluster_state_(cluster_state) {} + + ptr snapshot_; + CoordinatorClusterState cluster_state_; + }; + + auto create_snapshot_internal(ptr snapshot) -> void; + + CoordinatorClusterState cluster_state_; + + // mutable utils::RWLock lock{utils::RWLock::Priority::READ}; + std::atomic last_committed_idx_{0}; - ptr last_snapshot_; + // TODO: (andi) Maybe not needed, remove it + std::map> snapshots_; + std::mutex snapshots_lock_; + ptr last_snapshot_; std::mutex last_snapshot_lock_; }; diff --git a/src/coordination/include/nuraft/raft_log_action.hpp b/src/coordination/include/nuraft/raft_log_action.hpp new file mode 100644 index 000000000..953049038 --- /dev/null +++ b/src/coordination/include/nuraft/raft_log_action.hpp @@ -0,0 +1,63 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#ifdef MG_ENTERPRISE + +#include "coordination/coordinator_exceptions.hpp" + +#include +#include + +#include "json/json.hpp" + +namespace memgraph::coordination { + +enum class RaftLogAction : uint8_t { + REGISTER_REPLICATION_INSTANCE, + UNREGISTER_REPLICATION_INSTANCE, + SET_INSTANCE_AS_MAIN, + SET_INSTANCE_AS_REPLICA, + UPDATE_UUID +}; + +NLOHMANN_JSON_SERIALIZE_ENUM(RaftLogAction, { + {RaftLogAction::REGISTER_REPLICATION_INSTANCE, "register"}, + {RaftLogAction::UNREGISTER_REPLICATION_INSTANCE, "unregister"}, + {RaftLogAction::SET_INSTANCE_AS_MAIN, "promote"}, + {RaftLogAction::SET_INSTANCE_AS_REPLICA, "demote"}, + {RaftLogAction::UPDATE_UUID, "update_uuid"}, + }) + +inline auto ParseRaftLogAction(std::string_view action) -> RaftLogAction { + if (action == "register") { + return RaftLogAction::REGISTER_REPLICATION_INSTANCE; + } + if (action == "unregister") { + return RaftLogAction::UNREGISTER_REPLICATION_INSTANCE; + } + if (action == "promote") { + return RaftLogAction::SET_INSTANCE_AS_MAIN; + } + if (action == "demote") { + return RaftLogAction::SET_INSTANCE_AS_REPLICA; + } + + if (action == "update_uuid") { + return RaftLogAction::UPDATE_UUID; + } + + throw InvalidRaftLogActionException("Invalid Raft log action: {}.", action); +} + +} // namespace memgraph::coordination +#endif diff --git a/src/coordination/raft_state.cpp b/src/coordination/raft_state.cpp index d171a6b3d..365388b06 100644 --- a/src/coordination/raft_state.cpp +++ b/src/coordination/raft_state.cpp @@ -13,9 +13,8 @@ #include "coordination/raft_state.hpp" +#include "coordination/coordinator_config.hpp" #include "coordination/coordinator_exceptions.hpp" -#include "nuraft/coordinator_state_machine.hpp" -#include "nuraft/coordinator_state_manager.hpp" #include "utils/counter.hpp" namespace memgraph::coordination { @@ -90,18 +89,13 @@ auto RaftState::InitRaftServer() -> void { throw RaftServerStartException("Failed to initialize raft server on {}:{}", raft_address_, raft_port_); } -auto RaftState::MakeRaftState(BecomeLeaderCb become_leader_cb, BecomeFollowerCb become_follower_cb) -> RaftState { - uint32_t raft_server_id{0}; - uint32_t raft_port{0}; - try { - raft_server_id = FLAGS_raft_server_id; - raft_port = FLAGS_raft_server_port; - } catch (std::exception const &e) { - throw RaftCouldNotParseFlagsException("Failed to parse flags: {}", e.what()); - } +auto RaftState::MakeRaftState(BecomeLeaderCb &&become_leader_cb, BecomeFollowerCb &&become_follower_cb) -> RaftState { + uint32_t raft_server_id = FLAGS_raft_server_id; + uint32_t raft_port = FLAGS_raft_server_port; auto raft_state = RaftState(std::move(become_leader_cb), std::move(become_follower_cb), raft_server_id, raft_port, "127.0.0.1"); + raft_state.InitRaftServer(); return raft_state; } @@ -112,8 +106,9 @@ auto RaftState::InstanceName() const -> std::string { return "coordinator_" + st auto RaftState::RaftSocketAddress() const -> std::string { return raft_address_ + ":" + std::to_string(raft_port_); } -auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void { - auto const endpoint = raft_address + ":" + std::to_string(raft_port); +auto RaftState::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) + -> void { + auto const endpoint = fmt::format("{}:{}", raft_address, raft_port); srv_config const srv_config_to_add(static_cast(raft_server_id), endpoint); if (!raft_server_->add_srv(srv_config_to_add)->get_accepted()) { throw RaftAddServerException("Failed to add server {} to the cluster", endpoint); @@ -131,10 +126,123 @@ auto RaftState::IsLeader() const -> bool { return raft_server_->is_leader(); } auto RaftState::RequestLeadership() -> bool { return raft_server_->is_leader() || raft_server_->request_leadership(); } -auto RaftState::AppendRegisterReplicationInstance(std::string const &instance) -> ptr { - auto new_log = CoordinatorStateMachine::EncodeRegisterReplicationInstance(instance); - return raft_server_->append_entries({new_log}); +auto RaftState::AppendRegisterReplicationInstanceLog(CoordinatorClientConfig const &config) -> bool { + auto new_log = CoordinatorStateMachine::SerializeRegisterInstance(config); + auto const res = raft_server_->append_entries({new_log}); + + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for registering instance {}. Most likely the reason is that the instance is not " + "the " + "leader.", + config.instance_name); + return false; + } + + spdlog::info("Request for registering instance {} accepted", config.instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to register instance {} with error code {}", config.instance_name, res->get_result_code()); + return false; + } + + return true; } +auto RaftState::AppendUnregisterReplicationInstanceLog(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeUnregisterInstance(instance_name); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for unregistering instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + + spdlog::info("Request for unregistering instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to unregister instance {} with error code {}", instance_name, res->get_result_code()); + return false; + } + return true; +} + +auto RaftState::AppendSetInstanceAsMainLog(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsMain(instance_name); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for promoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + + spdlog::info("Request for promoting instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code()); + return false; + } + return true; +} + +auto RaftState::AppendSetInstanceAsReplicaLog(std::string_view instance_name) -> bool { + auto new_log = CoordinatorStateMachine::SerializeSetInstanceAsReplica(instance_name); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for demoting instance {}. Most likely the reason is that the instance is not " + "the leader.", + instance_name); + return false; + } + spdlog::info("Request for demoting instance {} accepted", instance_name); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to promote instance {} with error code {}", instance_name, res->get_result_code()); + return false; + } + + return true; +} + +auto RaftState::AppendUpdateUUIDLog(utils::UUID const &uuid) -> bool { + auto new_log = CoordinatorStateMachine::SerializeUpdateUUID(uuid); + auto const res = raft_server_->append_entries({new_log}); + if (!res->get_accepted()) { + spdlog::error( + "Failed to accept request for updating UUID. Most likely the reason is that the instance is not " + "the leader."); + return false; + } + spdlog::info("Request for updating UUID accepted"); + + if (res->get_result_code() != nuraft::cmd_result_code::OK) { + spdlog::error("Failed to update UUID with error code {}", res->get_result_code()); + return false; + } + + return true; +} + +auto RaftState::FindCurrentMainInstanceName() const -> std::optional { + return state_machine_->FindCurrentMainInstanceName(); +} + +auto RaftState::MainExists() const -> bool { return state_machine_->MainExists(); } + +auto RaftState::IsMain(std::string_view instance_name) const -> bool { return state_machine_->IsMain(instance_name); } + +auto RaftState::IsReplica(std::string_view instance_name) const -> bool { + return state_machine_->IsReplica(instance_name); +} + +auto RaftState::GetInstances() const -> std::vector { return state_machine_->GetInstances(); } + +auto RaftState::GetUUID() const -> utils::UUID { return state_machine_->GetUUID(); } + } // namespace memgraph::coordination #endif diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 50f1be468..ca7572ea7 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -25,15 +25,8 @@ ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorC HealthCheckInstanceCallback succ_instance_cb, HealthCheckInstanceCallback fail_instance_cb) : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), - replication_role_(replication_coordination_glue::ReplicationRole::REPLICA), succ_cb_(succ_instance_cb), - fail_cb_(fail_instance_cb) { - if (!client_.DemoteToReplica()) { - throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); - } - - client_.StartFrequentCheck(); -} + fail_cb_(fail_instance_cb) {} auto ReplicationInstance::OnSuccessPing() -> void { last_response_time_ = std::chrono::system_clock::now(); @@ -52,24 +45,17 @@ auto ReplicationInstance::IsReadyForUUIDPing() -> bool { } auto ReplicationInstance::InstanceName() const -> std::string { return client_.InstanceName(); } -auto ReplicationInstance::SocketAddress() const -> std::string { return client_.SocketAddress(); } +auto ReplicationInstance::CoordinatorSocketAddress() const -> std::string { return client_.CoordinatorSocketAddress(); } +auto ReplicationInstance::ReplicationSocketAddress() const -> std::string { return client_.ReplicationSocketAddress(); } auto ReplicationInstance::IsAlive() const -> bool { return is_alive_; } -auto ReplicationInstance::IsReplica() const -> bool { - return replication_role_ == replication_coordination_glue::ReplicationRole::REPLICA; -} -auto ReplicationInstance::IsMain() const -> bool { - return replication_role_ == replication_coordination_glue::ReplicationRole::MAIN; -} - -auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info, +auto ReplicationInstance::PromoteToMain(utils::UUID const &new_uuid, ReplicationClientsInfo repl_clients_info, HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool { if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) { return false; } - replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; main_uuid_ = new_uuid; succ_cb_ = main_succ_cb; fail_cb_ = main_fail_cb; @@ -77,13 +63,14 @@ auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClients return true; } +auto ReplicationInstance::SendDemoteToReplicaRpc() -> bool { return client_.DemoteToReplica(); } + auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) -> bool { if (!client_.DemoteToReplica()) { return false; } - replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA; succ_cb_ = replica_succ_cb; fail_cb_ = replica_fail_cb; @@ -117,6 +104,7 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur } UpdateReplicaLastResponseUUID(); + // NOLINTNEXTLINE if (res.GetValue().has_value() && res.GetValue().value() == curr_main_uuid) { return true; } @@ -124,7 +112,7 @@ auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &cur return SendSwapAndUpdateUUID(curr_main_uuid); } -auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool { +auto ReplicationInstance::SendSwapAndUpdateUUID(utils::UUID const &new_main_uuid) -> bool { if (!replication_coordination_glue::SendSwapMainUUIDRpc(client_.RpcClient(), new_main_uuid)) { return false; } @@ -132,7 +120,7 @@ auto ReplicationInstance::SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid return true; } -auto ReplicationInstance::SendUnregisterReplicaRpc(std::string const &instance_name) -> bool { +auto ReplicationInstance::SendUnregisterReplicaRpc(std::string_view instance_name) -> bool { return client_.SendUnregisterReplicaRpc(instance_name); } diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index f8e14e2a0..292d50d3d 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -20,28 +20,28 @@ namespace memgraph::dbms { CoordinatorHandler::CoordinatorHandler(coordination::CoordinatorState &coordinator_state) : coordinator_state_(coordinator_state) {} -auto CoordinatorHandler::RegisterReplicationInstance(memgraph::coordination::CoordinatorClientConfig config) +auto CoordinatorHandler::RegisterReplicationInstance(coordination::CoordinatorClientConfig const &config) -> coordination::RegisterInstanceCoordinatorStatus { return coordinator_state_.RegisterReplicationInstance(config); } -auto CoordinatorHandler::UnregisterReplicationInstance(std::string instance_name) +auto CoordinatorHandler::UnregisterReplicationInstance(std::string_view instance_name) -> coordination::UnregisterInstanceCoordinatorStatus { - return coordinator_state_.UnregisterReplicationInstance(std::move(instance_name)); + return coordinator_state_.UnregisterReplicationInstance(instance_name); } -auto CoordinatorHandler::SetReplicationInstanceToMain(std::string instance_name) +auto CoordinatorHandler::SetReplicationInstanceToMain(std::string_view instance_name) -> coordination::SetInstanceToMainCoordinatorStatus { - return coordinator_state_.SetReplicationInstanceToMain(std::move(instance_name)); + return coordinator_state_.SetReplicationInstanceToMain(instance_name); } auto CoordinatorHandler::ShowInstances() const -> std::vector { return coordinator_state_.ShowInstances(); } -auto CoordinatorHandler::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) - -> void { - coordinator_state_.AddCoordinatorInstance(raft_server_id, raft_port, std::move(raft_address)); +auto CoordinatorHandler::AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, + std::string_view raft_address) -> void { + coordinator_state_.AddCoordinatorInstance(raft_server_id, raft_port, raft_address); } } // namespace memgraph::dbms diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index d06e70676..1c456134d 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -30,16 +30,17 @@ class CoordinatorHandler { // TODO: (andi) When moving coordinator state on same instances, rename from RegisterReplicationInstance to // RegisterInstance - auto RegisterReplicationInstance(coordination::CoordinatorClientConfig config) + auto RegisterReplicationInstance(coordination::CoordinatorClientConfig const &config) -> coordination::RegisterInstanceCoordinatorStatus; - auto UnregisterReplicationInstance(std::string instance_name) -> coordination::UnregisterInstanceCoordinatorStatus; + auto UnregisterReplicationInstance(std::string_view instance_name) + -> coordination::UnregisterInstanceCoordinatorStatus; - auto SetReplicationInstanceToMain(std::string instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; + auto SetReplicationInstanceToMain(std::string_view instance_name) -> coordination::SetInstanceToMainCoordinatorStatus; auto ShowInstances() const -> std::vector; - auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string raft_address) -> void; + auto AddCoordinatorInstance(uint32_t raft_server_id, uint32_t raft_port, std::string_view raft_address) -> void; private: coordination::CoordinatorState &coordinator_state_; diff --git a/src/dbms/dbms_handler.cpp b/src/dbms/dbms_handler.cpp index 1c38106db..16927d7e2 100644 --- a/src/dbms/dbms_handler.cpp +++ b/src/dbms/dbms_handler.cpp @@ -185,6 +185,16 @@ DbmsHandler::DbmsHandler(storage::Config config, replication::ReplicationState & auto directories = std::set{std::string{kDefaultDB}}; // Recover previous databases + if (flags::AreExperimentsEnabled(flags::Experiments::SYSTEM_REPLICATION) && !recovery_on_startup) { + // This will result in dropping databases on SystemRecoveryHandler + // for MT case, and for single DB case we might not even set replication as commit timestamp is checked + spdlog::warn( + "Data recovery on startup not set, this will result in dropping database in case of multi-tenancy enabled."); + } + + // TODO: Problem is if user doesn't set this up "database" name won't be recovered + // but if storage-recover-on-startup is true storage will be recovered which is an issue + spdlog::info("Data recovery on startup set to {}", recovery_on_startup); if (recovery_on_startup) { auto it = durability_->begin(std::string(kDBPrefix)); auto end = durability_->end(std::string(kDBPrefix)); @@ -410,9 +420,10 @@ void DbmsHandler::UpdateDurability(const storage::Config &config, std::optional< if (!durability_) return; // Save database in a list of active databases const auto &key = Durability::GenKey(config.salient.name); - if (rel_dir == std::nullopt) + if (rel_dir == std::nullopt) { rel_dir = std::filesystem::relative(config.durability.storage_directory, default_config_.durability.storage_directory); + } const auto &val = Durability::GenVal(config.salient.uuid, *rel_dir); durability_->Put(key, val); } diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp index 87d1257a6..b0bbd5758 100644 --- a/src/dbms/dbms_handler.hpp +++ b/src/dbms/dbms_handler.hpp @@ -155,6 +155,8 @@ class DbmsHandler { spdlog::debug("Trying to create db '{}' on replica which already exists.", config.name); auto db = Get_(config.name); + spdlog::debug("Aligning database with name {} which has UUID {}, where config UUID is {}", config.name, + std::string(db->uuid()), std::string(config.uuid)); if (db->uuid() == config.uuid) { // Same db return db; } @@ -163,18 +165,22 @@ class DbmsHandler { // TODO: Fix this hack if (config.name == kDefaultDB) { + spdlog::debug("Last commit timestamp for DB {} is {}", kDefaultDB, + db->storage()->repl_storage_state_.last_commit_timestamp_); + // This seems correct, if database made progress if (db->storage()->repl_storage_state_.last_commit_timestamp_ != storage::kTimestampInitialId) { spdlog::debug("Default storage is not clean, cannot update UUID..."); return NewError::GENERIC; // Update error } - spdlog::debug("Update default db's UUID"); + spdlog::debug("Updated default db's UUID"); // Default db cannot be deleted and remade, have to just update the UUID db->storage()->config_.salient.uuid = config.uuid; UpdateDurability(db->storage()->config_, "."); return db; } - spdlog::debug("Drop database and recreate with the correct UUID"); + spdlog::debug("Dropping database {} with UUID: {} and recreating with the correct UUID: {}", config.name, + std::string(db->uuid()), std::string(config.uuid)); // Defer drop (void)Delete_(db->name()); // Second attempt diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index 3fc174d3c..6a78977bb 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -19,7 +19,6 @@ #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/version.hpp" -#include "storage/v2/fmt.hpp" #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/inmemory/storage.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" @@ -119,9 +118,14 @@ void InMemoryReplicationHandlers::Register(dbms::DbmsHandler *dbms_handler, repl }); server.rpc_server_.Register( [&data, dbms_handler](auto *req_reader, auto *res_builder) { - spdlog::debug("Received SwapMainUUIDHandler"); + spdlog::debug("Received SwapMainUUIDRpc"); InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms_handler, data, req_reader, res_builder); }); + server.rpc_server_.Register( + [&data, dbms_handler](auto *req_reader, auto *res_builder) { + spdlog::debug("Received ForceResetStorageRpc"); + InMemoryReplicationHandlers::ForceResetStorageHandler(dbms_handler, data.uuid_, req_reader, res_builder); + }); } void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_handler, @@ -135,7 +139,7 @@ void InMemoryReplicationHandlers::SwapMainUUIDHandler(dbms::DbmsHandler *dbms_ha replication_coordination_glue::SwapMainUUIDReq req; slk::Load(&req, req_reader); - spdlog::info(fmt::format("Set replica data UUID to main uuid {}", std::string(req.uuid))); + spdlog::info("Set replica data UUID to main uuid {}", std::string(req.uuid)); dbms_handler->ReplicationState().TryPersistRoleReplica(role_replica_data.config, req.uuid); role_replica_data.uuid_ = req.uuid; @@ -330,6 +334,78 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle spdlog::debug("Replication recovery from snapshot finished!"); } +void InMemoryReplicationHandlers::ForceResetStorageHandler(dbms::DbmsHandler *dbms_handler, + const std::optional ¤t_main_uuid, + slk::Reader *req_reader, slk::Builder *res_builder) { + storage::replication::ForceResetStorageReq req; + slk::Load(&req, req_reader); + auto db_acc = GetDatabaseAccessor(dbms_handler, req.db_uuid); + if (!db_acc) { + storage::replication::ForceResetStorageRes res{false, 0}; + slk::Save(res, res_builder); + return; + } + if (!current_main_uuid.has_value() || req.main_uuid != current_main_uuid) [[unlikely]] { + LogWrongMain(current_main_uuid, req.main_uuid, storage::replication::SnapshotReq::kType.name); + storage::replication::ForceResetStorageRes res{false, 0}; + slk::Save(res, res_builder); + return; + } + + storage::replication::Decoder decoder(req_reader); + + auto *storage = static_cast(db_acc->get()->storage()); + + auto storage_guard = std::unique_lock{storage->main_lock_}; + + // Clear the database + storage->vertices_.clear(); + storage->edges_.clear(); + storage->commit_log_.reset(); + storage->commit_log_.emplace(); + + storage->constraints_.existence_constraints_ = std::make_unique(); + storage->constraints_.unique_constraints_ = std::make_unique(); + storage->indices_.label_index_ = std::make_unique(); + storage->indices_.label_property_index_ = std::make_unique(); + + // Fine since we will force push when reading from WAL just random epoch with 0 timestamp, as it should be if it + // acted as MAIN before + storage->repl_storage_state_.epoch_.SetEpoch(std::string(utils::UUID{})); + storage->repl_storage_state_.last_commit_timestamp_ = 0; + + storage->repl_storage_state_.history.clear(); + storage->vertex_id_ = 0; + storage->edge_id_ = 0; + storage->timestamp_ = storage::kTimestampInitialId; + + storage->CollectGarbage(std::move(storage_guard), false); + storage->vertices_.run_gc(); + storage->edges_.run_gc(); + + storage::replication::ForceResetStorageRes res{true, storage->repl_storage_state_.last_commit_timestamp_.load()}; + slk::Save(res, res_builder); + + spdlog::trace("Deleting old snapshot files."); + // Delete other durability files + auto snapshot_files = storage::durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_); + for (const auto &[path, uuid, _] : snapshot_files) { + spdlog::trace("Deleting snapshot file {}", path); + storage->file_retainer_.DeleteFile(path); + } + + spdlog::trace("Deleting old WAL files."); + auto wal_files = storage::durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_); + if (wal_files) { + for (const auto &wal_file : *wal_files) { + spdlog::trace("Deleting WAL file {}", wal_file.path); + storage->file_retainer_.DeleteFile(wal_file.path); + } + + storage->wal_file_.reset(); + } +} + void InMemoryReplicationHandlers::WalFilesHandler(dbms::DbmsHandler *dbms_handler, const std::optional ¤t_main_uuid, slk::Reader *req_reader, slk::Builder *res_builder) { diff --git a/src/dbms/inmemory/replication_handlers.hpp b/src/dbms/inmemory/replication_handlers.hpp index 4406b8338..aaa2d0755 100644 --- a/src/dbms/inmemory/replication_handlers.hpp +++ b/src/dbms/inmemory/replication_handlers.hpp @@ -48,6 +48,9 @@ class InMemoryReplicationHandlers { static void SwapMainUUIDHandler(dbms::DbmsHandler *dbms_handler, replication::RoleReplicaData &role_replica_data, slk::Reader *req_reader, slk::Builder *res_builder); + static void ForceResetStorageHandler(dbms::DbmsHandler *dbms_handler, + const std::optional ¤t_main_uuid, slk::Reader *req_reader, + slk::Builder *res_builder); static void LoadWal(storage::InMemoryStorage *storage, storage::replication::Decoder *decoder); diff --git a/src/io/network/endpoint.cpp b/src/io/network/endpoint.cpp index 44123db6b..bb6dcfd10 100644 --- a/src/io/network/endpoint.cpp +++ b/src/io/network/endpoint.cpp @@ -24,22 +24,22 @@ namespace memgraph::io::network { -Endpoint::IpFamily Endpoint::GetIpFamily(const std::string &address) { +Endpoint::IpFamily Endpoint::GetIpFamily(std::string_view address) { in_addr addr4; in6_addr addr6; - int ipv4_result = inet_pton(AF_INET, address.c_str(), &addr4); - int ipv6_result = inet_pton(AF_INET6, address.c_str(), &addr6); + int ipv4_result = inet_pton(AF_INET, address.data(), &addr4); + int ipv6_result = inet_pton(AF_INET6, address.data(), &addr6); if (ipv4_result == 1) { return IpFamily::IP4; - } else if (ipv6_result == 1) { - return IpFamily::IP6; - } else { - return IpFamily::NONE; } + if (ipv6_result == 1) { + return IpFamily::IP6; + } + return IpFamily::NONE; } std::optional> Endpoint::ParseSocketOrIpAddress( - const std::string &address, const std::optional default_port) { + std::string_view address, const std::optional default_port) { /// expected address format: /// - "ip_address:port_number" /// - "ip_address" @@ -56,7 +56,7 @@ std::optional> Endpoint::ParseSocketOrIpAddress if (GetIpFamily(address) == IpFamily::NONE) { return std::nullopt; } - return std::pair{address, *default_port}; + return std::pair{std::string(address), *default_port}; // TODO: (andi) Optimize throughout the code } } else if (parts.size() == 2) { ip_address = std::move(parts[0]); @@ -88,7 +88,7 @@ std::optional> Endpoint::ParseSocketOrIpAddress } std::optional> Endpoint::ParseHostname( - const std::string &address, const std::optional default_port = {}) { + std::string_view address, const std::optional default_port = {}) { const std::string delimiter = ":"; std::string ip_address; std::vector parts = utils::Split(address, delimiter); @@ -97,7 +97,7 @@ std::optional> Endpoint::ParseHostname( if (!IsResolvableAddress(address, *default_port)) { return std::nullopt; } - return std::pair{address, *default_port}; + return std::pair{std::string(address), *default_port}; // TODO: (andi) Optimize throughout the code } } else if (parts.size() == 2) { int64_t int_port{0}; @@ -153,20 +153,20 @@ std::ostream &operator<<(std::ostream &os, const Endpoint &endpoint) { return os << endpoint.address << ":" << endpoint.port; } -bool Endpoint::IsResolvableAddress(const std::string &address, uint16_t port) { +bool Endpoint::IsResolvableAddress(std::string_view address, uint16_t port) { addrinfo hints{ .ai_flags = AI_PASSIVE, .ai_family = AF_UNSPEC, // IPv4 and IPv6 .ai_socktype = SOCK_STREAM // TCP socket }; addrinfo *info = nullptr; - auto status = getaddrinfo(address.c_str(), std::to_string(port).c_str(), &hints, &info); + auto status = getaddrinfo(address.data(), std::to_string(port).c_str(), &hints, &info); if (info) freeaddrinfo(info); return status == 0; } std::optional> Endpoint::ParseSocketOrAddress( - const std::string &address, const std::optional default_port) { + std::string_view address, const std::optional default_port) { const std::string delimiter = ":"; std::vector parts = utils::Split(address, delimiter); if (parts.size() == 1) { diff --git a/src/io/network/endpoint.hpp b/src/io/network/endpoint.hpp index 16d70e080..b0201240b 100644 --- a/src/io/network/endpoint.hpp +++ b/src/io/network/endpoint.hpp @@ -48,8 +48,8 @@ struct Endpoint { uint16_t port{0}; IpFamily family{IpFamily::NONE}; - static std::optional> ParseSocketOrAddress(const std::string &address, - std::optional default_port); + static std::optional> ParseSocketOrAddress( + std::string_view address, std::optional default_port = {}); /** * Tries to parse the given string as either a socket address or ip address. @@ -62,7 +62,7 @@ struct Endpoint { * it won't be used, as we expect that it is given in the address string. */ static std::optional> ParseSocketOrIpAddress( - const std::string &address, std::optional default_port = {}); + std::string_view address, std::optional default_port = {}); /** * Tries to parse given string as either socket address or hostname. @@ -71,12 +71,12 @@ struct Endpoint { * - "hostname" * After we parse hostname and port we try to resolve the hostname into an ip_address. */ - static std::optional> ParseHostname(const std::string &address, + static std::optional> ParseHostname(std::string_view address, std::optional default_port); - static IpFamily GetIpFamily(const std::string &address); + static IpFamily GetIpFamily(std::string_view address); - static bool IsResolvableAddress(const std::string &address, uint16_t port); + static bool IsResolvableAddress(std::string_view address, uint16_t port); /** * Tries to resolve hostname to its corresponding IP address. diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 34d64f434..d896bcc4c 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -334,7 +334,8 @@ int main(int argc, char **argv) { .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, .enable_schema_metadata = FLAGS_storage_enable_schema_metadata}, .salient.storage_mode = memgraph::flags::ParseStorageMode()}; - + spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup, + FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup); memgraph::utils::Scheduler jemalloc_purge_scheduler; jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec), [] { memgraph::memory::PurgeUnusedMemory(); }); diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 0cc0f182a..e6d39ab9a 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -410,7 +410,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { : coordinator_handler_(coordinator_state) {} - void UnregisterInstance(std::string const &instance_name) override { + void UnregisterInstance(std::string_view instance_name) override { auto status = coordinator_handler_.UnregisterReplicationInstance(instance_name); switch (status) { using enum memgraph::coordination::UnregisterInstanceCoordinatorStatus; @@ -423,6 +423,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("UNREGISTER INSTANCE query can only be run on a coordinator!"); case NOT_LEADER: throw QueryRuntimeException("Couldn't unregister replica instance since coordinator is not a leader!"); + case RAFT_LOG_ERROR: + throw QueryRuntimeException("Couldn't unregister replica instance since raft server couldn't append the log!"); case RPC_FAILED: throw QueryRuntimeException( "Couldn't unregister replica instance because current main instance couldn't unregister replica!"); @@ -431,20 +433,18 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } - void RegisterReplicationInstance(std::string const &coordinator_socket_address, - std::string const &replication_socket_address, + void RegisterReplicationInstance(std::string_view coordinator_socket_address, + std::string_view replication_socket_address, std::chrono::seconds const &instance_check_frequency, std::chrono::seconds const &instance_down_timeout, std::chrono::seconds const &instance_get_uuid_frequency, - std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) override { - const auto maybe_replication_ip_port = - io::network::Endpoint::ParseSocketOrAddress(replication_socket_address, std::nullopt); + std::string_view instance_name, CoordinatorQuery::SyncMode sync_mode) override { + const auto maybe_replication_ip_port = io::network::Endpoint::ParseSocketOrAddress(replication_socket_address); if (!maybe_replication_ip_port) { throw QueryRuntimeException("Invalid replication socket address!"); } - const auto maybe_coordinator_ip_port = - io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address, std::nullopt); + const auto maybe_coordinator_ip_port = io::network::Endpoint::ParseSocketOrAddress(coordinator_socket_address); if (!maybe_replication_ip_port) { throw QueryRuntimeException("Invalid replication socket address!"); } @@ -452,13 +452,13 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { const auto [replication_ip, replication_port] = *maybe_replication_ip_port; const auto [coordinator_server_ip, coordinator_server_port] = *maybe_coordinator_ip_port; const auto repl_config = coordination::CoordinatorClientConfig::ReplicationClientInfo{ - .instance_name = instance_name, + .instance_name = std::string(instance_name), .replication_mode = convertFromCoordinatorToReplicationMode(sync_mode), .replication_ip_address = replication_ip, .replication_port = replication_port}; auto coordinator_client_config = - coordination::CoordinatorClientConfig{.instance_name = instance_name, + coordination::CoordinatorClientConfig{.instance_name = std::string(instance_name), .ip_address = coordinator_server_ip, .port = coordinator_server_port, .instance_health_check_frequency_sec = instance_check_frequency, @@ -472,18 +472,17 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { using enum memgraph::coordination::RegisterInstanceCoordinatorStatus; case NAME_EXISTS: throw QueryRuntimeException("Couldn't register replica instance since instance with such name already exists!"); - case ENDPOINT_EXISTS: + case COORD_ENDPOINT_EXISTS: throw QueryRuntimeException( - "Couldn't register replica instance since instance with such endpoint already exists!"); + "Couldn't register replica instance since instance with such coordinator endpoint already exists!"); + case REPL_ENDPOINT_EXISTS: + throw QueryRuntimeException( + "Couldn't register replica instance since instance with such replication endpoint already exists!"); case NOT_COORDINATOR: throw QueryRuntimeException("REGISTER INSTANCE query can only be run on a coordinator!"); case NOT_LEADER: throw QueryRuntimeException("Couldn't register replica instance since coordinator is not a leader!"); - case RAFT_COULD_NOT_ACCEPT: - throw QueryRuntimeException( - "Couldn't register replica instance since raft server couldn't accept the log! Most likely the raft " - "instance is not a leader!"); - case RAFT_COULD_NOT_APPEND: + case RAFT_LOG_ERROR: throw QueryRuntimeException("Couldn't register replica instance since raft server couldn't append the log!"); case RPC_FAILED: throw QueryRuntimeException( @@ -494,8 +493,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } - auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &raft_socket_address) -> void override { - auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrIpAddress(raft_socket_address); + auto AddCoordinatorInstance(uint32_t raft_server_id, std::string_view raft_socket_address) -> void override { + auto const maybe_ip_and_port = io::network::Endpoint::ParseSocketOrAddress(raft_socket_address); if (maybe_ip_and_port) { auto const [ip, port] = *maybe_ip_and_port; spdlog::info("Adding instance {} with raft socket address {}:{}.", raft_server_id, port, ip); @@ -505,8 +504,8 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } - void SetReplicationInstanceToMain(const std::string &instance_name) override { - auto status = coordinator_handler_.SetReplicationInstanceToMain(instance_name); + void SetReplicationInstanceToMain(std::string_view instance_name) override { + auto const status = coordinator_handler_.SetReplicationInstanceToMain(instance_name); switch (status) { using enum memgraph::coordination::SetInstanceToMainCoordinatorStatus; case NO_INSTANCE_WITH_NAME: @@ -515,6 +514,10 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("Couldn't set instance to main since there is already a main instance in cluster!"); case NOT_COORDINATOR: throw QueryRuntimeException("SET INSTANCE TO MAIN query can only be run on a coordinator!"); + case NOT_LEADER: + throw QueryRuntimeException("Couldn't set instance to main since coordinator is not a leader!"); + case RAFT_LOG_ERROR: + throw QueryRuntimeException("Couldn't promote instance since raft server couldn't append the log!"); case COULD_NOT_PROMOTE_TO_MAIN: throw QueryRuntimeException( "Couldn't set replica instance to main! Check coordinator and replica for more logs"); @@ -1251,17 +1254,16 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param throw QueryRuntimeException("Only coordinator can run SHOW INSTANCES."); } - callback.header = {"name", "raft_socket_address", "coordinator_socket_address", "alive", "role"}; + callback.header = {"name", "raft_socket_address", "coordinator_socket_address", "health", "role"}; callback.fn = [handler = CoordQueryHandler{*coordinator_state}, replica_nfields = callback.header.size()]() mutable { auto const instances = handler.ShowInstances(); auto const converter = [](const auto &status) -> std::vector { return {TypedValue{status.instance_name}, TypedValue{status.raft_socket_address}, - TypedValue{status.coord_socket_address}, TypedValue{status.is_alive}, - TypedValue{status.cluster_role}}; + TypedValue{status.coord_socket_address}, TypedValue{status.health}, TypedValue{status.cluster_role}}; }; - return utils::fmap(converter, instances); + return utils::fmap(instances, converter); }; return callback; } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 01a443d6d..f18bd6721 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -95,25 +95,24 @@ class CoordinatorQueryHandler { }; /// @throw QueryRuntimeException if an error ocurred. - virtual void RegisterReplicationInstance(std::string const &coordinator_socket_address, - std::string const &replication_socket_address, + virtual void RegisterReplicationInstance(std::string_view coordinator_socket_address, + std::string_view replication_socket_address, std::chrono::seconds const &instance_health_check_frequency, std::chrono::seconds const &instance_down_timeout, std::chrono::seconds const &instance_get_uuid_frequency, - std::string const &instance_name, CoordinatorQuery::SyncMode sync_mode) = 0; + std::string_view instance_name, CoordinatorQuery::SyncMode sync_mode) = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual void UnregisterInstance(std::string const &instance_name) = 0; + virtual void UnregisterInstance(std::string_view instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual void SetReplicationInstanceToMain(const std::string &instance_name) = 0; + virtual void SetReplicationInstanceToMain(std::string_view instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. virtual std::vector ShowInstances() const = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual auto AddCoordinatorInstance(uint32_t raft_server_id, std::string const &coordinator_socket_address) - -> void = 0; + virtual auto AddCoordinatorInstance(uint32_t raft_server_id, std::string_view coordinator_socket_address) -> void = 0; }; #endif diff --git a/src/replication_coordination_glue/mode.hpp b/src/replication_coordination_glue/mode.hpp index d0b415733..4ca98b3a0 100644 --- a/src/replication_coordination_glue/mode.hpp +++ b/src/replication_coordination_glue/mode.hpp @@ -12,7 +12,19 @@ #pragma once #include +#include +#include +#include + +#include "json/json.hpp" namespace memgraph::replication_coordination_glue { + enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; + +NLOHMANN_JSON_SERIALIZE_ENUM(ReplicationMode, { + {ReplicationMode::SYNC, "sync"}, + {ReplicationMode::ASYNC, "async"}, + }) + } // namespace memgraph::replication_coordination_glue diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index d5c2bfa71..e1da19bfa 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -210,8 +210,13 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto client = std::make_unique(*instance_client_ptr, main_uuid); client->Start(storage, std::move(db_acc)); bool const success = std::invoke([state = client->State()]() { + // We force sync replicas in other situation if (state == storage::replication::ReplicaState::DIVERGED_FROM_MAIN) { +#ifdef MG_ENTERPRISE + return FLAGS_coordinator_server_port != 0; +#else return false; +#endif } return true; }); diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index ea567eed0..34ccdfc99 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -271,9 +271,7 @@ auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glu dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { auto &repl_storage_state = db_acc->storage()->repl_storage_state_; - std::vector> history = - utils::fmap([](const auto &elem) { return std::pair(elem.first, elem.second); }, - repl_storage_state.history); + std::vector> history = utils::fmap(repl_storage_state.history); history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load()); replication_coordination_glue::DatabaseHistory repl{ diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 92c4d11e8..a83313820 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -118,6 +118,8 @@ std::optional> GetWalFiles(const std::filesystem: if (!item.is_regular_file()) continue; try { auto info = ReadWalInfo(item.path()); + spdlog::trace("Getting wal file with following info: uuid: {}, epoch id: {}, from timestamp {}, to_timestamp {} ", + info.uuid, info.epoch_id, info.from_timestamp, info.to_timestamp); if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) { wal_files.emplace_back(info.seq_num, info.from_timestamp, info.to_timestamp, std::move(info.uuid), std::move(info.epoch_id), item.path()); @@ -410,22 +412,17 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication std::optional previous_seq_num; auto last_loaded_timestamp = snapshot_timestamp; spdlog::info("Trying to load WAL files."); + + if (last_loaded_timestamp) { + epoch_history->emplace_back(repl_storage_state.epoch_.id(), *last_loaded_timestamp); + } + for (auto &wal_file : wal_files) { if (previous_seq_num && (wal_file.seq_num - *previous_seq_num) > 1) { LOG_FATAL("You are missing a WAL file with the sequence number {}!", *previous_seq_num + 1); } previous_seq_num = wal_file.seq_num; - if (wal_file.epoch_id != repl_storage_state.epoch_.id()) { - // This way we skip WALs finalized only because of role change. - // We can also set the last timestamp to 0 if last loaded timestamp - // is nullopt as this can only happen if the WAL file with seq = 0 - // does not contain any deltas and we didn't find any snapshots. - if (last_loaded_timestamp) { - epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp); - } - repl_storage_state.epoch_.SetEpoch(std::move(wal_file.epoch_id)); - } try { auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper, edge_count, config.salient.items); @@ -434,13 +431,24 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication recovery_info.next_timestamp = std::max(recovery_info.next_timestamp, info.next_timestamp); recovery_info.last_commit_timestamp = info.last_commit_timestamp; + + if (recovery_info.next_timestamp != 0) { + last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); + } + + auto last_loaded_timestamp_value = last_loaded_timestamp.value_or(0); + if (epoch_history->empty() || epoch_history->back().first != wal_file.epoch_id) { + // no history or new epoch, add it + epoch_history->emplace_back(wal_file.epoch_id, last_loaded_timestamp_value); + repl_storage_state.epoch_.SetEpoch(wal_file.epoch_id); + } else if (epoch_history->back().second < last_loaded_timestamp_value) { + // existing epoch, update with newer timestamp + epoch_history->back().second = last_loaded_timestamp_value; + } + } catch (const RecoveryFailure &e) { LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", wal_file.path, e.what()); } - - if (recovery_info.next_timestamp != 0) { - last_loaded_timestamp.emplace(recovery_info.next_timestamp - 1); - } } // The sequence number needs to be recovered even though `LoadWal` didn't // load any deltas from that file. @@ -456,7 +464,12 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us, std::chrono::duration_cast(timer.Elapsed()).count()); + spdlog::trace("Set epoch id: {} with commit timestamp {}", std::string(repl_storage_state.epoch_.id()), + repl_storage_state.last_commit_timestamp_); + std::for_each(repl_storage_state.history.begin(), repl_storage_state.history.end(), [](auto &history) { + spdlog::trace("epoch id: {} with commit timestamp {}", std::string(history.first), history.second); + }); return recovery_info; } diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp index fe752bfd1..5f1182c75 100644 --- a/src/storage/v2/inmemory/replication/recovery.cpp +++ b/src/storage/v2/inmemory/replication/recovery.cpp @@ -233,7 +233,7 @@ std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileR } } - // In all cases, if we have a current wal file we need to use itW + // In all cases, if we have a current wal file we need to use it if (current_wal_seq_num) { // NOTE: File not handled directly, so no need to lock it recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index bd8534673..3a4fa9b91 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -109,6 +109,7 @@ InMemoryStorage::InMemoryStorage(Config config) timestamp_ = std::max(timestamp_, info->next_timestamp); if (info->last_commit_timestamp) { repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp; + spdlog::trace("Recovering last commit timestamp {}", *info->last_commit_timestamp); } } } else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED || diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 1eb06bf10..fb332672a 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -54,26 +54,59 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce std::optional branching_point; // different epoch id, replica was main + // In case there is no epoch transfer, and MAIN doesn't hold all the epochs as it could have been down and miss it + // we need then just to check commit timestamp if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) { + spdlog::trace( + "REPLICA: epoch UUID: {} and last_commit_timestamp: {}; MAIN: epoch UUID {} and last_commit_timestamp {}", + std::string(replica.epoch_id), replica.current_commit_timestamp, std::string(replStorageState.epoch_.id()), + replStorageState.last_commit_timestamp_); auto const &history = replStorageState.history; const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { return main_epoch_info.first == replica.epoch_id; }); // main didn't have that epoch, but why is here branching point if (epoch_info_iter == history.crend()) { + spdlog::info("Couldn't find epoch {} in MAIN, setting branching point", std::string(replica.epoch_id)); branching_point = 0; - } else if (epoch_info_iter->second != replica.current_commit_timestamp) { + } else if (epoch_info_iter->second < replica.current_commit_timestamp) { + spdlog::info("Found epoch {} on MAIN with last_commit_timestamp {}, REPLICA's last_commit_timestamp {}", + std::string(epoch_info_iter->first), epoch_info_iter->second, replica.current_commit_timestamp); branching_point = epoch_info_iter->second; } } if (branching_point) { - spdlog::error( - "You cannot register Replica {} to this Main because at one point " - "Replica {} acted as the Main instance. Both the Main and Replica {} " - "now hold unique data. Please resolve data conflicts and start the " - "replication on a clean instance.", - client_.name_, client_.name_, client_.name_); - replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::DIVERGED_FROM_MAIN; }); + auto replica_state = replica_state_.Lock(); + if (*replica_state == replication::ReplicaState::DIVERGED_FROM_MAIN) { + return; + } + *replica_state = replication::ReplicaState::DIVERGED_FROM_MAIN; + + auto log_error = [client_name = client_.name_]() { + spdlog::error( + "You cannot register Replica {} to this Main because at one point " + "Replica {} acted as the Main instance. Both the Main and Replica {} " + "now hold unique data. Please resolve data conflicts and start the " + "replication on a clean instance.", + client_name, client_name, client_name); + }; +#ifdef MG_ENTERPRISE + if (!FLAGS_coordinator_server_port) { + log_error(); + return; + } + client_.thread_pool_.AddTask([storage, gk = std::move(db_acc), this] { + const auto [success, timestamp] = this->ForceResetStorage(storage); + if (success) { + spdlog::info("Successfully reset storage of REPLICA {} to timestamp {}.", client_.name_, timestamp); + return; + } + spdlog::error("You cannot register REPLICA {} to this MAIN because MAIN couldn't reset REPLICA's storage.", + client_.name_); + }); +#else + log_error(); +#endif return; } @@ -192,9 +225,6 @@ void ReplicationStorageClient::StartTransactionReplication(const uint64_t curren } } -//////// AF: you can't finialize transaction replication if you are not replicating -/////// AF: if there is no stream or it is Defunct than we need to set replica in MAYBE_BEHIND -> is that even used -/////// AF: bool ReplicationStorageClient::FinalizeTransactionReplication(Storage *storage, DatabaseAccessProtector db_acc) { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption @@ -327,6 +357,21 @@ void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph: } } +std::pair ReplicationStorageClient::ForceResetStorage(memgraph::storage::Storage *storage) { + utils::OnScopeExit set_to_maybe_behind{ + [this]() { replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::MAYBE_BEHIND; }); }}; + try { + auto stream{client_.rpc_client_.Stream(main_uuid_, storage->uuid())}; + const auto res = stream.AwaitResponse(); + return std::pair{res.success, res.current_commit_timestamp}; + } catch (const rpc::RpcFailedException &) { + spdlog::error( + utils::MessageWithLink("Couldn't ForceReset data to {}.", client_.name_, "https://memgr.ph/replication")); + } + + return {false, 0}; +} + ////// ReplicaStream ////// ReplicaStream::ReplicaStream(Storage *storage, rpc::Client &rpc_client, const uint64_t current_seq_num, utils::UUID main_uuid) diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 3352bab65..063501111 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -188,6 +188,13 @@ class ReplicationStorageClient { */ void UpdateReplicaState(Storage *storage, DatabaseAccessProtector db_acc); + /** + * @brief Forcefully reset storage to as it is when started from scratch. + * + * @param storage pointer to the storage associated with the client + */ + std::pair ForceResetStorage(Storage *storage); + void LogRpcFailure(); /** diff --git a/src/storage/v2/replication/rpc.cpp b/src/storage/v2/replication/rpc.cpp index f523bb5d7..71a9ca65c 100644 --- a/src/storage/v2/replication/rpc.cpp +++ b/src/storage/v2/replication/rpc.cpp @@ -59,6 +59,19 @@ void TimestampRes::Save(const TimestampRes &self, memgraph::slk::Builder *builde memgraph::slk::Save(self, builder); } void TimestampRes::Load(TimestampRes *self, memgraph::slk::Reader *reader) { memgraph::slk::Load(self, reader); } + +void ForceResetStorageReq::Save(const ForceResetStorageReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void ForceResetStorageReq::Load(ForceResetStorageReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} +void ForceResetStorageRes::Save(const ForceResetStorageRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void ForceResetStorageRes::Load(ForceResetStorageRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} } // namespace storage::replication constexpr utils::TypeInfo storage::replication::AppendDeltasReq::kType{utils::TypeId::REP_APPEND_DELTAS_REQ, @@ -97,6 +110,12 @@ constexpr utils::TypeInfo storage::replication::TimestampReq::kType{utils::TypeI constexpr utils::TypeInfo storage::replication::TimestampRes::kType{utils::TypeId::REP_TIMESTAMP_RES, "TimestampRes", nullptr}; +constexpr utils::TypeInfo storage::replication::ForceResetStorageReq::kType{utils::TypeId::REP_FORCE_RESET_STORAGE_REQ, + "ForceResetStorageReq", nullptr}; + +constexpr utils::TypeInfo storage::replication::ForceResetStorageRes::kType{utils::TypeId::REP_FORCE_RESET_STORAGE_RES, + "ForceResetStorageRes", nullptr}; + // Autogenerated SLK serialization code namespace slk { // Serialize code for TimestampRes @@ -255,6 +274,30 @@ void Load(memgraph::storage::replication::AppendDeltasReq *self, memgraph::slk:: memgraph::slk::Load(&self->seq_num, reader); } +// Serialize code for ForceResetStorageReq + +void Save(const memgraph::storage::replication::ForceResetStorageReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.main_uuid, builder); + memgraph::slk::Save(self.db_uuid, builder); +} + +void Load(memgraph::storage::replication::ForceResetStorageReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->main_uuid, reader); + memgraph::slk::Load(&self->db_uuid, reader); +} + +// Serialize code for ForceResetStorageRes + +void Save(const memgraph::storage::replication::ForceResetStorageRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); + memgraph::slk::Save(self.current_commit_timestamp, builder); +} + +void Load(memgraph::storage::replication::ForceResetStorageRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); + memgraph::slk::Load(&self->current_commit_timestamp, reader); +} + // Serialize SalientConfig void Save(const memgraph::storage::SalientConfig &self, memgraph::slk::Builder *builder) { diff --git a/src/storage/v2/replication/rpc.hpp b/src/storage/v2/replication/rpc.hpp index 67f98d0ae..fb19d82f2 100644 --- a/src/storage/v2/replication/rpc.hpp +++ b/src/storage/v2/replication/rpc.hpp @@ -210,6 +210,36 @@ struct TimestampRes { using TimestampRpc = rpc::RequestResponse; +struct ForceResetStorageReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(ForceResetStorageReq *self, memgraph::slk::Reader *reader); + static void Save(const ForceResetStorageReq &self, memgraph::slk::Builder *builder); + ForceResetStorageReq() = default; + explicit ForceResetStorageReq(const utils::UUID &main_uuid, const utils::UUID &db_uuid) + : main_uuid{main_uuid}, db_uuid{db_uuid} {} + + utils::UUID main_uuid; + utils::UUID db_uuid; +}; + +struct ForceResetStorageRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(ForceResetStorageRes *self, memgraph::slk::Reader *reader); + static void Save(const ForceResetStorageRes &self, memgraph::slk::Builder *builder); + ForceResetStorageRes() = default; + ForceResetStorageRes(bool success, uint64_t current_commit_timestamp) + : success(success), current_commit_timestamp(current_commit_timestamp) {} + + bool success; + uint64_t current_commit_timestamp; +}; + +using ForceResetStorageRpc = rpc::RequestResponse; + } // namespace memgraph::storage::replication // SLK serialization declarations @@ -267,4 +297,12 @@ void Save(const memgraph::storage::SalientConfig &self, memgraph::slk::Builder * void Load(memgraph::storage::SalientConfig *self, memgraph::slk::Reader *reader); +void Save(const memgraph::storage::replication::ForceResetStorageReq &self, memgraph::slk::Builder *builder); + +void Load(memgraph::storage::replication::ForceResetStorageReq *self, memgraph::slk::Reader *reader); + +void Save(const memgraph::storage::replication::ForceResetStorageRes &self, memgraph::slk::Builder *builder); + +void Load(memgraph::storage::replication::ForceResetStorageRes *self, memgraph::slk::Reader *reader); + } // namespace memgraph::slk diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index bac3e78f3..802b8ff6f 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -24,6 +24,7 @@ find_package(Threads REQUIRED) add_library(mg-utils STATIC ${utils_src_files}) add_library(mg::utils ALIAS mg-utils) + target_link_libraries(mg-utils PUBLIC Boost::headers fmt::fmt spdlog::spdlog json) target_link_libraries(mg-utils PRIVATE librdtsc stdc++fs Threads::Threads gflags uuid rt) diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp index f5242944a..fe60edc5c 100644 --- a/src/utils/functional.hpp +++ b/src/utils/functional.hpp @@ -19,9 +19,9 @@ namespace memgraph::utils { template