fix PR comments
This commit is contained in:
parent
3c2d6863c4
commit
9317eb216e
@ -15,7 +15,6 @@ target_sources(mg-coordination
|
||||
include/coordination/replication_instance.hpp
|
||||
include/coordination/raft_state.hpp
|
||||
include/coordination/rpc_errors.hpp
|
||||
include/coordination/utils.hpp
|
||||
|
||||
include/nuraft/coordinator_log_store.hpp
|
||||
include/nuraft/coordinator_state_machine.hpp
|
||||
|
@ -70,10 +70,9 @@ void CoordinatorClient::StartFrequentCheck() {
|
||||
stream.AwaitResponse();
|
||||
}
|
||||
// Subtle race condition:
|
||||
// lock is acquired only in callback,
|
||||
// but we might have changed which callback needs to be called
|
||||
// (imagine case of failover where instance is promoted to MAIN)
|
||||
// which means this instance will execute REPLICA callback instead of MAIN callback
|
||||
// acquiring of lock needs to happen before function call, as function callback can be changed
|
||||
// for instance after lock is already acquired
|
||||
// (failover case when instance is promoted to MAIN)
|
||||
succ_cb_(coord_instance_, instance_name);
|
||||
} catch (rpc::RpcFailedException const &) {
|
||||
fail_cb_(coord_instance_, instance_name);
|
||||
|
@ -14,7 +14,6 @@
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
|
||||
#include "coordination/coordinator_exceptions.hpp"
|
||||
#include "coordination/utils.hpp"
|
||||
#include "coordination/fmt.hpp"
|
||||
#include "dbms/constants.hpp"
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
@ -39,13 +38,13 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||
|
||||
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name, std::move(lock));
|
||||
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance);
|
||||
};
|
||||
|
||||
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name, std::move(lock));
|
||||
std::invoke(repl_instance.GetFailCallback(), self, repl_instance);
|
||||
};
|
||||
}
|
||||
|
||||
@ -102,42 +101,36 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
return;
|
||||
}
|
||||
|
||||
std::string most_up_to_date_instance;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
std::optional<std::string> latest_epoch;
|
||||
{
|
||||
// for each DB in instance we get one DatabaseHistory
|
||||
using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
|
||||
std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
|
||||
|
||||
bool success{true};
|
||||
std::for_each(alive_replicas.begin(), alive_replicas.end(),
|
||||
[&success, &instance_database_histories](ReplicationInstance &replica) {
|
||||
if (!success) {
|
||||
return;
|
||||
}
|
||||
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
|
||||
if (res.HasError()) {
|
||||
spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
|
||||
success = false;
|
||||
return;
|
||||
}
|
||||
instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
|
||||
});
|
||||
// for each DB in instance we get one DatabaseHistory
|
||||
using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
|
||||
std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
|
||||
|
||||
bool success{true};
|
||||
std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) {
|
||||
if (!success) {
|
||||
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
|
||||
return;
|
||||
}
|
||||
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
|
||||
if (res.HasError()) {
|
||||
spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
|
||||
success = false;
|
||||
return;
|
||||
}
|
||||
instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
|
||||
});
|
||||
|
||||
most_up_to_date_instance =
|
||||
coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch, latest_commit_timestamp);
|
||||
if (!success) {
|
||||
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
|
||||
return;
|
||||
}
|
||||
|
||||
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
|
||||
ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
|
||||
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
|
||||
|
||||
auto &new_repl_instance = FindReplicationInstance(most_up_to_date_instance);
|
||||
auto *new_main = &new_repl_instance;
|
||||
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
|
||||
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
@ -278,11 +271,7 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
|
||||
return RegisterInstanceCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
|
||||
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name,
|
||||
std::unique_lock<utils::ResourceLock> lock) {
|
||||
MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock");
|
||||
spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
void CoordinatorInstance::MainFailCallback(ReplicationInstance &repl_instance) {
|
||||
repl_instance.OnFailPing();
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
|
||||
@ -293,13 +282,10 @@ void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name,
|
||||
}
|
||||
}
|
||||
|
||||
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name,
|
||||
std::unique_lock<utils::ResourceLock> lock) {
|
||||
MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock");
|
||||
void CoordinatorInstance::MainSuccessCallback(ReplicationInstance &repl_instance) {
|
||||
const auto &repl_instance_name = repl_instance.InstanceName();
|
||||
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
|
||||
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
|
||||
if (repl_instance.IsAlive()) {
|
||||
repl_instance.OnSuccessPing();
|
||||
return;
|
||||
@ -334,10 +320,8 @@ void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_nam
|
||||
}
|
||||
}
|
||||
|
||||
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name,
|
||||
std::unique_lock<utils::ResourceLock> lock) {
|
||||
MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock");
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
void CoordinatorInstance::ReplicaSuccessCallback(ReplicationInstance &repl_instance) {
|
||||
const auto &repl_instance_name = repl_instance.InstanceName();
|
||||
if (!repl_instance.IsReplica()) {
|
||||
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
|
||||
return;
|
||||
@ -355,10 +339,8 @@ void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_
|
||||
repl_instance.OnSuccessPing();
|
||||
}
|
||||
|
||||
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name,
|
||||
std::unique_lock<utils::ResourceLock> lock) {
|
||||
MG_ASSERT(lock.owns_lock(), "Callback doesn't own lock");
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
void CoordinatorInstance::ReplicaFailCallback(ReplicationInstance &repl_instance) {
|
||||
const auto &repl_instance_name = repl_instance.InstanceName();
|
||||
if (!repl_instance.IsReplica()) {
|
||||
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
|
||||
return;
|
||||
@ -406,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_
|
||||
// TODO: (andi) Add to the RAFT log.
|
||||
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
|
||||
|
||||
auto CoordinatorInstance::ChooseMostUpToDateInstance(
|
||||
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
|
||||
&instance_database_histories) -> NewMainRes {
|
||||
NewMainRes new_main_res;
|
||||
std::for_each(
|
||||
instance_database_histories.begin(), instance_database_histories.end(),
|
||||
[&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
|
||||
const auto &[instance_name, instance_db_histories] = instance_res_pair;
|
||||
|
||||
// Find default db for instance and its history
|
||||
auto default_db_history_data = std::ranges::find_if(
|
||||
instance_db_histories, [default_db = memgraph::dbms::kDefaultDB](
|
||||
const replication_coordination_glue::DatabaseHistory &db_timestamps) {
|
||||
return db_timestamps.name == default_db;
|
||||
});
|
||||
|
||||
std::ranges::for_each(
|
||||
instance_db_histories,
|
||||
[&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
|
||||
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
|
||||
memgraph::dbms::kDefaultDB);
|
||||
});
|
||||
|
||||
MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
|
||||
|
||||
const auto &instance_default_db_history = default_db_history_data->history;
|
||||
|
||||
std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
|
||||
[&instance_name = instance_name](const auto &epoch_history_it) {
|
||||
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
|
||||
std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
|
||||
});
|
||||
|
||||
// get latest epoch
|
||||
// get latest timestamp
|
||||
|
||||
if (!new_main_res.latest_epoch) {
|
||||
const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
|
||||
new_main_res = NewMainRes{
|
||||
.most_up_to_date_instance = instance_name,
|
||||
.latest_epoch = epoch,
|
||||
.latest_commit_timestamp = timestamp,
|
||||
};
|
||||
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
|
||||
instance_name, epoch, timestamp);
|
||||
return;
|
||||
}
|
||||
|
||||
bool found_same_point{false};
|
||||
std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch};
|
||||
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
|
||||
if (*new_main_res.latest_commit_timestamp < timestamp) {
|
||||
new_main_res = NewMainRes{
|
||||
.most_up_to_date_instance = instance_name,
|
||||
.latest_epoch = epoch,
|
||||
.latest_commit_timestamp = timestamp,
|
||||
};
|
||||
|
||||
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
|
||||
instance_name, epoch, timestamp);
|
||||
}
|
||||
|
||||
// we found point at which they were same
|
||||
if (epoch == last_most_up_to_date_epoch) {
|
||||
found_same_point = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found_same_point) {
|
||||
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
|
||||
new_main_res.most_up_to_date_instance, instance_name);
|
||||
}
|
||||
});
|
||||
|
||||
return new_main_res;
|
||||
}
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {}
|
||||
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {}
|
||||
|
||||
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {}
|
||||
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {}
|
||||
|
||||
// GetInstanceUUID
|
||||
void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
|
||||
@ -97,13 +97,14 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
// GetInstanceUUID
|
||||
void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self, builder);
|
||||
// GetDatabaseHistoriesRpc
|
||||
|
||||
void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
|
||||
/* nothing to serialize */
|
||||
}
|
||||
|
||||
void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(self, reader);
|
||||
void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
|
||||
/* nothing to serialize */
|
||||
}
|
||||
|
||||
void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
|
||||
@ -238,14 +239,6 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade
|
||||
|
||||
// GetInstanceTimestampsReq
|
||||
|
||||
void Save(const memgraph::coordination::GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
|
||||
/* nothing to serialize*/
|
||||
}
|
||||
|
||||
void Load(memgraph::coordination::GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
|
||||
/* nothing to serialize*/
|
||||
}
|
||||
|
||||
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self.database_histories, builder);
|
||||
}
|
||||
|
@ -26,6 +26,13 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
struct NewMainRes {
|
||||
std::string most_up_to_date_instance;
|
||||
std::optional<std::string> latest_epoch;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
};
|
||||
using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;
|
||||
|
||||
class CoordinatorInstance {
|
||||
public:
|
||||
CoordinatorInstance();
|
||||
@ -47,13 +54,15 @@ class CoordinatorInstance {
|
||||
|
||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
|
||||
|
||||
void MainFailCallback(std::string_view repl_instance_name, std::unique_lock<utils::ResourceLock> lock);
|
||||
void MainFailCallback(ReplicationInstance &);
|
||||
|
||||
void MainSuccessCallback(std::string_view repl_instance_name, std::unique_lock<utils::ResourceLock> lock);
|
||||
void MainSuccessCallback(ReplicationInstance &);
|
||||
|
||||
void ReplicaSuccessCallback(std::string_view repl_instance_name, std::unique_lock<utils::ResourceLock> lock);
|
||||
void ReplicaSuccessCallback(ReplicationInstance &);
|
||||
|
||||
void ReplicaFailCallback(std::string_view repl_instance_name, std::unique_lock<utils::ResourceLock> lock);
|
||||
void ReplicaFailCallback(ReplicationInstance &);
|
||||
|
||||
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
|
||||
|
||||
private:
|
||||
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
|
||||
|
@ -210,18 +210,18 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk:
|
||||
void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
|
||||
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// UnregisterReplicaRpc
|
||||
void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
|
||||
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// EnableWritingOnMainRpc
|
||||
void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// GetInstanceTimestampsRpc
|
||||
void Save(const memgraph::coordination::GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader);
|
||||
// GetDatabaseHistoriesRpc
|
||||
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
|
@ -25,9 +25,9 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorInstance;
|
||||
class ReplicationInstance;
|
||||
|
||||
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view,
|
||||
std::unique_lock<utils::ResourceLock>);
|
||||
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(ReplicationInstance &);
|
||||
|
||||
class ReplicationInstance {
|
||||
public:
|
||||
|
@ -1,101 +0,0 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <range/v3/view.hpp>
|
||||
#include <ranges>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include "dbms/constants.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
inline std::string ChooseMostUpToDateInstance(
|
||||
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
|
||||
&instance_database_histories,
|
||||
std::optional<std::string> &latest_epoch, std::optional<uint64_t> &latest_commit_timestamp) {
|
||||
std::string most_up_to_date_instance;
|
||||
std::for_each(
|
||||
instance_database_histories.begin(), instance_database_histories.end(),
|
||||
[&latest_epoch, &latest_commit_timestamp, &most_up_to_date_instance](
|
||||
const std::pair<const std::string, replication_coordination_glue::DatabaseHistories> &instance_res_pair) {
|
||||
const auto &[instance_name, instance_db_histories] = instance_res_pair;
|
||||
|
||||
// Find default db for instance and its history
|
||||
auto default_db_history_data =
|
||||
std::find_if(instance_db_histories.begin(), instance_db_histories.end(),
|
||||
[default_db = memgraph::dbms::kDefaultDB](
|
||||
const replication_coordination_glue::DatabaseHistory &db_timestamps) {
|
||||
return db_timestamps.name == default_db;
|
||||
});
|
||||
|
||||
std::for_each(
|
||||
instance_db_histories.begin(), instance_db_histories.end(),
|
||||
[instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
|
||||
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
|
||||
memgraph::dbms::kDefaultDB);
|
||||
});
|
||||
|
||||
MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
|
||||
|
||||
const auto &instance_default_db_history = default_db_history_data->history;
|
||||
|
||||
std::for_each(instance_default_db_history.rbegin(), instance_default_db_history.rend(),
|
||||
[instance_name = instance_name](const auto &instance_default_db_history_it) {
|
||||
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
|
||||
std::get<1>(instance_default_db_history_it),
|
||||
std::get<0>(instance_default_db_history_it));
|
||||
});
|
||||
|
||||
// get latest epoch
|
||||
// get latest timestamp
|
||||
|
||||
if (!latest_epoch) {
|
||||
const auto it = instance_default_db_history.crbegin();
|
||||
const auto &[epoch, timestamp] = *it;
|
||||
latest_epoch.emplace(epoch);
|
||||
latest_commit_timestamp.emplace(timestamp);
|
||||
most_up_to_date_instance = instance_name;
|
||||
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
|
||||
instance_name, epoch, timestamp);
|
||||
return;
|
||||
}
|
||||
|
||||
bool found_same_point{false};
|
||||
std::string last_most_up_to_date_epoch{*latest_epoch};
|
||||
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
|
||||
if (*latest_commit_timestamp < timestamp) {
|
||||
latest_commit_timestamp.emplace(timestamp);
|
||||
latest_epoch.emplace(epoch);
|
||||
most_up_to_date_instance = instance_name;
|
||||
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
|
||||
instance_name, epoch, timestamp);
|
||||
}
|
||||
|
||||
// we found point at which they were same
|
||||
if (epoch == last_most_up_to_date_epoch) {
|
||||
found_same_point = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found_same_point) {
|
||||
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
|
||||
most_up_to_date_instance, instance_name);
|
||||
}
|
||||
});
|
||||
|
||||
return most_up_to_date_instance;
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
@ -269,8 +269,7 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli
|
||||
auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories {
|
||||
replication_coordination_glue::DatabaseHistories results;
|
||||
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
|
||||
auto *storage = db_acc->storage();
|
||||
auto &repl_storage_state = storage->repl_storage_state_;
|
||||
auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
|
||||
|
||||
std::vector<std::pair<std::string, uint64_t>> history =
|
||||
utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); },
|
||||
|
@ -18,17 +18,12 @@
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
template <class F, class T, class R = typename std::invoke_result<F, T>::type>
|
||||
auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> {
|
||||
template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>,
|
||||
typename F, typename R = std::invoke_result_t<F, T>>
|
||||
requires ranges::range<Container<T, Allocator>> &&
|
||||
(!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v)
|
||||
-> std::vector<R> {
|
||||
return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
|
||||
}
|
||||
|
||||
template <class F, class T, class R = typename std::result_of<F(T)>::type, class V = std::vector<R>>
|
||||
V fmap(F &&f, const std::deque<T> &v) {
|
||||
V r;
|
||||
r.reserve(v.size());
|
||||
std::ranges::transform(v, std::back_inserter(r), std::forward<F>(f));
|
||||
return r;
|
||||
}
|
||||
|
||||
} // namespace memgraph::utils
|
||||
|
@ -11,7 +11,8 @@
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "coordination/utils.hpp"
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
#include "dbms/constants.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
|
||||
@ -33,8 +34,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
|
||||
// replica 3: A(24) B(36) C(48) D(50) E(51)
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
std::optional<std::string> latest_epoch;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 24);
|
||||
@ -63,9 +62,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
|
||||
auto instance_name = memgraph::coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch,
|
||||
latest_commit_timestamp);
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
|
||||
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
|
||||
@ -80,8 +80,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
|
||||
// replica 3: A(24) B(12) C(15) D(17) E(59)
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
std::optional<std::string> latest_epoch;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 24);
|
||||
@ -118,8 +116,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
auto instance_name = memgraph::coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch,
|
||||
latest_commit_timestamp);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
ASSERT_TRUE(instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
|
||||
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
|
||||
@ -134,8 +134,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
|
||||
// replica 3: A(24) B(36) C(48) D(50) E(51) X X X up
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
std::optional<std::string> latest_epoch;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 24);
|
||||
@ -176,8 +174,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
auto instance_name = memgraph::coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch,
|
||||
latest_commit_timestamp);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
ASSERT_TRUE(instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first);
|
||||
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
|
||||
@ -191,8 +191,6 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
|
||||
// replica 3: A(1) B(2) X D(4) X up
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
std::optional<std::string> latest_epoch;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 1);
|
||||
@ -238,8 +236,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
auto instance_name = memgraph::coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch,
|
||||
latest_commit_timestamp);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
ASSERT_TRUE(instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
|
||||
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
|
||||
|
Loading…
Reference in New Issue
Block a user