From b561c61b6455ef6a6dee35aa47057ea7871174f2 Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Wed, 28 Feb 2024 10:57:00 +0100 Subject: [PATCH] HA: Add initial logic for choosing new replica (#1729) --- src/coordination/coordinator_client.cpp | 26 +- src/coordination/coordinator_handlers.cpp | 11 + src/coordination/coordinator_instance.cpp | 302 ++++++++++++------ src/coordination/coordinator_rpc.cpp | 38 ++- .../coordination/coordinator_client.hpp | 12 +- .../coordination/coordinator_handlers.hpp | 3 + .../coordination/coordinator_instance.hpp | 24 +- .../include/coordination/coordinator_rpc.hpp | 33 ++ .../include/coordination/coordinator_slk.hpp | 14 + .../coordination/replication_instance.hpp | 24 +- .../include/coordination/rpc_errors.hpp | 1 + src/coordination/replication_instance.cpp | 27 +- src/dbms/dbms_handler.hpp | 4 - .../CMakeLists.txt | 1 + src/replication_coordination_glue/common.hpp | 32 ++ .../replication_handler.hpp | 3 + .../replication_handler.cpp | 21 +- .../v2/inmemory/replication/recovery.cpp | 6 +- .../v2/replication/replication_client.cpp | 2 + src/utils/functional.hpp | 7 +- src/utils/scheduler.hpp | 2 +- src/utils/typeinfo.hpp | 2 + tests/unit/CMakeLists.txt | 8 + tests/unit/coordination_utils.cpp | 246 ++++++++++++++ 24 files changed, 718 insertions(+), 131 deletions(-) create mode 100644 src/replication_coordination_glue/common.hpp create mode 100644 tests/unit/coordination_utils.cpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 84044b04a..f4d2da838 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -16,6 +16,7 @@ #include "coordination/coordinator_config.hpp" #include "coordination/coordinator_rpc.hpp" +#include "replication_coordination_glue/common.hpp" #include "replication_coordination_glue/messages.hpp" #include "utils/result.hpp" @@ -30,7 +31,7 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const & } // namespace CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config, - HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) + HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb) : rpc_context_{CreateClientContext(config)}, rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), &rpc_context_}, @@ -68,6 +69,10 @@ void CoordinatorClient::StartFrequentCheck() { auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; stream.AwaitResponse(); } + // Subtle race condition: + // acquiring of lock needs to happen before function call, as function callback can be changed + // for instance after lock is already acquired + // (failover case when instance is promoted to MAIN) succ_cb_(coord_instance_, instance_name); } catch (rpc::RpcFailedException const &) { fail_cb_(coord_instance_, instance_name); @@ -79,11 +84,6 @@ void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); } void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); } void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); } -auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void { - succ_cb_ = std::move(succ_cb); - fail_cb_ = std::move(fail_cb); -} - auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; } auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid, @@ -171,5 +171,19 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool { return false; } +auto CoordinatorClient::SendGetInstanceTimestampsRpc() const + -> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> { + try { + auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()}; + auto res = stream.AwaitResponse(); + + return res.database_histories; + + } catch (const rpc::RpcFailedException &) { + spdlog::error("RPC error occured while sending GetInstance UUID RPC"); + return GetInstanceUUIDError::RPC_EXCEPTION; + } +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index f605069fe..637360267 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -57,6 +57,17 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se spdlog::info("Received GetInstanceUUIDRpc on coordinator server"); CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder); }); + + server.Register<coordination::GetDatabaseHistoriesRpc>( + [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received GetDatabasesHistoryRpc on coordinator server"); + CoordinatorHandlers::GetDatabaseHistoriesHandler(replication_handler, req_reader, res_builder); + }); +} + +void CoordinatorHandlers::GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler, + slk::Reader * /*req_reader*/, slk::Builder *res_builder) { + slk::Save(coordination::GetDatabaseHistoriesRes{replication_handler.GetDatabasesHistories()}, res_builder); } void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 90674cf3c..ba94d9d5f 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -15,10 +15,12 @@ #include "coordination/coordinator_exceptions.hpp" #include "coordination/fmt.hpp" +#include "dbms/constants.hpp" #include "nuraft/coordinator_state_machine.hpp" #include "nuraft/coordinator_state_manager.hpp" #include "utils/counter.hpp" #include "utils/functional.hpp" +#include "utils/resource_lock.hpp" #include <range/v3/view.hpp> #include <shared_mutex> @@ -32,96 +34,28 @@ CoordinatorInstance::CoordinatorInstance() : raft_state_(RaftState::MakeRaftState( [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); }, [this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) { - auto find_repl_instance = [](CoordinatorInstance *self, - std::string_view repl_instance_name) -> ReplicationInstance & { - auto repl_instance = - std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) { - return instance.InstanceName() == repl_instance_name; - }); - - MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!", - repl_instance_name); - return *repl_instance; + client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::unique_lock{self->coord_instance_lock_}; + auto &repl_instance = self->FindReplicationInstance(repl_instance_name); + std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name); }; - replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::lock_guard{self->coord_instance_lock_}; - spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); - auto &repl_instance = find_repl_instance(self, repl_instance_name); - - // We need to get replicas UUID from time to time to ensure replica is listening to correct main - // and that it didn't go down for less time than we could notice - // We need to get id of main replica is listening to - // and swap if necessary - if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) { - spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()); - return; - } - - repl_instance.OnSuccessPing(); + client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { + auto lock = std::unique_lock{self->coord_instance_lock_}; + auto &repl_instance = self->FindReplicationInstance(repl_instance_name); + std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name); }; +} - replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::lock_guard{self->coord_instance_lock_}; - spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); - auto &repl_instance = find_repl_instance(self, repl_instance_name); - repl_instance.OnFailPing(); - }; +auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & { + auto repl_instance = + std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) { + return instance.InstanceName() == replication_instance_name; + }); - main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::lock_guard{self->coord_instance_lock_}; - spdlog::trace("Instance {} performing main successful callback", repl_instance_name); - - auto &repl_instance = find_repl_instance(self, repl_instance_name); - - if (repl_instance.IsAlive()) { - repl_instance.OnSuccessPing(); - return; - } - - const auto &repl_instance_uuid = repl_instance.GetMainUUID(); - MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set."); - - auto const curr_main_uuid = self->GetMainUUID(); - if (curr_main_uuid == repl_instance_uuid.value()) { - if (!repl_instance.EnableWritingOnMain()) { - spdlog::error("Failed to enable writing on main instance {}", repl_instance_name); - return; - } - - repl_instance.OnSuccessPing(); - return; - } - - // TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but - // swapUUID can fail - if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) { - repl_instance.OnSuccessPing(); - spdlog::info("Instance {} demoted to replica", repl_instance_name); - } else { - spdlog::error("Instance {} failed to become replica", repl_instance_name); - return; - } - - if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) { - spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName())); - return; - } - }; - - main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { - auto lock = std::lock_guard{self->coord_instance_lock_}; - spdlog::trace("Instance {} performing main failure callback", repl_instance_name); - auto &repl_instance = find_repl_instance(self, repl_instance_name); - repl_instance.OnFailPing(); - const auto &repl_instance_uuid = repl_instance.GetMainUUID(); - MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set"); - - if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) { - spdlog::info("Cluster without main instance, trying automatic failover"); - self->TryFailover(); // TODO: (andi) Initiate failover - } - }; + MG_ASSERT(repl_instance != repl_instances_.end(), "Instance {} not found during callback!", + replication_instance_name); + return *repl_instance; } auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> { @@ -166,8 +100,36 @@ auto CoordinatorInstance::TryFailover() -> void { return; } - // TODO: Smarter choice - auto new_main = ranges::begin(alive_replicas); + // for each DB in instance we get one DatabaseHistory + using DatabaseHistories = replication_coordination_glue::DatabaseHistories; + std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories; + + bool success{true}; + std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) { + if (!success) { + return; + } + auto res = replica.GetClient().SendGetInstanceTimestampsRpc(); + if (res.HasError()) { + spdlog::error("Could get per db history data for instance {}", replica.InstanceName()); + success = false; + return; + } + instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue())); + }); + + if (!success) { + spdlog::error("Aborting failover as at least one instance didn't provide per database history."); + return; + } + + auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] = + ChooseMostUpToDateInstance(instance_database_histories); + + spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp", + most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp); + + auto *new_main = &FindReplicationInstance(most_up_to_date_instance); new_main->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; @@ -191,7 +153,8 @@ auto CoordinatorInstance::TryFailover() -> void { ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) | ranges::to<ReplicationClientsInfo>(); - if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { + if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, + &CoordinatorInstance::MainFailCallback)) { spdlog::warn("Failover failed since promoting replica to main failed!"); return; } @@ -242,7 +205,8 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main), std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo); - if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) { + if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback, + &CoordinatorInstance::MainFailCallback)) { return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN; } @@ -290,7 +254,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co spdlog::info("Request for registering instance {} accepted", instance_name); try { - repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_); + repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_, + &CoordinatorInstance::ReplicaSuccessCallback, + &CoordinatorInstance::ReplicaFailCallback); } catch (CoordinatorRegisterInstanceException const &) { return RegisterInstanceCoordinatorStatus::RPC_FAILED; } @@ -304,6 +270,85 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co return RegisterInstanceCoordinatorStatus::SUCCESS; } +void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) { + auto &repl_instance = FindReplicationInstance(repl_instance_name); + repl_instance.OnFailPing(); + const auto &repl_instance_uuid = repl_instance.GetMainUUID(); + MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set"); + + if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) { + spdlog::info("Cluster without main instance, trying automatic failover"); + TryFailover(); + } +} + +void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) { + auto &repl_instance = FindReplicationInstance(repl_instance_name); + spdlog::trace("Instance {} performing main successful callback", repl_instance_name); + + if (repl_instance.IsAlive()) { + repl_instance.OnSuccessPing(); + return; + } + + const auto &repl_instance_uuid = repl_instance.GetMainUUID(); + MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set."); + + auto const curr_main_uuid = GetMainUUID(); + if (curr_main_uuid == repl_instance_uuid.value()) { + if (!repl_instance.EnableWritingOnMain()) { + spdlog::error("Failed to enable writing on main instance {}", repl_instance_name); + return; + } + + repl_instance.OnSuccessPing(); + return; + } + + if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback, + &CoordinatorInstance::ReplicaFailCallback)) { + repl_instance.OnSuccessPing(); + spdlog::info("Instance {} demoted to replica", repl_instance_name); + } else { + spdlog::error("Instance {} failed to become replica", repl_instance_name); + return; + } + + if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) { + spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName())); + return; + } +} + +void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) { + auto &repl_instance = FindReplicationInstance(repl_instance_name); + if (!repl_instance.IsReplica()) { + spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name); + return; + } + spdlog::trace("Instance {} performing replica successful callback", repl_instance_name); + // We need to get replicas UUID from time to time to ensure replica is listening to correct main + // and that it didn't go down for less time than we could notice + // We need to get id of main replica is listening to + // and swap if necessary + if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) { + spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName()); + return; + } + + repl_instance.OnSuccessPing(); +} + +void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) { + auto &repl_instance = FindReplicationInstance(repl_instance_name); + if (!repl_instance.IsReplica()) { + spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name); + return; + } + spdlog::trace("Instance {} performing replica failure callback", repl_instance_name); + repl_instance.OnFailPing(); +} + auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name) -> UnregisterInstanceCoordinatorStatus { auto lock = std::lock_guard{coord_instance_lock_}; @@ -343,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_ // TODO: (andi) Add to the RAFT log. auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; } +auto CoordinatorInstance::ChooseMostUpToDateInstance( + const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>> + &instance_database_histories) -> NewMainRes { + NewMainRes new_main_res; + std::for_each( + instance_database_histories.begin(), instance_database_histories.end(), + [&new_main_res](const InstanceNameDbHistories &instance_res_pair) { + const auto &[instance_name, instance_db_histories] = instance_res_pair; + + // Find default db for instance and its history + auto default_db_history_data = std::ranges::find_if( + instance_db_histories, [default_db = memgraph::dbms::kDefaultDB]( + const replication_coordination_glue::DatabaseHistory &db_timestamps) { + return db_timestamps.name == default_db; + }); + + std::ranges::for_each( + instance_db_histories, + [&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) { + spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name, + memgraph::dbms::kDefaultDB); + }); + + MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance"); + + const auto &instance_default_db_history = default_db_history_data->history; + + std::ranges::for_each(instance_default_db_history | ranges::views::reverse, + [&instance_name = instance_name](const auto &epoch_history_it) { + spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, + std::get<0>(epoch_history_it), std::get<1>(epoch_history_it)); + }); + + // get latest epoch + // get latest timestamp + + if (!new_main_res.latest_epoch) { + const auto &[epoch, timestamp] = *instance_default_db_history.crbegin(); + new_main_res = NewMainRes{ + .most_up_to_date_instance = instance_name, + .latest_epoch = epoch, + .latest_commit_timestamp = timestamp, + }; + spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", + instance_name, epoch, timestamp); + return; + } + + bool found_same_point{false}; + std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch}; + for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) { + if (*new_main_res.latest_commit_timestamp < timestamp) { + new_main_res = NewMainRes{ + .most_up_to_date_instance = instance_name, + .latest_epoch = epoch, + .latest_commit_timestamp = timestamp, + }; + + spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp", + instance_name, epoch, timestamp); + } + + // we found point at which they were same + if (epoch == last_most_up_to_date_epoch) { + found_same_point = true; + break; + } + } + + if (!found_same_point) { + spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch, + new_main_res.most_up_to_date_instance, instance_name); + } + }); + + return new_main_res; +} } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index 4115f1979..815693824 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R memgraph::slk::Load(self, reader); } -void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {} +void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {} -void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {} +void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {} // GetInstanceUUID void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) { @@ -97,6 +97,24 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r memgraph::slk::Load(self, reader); } +// GetDatabaseHistoriesRpc + +void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* nothing to serialize */ +} + +void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* nothing to serialize */ +} + +void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetDatabaseHistoriesRes::Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -130,6 +148,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId: constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes", nullptr}; +constexpr utils::TypeInfo coordination::GetDatabaseHistoriesReq::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_REQ, + "GetInstanceDatabasesReq", nullptr}; + +constexpr utils::TypeInfo coordination::GetDatabaseHistoriesRes::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_RES, + "GetInstanceDatabasesRes", nullptr}; + namespace slk { // PromoteReplicaToMainRpc @@ -213,6 +237,16 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade memgraph::slk::Load(&self->uuid, reader); } +// GetInstanceTimestampsReq + +void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.database_histories, builder); +} + +void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->database_histories, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 5e10af89d..994c78d18 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -14,6 +14,7 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" +#include "replication_coordination_glue/common.hpp" #include "rpc/client.hpp" #include "rpc_errors.hpp" #include "utils/result.hpp" @@ -23,13 +24,13 @@ namespace memgraph::coordination { class CoordinatorInstance; -using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>; +using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>; using ReplicationClientsInfo = std::vector<ReplClientInfo>; class CoordinatorClient { public: explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config, - HealthCheckCallback succ_cb, HealthCheckCallback fail_cb); + HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb); ~CoordinatorClient() = default; @@ -62,7 +63,8 @@ class CoordinatorClient { auto ReplicationClientInfo() const -> ReplClientInfo; - auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; + auto SendGetInstanceTimestampsRpc() const + -> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>; auto RpcClient() -> rpc::Client & { return rpc_client_; } @@ -82,8 +84,8 @@ class CoordinatorClient { CoordinatorClientConfig config_; CoordinatorInstance *coord_instance_; - HealthCheckCallback succ_cb_; - HealthCheckCallback fail_cb_; + HealthCheckClientCallback succ_cb_; + HealthCheckClientCallback fail_cb_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index b9ed4b519..18aecc9cf 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -41,6 +41,9 @@ class CoordinatorHandlers { static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); + + static void GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, + slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 15b377ed9..bed202744 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -18,6 +18,7 @@ #include "coordination/raft_state.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" #include "coordination/replication_instance.hpp" +#include "utils/resource_lock.hpp" #include "utils/rw_lock.hpp" #include "utils/thread_pool.hpp" @@ -25,6 +26,13 @@ namespace memgraph::coordination { +struct NewMainRes { + std::string most_up_to_date_instance; + std::optional<std::string> latest_epoch; + std::optional<uint64_t> latest_commit_timestamp; +}; +using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>; + class CoordinatorInstance { public: CoordinatorInstance(); @@ -44,12 +52,24 @@ class CoordinatorInstance { auto SetMainUUID(utils::UUID new_uuid) -> void; + auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; + + void MainFailCallback(std::string_view); + + void MainSuccessCallback(std::string_view); + + void ReplicaSuccessCallback(std::string_view); + + void ReplicaFailCallback(std::string_view); + + static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes; + private: - HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_; + HealthCheckClientCallback client_succ_cb_, client_fail_cb_; // NOTE: Must be std::list because we rely on pointer stability std::list<ReplicationInstance> repl_instances_; - mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ}; + mutable utils::ResourceLock coord_instance_lock_{}; utils::UUID main_uuid_; diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 1578b4577..2bf88fe46 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -15,6 +15,7 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" +#include "replication_coordination_glue/common.hpp" #include "rpc/messages.hpp" #include "slk/serialization.hpp" @@ -161,6 +162,32 @@ struct GetInstanceUUIDRes { using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>; +struct GetDatabaseHistoriesReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader); + static void Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder); + + GetDatabaseHistoriesReq() = default; +}; + +struct GetDatabaseHistoriesRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader); + static void Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder); + + explicit GetDatabaseHistoriesRes(const replication_coordination_glue::DatabaseHistories &database_histories) + : database_histories(database_histories) {} + GetDatabaseHistoriesRes() = default; + + replication_coordination_glue::DatabaseHistories database_histories; +}; + +using GetDatabaseHistoriesRpc = rpc::RequestResponse<GetDatabaseHistoriesReq, GetDatabaseHistoriesRes>; + } // namespace memgraph::coordination // SLK serialization declarations @@ -183,15 +210,21 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk: void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader); void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); + // UnregisterReplicaRpc void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader); void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader); +// EnableWritingOnMainRpc void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader); +// GetDatabaseHistoriesRpc +void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader); + } // namespace memgraph::slk #endif diff --git a/src/coordination/include/coordination/coordinator_slk.hpp b/src/coordination/include/coordination/coordinator_slk.hpp index 49834be41..ee393b7b6 100644 --- a/src/coordination/include/coordination/coordinator_slk.hpp +++ b/src/coordination/include/coordination/coordinator_slk.hpp @@ -14,6 +14,7 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" +#include "replication_coordination_glue/common.hpp" #include "slk/serialization.hpp" #include "slk/streams.hpp" @@ -34,5 +35,18 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) { Load(&obj->replication_ip_address, reader); Load(&obj->replication_port, reader); } + +inline void Save(const replication_coordination_glue::DatabaseHistory &obj, Builder *builder) { + Save(obj.db_uuid, builder); + Save(obj.history, builder); + Save(obj.name, builder); +} + +inline void Load(replication_coordination_glue::DatabaseHistory *obj, Reader *reader) { + Load(&obj->db_uuid, reader); + Load(&obj->history, reader); + Load(&obj->name, reader); +} + } // namespace memgraph::slk #endif diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 8001d0905..e8e00a0a8 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -18,17 +18,22 @@ #include "replication_coordination_glue/role.hpp" #include <libnuraft/nuraft.hxx> +#include "utils/resource_lock.hpp" #include "utils/result.hpp" #include "utils/uuid.hpp" namespace memgraph::coordination { class CoordinatorInstance; +class ReplicationInstance; + +using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view); class ReplicationInstance { public: - ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb, - HealthCheckCallback fail_cb); + ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckClientCallback succ_cb, + HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb, + HealthCheckInstanceCallback fail_instance_cb); ReplicationInstance(ReplicationInstance const &other) = delete; ReplicationInstance &operator=(ReplicationInstance const &other) = delete; @@ -50,9 +55,10 @@ class ReplicationInstance { auto IsReplica() const -> bool; auto IsMain() const -> bool; - auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb, - HealthCheckCallback main_fail_cb) -> bool; - auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool; + auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, + HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool; + auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb) + -> bool; auto StartFrequentCheck() -> void; auto StopFrequentCheck() -> void; @@ -66,16 +72,17 @@ class ReplicationInstance { auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool; auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool; - auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>; auto GetClient() -> CoordinatorClient &; auto EnableWritingOnMain() -> bool; auto SetNewMainUUID(utils::UUID const &main_uuid) -> void; - auto ResetMainUUID() -> void; auto GetMainUUID() const -> const std::optional<utils::UUID> &; + auto GetSuccessCallback() -> HealthCheckInstanceCallback &; + auto GetFailCallback() -> HealthCheckInstanceCallback &; + private: CoordinatorClient client_; replication_coordination_glue::ReplicationRole replication_role_; @@ -90,6 +97,9 @@ class ReplicationInstance { // so we need to send swap uuid again std::optional<utils::UUID> main_uuid_; + HealthCheckInstanceCallback succ_cb_; + HealthCheckInstanceCallback fail_cb_; + friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) { return first.client_ == second.client_ && first.replication_role_ == second.replication_role_; } diff --git a/src/coordination/include/coordination/rpc_errors.hpp b/src/coordination/include/coordination/rpc_errors.hpp index f6bfbf3e0..3829d430a 100644 --- a/src/coordination/include/coordination/rpc_errors.hpp +++ b/src/coordination/include/coordination/rpc_errors.hpp @@ -11,4 +11,5 @@ namespace memgraph::coordination { enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION }; +enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION }; } // namespace memgraph::coordination diff --git a/src/coordination/replication_instance.cpp b/src/coordination/replication_instance.cpp index 0d16db648..50f1be468 100644 --- a/src/coordination/replication_instance.cpp +++ b/src/coordination/replication_instance.cpp @@ -13,15 +13,21 @@ #include "coordination/replication_instance.hpp" +#include <utility> + #include "replication_coordination_glue/handler.hpp" #include "utils/result.hpp" namespace memgraph::coordination { ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, - HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) + HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb, + HealthCheckInstanceCallback succ_instance_cb, + HealthCheckInstanceCallback fail_instance_cb) : client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)), - replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) { + replication_role_(replication_coordination_glue::ReplicationRole::REPLICA), + succ_cb_(succ_instance_cb), + fail_cb_(fail_instance_cb) { if (!client_.DemoteToReplica()) { throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName()); } @@ -57,26 +63,29 @@ auto ReplicationInstance::IsMain() const -> bool { } auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info, - HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool { + HealthCheckInstanceCallback main_succ_cb, + HealthCheckInstanceCallback main_fail_cb) -> bool { if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) { return false; } replication_role_ = replication_coordination_glue::ReplicationRole::MAIN; main_uuid_ = new_uuid; - client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb)); + succ_cb_ = main_succ_cb; + fail_cb_ = main_fail_cb; return true; } -auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) - -> bool { +auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, + HealthCheckInstanceCallback replica_fail_cb) -> bool { if (!client_.DemoteToReplica()) { return false; } replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA; - client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb)); + succ_cb_ = replica_succ_cb; + fail_cb_ = replica_fail_cb; return true; } @@ -90,10 +99,12 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf return client_.ReplicationClientInfo(); } +auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; } +auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; } + auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; } auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; } -auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; } auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; } auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool { diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp index 7b1d45335..87d1257a6 100644 --- a/src/dbms/dbms_handler.hpp +++ b/src/dbms/dbms_handler.hpp @@ -266,10 +266,6 @@ class DbmsHandler { bool IsMain() const { return repl_state_.IsMain(); } bool IsReplica() const { return repl_state_.IsReplica(); } -#ifdef MG_ENTERPRISE - // coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; } -#endif - /** * @brief Return all active databases. * diff --git a/src/replication_coordination_glue/CMakeLists.txt b/src/replication_coordination_glue/CMakeLists.txt index f81aed4ba..f452e1c1f 100644 --- a/src/replication_coordination_glue/CMakeLists.txt +++ b/src/replication_coordination_glue/CMakeLists.txt @@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue mode.hpp role.hpp handler.hpp + common.hpp PRIVATE messages.cpp diff --git a/src/replication_coordination_glue/common.hpp b/src/replication_coordination_glue/common.hpp new file mode 100644 index 000000000..439e5cae8 --- /dev/null +++ b/src/replication_coordination_glue/common.hpp @@ -0,0 +1,32 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "rpc/client.hpp" +#include "utils/uuid.hpp" + +#include <deque> +#include "messages.hpp" +#include "rpc/messages.hpp" +#include "utils/uuid.hpp" + +namespace memgraph::replication_coordination_glue { + +struct DatabaseHistory { + memgraph::utils::UUID db_uuid; + std::vector<std::pair<std::string, uint64_t>> history; + std::string name; +}; + +using DatabaseHistories = std::vector<DatabaseHistory>; + +} // namespace memgraph::replication_coordination_glue diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index b110e6015..d5c2bfa71 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -14,6 +14,7 @@ #include "dbms/dbms_handler.hpp" #include "flags/experimental.hpp" #include "replication/include/replication/state.hpp" +#include "replication_coordination_glue/common.hpp" #include "replication_handler/system_replication.hpp" #include "replication_handler/system_rpc.hpp" #include "utils/result.hpp" @@ -149,6 +150,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto GetReplicaUUID() -> std::optional<utils::UUID>; + auto GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories; + private: template <bool SendSwapUUID> auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config) diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index 5f807779d..ea567eed0 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -14,6 +14,7 @@ #include "dbms/dbms_handler.hpp" #include "replication/replication_client.hpp" #include "replication_handler/system_replication.hpp" +#include "utils/functional.hpp" namespace memgraph::replication { @@ -265,8 +266,26 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli return repl_state_.GetRole(); } +auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories { + replication_coordination_glue::DatabaseHistories results; + dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { + auto &repl_storage_state = db_acc->storage()->repl_storage_state_; + + std::vector<std::pair<std::string, uint64_t>> history = + utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); }, + repl_storage_state.history); + + history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load()); + replication_coordination_glue::DatabaseHistory repl{ + .db_uuid = utils::UUID{db_acc->storage()->uuid()}, .history = history, .name = std::string(db_acc->name())}; + results.emplace_back(repl); + }); + + return results; +} + auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> { - MG_ASSERT(repl_state_.IsReplica()); + MG_ASSERT(repl_state_.IsReplica(), "Instance is not replica"); return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_; } diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp index 921c1f5c0..fe752bfd1 100644 --- a/src/storage/v2/inmemory/replication/recovery.cpp +++ b/src/storage/v2/inmemory/replication/recovery.cpp @@ -106,8 +106,8 @@ uint64_t ReplicateCurrentWal(const utils::UUID &main_uuid, const InMemoryStorage return response.current_commit_timestamp; } -/// This method tries to find the optimal path for recoverying a single replica. -/// Based on the last commit transfered to replica it tries to update the +/// This method tries to find the optimal path for recovering a single replica. +/// Based on the last commit transferred to replica it tries to update the /// replica using durability files - WALs and Snapshots. WAL files are much /// smaller in size as they contain only the Deltas (changes) made during the /// transactions while Snapshots contain all the data. For that reason we prefer @@ -175,7 +175,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR auto add_snapshot = [&]() { if (!latest_snapshot) return; const auto lock_success = locker_acc.AddPath(latest_snapshot->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path."); + MG_ASSERT(!lock_success.HasError(), "Tried to lock a non-existent snapshot path."); recovery_steps.emplace_back(std::in_place_type_t<RecoverySnapshot>{}, std::move(latest_snapshot->path)); }; diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 16429d11f..1eb06bf10 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -53,11 +53,13 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce #endif std::optional<uint64_t> branching_point; + // different epoch id, replica was main if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) { auto const &history = replStorageState.history; const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { return main_epoch_info.first == replica.epoch_id; }); + // main didn't have that epoch, but why is here branching point if (epoch_info_iter == history.crend()) { branching_point = 0; } else if (epoch_info_iter->second != replica.current_commit_timestamp) { diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp index e0714de2a..f5242944a 100644 --- a/src/utils/functional.hpp +++ b/src/utils/functional.hpp @@ -18,8 +18,11 @@ namespace memgraph::utils { -template <class F, class T, class R = typename std::invoke_result<F, T>::type> -auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> { +template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>, + typename F, typename R = std::invoke_result_t<F, T>> +requires ranges::range<Container<T, Allocator>> && + (!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v) + -> std::vector<R> { return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>(); } diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index 742271a95..45b2c8b04 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -57,7 +57,7 @@ class Scheduler { // program and there is probably no work to do in scheduled function at // the start of the program. Since Server will log some messages on // the program start we let him log first and we make sure by first - // waiting that funcion f will not log before it. + // waiting that function f will not log before it. // Check for pause also. std::unique_lock<std::mutex> lk(mutex_); auto now = std::chrono::system_clock::now(); diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index 1ca08a3f7..aadc8a07b 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -114,6 +114,8 @@ enum class TypeId : uint64_t { COORD_GET_UUID_REQ, COORD_GET_UUID_RES, + COORD_GET_INSTANCE_DATABASES_REQ, + COORD_GET_INSTANCE_DATABASES_RES, // AST AST_LABELIX = 3000, diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 6f7b3bbef..b92989f4e 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -430,3 +430,11 @@ target_include_directories(${test_prefix}distributed_lamport_clock PRIVATE ${CMA add_unit_test(query_hint_provider.cpp) target_link_libraries(${test_prefix}query_hint_provider mg-query mg-glue) + + +# Test coordination +if(MG_ENTERPRISE) +add_unit_test(coordination_utils.cpp) +target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue) +target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include) +endif() diff --git a/tests/unit/coordination_utils.cpp b/tests/unit/coordination_utils.cpp new file mode 100644 index 000000000..1346dce2c --- /dev/null +++ b/tests/unit/coordination_utils.cpp @@ -0,0 +1,246 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <gflags/gflags.h> +#include <gtest/gtest.h> +#include "coordination/coordinator_instance.hpp" +#include "dbms/constants.hpp" +#include "replication_coordination_glue/common.hpp" +#include "utils/functional.hpp" + +class CoordinationUtils : public ::testing::Test { + protected: + void SetUp() override {} + + void TearDown() override {} + + std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_coordination"}; +}; + +TEST_F(CoordinationUtils, MemgraphDbHistorySimple) { + // Choose any if everything is same + // X = dead + // Main : A(24) B(36) C(48) D(50) E(51) X + // replica 1: A(24) B(36) C(48) D(50) E(51) + // replica 2: A(24) B(36) C(48) D(50) E(51) + // replica 3: A(24) B(36) C(48) D(50) E(51) + std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>> + instance_database_histories; + + std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories; + histories.emplace_back(memgraph::utils::UUID{}, 24); + histories.emplace_back(memgraph::utils::UUID{}, 36); + histories.emplace_back(memgraph::utils::UUID{}, 48); + histories.emplace_back(memgraph::utils::UUID{}, 50); + histories.emplace_back(memgraph::utils::UUID{}, 51); + + memgraph::utils::UUID db_uuid; + std::string default_name = std::string(memgraph::dbms::kDefaultDB); + + auto db_histories = memgraph::utils::fmap( + [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) { + return std::make_pair(std::string(pair.first), pair.second); + }, + histories); + + memgraph::replication_coordination_glue::DatabaseHistory history{ + .db_uuid = db_uuid, .history = db_histories, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history}; + instance_database_histories.emplace_back("instance_1", instance_1_db_histories_); + + memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history}; + instance_database_histories.emplace_back("instance_2", instance_2_db_histories_); + + memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history}; + instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); + memgraph::coordination::CoordinatorInstance instance; + + auto [instance_name, latest_epoch, latest_commit_timestamp] = + instance.ChooseMostUpToDateInstance(instance_database_histories); + ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3"); + ASSERT_TRUE(*latest_epoch == db_histories.back().first); + ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second); +} + +TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) { + // Prioritize one with the biggest last commit timestamp on last epoch + // X = dead + // Main : A(24) B(36) C(48) D(50) E(59) X + // replica 1: A(24) B(12) C(15) D(17) E(51) + // replica 2: A(24) B(12) C(15) D(17) E(57) + // replica 3: A(24) B(12) C(15) D(17) E(59) + std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>> + instance_database_histories; + + std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories; + histories.emplace_back(memgraph::utils::UUID{}, 24); + histories.emplace_back(memgraph::utils::UUID{}, 36); + histories.emplace_back(memgraph::utils::UUID{}, 48); + histories.emplace_back(memgraph::utils::UUID{}, 50); + histories.emplace_back(memgraph::utils::UUID{}, 59); + + memgraph::utils::UUID db_uuid; + std::string default_name = std::string(memgraph::dbms::kDefaultDB); + + auto db_histories = memgraph::utils::fmap( + [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) { + return std::make_pair(std::string(pair.first), pair.second); + }, + histories); + + db_histories.back().second = 51; + memgraph::replication_coordination_glue::DatabaseHistory history1{ + .db_uuid = db_uuid, .history = db_histories, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history1}; + instance_database_histories.emplace_back("instance_1", instance_1_db_histories_); + + db_histories.back().second = 57; + memgraph::replication_coordination_glue::DatabaseHistory history2{ + .db_uuid = db_uuid, .history = db_histories, .name = default_name}; + memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history2}; + instance_database_histories.emplace_back("instance_2", instance_2_db_histories_); + + db_histories.back().second = 59; + memgraph::replication_coordination_glue::DatabaseHistory history3{ + .db_uuid = db_uuid, .history = db_histories, .name = default_name}; + memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3}; + instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); + + memgraph::coordination::CoordinatorInstance instance; + auto [instance_name, latest_epoch, latest_commit_timestamp] = + instance.ChooseMostUpToDateInstance(instance_database_histories); + + ASSERT_TRUE(instance_name == "instance_3"); + ASSERT_TRUE(*latest_epoch == db_histories.back().first); + ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second); +} + +TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) { + // Prioritize one biggest commit timestamp + // X = dead + // Main : A(24) B(36) C(48) D(50) E(51) X X X X + // replica 1: A(24) B(36) C(48) D(50) E(51) F(60) G(65) X up + // replica 2: A(24) B(36) C(48) D(50) E(51) X X X up + // replica 3: A(24) B(36) C(48) D(50) E(51) X X X up + std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>> + instance_database_histories; + + std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories; + histories.emplace_back(memgraph::utils::UUID{}, 24); + histories.emplace_back(memgraph::utils::UUID{}, 36); + histories.emplace_back(memgraph::utils::UUID{}, 48); + histories.emplace_back(memgraph::utils::UUID{}, 50); + histories.emplace_back(memgraph::utils::UUID{}, 51); + + memgraph::utils::UUID db_uuid; + std::string default_name = std::string(memgraph::dbms::kDefaultDB); + + auto db_histories = memgraph::utils::fmap( + [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) { + return std::make_pair(std::string(pair.first), pair.second); + }, + histories); + + memgraph::replication_coordination_glue::DatabaseHistory history{ + .db_uuid = db_uuid, .history = db_histories, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history}; + instance_database_histories.emplace_back("instance_1", instance_1_db_histories_); + + memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history}; + instance_database_histories.emplace_back("instance_2", instance_2_db_histories_); + + histories.emplace_back(memgraph::utils::UUID{}, 60); + histories.emplace_back(memgraph::utils::UUID{}, 65); + auto db_histories_longest = memgraph::utils::fmap( + [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) { + return std::make_pair(std::string(pair.first), pair.second); + }, + histories); + + memgraph::replication_coordination_glue::DatabaseHistory history_longest{ + .db_uuid = db_uuid, .history = db_histories_longest, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest}; + instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); + + memgraph::coordination::CoordinatorInstance instance; + auto [instance_name, latest_epoch, latest_commit_timestamp] = + instance.ChooseMostUpToDateInstance(instance_database_histories); + + ASSERT_TRUE(instance_name == "instance_3"); + ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first); + ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second); +} + +TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) { + // When history diverged, also prioritize one with biggest last commit timestamp + // Main : A(1) B(2) C(3) X + // replica 1: A(1) B(2) C(3) X X up + // replica 2: A(1) B(2) X D(5) X up + // replica 3: A(1) B(2) X D(4) X up + std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>> + instance_database_histories; + + std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories; + histories.emplace_back(memgraph::utils::UUID{}, 1); + histories.emplace_back(memgraph::utils::UUID{}, 2); + histories.emplace_back(memgraph::utils::UUID{}, 3); + + memgraph::utils::UUID db_uuid; + std::string default_name = std::string(memgraph::dbms::kDefaultDB); + + auto db_histories = memgraph::utils::fmap( + [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) { + return std::make_pair(std::string(pair.first), pair.second); + }, + histories); + + memgraph::replication_coordination_glue::DatabaseHistory history{ + .db_uuid = db_uuid, .history = db_histories, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history}; + instance_database_histories.emplace_back("instance_1", instance_1_db_histories_); + + db_histories.pop_back(); + + auto oldest_commit_timestamp{5}; + auto newest_different_epoch = memgraph::utils::UUID{}; + histories.emplace_back(newest_different_epoch, oldest_commit_timestamp); + auto db_histories_different = memgraph::utils::fmap( + [](const std::pair<memgraph::utils::UUID, uint64_t> &pair) { + return std::make_pair(std::string(pair.first), pair.second); + }, + histories); + + memgraph::replication_coordination_glue::DatabaseHistory history_3{ + .db_uuid = db_uuid, .history = db_histories_different, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_3}; + instance_database_histories.emplace_back("instance_3", instance_3_db_histories_); + + db_histories_different.back().second = 4; + memgraph::replication_coordination_glue::DatabaseHistory history_2{ + .db_uuid = db_uuid, .history = db_histories_different, .name = default_name}; + + memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2}; + instance_database_histories.emplace_back("instance_2", instance_2_db_histories_); + + memgraph::coordination::CoordinatorInstance instance; + auto [instance_name, latest_epoch, latest_commit_timestamp] = + instance.ChooseMostUpToDateInstance(instance_database_histories); + + ASSERT_TRUE(instance_name == "instance_3"); + ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch)); + ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp); +}