From 9317eb216ece5f08f7850e8e69b58b60874f9be4 Mon Sep 17 00:00:00 2001 From: antoniofilipovic Date: Tue, 27 Feb 2024 14:04:55 +0100 Subject: [PATCH] fix PR comments --- src/coordination/CMakeLists.txt | 1 - src/coordination/coordinator_client.cpp | 7 +- src/coordination/coordinator_instance.cpp | 155 ++++++++++++------ src/coordination/coordinator_rpc.cpp | 23 +-- .../coordination/coordinator_instance.hpp | 17 +- .../include/coordination/coordinator_rpc.hpp | 6 +- .../coordination/replication_instance.hpp | 4 +- .../include/coordination/utils.hpp | 101 ------------ .../replication_handler.cpp | 3 +- src/utils/functional.hpp | 15 +- tests/unit/coordination_utils.cpp | 34 ++-- 11 files changed, 159 insertions(+), 207 deletions(-) delete mode 100644 src/coordination/include/coordination/utils.hpp diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index f0820a907..936d7a5c2 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -15,7 +15,6 @@ target_sources(mg-coordination include/coordination/replication_instance.hpp include/coordination/raft_state.hpp include/coordination/rpc_errors.hpp - include/coordination/utils.hpp include/nuraft/coordinator_log_store.hpp include/nuraft/coordinator_state_machine.hpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 57b15f742..f4d2da838 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -70,10 +70,9 @@ void CoordinatorClient::StartFrequentCheck() { stream.AwaitResponse(); } // Subtle race condition: - // lock is acquired only in callback, - // but we might have changed which callback needs to be called - // (imagine case of failover where instance is promoted to MAIN) - // which means this instance will execute REPLICA callback instead of MAIN callback + // acquiring of lock needs to happen before function call, as function callback can be changed + // for instance after lock is already acquired + // (failover case when instance is promoted to MAIN) succ_cb_(coord_instance_, instance_name); } catch (rpc::RpcFailedException const &) { fail_cb_(coord_instance_, instance_name); diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 66955146a..853b44e3b 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -14,7 +14,6 @@ #include "coordination/coordinator_instance.hpp" #include "coordination/coordinator_exceptions.hpp" -#include "coordination/utils.hpp" #include "coordination/fmt.hpp" #include "dbms/constants.hpp" #include "nuraft/coordinator_state_machine.hpp" @@ -39,13 +38,13 @@ CoordinatorInstance::CoordinatorInstance() auto lock = std::unique_lock{self->coord_instance_lock_}; auto &repl_instance = self->FindReplicationInstance(repl_instance_name); - std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name, std::move(lock)); + std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance); }; client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void { auto lock = std::unique_lock{self->coord_instance_lock_}; auto &repl_instance = self->FindReplicationInstance(repl_instance_name); - std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name, std::move(lock)); + std::invoke(repl_instance.GetFailCallback(), self, repl_instance); }; } @@ -102,42 +101,36 @@ auto CoordinatorInstance::TryFailover() -> void { return; } - std::string most_up_to_date_instance; - std::optional latest_commit_timestamp; - std::optional latest_epoch; - { - // for each DB in instance we get one DatabaseHistory - using DatabaseHistories = replication_coordination_glue::DatabaseHistories; - std::vector> instance_database_histories; - - bool success{true}; - std::for_each(alive_replicas.begin(), alive_replicas.end(), - [&success, &instance_database_histories](ReplicationInstance &replica) { - if (!success) { - return; - } - auto res = replica.GetClient().SendGetInstanceTimestampsRpc(); - if (res.HasError()) { - spdlog::error("Could get per db history data for instance {}", replica.InstanceName()); - success = false; - return; - } - instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue())); - }); + // for each DB in instance we get one DatabaseHistory + using DatabaseHistories = replication_coordination_glue::DatabaseHistories; + std::vector> instance_database_histories; + bool success{true}; + std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) { if (!success) { - spdlog::error("Aborting failover as at least one instance didn't provide per database history."); return; } + auto res = replica.GetClient().SendGetInstanceTimestampsRpc(); + if (res.HasError()) { + spdlog::error("Could get per db history data for instance {}", replica.InstanceName()); + success = false; + return; + } + instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue())); + }); - most_up_to_date_instance = - coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch, latest_commit_timestamp); + if (!success) { + spdlog::error("Aborting failover as at least one instance didn't provide per database history."); + return; } + + auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] = + ChooseMostUpToDateInstance(instance_database_histories); + spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp", most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp); - auto &new_repl_instance = FindReplicationInstance(most_up_to_date_instance); - auto *new_main = &new_repl_instance; + auto *new_main = &FindReplicationInstance(most_up_to_date_instance); new_main->PauseFrequentCheck(); utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }}; @@ -278,11 +271,7 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co return RegisterInstanceCoordinatorStatus::SUCCESS; } -void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name, - std::unique_lock lock) { - MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock"); - spdlog::trace("Instance {} performing main failure callback", repl_instance_name); - auto &repl_instance = FindReplicationInstance(repl_instance_name); +void CoordinatorInstance::MainFailCallback(ReplicationInstance &repl_instance) { repl_instance.OnFailPing(); const auto &repl_instance_uuid = repl_instance.GetMainUUID(); MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set"); @@ -293,13 +282,10 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name, } } -void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name, - std::unique_lock lock) { - MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock"); +void CoordinatorInstance::MainSuccessCallback(ReplicationInstance &repl_instance) { + const auto &repl_instance_name = repl_instance.InstanceName(); spdlog::trace("Instance {} performing main successful callback", repl_instance_name); - auto &repl_instance = FindReplicationInstance(repl_instance_name); - if (repl_instance.IsAlive()) { repl_instance.OnSuccessPing(); return; @@ -334,10 +320,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam } } -void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name, - std::unique_lock lock) { - MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock"); - auto &repl_instance = FindReplicationInstance(repl_instance_name); +void CoordinatorInstance::ReplicaSuccessCallback(ReplicationInstance &repl_instance) { + const auto &repl_instance_name = repl_instance.InstanceName(); if (!repl_instance.IsReplica()) { spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name); return; @@ -355,10 +339,8 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_ repl_instance.OnSuccessPing(); } -void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name, - std::unique_lock lock) { - MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock"); - auto &repl_instance = FindReplicationInstance(repl_instance_name); +void CoordinatorInstance::ReplicaFailCallback(ReplicationInstance &repl_instance) { + const auto &repl_instance_name = repl_instance.InstanceName(); if (!repl_instance.IsReplica()) { spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name); return; @@ -406,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_ // TODO: (andi) Add to the RAFT log. auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; } +auto CoordinatorInstance::ChooseMostUpToDateInstance( + const std::vector> + &instance_database_histories) -> NewMainRes { + NewMainRes new_main_res; + std::for_each( + instance_database_histories.begin(), instance_database_histories.end(), + [&new_main_res](const InstanceNameDbHistories &instance_res_pair) { + const auto &[instance_name, instance_db_histories] = instance_res_pair; + + // Find default db for instance and its history + auto default_db_history_data = std::ranges::find_if( + instance_db_histories, [default_db = memgraph::dbms::kDefaultDB]( + const replication_coordination_glue::DatabaseHistory &db_timestamps) { + return db_timestamps.name == default_db; + }); + + std::ranges::for_each( + instance_db_histories, + [&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) { + spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name, + memgraph::dbms::kDefaultDB); + }); + + MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance"); + + const auto &instance_default_db_history = default_db_history_data->history; + + std::ranges::for_each(instance_default_db_history | ranges::views::reverse, + [&instance_name = instance_name](const auto &epoch_history_it) { + spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, + std::get<0>(epoch_history_it), std::get<1>(epoch_history_it)); + }); + + // get latest epoch + // get latest timestamp + + if (!new_main_res.latest_epoch) { + const auto &[epoch, timestamp] = *instance_default_db_history.crbegin(); + new_main_res = NewMainRes{ + .most_up_to_date_instance = instance_name, + .latest_epoch = epoch, + .latest_commit_timestamp = timestamp, + }; + spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", + instance_name, epoch, timestamp); + return; + } + + bool found_same_point{false}; + std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch}; + for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) { + if (*new_main_res.latest_commit_timestamp < timestamp) { + new_main_res = NewMainRes{ + .most_up_to_date_instance = instance_name, + .latest_epoch = epoch, + .latest_commit_timestamp = timestamp, + }; + + spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp", + instance_name, epoch, timestamp); + } + + // we found point at which they were same + if (epoch == last_most_up_to_date_epoch) { + found_same_point = true; + break; + } + } + + if (!found_same_point) { + spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch, + new_main_res.most_up_to_date_instance, instance_name); + } + }); + + return new_main_res; +} } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index 439692506..815693824 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R memgraph::slk::Load(self, reader); } -void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {} +void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {} -void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {} +void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {} // GetInstanceUUID void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) { @@ -97,13 +97,14 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r memgraph::slk::Load(self, reader); } -// GetInstanceUUID -void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self, builder); +// GetDatabaseHistoriesRpc + +void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* nothing to serialize */ } -void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(self, reader); +void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* nothing to serialize */ } void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) { @@ -238,14 +239,6 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade // GetInstanceTimestampsReq -void Save(const memgraph::coordination::GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) { - /* nothing to serialize*/ -} - -void Load(memgraph::coordination::GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) { - /* nothing to serialize*/ -} - void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) { memgraph::slk::Save(self.database_histories, builder); } diff --git a/src/coordination/include/coordination/coordinator_instance.hpp b/src/coordination/include/coordination/coordinator_instance.hpp index 87f7be111..643629a91 100644 --- a/src/coordination/include/coordination/coordinator_instance.hpp +++ b/src/coordination/include/coordination/coordinator_instance.hpp @@ -26,6 +26,13 @@ namespace memgraph::coordination { +struct NewMainRes { + std::string most_up_to_date_instance; + std::optional latest_epoch; + std::optional latest_commit_timestamp; +}; +using InstanceNameDbHistories = std::pair; + class CoordinatorInstance { public: CoordinatorInstance(); @@ -47,13 +54,15 @@ class CoordinatorInstance { auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &; - void MainFailCallback(std::string_view repl_instance_name, std::unique_lock lock); + void MainFailCallback(ReplicationInstance &); - void MainSuccessCallback(std::string_view repl_instance_name, std::unique_lock lock); + void MainSuccessCallback(ReplicationInstance &); - void ReplicaSuccessCallback(std::string_view repl_instance_name, std::unique_lock lock); + void ReplicaSuccessCallback(ReplicationInstance &); - void ReplicaFailCallback(std::string_view repl_instance_name, std::unique_lock lock); + void ReplicaFailCallback(ReplicationInstance &); + + static auto ChooseMostUpToDateInstance(const std::vector &) -> NewMainRes; private: HealthCheckClientCallback client_succ_cb_, client_fail_cb_; diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index 5eadcb568..2bf88fe46 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -210,18 +210,18 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk: void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader); void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); + // UnregisterReplicaRpc void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader); void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader); +// EnableWritingOnMainRpc void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader); -// GetInstanceTimestampsRpc -void Save(const memgraph::coordination::GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder); -void Load(memgraph::coordination::GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader); +// GetDatabaseHistoriesRpc void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader); diff --git a/src/coordination/include/coordination/replication_instance.hpp b/src/coordination/include/coordination/replication_instance.hpp index 153546087..a790baa18 100644 --- a/src/coordination/include/coordination/replication_instance.hpp +++ b/src/coordination/include/coordination/replication_instance.hpp @@ -25,9 +25,9 @@ namespace memgraph::coordination { class CoordinatorInstance; +class ReplicationInstance; -using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view, - std::unique_lock); +using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(ReplicationInstance &); class ReplicationInstance { public: diff --git a/src/coordination/include/coordination/utils.hpp b/src/coordination/include/coordination/utils.hpp deleted file mode 100644 index 4893fd58a..000000000 --- a/src/coordination/include/coordination/utils.hpp +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2024 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. -#pragma once - -#include -#include -#include -#include -#include -#include "dbms/constants.hpp" -#include "replication_coordination_glue/common.hpp" - -namespace memgraph::coordination { - -inline std::string ChooseMostUpToDateInstance( - const std::vector> - &instance_database_histories, - std::optional &latest_epoch, std::optional &latest_commit_timestamp) { - std::string most_up_to_date_instance; - std::for_each( - instance_database_histories.begin(), instance_database_histories.end(), - [&latest_epoch, &latest_commit_timestamp, &most_up_to_date_instance]( - const std::pair &instance_res_pair) { - const auto &[instance_name, instance_db_histories] = instance_res_pair; - - // Find default db for instance and its history - auto default_db_history_data = - std::find_if(instance_db_histories.begin(), instance_db_histories.end(), - [default_db = memgraph::dbms::kDefaultDB]( - const replication_coordination_glue::DatabaseHistory &db_timestamps) { - return db_timestamps.name == default_db; - }); - - std::for_each( - instance_db_histories.begin(), instance_db_histories.end(), - [instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) { - spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name, - memgraph::dbms::kDefaultDB); - }); - - MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance"); - - const auto &instance_default_db_history = default_db_history_data->history; - - std::for_each(instance_default_db_history.rbegin(), instance_default_db_history.rend(), - [instance_name = instance_name](const auto &instance_default_db_history_it) { - spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name, - std::get<1>(instance_default_db_history_it), - std::get<0>(instance_default_db_history_it)); - }); - - // get latest epoch - // get latest timestamp - - if (!latest_epoch) { - const auto it = instance_default_db_history.crbegin(); - const auto &[epoch, timestamp] = *it; - latest_epoch.emplace(epoch); - latest_commit_timestamp.emplace(timestamp); - most_up_to_date_instance = instance_name; - spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp", - instance_name, epoch, timestamp); - return; - } - - bool found_same_point{false}; - std::string last_most_up_to_date_epoch{*latest_epoch}; - for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) { - if (*latest_commit_timestamp < timestamp) { - latest_commit_timestamp.emplace(timestamp); - latest_epoch.emplace(epoch); - most_up_to_date_instance = instance_name; - spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp", - instance_name, epoch, timestamp); - } - - // we found point at which they were same - if (epoch == last_most_up_to_date_epoch) { - found_same_point = true; - break; - } - } - - if (!found_same_point) { - spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch, - most_up_to_date_instance, instance_name); - } - }); - - return most_up_to_date_instance; -} - -} // namespace memgraph::coordination diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index b6387f693..ea567eed0 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -269,8 +269,7 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories { replication_coordination_glue::DatabaseHistories results; dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { - auto *storage = db_acc->storage(); - auto &repl_storage_state = storage->repl_storage_state_; + auto &repl_storage_state = db_acc->storage()->repl_storage_state_; std::vector> history = utils::fmap([](const auto &elem) { return std::pair(elem.first, elem.second); }, diff --git a/src/utils/functional.hpp b/src/utils/functional.hpp index e3395809d..f5242944a 100644 --- a/src/utils/functional.hpp +++ b/src/utils/functional.hpp @@ -18,17 +18,12 @@ namespace memgraph::utils { -template ::type> -auto fmap(F &&f, std::vector const &v) -> std::vector { +template