add final logic

This commit is contained in:
antoniofilipovic 2024-02-23 14:04:43 +01:00 committed by Antonio Filipovic
parent 7a8f2dc782
commit 3c2d6863c4
2 changed files with 107 additions and 78 deletions

View File

@ -11,9 +11,10 @@
#pragma once
#include <optional>
#include <range/v3/view.hpp>
#include <ranges>
#include <string>
#include <unordered_map>
#include "dbms/constants.hpp"
#include "replication_coordination_glue/common.hpp"
@ -38,7 +39,6 @@ inline std::string ChooseMostUpToDateInstance(
return db_timestamps.name == default_db;
});
// TODO remove only for logging purposes
std::for_each(
instance_db_histories.begin(), instance_db_histories.end(),
[instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
@ -46,12 +46,10 @@ inline std::string ChooseMostUpToDateInstance(
memgraph::dbms::kDefaultDB);
});
// auto error_msg = std::string(fmt::format("No history for instance {}", instance_name));
MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
const auto &instance_default_db_history = default_db_history_data->history;
// TODO remove only for logging purposes
std::for_each(instance_default_db_history.rbegin(), instance_default_db_history.rend(),
[instance_name = instance_name](const auto &instance_default_db_history_it) {
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
@ -62,8 +60,6 @@ inline std::string ChooseMostUpToDateInstance(
// get latest epoch
// get latest timestamp
// if current history is none, I am first one
// if there is some kind history recorded, check that I am older
if (!latest_epoch) {
const auto it = instance_default_db_history.crbegin();
const auto &[epoch, timestamp] = *it;
@ -75,30 +71,27 @@ inline std::string ChooseMostUpToDateInstance(
return;
}
for (auto it = instance_default_db_history.rbegin(); it != instance_default_db_history.rend(); ++it) {
const auto &[epoch, timestamp] = *it;
bool found_same_point{false};
std::string last_most_up_to_date_epoch{*latest_epoch};
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
if (*latest_commit_timestamp < timestamp) {
latest_commit_timestamp.emplace(timestamp);
latest_epoch.emplace(epoch);
most_up_to_date_instance = instance_name;
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
}
// we found point at which they were same
if (epoch == *latest_epoch) {
// if this is the latest history, compare timestamps
if (it == instance_default_db_history.rbegin()) {
if (*latest_commit_timestamp < timestamp) {
latest_commit_timestamp.emplace(timestamp);
most_up_to_date_instance = instance_name;
spdlog::trace("Found new the most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
}
} else {
latest_epoch.emplace(instance_default_db_history.rbegin()->first);
latest_commit_timestamp.emplace(instance_default_db_history.rbegin()->second);
most_up_to_date_instance = instance_name;
spdlog::trace("Found new the most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
}
if (epoch == last_most_up_to_date_epoch) {
found_same_point = true;
break;
}
// else if we don't find epoch which is same, instance with current most_up_to_date_instance
// is ahead
}
if (!found_same_point) {
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
most_up_to_date_instance, instance_name);
}
});

View File

@ -27,10 +27,10 @@ class CoordinationUtils : public ::testing::Test {
TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
// Choose any if everything is same
// X = dead
// Main : A(24) B(12) C(15) D(17) E(1) X
// replica 1: A(24) B(12) C(15) D(17) E(1)
// replica 2: A(24) B(12) C(15) D(17) E(1)
// replica 3: A(24) B(12) C(15) D(17) E(1)
// Main : A(24) B(36) C(48) D(50) E(51) X
// replica 1: A(24) B(36) C(48) D(50) E(51)
// replica 2: A(24) B(36) C(48) D(50) E(51)
// replica 3: A(24) B(36) C(48) D(50) E(51)
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::optional<std::string> latest_epoch;
@ -38,10 +38,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 12);
histories.emplace_back(memgraph::utils::UUID{}, 15);
histories.emplace_back(memgraph::utils::UUID{}, 17);
histories.emplace_back(memgraph::utils::UUID{}, 1);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 51);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
@ -74,10 +74,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
// Prioritize one with the biggest last commit timestamp on last epoch
// X = dead
// Main : A(24) B(12) C(15) D(17) E(1) X
// replica 1: A(24) B(12) C(15) D(17) E(10)
// replica 2: A(24) B(12) C(15) D(17) E(15)
// replica 3: A(24) B(12) C(15) D(17) E(20)
// Main : A(24) B(36) C(48) D(50) E(59) X
// replica 1: A(24) B(12) C(15) D(17) E(51)
// replica 2: A(24) B(12) C(15) D(17) E(57)
// replica 3: A(24) B(12) C(15) D(17) E(59)
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::optional<std::string> latest_epoch;
@ -85,10 +85,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 12);
histories.emplace_back(memgraph::utils::UUID{}, 15);
histories.emplace_back(memgraph::utils::UUID{}, 17);
histories.emplace_back(memgraph::utils::UUID{}, 1);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 59);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
@ -99,19 +99,20 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
},
histories);
db_histories.back().second = 51;
memgraph::replication_coordination_glue::DatabaseHistory history1{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history1};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
db_histories.back().second = 10;
db_histories.back().second = 57;
memgraph::replication_coordination_glue::DatabaseHistory history2{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
db_histories.back().second = 15;
db_histories.back().second = 59;
memgraph::replication_coordination_glue::DatabaseHistory history3{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
@ -125,12 +126,12 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
}
TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
// Prioritize one with most epochs if same history
// Prioritize one biggest commit timestamp
// X = dead
// Main : A(24) B(12) C(15) D(17) E(1) X X X X
// replica 1: A(24) B(12) C(15) D(17) E(1) F(17) G(20) X up
// replica 2: A(24) B(12) C(15) D(17) E(1) X X X up
// replica 3: A(24) B(12) C(15) D(17) E(1) X X X up
// Main : A(24) B(36) C(48) D(50) E(51) X X X X
// replica 1: A(24) B(36) C(48) D(50) E(51) F(60) G(65) X up
// replica 2: A(24) B(36) C(48) D(50) E(51) X X X up
// replica 3: A(24) B(36) C(48) D(50) E(51) X X X up
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::optional<std::string> latest_epoch;
@ -138,10 +139,10 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 12);
histories.emplace_back(memgraph::utils::UUID{}, 15);
histories.emplace_back(memgraph::utils::UUID{}, 17);
histories.emplace_back(memgraph::utils::UUID{}, 1);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 51);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
@ -161,8 +162,8 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
histories.emplace_back(memgraph::utils::UUID{}, 17);
histories.emplace_back(memgraph::utils::UUID{}, 20);
histories.emplace_back(memgraph::utils::UUID{}, 60);
histories.emplace_back(memgraph::utils::UUID{}, 65);
auto db_histories_longest = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
@ -182,29 +183,64 @@ TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesChooseAnyDifferentEpochs) {
// What to do in this case -> chose any in this case?
// Main : A B C X
// replica 1: A B C X X up
// replica 2: A B X D X up
// replica 3: A B X D X up
ASSERT_TRUE(true);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
// When history diverged, also prioritize one with biggest last commit timestamp
// Main : A(1) B(2) C(3) X
// replica 1: A(1) B(2) C(3) X X up
// replica 2: A(1) B(2) X D(5) X up
// replica 3: A(1) B(2) X D(4) X up
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::optional<std::string> latest_epoch;
std::optional<uint64_t> latest_commit_timestamp;
TEST_F(CoordinationUtils, MemgraphDbHistoryMostEpochs) {
// Prioritize one with most epochs?
// Main : A B C X X X X
// replica 1: A B C X X X X up
// replica 2: A B X D E F X up
// replica 3: A B X D E F G up
ASSERT_TRUE(true);
}
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 1);
histories.emplace_back(memgraph::utils::UUID{}, 2);
histories.emplace_back(memgraph::utils::UUID{}, 3);
TEST_F(CoordinationUtils, MemgraphDbHistoryBiggestLastCommitTimestampsDifferentEpochs) {
// Prioritize one with biggest latest commit timestamp on last epoch if same number of epochs ?
// Main : A(0) B(15) C(17) X X X X
// replica 1: A(0) B(15) C(17) D(0) X X up
// replica 2: A(0) B(15) C(17) X E(1) X up
// replica 3: A(0) B(15) C(17) X X F(15) up
ASSERT_TRUE(true);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
db_histories.pop_back();
auto oldest_commit_timestamp{5};
auto newest_different_epoch = memgraph::utils::UUID{};
histories.emplace_back(newest_different_epoch, oldest_commit_timestamp);
auto db_histories_different = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history_3{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_3};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
db_histories_different.back().second = 4;
memgraph::replication_coordination_glue::DatabaseHistory history_2{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
auto instance_name = memgraph::coordination::ChooseMostUpToDateInstance(instance_database_histories, latest_epoch,
latest_commit_timestamp);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
}