HA: Add initial logic for choosing new replica (#1729)

This commit is contained in:
Antonio Filipovic 2024-02-28 10:57:00 +01:00 committed by GitHub
parent b7de79d5a0
commit b561c61b64
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 718 additions and 131 deletions

View File

@ -16,6 +16,7 @@
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_rpc.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/messages.hpp"
#include "utils/result.hpp"
@ -30,7 +31,7 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
} // namespace
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
: rpc_context_{CreateClientContext(config)},
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
&rpc_context_},
@ -68,6 +69,10 @@ void CoordinatorClient::StartFrequentCheck() {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
// Subtle race condition:
// acquiring of lock needs to happen before function call, as function callback can be changed
// for instance after lock is already acquired
// (failover case when instance is promoted to MAIN)
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_instance_, instance_name);
@ -79,11 +84,6 @@ void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
succ_cb_ = std::move(succ_cb);
fail_cb_ = std::move(fail_cb);
}
auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; }
auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
@ -171,5 +171,19 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
return false;
}
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
try {
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
auto res = stream.AwaitResponse();
return res.database_histories;
} catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
return GetInstanceUUIDError::RPC_EXCEPTION;
}
}
} // namespace memgraph::coordination
#endif

View File

@ -57,6 +57,17 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
});
server.Register<coordination::GetDatabaseHistoriesRpc>(
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received GetDatabasesHistoryRpc on coordinator server");
CoordinatorHandlers::GetDatabaseHistoriesHandler(replication_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler,
slk::Reader * /*req_reader*/, slk::Builder *res_builder) {
slk::Save(coordination::GetDatabaseHistoriesRes{replication_handler.GetDatabasesHistories()}, res_builder);
}
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,

View File

@ -15,10 +15,12 @@
#include "coordination/coordinator_exceptions.hpp"
#include "coordination/fmt.hpp"
#include "dbms/constants.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
#include "utils/functional.hpp"
#include "utils/resource_lock.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
@ -32,96 +34,28 @@ CoordinatorInstance::CoordinatorInstance()
: raft_state_(RaftState::MakeRaftState(
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
auto find_repl_instance = [](CoordinatorInstance *self,
std::string_view repl_instance_name) -> ReplicationInstance & {
auto repl_instance =
std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == repl_instance_name;
});
MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!",
repl_instance_name);
return *repl_instance;
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
};
replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
return;
}
repl_instance.OnSuccessPing();
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
};
}
replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
};
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
auto repl_instance =
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == replication_instance_name;
});
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
return;
}
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
auto const curr_main_uuid = self->GetMainUUID();
if (curr_main_uuid == repl_instance_uuid.value()) {
if (!repl_instance.EnableWritingOnMain()) {
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
return;
}
repl_instance.OnSuccessPing();
return;
}
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) {
repl_instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", repl_instance_name);
} else {
spdlog::error("Instance {} failed to become replica", repl_instance_name);
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
return;
}
};
main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
self->TryFailover(); // TODO: (andi) Initiate failover
}
};
MG_ASSERT(repl_instance != repl_instances_.end(), "Instance {} not found during callback!",
replication_instance_name);
return *repl_instance;
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
@ -166,8 +100,36 @@ auto CoordinatorInstance::TryFailover() -> void {
return;
}
// TODO: Smarter choice
auto new_main = ranges::begin(alive_replicas);
// for each DB in instance we get one DatabaseHistory
using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
bool success{true};
std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) {
if (!success) {
return;
}
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
if (res.HasError()) {
spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
success = false;
return;
}
instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
});
if (!success) {
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
return;
}
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_database_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
@ -191,7 +153,8 @@ auto CoordinatorInstance::TryFailover() -> void {
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
@ -242,7 +205,8 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
@ -290,7 +254,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
spdlog::info("Request for registering instance {} accepted", instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
@ -304,6 +270,85 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
TryFailover();
}
}
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
return;
}
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
auto const curr_main_uuid = GetMainUUID();
if (curr_main_uuid == repl_instance_uuid.value()) {
if (!repl_instance.EnableWritingOnMain()) {
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
return;
}
repl_instance.OnSuccessPing();
return;
}
if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
repl_instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", repl_instance_name);
} else {
spdlog::error("Instance {} failed to become replica", repl_instance_name);
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
return;
}
}
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.IsReplica()) {
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
return;
}
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) {
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
return;
}
repl_instance.OnSuccessPing();
}
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.IsReplica()) {
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
return;
}
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
repl_instance.OnFailPing();
}
auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name)
-> UnregisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
@ -343,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_
// TODO: (andi) Add to the RAFT log.
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
auto CoordinatorInstance::ChooseMostUpToDateInstance(
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
&instance_database_histories) -> NewMainRes {
NewMainRes new_main_res;
std::for_each(
instance_database_histories.begin(), instance_database_histories.end(),
[&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
const auto &[instance_name, instance_db_histories] = instance_res_pair;
// Find default db for instance and its history
auto default_db_history_data = std::ranges::find_if(
instance_db_histories, [default_db = memgraph::dbms::kDefaultDB](
const replication_coordination_glue::DatabaseHistory &db_timestamps) {
return db_timestamps.name == default_db;
});
std::ranges::for_each(
instance_db_histories,
[&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
memgraph::dbms::kDefaultDB);
});
MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
const auto &instance_default_db_history = default_db_history_data->history;
std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
[&instance_name = instance_name](const auto &epoch_history_it) {
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
});
// get latest epoch
// get latest timestamp
if (!new_main_res.latest_epoch) {
const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
new_main_res = NewMainRes{
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
return;
}
bool found_same_point{false};
std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch};
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
if (*new_main_res.latest_commit_timestamp < timestamp) {
new_main_res = NewMainRes{
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
}
// we found point at which they were same
if (epoch == last_most_up_to_date_epoch) {
found_same_point = true;
break;
}
}
if (!found_same_point) {
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
new_main_res.most_up_to_date_instance, instance_name);
}
});
return new_main_res;
}
} // namespace memgraph::coordination
#endif

View File

@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R
memgraph::slk::Load(self, reader);
}
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {}
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {}
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {}
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {}
// GetInstanceUUID
void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
@ -97,6 +97,24 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r
memgraph::slk::Load(self, reader);
}
// GetDatabaseHistoriesRpc
void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
/* nothing to serialize */
}
void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
/* nothing to serialize */
}
void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetDatabaseHistoriesRes::Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace coordination
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@ -130,6 +148,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId:
constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
nullptr};
constexpr utils::TypeInfo coordination::GetDatabaseHistoriesReq::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_REQ,
"GetInstanceDatabasesReq", nullptr};
constexpr utils::TypeInfo coordination::GetDatabaseHistoriesRes::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_RES,
"GetInstanceDatabasesRes", nullptr};
namespace slk {
// PromoteReplicaToMainRpc
@ -213,6 +237,16 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade
memgraph::slk::Load(&self->uuid, reader);
}
// GetInstanceTimestampsReq
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.database_histories, builder);
}
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->database_histories, reader);
}
} // namespace slk
} // namespace memgraph

View File

@ -14,6 +14,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/client.hpp"
#include "rpc_errors.hpp"
#include "utils/result.hpp"
@ -23,13 +24,13 @@
namespace memgraph::coordination {
class CoordinatorInstance;
using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
~CoordinatorClient() = default;
@ -62,7 +63,8 @@ class CoordinatorClient {
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
auto SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>;
auto RpcClient() -> rpc::Client & { return rpc_client_; }
@ -82,8 +84,8 @@ class CoordinatorClient {
CoordinatorClientConfig config_;
CoordinatorInstance *coord_instance_;
HealthCheckCallback succ_cb_;
HealthCheckCallback fail_cb_;
HealthCheckClientCallback succ_cb_;
HealthCheckClientCallback fail_cb_;
};
} // namespace memgraph::coordination

View File

@ -41,6 +41,9 @@ class CoordinatorHandlers {
static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -18,6 +18,7 @@
#include "coordination/raft_state.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/resource_lock.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
@ -25,6 +26,13 @@
namespace memgraph::coordination {
struct NewMainRes {
std::string most_up_to_date_instance;
std::optional<std::string> latest_epoch;
std::optional<uint64_t> latest_commit_timestamp;
};
using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;
class CoordinatorInstance {
public:
CoordinatorInstance();
@ -44,12 +52,24 @@ class CoordinatorInstance {
auto SetMainUUID(utils::UUID new_uuid) -> void;
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
void MainFailCallback(std::string_view);
void MainSuccessCallback(std::string_view);
void ReplicaSuccessCallback(std::string_view);
void ReplicaFailCallback(std::string_view);
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
private:
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
mutable utils::ResourceLock coord_instance_lock_{};
utils::UUID main_uuid_;

View File

@ -15,6 +15,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
@ -161,6 +162,32 @@ struct GetInstanceUUIDRes {
using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
struct GetDatabaseHistoriesReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader);
static void Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder);
GetDatabaseHistoriesReq() = default;
};
struct GetDatabaseHistoriesRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
static void Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
explicit GetDatabaseHistoriesRes(const replication_coordination_glue::DatabaseHistories &database_histories)
: database_histories(database_histories) {}
GetDatabaseHistoriesRes() = default;
replication_coordination_glue::DatabaseHistories database_histories;
};
using GetDatabaseHistoriesRpc = rpc::RequestResponse<GetDatabaseHistoriesReq, GetDatabaseHistoriesRes>;
} // namespace memgraph::coordination
// SLK serialization declarations
@ -183,15 +210,21 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk:
void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
// UnregisterReplicaRpc
void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
// EnableWritingOnMainRpc
void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
// GetDatabaseHistoriesRpc
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk
#endif

View File

@ -14,6 +14,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "slk/serialization.hpp"
#include "slk/streams.hpp"
@ -34,5 +35,18 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) {
Load(&obj->replication_ip_address, reader);
Load(&obj->replication_port, reader);
}
inline void Save(const replication_coordination_glue::DatabaseHistory &obj, Builder *builder) {
Save(obj.db_uuid, builder);
Save(obj.history, builder);
Save(obj.name, builder);
}
inline void Load(replication_coordination_glue::DatabaseHistory *obj, Reader *reader) {
Load(&obj->db_uuid, reader);
Load(&obj->history, reader);
Load(&obj->name, reader);
}
} // namespace memgraph::slk
#endif

View File

@ -18,17 +18,22 @@
#include "replication_coordination_glue/role.hpp"
#include <libnuraft/nuraft.hxx>
#include "utils/resource_lock.hpp"
#include "utils/result.hpp"
#include "utils/uuid.hpp"
namespace memgraph::coordination {
class CoordinatorInstance;
class ReplicationInstance;
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
class ReplicationInstance {
public:
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckClientCallback succ_cb,
HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb);
ReplicationInstance(ReplicationInstance const &other) = delete;
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
@ -50,9 +55,10 @@ class ReplicationInstance {
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;
auto StartFrequentCheck() -> void;
auto StopFrequentCheck() -> void;
@ -66,16 +72,17 @@ class ReplicationInstance {
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool;
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
auto GetClient() -> CoordinatorClient &;
auto EnableWritingOnMain() -> bool;
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
auto ResetMainUUID() -> void;
auto GetMainUUID() const -> const std::optional<utils::UUID> &;
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
auto GetFailCallback() -> HealthCheckInstanceCallback &;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
@ -90,6 +97,9 @@ class ReplicationInstance {
// so we need to send swap uuid again
std::optional<utils::UUID> main_uuid_;
HealthCheckInstanceCallback succ_cb_;
HealthCheckInstanceCallback fail_cb_;
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
}

View File

@ -11,4 +11,5 @@
namespace memgraph::coordination {
enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION };
} // namespace memgraph::coordination

View File

@ -13,15 +13,21 @@
#include "coordination/replication_instance.hpp"
#include <utility>
#include "replication_coordination_glue/handler.hpp"
#include "utils/result.hpp"
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb)
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) {
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
succ_cb_(succ_instance_cb),
fail_cb_(fail_instance_cb) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
@ -57,26 +63,29 @@ auto ReplicationInstance::IsMain() const -> bool {
}
auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
main_uuid_ = new_uuid;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
succ_cb_ = main_succ_cb;
fail_cb_ = main_fail_cb;
return true;
}
auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
succ_cb_ = replica_succ_cb;
fail_cb_ = replica_fail_cb;
return true;
}
@ -90,10 +99,12 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf
return client_.ReplicationClientInfo();
}
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {

View File

@ -266,10 +266,6 @@ class DbmsHandler {
bool IsMain() const { return repl_state_.IsMain(); }
bool IsReplica() const { return repl_state_.IsReplica(); }
#ifdef MG_ENTERPRISE
// coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
#endif
/**
* @brief Return all active databases.
*

View File

@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue
mode.hpp
role.hpp
handler.hpp
common.hpp
PRIVATE
messages.cpp

View File

@ -0,0 +1,32 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "rpc/client.hpp"
#include "utils/uuid.hpp"
#include <deque>
#include "messages.hpp"
#include "rpc/messages.hpp"
#include "utils/uuid.hpp"
namespace memgraph::replication_coordination_glue {
struct DatabaseHistory {
memgraph::utils::UUID db_uuid;
std::vector<std::pair<std::string, uint64_t>> history;
std::string name;
};
using DatabaseHistories = std::vector<DatabaseHistory>;
} // namespace memgraph::replication_coordination_glue

View File

@ -14,6 +14,7 @@
#include "dbms/dbms_handler.hpp"
#include "flags/experimental.hpp"
#include "replication/include/replication/state.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_handler/system_replication.hpp"
#include "replication_handler/system_rpc.hpp"
#include "utils/result.hpp"
@ -149,6 +150,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto GetReplicaUUID() -> std::optional<utils::UUID>;
auto GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories;
private:
template <bool SendSwapUUID>
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config)

View File

@ -14,6 +14,7 @@
#include "dbms/dbms_handler.hpp"
#include "replication/replication_client.hpp"
#include "replication_handler/system_replication.hpp"
#include "utils/functional.hpp"
namespace memgraph::replication {
@ -265,8 +266,26 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli
return repl_state_.GetRole();
}
auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories {
replication_coordination_glue::DatabaseHistories results;
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history =
utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); },
repl_storage_state.history);
history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
replication_coordination_glue::DatabaseHistory repl{
.db_uuid = utils::UUID{db_acc->storage()->uuid()}, .history = history, .name = std::string(db_acc->name())};
results.emplace_back(repl);
});
return results;
}
auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
MG_ASSERT(repl_state_.IsReplica());
MG_ASSERT(repl_state_.IsReplica(), "Instance is not replica");
return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;
}

View File

@ -106,8 +106,8 @@ uint64_t ReplicateCurrentWal(const utils::UUID &main_uuid, const InMemoryStorage
return response.current_commit_timestamp;
}
/// This method tries to find the optimal path for recoverying a single replica.
/// Based on the last commit transfered to replica it tries to update the
/// This method tries to find the optimal path for recovering a single replica.
/// Based on the last commit transferred to replica it tries to update the
/// replica using durability files - WALs and Snapshots. WAL files are much
/// smaller in size as they contain only the Deltas (changes) made during the
/// transactions while Snapshots contain all the data. For that reason we prefer
@ -175,7 +175,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
auto add_snapshot = [&]() {
if (!latest_snapshot) return;
const auto lock_success = locker_acc.AddPath(latest_snapshot->path);
MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path.");
MG_ASSERT(!lock_success.HasError(), "Tried to lock a non-existent snapshot path.");
recovery_steps.emplace_back(std::in_place_type_t<RecoverySnapshot>{}, std::move(latest_snapshot->path));
};

View File

@ -53,11 +53,13 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
#endif
std::optional<uint64_t> branching_point;
// different epoch id, replica was main
if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) {
auto const &history = replStorageState.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id;
});
// main didn't have that epoch, but why is here branching point
if (epoch_info_iter == history.crend()) {
branching_point = 0;
} else if (epoch_info_iter->second != replica.current_commit_timestamp) {

View File

@ -18,8 +18,11 @@
namespace memgraph::utils {
template <class F, class T, class R = typename std::invoke_result<F, T>::type>
auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> {
template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>,
typename F, typename R = std::invoke_result_t<F, T>>
requires ranges::range<Container<T, Allocator>> &&
(!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v)
-> std::vector<R> {
return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
}

View File

@ -57,7 +57,7 @@ class Scheduler {
// program and there is probably no work to do in scheduled function at
// the start of the program. Since Server will log some messages on
// the program start we let him log first and we make sure by first
// waiting that funcion f will not log before it.
// waiting that function f will not log before it.
// Check for pause also.
std::unique_lock<std::mutex> lk(mutex_);
auto now = std::chrono::system_clock::now();

View File

@ -114,6 +114,8 @@ enum class TypeId : uint64_t {
COORD_GET_UUID_REQ,
COORD_GET_UUID_RES,
COORD_GET_INSTANCE_DATABASES_REQ,
COORD_GET_INSTANCE_DATABASES_RES,
// AST
AST_LABELIX = 3000,

View File

@ -430,3 +430,11 @@ target_include_directories(${test_prefix}distributed_lamport_clock PRIVATE ${CMA
add_unit_test(query_hint_provider.cpp)
target_link_libraries(${test_prefix}query_hint_provider mg-query mg-glue)
# Test coordination
if(MG_ENTERPRISE)
add_unit_test(coordination_utils.cpp)
target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()

View File

@ -0,0 +1,246 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "coordination/coordinator_instance.hpp"
#include "dbms/constants.hpp"
#include "replication_coordination_glue/common.hpp"
#include "utils/functional.hpp"
class CoordinationUtils : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_coordination"};
};
TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
// Choose any if everything is same
// X = dead
// Main : A(24) B(36) C(48) D(50) E(51) X
// replica 1: A(24) B(36) C(48) D(50) E(51)
// replica 2: A(24) B(36) C(48) D(50) E(51)
// replica 3: A(24) B(36) C(48) D(50) E(51)
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 51);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
// Prioritize one with the biggest last commit timestamp on last epoch
// X = dead
// Main : A(24) B(36) C(48) D(50) E(59) X
// replica 1: A(24) B(12) C(15) D(17) E(51)
// replica 2: A(24) B(12) C(15) D(17) E(57)
// replica 3: A(24) B(12) C(15) D(17) E(59)
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 59);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
db_histories.back().second = 51;
memgraph::replication_coordination_glue::DatabaseHistory history1{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history1};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
db_histories.back().second = 57;
memgraph::replication_coordination_glue::DatabaseHistory history2{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
db_histories.back().second = 59;
memgraph::replication_coordination_glue::DatabaseHistory history3{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
// Prioritize one biggest commit timestamp
// X = dead
// Main : A(24) B(36) C(48) D(50) E(51) X X X X
// replica 1: A(24) B(36) C(48) D(50) E(51) F(60) G(65) X up
// replica 2: A(24) B(36) C(48) D(50) E(51) X X X up
// replica 3: A(24) B(36) C(48) D(50) E(51) X X X up
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 51);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
histories.emplace_back(memgraph::utils::UUID{}, 60);
histories.emplace_back(memgraph::utils::UUID{}, 65);
auto db_histories_longest = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history_longest{
.db_uuid = db_uuid, .history = db_histories_longest, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
// When history diverged, also prioritize one with biggest last commit timestamp
// Main : A(1) B(2) C(3) X
// replica 1: A(1) B(2) C(3) X X up
// replica 2: A(1) B(2) X D(5) X up
// replica 3: A(1) B(2) X D(4) X up
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 1);
histories.emplace_back(memgraph::utils::UUID{}, 2);
histories.emplace_back(memgraph::utils::UUID{}, 3);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
db_histories.pop_back();
auto oldest_commit_timestamp{5};
auto newest_different_epoch = memgraph::utils::UUID{};
histories.emplace_back(newest_different_epoch, oldest_commit_timestamp);
auto db_histories_different = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history_3{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_3};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
db_histories_different.back().second = 4;
memgraph::replication_coordination_glue::DatabaseHistory history_2{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
}