Merge branch 'master' into support-label-manipulation-via-variables

This commit is contained in:
DavIvek 2024-02-29 17:17:22 +01:00 committed by GitHub
commit fc84243cc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1052 additions and 312 deletions

View File

@ -45,6 +45,7 @@ MEMGRAPH_BUILD_DEPS=(
readline-devel # for memgraph console
python3-devel # for query modules
openssl-devel
openssl
libseccomp-devel
python3 python3-pip nmap-ncat # for tests
#

View File

@ -43,6 +43,7 @@ MEMGRAPH_BUILD_DEPS=(
readline-devel # for memgraph console
python3-devel # for query modules
openssl-devel
openssl
libseccomp-devel
python3 python-virtualenv python3-pip nmap-ncat # for qa, macro_benchmark and stress tests
#

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -9,10 +9,11 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <boost/functional/hash.hpp>
#include <mgp.hpp>
#include "utils/string.hpp"
#include <optional>
#include <unordered_set>
namespace Schema {
@ -37,6 +38,7 @@ constexpr std::string_view kParameterIndices = "indices";
constexpr std::string_view kParameterUniqueConstraints = "unique_constraints";
constexpr std::string_view kParameterExistenceConstraints = "existence_constraints";
constexpr std::string_view kParameterDropExisting = "drop_existing";
constexpr int kInitialNumberOfPropertyOccurances = 1;
std::string TypeOf(const mgp::Type &type);
@ -108,83 +110,79 @@ void Schema::ProcessPropertiesRel(mgp::Record &record, const std::string_view &t
record.Insert(std::string(kReturnMandatory).c_str(), mandatory);
}
struct Property {
std::string name;
mgp::Value value;
struct PropertyInfo {
std::unordered_set<std::string> property_types; // property types
int64_t number_of_property_occurrences = 0;
Property(const std::string &name, mgp::Value &&value) : name(name), value(std::move(value)) {}
PropertyInfo() = default;
explicit PropertyInfo(std::string &&property_type)
: property_types({std::move(property_type)}),
number_of_property_occurrences(Schema::kInitialNumberOfPropertyOccurances) {}
};
struct LabelsInfo {
std::unordered_map<std::string, PropertyInfo> properties; // key is a property name
int64_t number_of_label_occurrences = 0;
};
struct LabelsHash {
std::size_t operator()(const std::set<std::string> &set) const {
std::size_t seed = set.size();
for (const auto &i : set) {
seed ^= std::hash<std::string>{}(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
return seed;
}
std::size_t operator()(const std::set<std::string> &s) const { return boost::hash_range(s.begin(), s.end()); }
};
struct LabelsComparator {
bool operator()(const std::set<std::string> &lhs, const std::set<std::string> &rhs) const { return lhs == rhs; }
};
struct PropertyComparator {
bool operator()(const Property &lhs, const Property &rhs) const { return lhs.name < rhs.name; }
};
struct PropertyInfo {
std::set<Property, PropertyComparator> properties;
bool mandatory;
};
void Schema::NodeTypeProperties(mgp_list * /*args*/, mgp_graph *memgraph_graph, mgp_result *result,
mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
const auto record_factory = mgp::RecordFactory(result);
try {
std::unordered_map<std::set<std::string>, PropertyInfo, LabelsHash, LabelsComparator> node_types_properties;
std::unordered_map<std::set<std::string>, LabelsInfo, LabelsHash, LabelsComparator> node_types_properties;
for (auto node : mgp::Graph(memgraph_graph).Nodes()) {
for (const auto node : mgp::Graph(memgraph_graph).Nodes()) {
std::set<std::string> labels_set = {};
for (auto label : node.Labels()) {
for (const auto label : node.Labels()) {
labels_set.emplace(label);
}
if (node_types_properties.find(labels_set) == node_types_properties.end()) {
node_types_properties[labels_set] = PropertyInfo{std::set<Property, PropertyComparator>(), true};
}
node_types_properties[labels_set].number_of_label_occurrences++;
if (node.Properties().empty()) {
node_types_properties[labels_set].mandatory = false; // if there is node with no property, it is not mandatory
continue;
}
auto &property_info = node_types_properties.at(labels_set);
for (auto &[key, prop] : node.Properties()) {
property_info.properties.emplace(key, std::move(prop));
if (property_info.mandatory) {
property_info.mandatory =
property_info.properties.size() == 1; // if there is only one property, it is mandatory
auto &labels_info = node_types_properties.at(labels_set);
for (const auto &[key, prop] : node.Properties()) {
auto prop_type = TypeOf(prop.Type());
if (labels_info.properties.find(key) == labels_info.properties.end()) {
labels_info.properties[key] = PropertyInfo{std::move(prop_type)};
} else {
labels_info.properties[key].property_types.emplace(prop_type);
labels_info.properties[key].number_of_property_occurrences++;
}
}
}
for (auto &[labels, property_info] : node_types_properties) {
for (auto &[node_type, labels_info] : node_types_properties) { // node type is a set of labels
std::string label_type;
mgp::List labels_list = mgp::List();
for (auto const &label : labels) {
auto labels_list = mgp::List();
for (const auto &label : node_type) {
label_type += ":`" + std::string(label) + "`";
labels_list.AppendExtend(mgp::Value(label));
}
for (auto const &prop : property_info.properties) {
for (const auto &prop : labels_info.properties) {
auto prop_types = mgp::List();
for (const auto &prop_type : prop.second.property_types) {
prop_types.AppendExtend(mgp::Value(prop_type));
}
bool mandatory = prop.second.number_of_property_occurrences == labels_info.number_of_label_occurrences;
auto record = record_factory.NewRecord();
ProcessPropertiesNode(record, label_type, labels_list, prop.name, TypeOf(prop.value.Type()),
property_info.mandatory);
ProcessPropertiesNode(record, label_type, labels_list, prop.first, prop_types, mandatory);
}
if (property_info.properties.empty()) {
if (labels_info.properties.empty()) {
auto record = record_factory.NewRecord();
ProcessPropertiesNode<std::string>(record, label_type, labels_list, "", "", false);
ProcessPropertiesNode<mgp::List>(record, label_type, labels_list, "", mgp::List(), false);
}
}
@ -197,40 +195,45 @@ void Schema::NodeTypeProperties(mgp_list * /*args*/, mgp_graph *memgraph_graph,
void Schema::RelTypeProperties(mgp_list * /*args*/, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
mgp::MemoryDispatcherGuard guard{memory};
std::unordered_map<std::string, PropertyInfo> rel_types_properties;
std::unordered_map<std::string, LabelsInfo> rel_types_properties;
const auto record_factory = mgp::RecordFactory(result);
try {
const mgp::Graph graph = mgp::Graph(memgraph_graph);
for (auto rel : graph.Relationships()) {
const auto graph = mgp::Graph(memgraph_graph);
for (const auto rel : graph.Relationships()) {
std::string rel_type = std::string(rel.Type());
if (rel_types_properties.find(rel_type) == rel_types_properties.end()) {
rel_types_properties[rel_type] = PropertyInfo{std::set<Property, PropertyComparator>(), true};
}
rel_types_properties[rel_type].number_of_label_occurrences++;
if (rel.Properties().empty()) {
rel_types_properties[rel_type].mandatory = false; // if there is rel with no property, it is not mandatory
continue;
}
auto &property_info = rel_types_properties.at(rel_type);
auto &labels_info = rel_types_properties.at(rel_type);
for (auto &[key, prop] : rel.Properties()) {
property_info.properties.emplace(key, std::move(prop));
if (property_info.mandatory) {
property_info.mandatory =
property_info.properties.size() == 1; // if there is only one property, it is mandatory
auto prop_type = TypeOf(prop.Type());
if (labels_info.properties.find(key) == labels_info.properties.end()) {
labels_info.properties[key] = PropertyInfo{std::move(prop_type)};
} else {
labels_info.properties[key].property_types.emplace(prop_type);
labels_info.properties[key].number_of_property_occurrences++;
}
}
}
for (auto &[type, property_info] : rel_types_properties) {
std::string type_str = ":`" + std::string(type) + "`";
for (auto const &prop : property_info.properties) {
for (auto &[rel_type, labels_info] : rel_types_properties) {
std::string type_str = ":`" + std::string(rel_type) + "`";
for (const auto &prop : labels_info.properties) {
auto prop_types = mgp::List();
for (const auto &prop_type : prop.second.property_types) {
prop_types.AppendExtend(mgp::Value(prop_type));
}
bool mandatory = prop.second.number_of_property_occurrences == labels_info.number_of_label_occurrences;
auto record = record_factory.NewRecord();
ProcessPropertiesRel(record, type_str, prop.name, TypeOf(prop.value.Type()), property_info.mandatory);
ProcessPropertiesRel(record, type_str, prop.first, prop_types, mandatory);
}
if (property_info.properties.empty()) {
if (labels_info.properties.empty()) {
auto record = record_factory.NewRecord();
ProcessPropertiesRel<std::string>(record, type_str, "", "", false);
ProcessPropertiesRel<mgp::List>(record, type_str, "", mgp::List(), false);
}
}

View File

@ -16,6 +16,7 @@
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_rpc.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/messages.hpp"
#include "utils/result.hpp"
@ -30,7 +31,7 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
} // namespace
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
: rpc_context_{CreateClientContext(config)},
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
&rpc_context_},
@ -68,6 +69,10 @@ void CoordinatorClient::StartFrequentCheck() {
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
stream.AwaitResponse();
}
// Subtle race condition:
// acquiring of lock needs to happen before function call, as function callback can be changed
// for instance after lock is already acquired
// (failover case when instance is promoted to MAIN)
succ_cb_(coord_instance_, instance_name);
} catch (rpc::RpcFailedException const &) {
fail_cb_(coord_instance_, instance_name);
@ -79,11 +84,6 @@ void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
succ_cb_ = std::move(succ_cb);
fail_cb_ = std::move(fail_cb);
}
auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; }
auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
@ -171,5 +171,19 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
return false;
}
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
try {
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
auto res = stream.AwaitResponse();
return res.database_histories;
} catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
return GetInstanceUUIDError::RPC_EXCEPTION;
}
}
} // namespace memgraph::coordination
#endif

View File

@ -57,6 +57,17 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
});
server.Register<coordination::GetDatabaseHistoriesRpc>(
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received GetDatabasesHistoryRpc on coordinator server");
CoordinatorHandlers::GetDatabaseHistoriesHandler(replication_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler,
slk::Reader * /*req_reader*/, slk::Builder *res_builder) {
slk::Save(coordination::GetDatabaseHistoriesRes{replication_handler.GetDatabasesHistories()}, res_builder);
}
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,

View File

@ -15,10 +15,12 @@
#include "coordination/coordinator_exceptions.hpp"
#include "coordination/fmt.hpp"
#include "dbms/constants.hpp"
#include "nuraft/coordinator_state_machine.hpp"
#include "nuraft/coordinator_state_manager.hpp"
#include "utils/counter.hpp"
#include "utils/functional.hpp"
#include "utils/resource_lock.hpp"
#include <range/v3/view.hpp>
#include <shared_mutex>
@ -32,96 +34,28 @@ CoordinatorInstance::CoordinatorInstance()
: raft_state_(RaftState::MakeRaftState(
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
auto find_repl_instance = [](CoordinatorInstance *self,
std::string_view repl_instance_name) -> ReplicationInstance & {
auto repl_instance =
std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == repl_instance_name;
});
MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!",
repl_instance_name);
return *repl_instance;
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
};
replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
return;
}
repl_instance.OnSuccessPing();
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::unique_lock{self->coord_instance_lock_};
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
};
}
replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
};
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
auto repl_instance =
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
return instance.InstanceName() == replication_instance_name;
});
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
return;
}
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
auto const curr_main_uuid = self->GetMainUUID();
if (curr_main_uuid == repl_instance_uuid.value()) {
if (!repl_instance.EnableWritingOnMain()) {
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
return;
}
repl_instance.OnSuccessPing();
return;
}
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
// swapUUID can fail
if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) {
repl_instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", repl_instance_name);
} else {
spdlog::error("Instance {} failed to become replica", repl_instance_name);
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
return;
}
};
main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
auto lock = std::lock_guard{self->coord_instance_lock_};
spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
auto &repl_instance = find_repl_instance(self, repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
self->TryFailover(); // TODO: (andi) Initiate failover
}
};
MG_ASSERT(repl_instance != repl_instances_.end(), "Instance {} not found during callback!",
replication_instance_name);
return *repl_instance;
}
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
@ -166,8 +100,36 @@ auto CoordinatorInstance::TryFailover() -> void {
return;
}
// TODO: Smarter choice
auto new_main = ranges::begin(alive_replicas);
// for each DB in instance we get one DatabaseHistory
using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
bool success{true};
std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) {
if (!success) {
return;
}
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
if (res.HasError()) {
spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
success = false;
return;
}
instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
});
if (!success) {
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
return;
}
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
ChooseMostUpToDateInstance(instance_database_histories);
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
new_main->PauseFrequentCheck();
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
@ -191,7 +153,8 @@ auto CoordinatorInstance::TryFailover() -> void {
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
ranges::to<ReplicationClientsInfo>();
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
spdlog::warn("Failover failed since promoting replica to main failed!");
return;
}
@ -242,7 +205,8 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
&CoordinatorInstance::MainFailCallback)) {
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
}
@ -290,7 +254,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
spdlog::info("Request for registering instance {} accepted", instance_name);
try {
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_,
&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback);
} catch (CoordinatorRegisterInstanceException const &) {
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
}
@ -304,6 +270,85 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
return RegisterInstanceCoordinatorStatus::SUCCESS;
}
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
repl_instance.OnFailPing();
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) {
spdlog::info("Cluster without main instance, trying automatic failover");
TryFailover();
}
}
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
if (repl_instance.IsAlive()) {
repl_instance.OnSuccessPing();
return;
}
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
auto const curr_main_uuid = GetMainUUID();
if (curr_main_uuid == repl_instance_uuid.value()) {
if (!repl_instance.EnableWritingOnMain()) {
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
return;
}
repl_instance.OnSuccessPing();
return;
}
if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
&CoordinatorInstance::ReplicaFailCallback)) {
repl_instance.OnSuccessPing();
spdlog::info("Instance {} demoted to replica", repl_instance_name);
} else {
spdlog::error("Instance {} failed to become replica", repl_instance_name);
return;
}
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
return;
}
}
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.IsReplica()) {
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
return;
}
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
// and that it didn't go down for less time than we could notice
// We need to get id of main replica is listening to
// and swap if necessary
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) {
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
return;
}
repl_instance.OnSuccessPing();
}
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
auto &repl_instance = FindReplicationInstance(repl_instance_name);
if (!repl_instance.IsReplica()) {
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
return;
}
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
repl_instance.OnFailPing();
}
auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name)
-> UnregisterInstanceCoordinatorStatus {
auto lock = std::lock_guard{coord_instance_lock_};
@ -343,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_
// TODO: (andi) Add to the RAFT log.
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
auto CoordinatorInstance::ChooseMostUpToDateInstance(
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
&instance_database_histories) -> NewMainRes {
NewMainRes new_main_res;
std::for_each(
instance_database_histories.begin(), instance_database_histories.end(),
[&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
const auto &[instance_name, instance_db_histories] = instance_res_pair;
// Find default db for instance and its history
auto default_db_history_data = std::ranges::find_if(
instance_db_histories, [default_db = memgraph::dbms::kDefaultDB](
const replication_coordination_glue::DatabaseHistory &db_timestamps) {
return db_timestamps.name == default_db;
});
std::ranges::for_each(
instance_db_histories,
[&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
memgraph::dbms::kDefaultDB);
});
MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
const auto &instance_default_db_history = default_db_history_data->history;
std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
[&instance_name = instance_name](const auto &epoch_history_it) {
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
});
// get latest epoch
// get latest timestamp
if (!new_main_res.latest_epoch) {
const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
new_main_res = NewMainRes{
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
return;
}
bool found_same_point{false};
std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch};
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
if (*new_main_res.latest_commit_timestamp < timestamp) {
new_main_res = NewMainRes{
.most_up_to_date_instance = instance_name,
.latest_epoch = epoch,
.latest_commit_timestamp = timestamp,
};
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
instance_name, epoch, timestamp);
}
// we found point at which they were same
if (epoch == last_most_up_to_date_epoch) {
found_same_point = true;
break;
}
}
if (!found_same_point) {
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
new_main_res.most_up_to_date_instance, instance_name);
}
});
return new_main_res;
}
} // namespace memgraph::coordination
#endif

View File

@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R
memgraph::slk::Load(self, reader);
}
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {}
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {}
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {}
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {}
// GetInstanceUUID
void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
@ -97,6 +97,24 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r
memgraph::slk::Load(self, reader);
}
// GetDatabaseHistoriesRpc
void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
/* nothing to serialize */
}
void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
/* nothing to serialize */
}
void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetDatabaseHistoriesRes::Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace coordination
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@ -130,6 +148,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId:
constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
nullptr};
constexpr utils::TypeInfo coordination::GetDatabaseHistoriesReq::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_REQ,
"GetInstanceDatabasesReq", nullptr};
constexpr utils::TypeInfo coordination::GetDatabaseHistoriesRes::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_RES,
"GetInstanceDatabasesRes", nullptr};
namespace slk {
// PromoteReplicaToMainRpc
@ -213,6 +237,16 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade
memgraph::slk::Load(&self->uuid, reader);
}
// GetInstanceTimestampsReq
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.database_histories, builder);
}
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->database_histories, reader);
}
} // namespace slk
} // namespace memgraph

View File

@ -14,6 +14,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/client.hpp"
#include "rpc_errors.hpp"
#include "utils/result.hpp"
@ -23,13 +24,13 @@
namespace memgraph::coordination {
class CoordinatorInstance;
using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
class CoordinatorClient {
public:
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
~CoordinatorClient() = default;
@ -62,7 +63,8 @@ class CoordinatorClient {
auto ReplicationClientInfo() const -> ReplClientInfo;
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
auto SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>;
auto RpcClient() -> rpc::Client & { return rpc_client_; }
@ -82,8 +84,8 @@ class CoordinatorClient {
CoordinatorClientConfig config_;
CoordinatorInstance *coord_instance_;
HealthCheckCallback succ_cb_;
HealthCheckCallback fail_cb_;
HealthCheckClientCallback succ_cb_;
HealthCheckClientCallback fail_cb_;
};
} // namespace memgraph::coordination

View File

@ -41,6 +41,9 @@ class CoordinatorHandlers {
static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -18,6 +18,7 @@
#include "coordination/raft_state.hpp"
#include "coordination/register_main_replica_coordinator_status.hpp"
#include "coordination/replication_instance.hpp"
#include "utils/resource_lock.hpp"
#include "utils/rw_lock.hpp"
#include "utils/thread_pool.hpp"
@ -25,6 +26,13 @@
namespace memgraph::coordination {
struct NewMainRes {
std::string most_up_to_date_instance;
std::optional<std::string> latest_epoch;
std::optional<uint64_t> latest_commit_timestamp;
};
using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;
class CoordinatorInstance {
public:
CoordinatorInstance();
@ -44,12 +52,24 @@ class CoordinatorInstance {
auto SetMainUUID(utils::UUID new_uuid) -> void;
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
void MainFailCallback(std::string_view);
void MainSuccessCallback(std::string_view);
void ReplicaSuccessCallback(std::string_view);
void ReplicaFailCallback(std::string_view);
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
private:
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
// NOTE: Must be std::list because we rely on pointer stability
std::list<ReplicationInstance> repl_instances_;
mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
mutable utils::ResourceLock coord_instance_lock_{};
utils::UUID main_uuid_;

View File

@ -15,6 +15,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
@ -161,6 +162,32 @@ struct GetInstanceUUIDRes {
using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
struct GetDatabaseHistoriesReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader);
static void Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder);
GetDatabaseHistoriesReq() = default;
};
struct GetDatabaseHistoriesRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
static void Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
explicit GetDatabaseHistoriesRes(const replication_coordination_glue::DatabaseHistories &database_histories)
: database_histories(database_histories) {}
GetDatabaseHistoriesRes() = default;
replication_coordination_glue::DatabaseHistories database_histories;
};
using GetDatabaseHistoriesRpc = rpc::RequestResponse<GetDatabaseHistoriesReq, GetDatabaseHistoriesRes>;
} // namespace memgraph::coordination
// SLK serialization declarations
@ -183,15 +210,21 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk:
void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
// UnregisterReplicaRpc
void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
// EnableWritingOnMainRpc
void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
// GetDatabaseHistoriesRpc
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk
#endif

View File

@ -14,6 +14,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "slk/serialization.hpp"
#include "slk/streams.hpp"
@ -34,5 +35,18 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) {
Load(&obj->replication_ip_address, reader);
Load(&obj->replication_port, reader);
}
inline void Save(const replication_coordination_glue::DatabaseHistory &obj, Builder *builder) {
Save(obj.db_uuid, builder);
Save(obj.history, builder);
Save(obj.name, builder);
}
inline void Load(replication_coordination_glue::DatabaseHistory *obj, Reader *reader) {
Load(&obj->db_uuid, reader);
Load(&obj->history, reader);
Load(&obj->name, reader);
}
} // namespace memgraph::slk
#endif

View File

@ -18,17 +18,22 @@
#include "replication_coordination_glue/role.hpp"
#include <libnuraft/nuraft.hxx>
#include "utils/resource_lock.hpp"
#include "utils/result.hpp"
#include "utils/uuid.hpp"
namespace memgraph::coordination {
class CoordinatorInstance;
class ReplicationInstance;
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
class ReplicationInstance {
public:
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
HealthCheckCallback fail_cb);
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckClientCallback succ_cb,
HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb);
ReplicationInstance(ReplicationInstance const &other) = delete;
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
@ -50,9 +55,10 @@ class ReplicationInstance {
auto IsReplica() const -> bool;
auto IsMain() const -> bool;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
HealthCheckCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
-> bool;
auto StartFrequentCheck() -> void;
auto StopFrequentCheck() -> void;
@ -66,16 +72,17 @@ class ReplicationInstance {
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool;
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
auto GetClient() -> CoordinatorClient &;
auto EnableWritingOnMain() -> bool;
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
auto ResetMainUUID() -> void;
auto GetMainUUID() const -> const std::optional<utils::UUID> &;
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
auto GetFailCallback() -> HealthCheckInstanceCallback &;
private:
CoordinatorClient client_;
replication_coordination_glue::ReplicationRole replication_role_;
@ -90,6 +97,9 @@ class ReplicationInstance {
// so we need to send swap uuid again
std::optional<utils::UUID> main_uuid_;
HealthCheckInstanceCallback succ_cb_;
HealthCheckInstanceCallback fail_cb_;
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
}

View File

@ -11,4 +11,5 @@
namespace memgraph::coordination {
enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION };
} // namespace memgraph::coordination

View File

@ -13,15 +13,21 @@
#include "coordination/replication_instance.hpp"
#include <utility>
#include "replication_coordination_glue/handler.hpp"
#include "utils/result.hpp"
namespace memgraph::coordination {
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
HealthCheckInstanceCallback succ_instance_cb,
HealthCheckInstanceCallback fail_instance_cb)
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) {
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
succ_cb_(succ_instance_cb),
fail_cb_(fail_instance_cb) {
if (!client_.DemoteToReplica()) {
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
}
@ -57,26 +63,29 @@ auto ReplicationInstance::IsMain() const -> bool {
}
auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
HealthCheckInstanceCallback main_succ_cb,
HealthCheckInstanceCallback main_fail_cb) -> bool {
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
main_uuid_ = new_uuid;
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
succ_cb_ = main_succ_cb;
fail_cb_ = main_fail_cb;
return true;
}
auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
-> bool {
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
HealthCheckInstanceCallback replica_fail_cb) -> bool {
if (!client_.DemoteToReplica()) {
return false;
}
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
succ_cb_ = replica_succ_cb;
fail_cb_ = replica_fail_cb;
return true;
}
@ -90,10 +99,12 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf
return client_.ReplicationClientInfo();
}
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {

View File

@ -266,10 +266,6 @@ class DbmsHandler {
bool IsMain() const { return repl_state_.IsMain(); }
bool IsReplica() const { return repl_state_.IsReplica(); }
#ifdef MG_ENTERPRISE
// coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
#endif
/**
* @brief Return all active databases.
*

View File

@ -122,11 +122,11 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(length);
[[maybe_unused]] bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(length);
DMG_ASSERT(ok);
}
auto ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
[[maybe_unused]] auto ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
DMG_ASSERT(ok);
return false;

View File

@ -416,7 +416,7 @@ memgraph::storage::PropertyValue StringToValue(const std::string &str, const std
std::string GetIdSpace(const std::string &type) {
// The format of this field is as follows:
// [START_|END_]ID[(<id_space>)]
std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)", std::regex::extended);
static std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)", std::regex::extended);
std::smatch res;
if (!std::regex_match(type, res, format))
throw LoadException(

View File

@ -3798,7 +3798,7 @@ void PrintFuncSignature(const mgp_func &func, std::ostream &stream) {
bool IsValidIdentifierName(const char *name) {
if (!name) return false;
std::regex regex("[_[:alpha:]][_[:alnum:]]*");
static std::regex regex("[_[:alpha:]][_[:alnum:]]*");
return std::regex_match(name, regex);
}

View File

@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue
mode.hpp
role.hpp
handler.hpp
common.hpp
PRIVATE
messages.cpp

View File

@ -0,0 +1,32 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "rpc/client.hpp"
#include "utils/uuid.hpp"
#include <deque>
#include "messages.hpp"
#include "rpc/messages.hpp"
#include "utils/uuid.hpp"
namespace memgraph::replication_coordination_glue {
struct DatabaseHistory {
memgraph::utils::UUID db_uuid;
std::vector<std::pair<std::string, uint64_t>> history;
std::string name;
};
using DatabaseHistories = std::vector<DatabaseHistory>;
} // namespace memgraph::replication_coordination_glue

View File

@ -14,6 +14,7 @@
#include "dbms/dbms_handler.hpp"
#include "flags/experimental.hpp"
#include "replication/include/replication/state.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_handler/system_replication.hpp"
#include "replication_handler/system_rpc.hpp"
#include "utils/result.hpp"
@ -149,6 +150,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto GetReplicaUUID() -> std::optional<utils::UUID>;
auto GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories;
private:
template <bool SendSwapUUID>
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config)

View File

@ -14,6 +14,7 @@
#include "dbms/dbms_handler.hpp"
#include "replication/replication_client.hpp"
#include "replication_handler/system_replication.hpp"
#include "utils/functional.hpp"
namespace memgraph::replication {
@ -265,8 +266,26 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli
return repl_state_.GetRole();
}
auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories {
replication_coordination_glue::DatabaseHistories results;
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history =
utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); },
repl_storage_state.history);
history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
replication_coordination_glue::DatabaseHistory repl{
.db_uuid = utils::UUID{db_acc->storage()->uuid()}, .history = history, .name = std::string(db_acc->name())};
results.emplace_back(repl);
});
return results;
}
auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
MG_ASSERT(repl_state_.IsReplica());
MG_ASSERT(repl_state_.IsReplica(), "Instance is not replica");
return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;
}

View File

@ -123,6 +123,26 @@ inline bool operator==(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer
inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer &b) { return !(a == b); }
struct opt_str {
opt_str(std::optional<std::string> const &other) : str_{other ? new_cstr(*other) : nullptr} {}
~opt_str() { delete[] str_; }
auto as_opt_str() const -> std::optional<std::string> {
if (!str_) return std::nullopt;
return std::optional<std::string>{std::in_place, str_};
}
private:
static auto new_cstr(std::string const &str) -> char const * {
auto *mem = new char[str.length() + 1];
strcpy(mem, str.c_str());
return mem;
}
char const *str_ = nullptr;
};
struct Delta {
enum class Action : std::uint8_t {
/// Use for Vertex and Edge
@ -160,7 +180,7 @@ struct Delta {
// Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from
// current tx. This timestamp we got from RocksDB timestamp stored in key.
Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, std::optional<std::string> old_disk_key)
: timestamp(new std::atomic<uint64_t>(ts)), command_id(0), old_disk_key{.value = std::move(old_disk_key)} {}
: timestamp(new std::atomic<uint64_t>(ts)), command_id(0), old_disk_key{.value = old_disk_key} {}
Delta(DeleteObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
: timestamp(timestamp), command_id(command_id), action(Action::DELETE_OBJECT) {}
@ -222,7 +242,7 @@ struct Delta {
case Action::REMOVE_OUT_EDGE:
break;
case Action::DELETE_DESERIALIZED_OBJECT:
old_disk_key.value.reset();
std::destroy_at(&old_disk_key.value);
delete timestamp;
timestamp = nullptr;
break;
@ -242,7 +262,7 @@ struct Delta {
Action action;
struct {
Action action = Action::DELETE_DESERIALIZED_OBJECT;
std::optional<std::string> value;
opt_str value;
} old_disk_key;
struct {
Action action;

View File

@ -310,7 +310,7 @@ class DiskStorage final : public Storage {
StorageInfo GetBaseInfo() override;
StorageInfo GetInfo(memgraph::replication_coordination_glue::ReplicationRole replication_role) override;
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/) override {}
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/, bool /*periodic*/) override {}
void PrepareForNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); }

View File

@ -144,27 +144,30 @@ void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_st
auto maybe_stop = utils::ResettableCounter<2048>();
for (auto &[label_property, index] : index_) {
auto [label_id, prop_id] = label_property;
// before starting index, check if stop_requested
if (token.stop_requested()) return;
auto index_acc = index.access();
for (auto it = index_acc.begin(); it != index_acc.end();) {
auto it = index_acc.begin();
auto end_it = index_acc.end();
if (it == end_it) continue;
while (true) {
// Hot loop, don't check stop_requested every time
if (maybe_stop() && token.stop_requested()) return;
auto next_it = it;
++next_it;
if (it->timestamp >= oldest_active_start_timestamp) {
it = next_it;
continue;
}
if ((next_it != index_acc.end() && it->vertex == next_it->vertex && it->value == next_it->value) ||
!AnyVersionHasLabelProperty(*it->vertex, label_property.first, label_property.second, it->value,
oldest_active_start_timestamp)) {
index_acc.remove(*it);
bool has_next = next_it != end_it;
if (it->timestamp < oldest_active_start_timestamp) {
bool redundant_duplicate = has_next && it->vertex == next_it->vertex && it->value == next_it->value;
if (redundant_duplicate ||
!AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp)) {
index_acc.remove(*it);
}
}
if (!has_next) break;
it = next_it;
}
}

View File

@ -106,8 +106,8 @@ uint64_t ReplicateCurrentWal(const utils::UUID &main_uuid, const InMemoryStorage
return response.current_commit_timestamp;
}
/// This method tries to find the optimal path for recoverying a single replica.
/// Based on the last commit transfered to replica it tries to update the
/// This method tries to find the optimal path for recovering a single replica.
/// Based on the last commit transferred to replica it tries to update the
/// replica using durability files - WALs and Snapshots. WAL files are much
/// smaller in size as they contain only the Deltas (changes) made during the
/// transactions while Snapshots contain all the data. For that reason we prefer
@ -175,7 +175,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
auto add_snapshot = [&]() {
if (!latest_snapshot) return;
const auto lock_success = locker_acc.AddPath(latest_snapshot->path);
MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path.");
MG_ASSERT(!lock_success.HasError(), "Tried to lock a non-existent snapshot path.");
recovery_steps.emplace_back(std::in_place_type_t<RecoverySnapshot>{}, std::move(latest_snapshot->path));
};

View File

@ -143,9 +143,7 @@ InMemoryStorage::InMemoryStorage(Config config)
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
// TODO: move out of storage have one global gc_runner_
gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
});
gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->FreeMemory({}, true); });
}
if (timestamp_ == kTimestampInitialId) {
commit_log_.emplace();
@ -1425,28 +1423,27 @@ void InMemoryStorage::SetStorageMode(StorageMode new_storage_mode) {
}
storage_mode_ = new_storage_mode;
FreeMemory(std::move(main_guard));
FreeMemory(std::move(main_guard), false);
}
}
template <bool force>
void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard) {
template <bool aggressive = true>
void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) {
// NOTE: You do not need to consider cleanup of deleted object that occurred in
// different storage modes within the same CollectGarbage call. This is because
// SetStorageMode will ensure CollectGarbage is called before any new transactions
// with the new storage mode can start.
// SetStorageMode will pass its unique_lock of main_lock_. We will use that lock,
// as reacquiring the lock would cause deadlock. Otherwise, we need to get our own
// as reacquiring the lock would cause deadlock. Otherwise, we need to get our own
// lock.
if (!main_guard.owns_lock()) {
if constexpr (force) {
// We take the unique lock on the main storage lock, so we can forcefully clean
// everything we can
if (!main_lock_.try_lock()) {
CollectGarbage<false>();
return;
}
if constexpr (aggressive) {
// We tried to be aggressive but we do not already have main lock continue as not aggressive
// Perf note: Do not try to get unique lock if it was not already passed in. GC maybe expensive,
// do not assume it is fast, unique lock will blocks all new storage transactions.
CollectGarbage<false>({}, periodic);
return;
} else {
// Because the garbage collector iterates through the indices and constraints
// to clean them up, it must take the main lock for reading to make sure that
@ -1458,17 +1455,24 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
utils::OnScopeExit lock_releaser{[&] {
if (!main_guard.owns_lock()) {
if constexpr (force) {
main_lock_.unlock();
} else {
main_lock_.unlock_shared();
}
} else {
if (main_guard.owns_lock()) {
main_guard.unlock();
} else {
main_lock_.unlock_shared();
}
}};
// Only one gc run at a time
std::unique_lock<std::mutex> gc_guard(gc_lock_, std::try_to_lock);
if (!gc_guard.owns_lock()) {
return;
}
// Diagnostic trace
spdlog::trace("Storage GC on '{}' started [{}]", name(), periodic ? "periodic" : "forced");
auto trace_on_exit = utils::OnScopeExit{
[&] { spdlog::trace("Storage GC on '{}' finished [{}]", name(), periodic ? "periodic" : "forced"); }};
// Garbage collection must be performed in two phases. In the first phase,
// deltas that won't be applied by any transaction anymore are unlinked from
// the version chains. They cannot be deleted immediately, because there
@ -1476,27 +1480,29 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
// chain traversal. They are instead marked for deletion and will be deleted
// in the second GC phase in this GC iteration or some of the following
// ones.
std::unique_lock<std::mutex> gc_guard(gc_lock_, std::try_to_lock);
if (!gc_guard.owns_lock()) {
return;
}
uint64_t oldest_active_start_timestamp = commit_log_->OldestActive();
// Deltas from previous GC runs or from aborts can be cleaned up here
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
if constexpr (force) {
// if force is set to true we can simply delete all the leftover undos because
// no transaction is active
garbage_undo_buffers.clear();
} else {
// garbage_undo_buffers is ordered, pop until we can't
while (!garbage_undo_buffers.empty() &&
garbage_undo_buffers.front().mark_timestamp_ <= oldest_active_start_timestamp) {
garbage_undo_buffers.pop_front();
{
std::unique_lock<utils::SpinLock> guard(engine_lock_);
uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have
// Deltas from previous GC runs or from aborts can be cleaned up here
garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) {
guard.unlock();
if (aggressive or mark_timestamp == oldest_active_start_timestamp) {
// We know no transaction is active, it is safe to simply delete all the garbage undos
// Nothing can be reading them
garbage_undo_buffers.clear();
} else {
// garbage_undo_buffers is ordered, pop until we can't
while (!garbage_undo_buffers.empty() &&
garbage_undo_buffers.front().mark_timestamp_ <= oldest_active_start_timestamp) {
garbage_undo_buffers.pop_front();
}
}
}
});
});
}
// We don't move undo buffers of unlinked transactions to garbage_undo_buffers
// list immediately, because we would have to repeatedly take
@ -1694,7 +1700,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
std::unique_lock<utils::SpinLock> guard(engine_lock_);
uint64_t mark_timestamp = timestamp_; // a timestamp no active transaction can currently have
if (force or mark_timestamp == oldest_active_start_timestamp) {
if (aggressive or mark_timestamp == oldest_active_start_timestamp) {
guard.unlock();
// if lucky, there are no active transactions, hence nothing looking at the deltas
// remove them now
unlinked_undo_buffers.clear();
@ -1756,8 +1763,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
}
// tell the linker he can find the CollectGarbage definitions here
template void InMemoryStorage::CollectGarbage<true>(std::unique_lock<utils::ResourceLock>);
template void InMemoryStorage::CollectGarbage<false>(std::unique_lock<utils::ResourceLock>);
template void InMemoryStorage::CollectGarbage<true>(std::unique_lock<utils::ResourceLock> main_guard, bool periodic);
template void InMemoryStorage::CollectGarbage<false>(std::unique_lock<utils::ResourceLock> main_guard, bool periodic);
StorageInfo InMemoryStorage::GetBaseInfo() {
StorageInfo info{};
@ -2108,50 +2115,35 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(
memgraph::replication_coordination_glue::ReplicationRole replication_role) {
if (replication_role == memgraph::replication_coordination_glue::ReplicationRole::REPLICA) {
using memgraph::replication_coordination_glue::ReplicationRole;
if (replication_role == ReplicationRole::REPLICA) {
return InMemoryStorage::CreateSnapshotError::DisabledForReplica;
}
auto const &epoch = repl_storage_state_.epoch_;
auto snapshot_creator = [this, &epoch]() {
utils::Timer timer;
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_,
memgraph::replication_coordination_glue::ReplicationRole::MAIN);
durability::CreateSnapshot(this, &transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_,
&edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
};
std::lock_guard snapshot_guard(snapshot_lock_);
auto should_try_shared{true};
auto max_num_tries{10};
while (max_num_tries) {
if (should_try_shared) {
std::shared_lock storage_guard(main_lock_);
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
snapshot_creator();
return {};
}
auto accessor = std::invoke([&]() {
if (storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL) {
// For analytical no other txn can be in play
return UniqueAccess(ReplicationRole::MAIN, IsolationLevel::SNAPSHOT_ISOLATION);
} else {
std::unique_lock main_guard{main_lock_};
if (storage_mode_ == memgraph::storage::StorageMode::IN_MEMORY_ANALYTICAL) {
snapshot_creator();
return {};
}
return Access(ReplicationRole::MAIN, IsolationLevel::SNAPSHOT_ISOLATION);
}
should_try_shared = !should_try_shared;
max_num_tries--;
}
});
return CreateSnapshotError::ReachedMaxNumTries;
utils::Timer timer;
Transaction *transaction = accessor->GetTransaction();
auto const &epoch = repl_storage_state_.epoch_;
durability::CreateSnapshot(this, transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_,
&edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_);
memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
return {};
}
void InMemoryStorage::FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) {
CollectGarbage<true>(std::move(main_guard));
void InMemoryStorage::FreeMemory(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) {
CollectGarbage(std::move(main_guard), periodic);
static_cast<InMemoryLabelIndex *>(indices_.label_index_.get())->RunGC();
static_cast<InMemoryLabelPropertyIndex *>(indices_.label_property_index_.get())->RunGC();

View File

@ -332,7 +332,7 @@ class InMemoryStorage final : public Storage {
std::unique_ptr<Accessor> UniqueAccess(memgraph::replication_coordination_glue::ReplicationRole replication_role,
std::optional<IsolationLevel> override_isolation_level) override;
void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) override;
void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) override;
utils::FileRetainer::FileLockerAccessor::ret_type IsPathLocked();
utils::FileRetainer::FileLockerAccessor::ret_type LockPath();
@ -363,7 +363,7 @@ class InMemoryStorage final : public Storage {
/// @throw std::system_error
/// @throw std::bad_alloc
template <bool force>
void CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard = {});
void CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard, bool periodic);
bool InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch);
void FinalizeWalFile();

View File

@ -1051,6 +1051,14 @@ struct SpecificPropertyAndBufferInfo {
uint64_t all_size;
};
// Struct used to return info about the property position
struct SpecificPropertyAndBufferInfoMinimal {
uint64_t property_begin;
uint64_t property_end;
auto property_size() const { return property_end - property_begin; }
};
// Function used to find the position where the property should be in the data
// buffer. It keeps the properties in the buffer sorted by `PropertyId` and
// returns the positions in the buffer where the seeked property starts and
@ -1083,6 +1091,27 @@ SpecificPropertyAndBufferInfo FindSpecificPropertyAndBufferInfo(Reader *reader,
return {property_begin, property_end, property_end - property_begin, all_begin, all_end, all_end - all_begin};
}
// Like FindSpecificPropertyAndBufferInfo, but will early exit. No need to find the "all" information
SpecificPropertyAndBufferInfoMinimal FindSpecificPropertyAndBufferInfoMinimal(Reader *reader, PropertyId property) {
uint64_t property_begin = reader->GetPosition();
while (true) {
switch (HasExpectedProperty(reader, property)) {
case ExpectedPropertyStatus::MISSING_DATA:
[[fallthrough]];
case ExpectedPropertyStatus::GREATER: {
return {0, 0};
}
case ExpectedPropertyStatus::EQUAL: {
return {property_begin, reader->GetPosition()};
}
case ExpectedPropertyStatus::SMALLER: {
property_begin = reader->GetPosition();
break;
}
}
}
}
// All data buffers will be allocated to a power of 8 size.
uint64_t ToPowerOf8(uint64_t size) {
uint64_t mod = size % 8;
@ -1254,11 +1283,12 @@ bool PropertyStore::IsPropertyEqual(PropertyId property, const PropertyValue &va
BufferInfo buffer_info = GetBufferInfo(buffer_);
Reader reader(buffer_info.data, buffer_info.size);
auto info = FindSpecificPropertyAndBufferInfo(&reader, property);
if (info.property_size == 0) return value.IsNull();
Reader prop_reader(buffer_info.data + info.property_begin, info.property_size);
auto info = FindSpecificPropertyAndBufferInfoMinimal(&reader, property);
auto property_size = info.property_size();
if (property_size == 0) return value.IsNull();
Reader prop_reader(buffer_info.data + info.property_begin, property_size);
if (!CompareExpectedProperty(&prop_reader, property, value)) return false;
return prop_reader.GetPosition() == info.property_size;
return prop_reader.GetPosition() == property_size;
}
std::map<PropertyId, PropertyValue> PropertyStore::Properties() const {

View File

@ -53,11 +53,13 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
#endif
std::optional<uint64_t> branching_point;
// different epoch id, replica was main
if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) {
auto const &history = replStorageState.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id;
});
// main didn't have that epoch, but why is here branching point
if (epoch_info_iter == history.crend()) {
branching_point = 0;
} else if (epoch_info_iter->second != replica.current_commit_timestamp) {

View File

@ -284,6 +284,8 @@ class Storage {
virtual UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label,
const std::set<PropertyId> &properties) = 0;
auto GetTransaction() -> Transaction * { return std::addressof(transaction_); }
protected:
Storage *storage_;
std::shared_lock<utils::ResourceLock> storage_guard_;
@ -336,9 +338,15 @@ class Storage {
StorageMode GetStorageMode() const noexcept;
virtual void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard) = 0;
virtual void FreeMemory(std::unique_lock<utils::ResourceLock> main_guard, bool periodic) = 0;
void FreeMemory() { FreeMemory({}); }
void FreeMemory() {
if (storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL) {
FreeMemory(std::unique_lock{main_lock_}, false);
} else {
FreeMemory({}, false);
}
}
virtual std::unique_ptr<Accessor> Access(memgraph::replication_coordination_glue::ReplicationRole replication_role,
std::optional<IsolationLevel> override_isolation_level) = 0;

View File

@ -21,7 +21,7 @@ inline std::optional<std::string> GetOldDiskKeyOrNull(storage::Delta *head) {
head = head->next;
}
if (head->action == storage::Delta::Action::DELETE_DESERIALIZED_OBJECT) {
return head->old_disk_key.value;
return head->old_disk_key.value.as_opt_str();
}
return std::nullopt;
}

View File

@ -18,8 +18,11 @@
namespace memgraph::utils {
template <class F, class T, class R = typename std::invoke_result<F, T>::type>
auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> {
template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>,
typename F, typename R = std::invoke_result_t<F, T>>
requires ranges::range<Container<T, Allocator>> &&
(!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v)
-> std::vector<R> {
return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
}

View File

@ -66,6 +66,17 @@ struct ResourceLock {
}
return false;
}
template <typename Rep, typename Period>
bool try_lock_shared_for(std::chrono::duration<Rep, Period> const &time) {
auto lock = std::unique_lock{mtx};
// block until available
if (!cv.wait_for(lock, time, [this] { return state != UNIQUE; })) return false;
state = SHARED;
++count;
return true;
}
void unlock() {
auto lock = std::unique_lock{mtx};
state = UNLOCKED;

View File

@ -57,7 +57,7 @@ class Scheduler {
// program and there is probably no work to do in scheduled function at
// the start of the program. Since Server will log some messages on
// the program start we let him log first and we make sure by first
// waiting that funcion f will not log before it.
// waiting that function f will not log before it.
// Check for pause also.
std::unique_lock<std::mutex> lk(mutex_);
auto now = std::chrono::system_clock::now();

View File

@ -242,7 +242,7 @@ std::vector<TString, TAllocator> *Split(std::vector<TString, TAllocator> *out, c
if (src.empty()) return out;
// TODO: Investigate how much regex allocate and perhaps replace with custom
// solution doing no allocations.
std::regex not_whitespace("[^\\s]+");
static std::regex not_whitespace("[^\\s]+");
auto matches_begin = std::cregex_iterator(src.data(), src.data() + src.size(), not_whitespace);
auto matches_end = std::cregex_iterator();
out->reserve(std::distance(matches_begin, matches_end));

View File

@ -114,6 +114,8 @@ enum class TypeId : uint64_t {
COORD_GET_UUID_REQ,
COORD_GET_UUID_RES,
COORD_GET_INSTANCE_DATABASES_REQ,
COORD_GET_INSTANCE_DATABASES_RES,
// AST
AST_LABELIX = 3000,

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -39,9 +39,10 @@ struct UUID {
UUID() { uuid_generate(uuid.data()); }
explicit operator std::string() const {
auto decoded = std::array<char, UUID_STR_LEN>{};
// Note not using UUID_STR_LEN so we can build with older libuuid
auto decoded = std::array<char, 37 /*UUID_STR_LEN*/>{};
uuid_unparse(uuid.data(), decoded.data());
return std::string{decoded.data(), UUID_STR_LEN - 1};
return std::string{decoded.data(), 37 /*UUID_STR_LEN*/ - 1};
}
explicit operator arr_t() const { return uuid; }

View File

@ -431,7 +431,7 @@ def test_node_type_properties1():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)[0]
)
assert (result) == [":`Activity`", ["Activity"], "location", "String", False]
assert (result) == [":`Activity`", ["Activity"], "location", ["String"], True]
result = list(
execute_and_fetch_all(
@ -439,7 +439,7 @@ def test_node_type_properties1():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)[1]
)
assert (result) == [":`Activity`", ["Activity"], "name", "String", False]
assert (result) == [":`Activity`", ["Activity"], "name", ["String"], True]
result = list(
execute_and_fetch_all(
@ -447,7 +447,7 @@ def test_node_type_properties1():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)[2]
)
assert (result) == [":`Dog`", ["Dog"], "name", "String", False]
assert (result) == [":`Dog`", ["Dog"], "name", ["String"], True]
result = list(
execute_and_fetch_all(
@ -455,7 +455,7 @@ def test_node_type_properties1():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)[3]
)
assert (result) == [":`Dog`", ["Dog"], "owner", "String", False]
assert (result) == [":`Dog`", ["Dog"], "owner", ["String"], True]
def test_node_type_properties2():
@ -471,7 +471,8 @@ def test_node_type_properties2():
cursor,
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)
assert (list(result[0])) == [":`MyNode`", ["MyNode"], "", "", False]
assert (list(result[0])) == [":`MyNode`", ["MyNode"], "", [], False]
assert (result.__len__()) == 1
@ -489,8 +490,8 @@ def test_node_type_properties3():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", "String", False]
assert (list(result[1])) == [":`Dog`", ["Dog"], "owner", "String", False]
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", ["String"], False]
assert (list(result[1])) == [":`Dog`", ["Dog"], "owner", ["String"], False]
assert (result.__len__()) == 2
@ -509,9 +510,9 @@ def test_node_type_properties4():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)
)
assert (list(result[0])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property1", "String", False]
assert (list(result[1])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property2", "String", False]
assert (list(result[2])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property3", "String", False]
assert (list(result[0])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property1", ["String"], False]
assert (list(result[1])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property2", ["String"], False]
assert (list(result[2])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property3", ["String"], False]
assert (result.__len__()) == 3
@ -528,7 +529,49 @@ def test_node_type_properties5():
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", "String", True]
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", ["String"], True]
assert (result.__len__()) == 1
def test_node_type_properties6():
cursor = connect().cursor()
execute_and_fetch_all(
cursor,
"""
CREATE (d:Dog {name: 'Rex'})
CREATE (n:Dog {name: 'Simba', owner: 'Lucy'})
""",
)
result = execute_and_fetch_all(
cursor,
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", ["String"], True]
assert (list(result[1])) == [":`Dog`", ["Dog"], "owner", ["String"], False]
assert (result.__len__()) == 2
def test_node_type_properties_multiple_property_types():
cursor = connect().cursor()
execute_and_fetch_all(
cursor,
"""
CREATE (n:Node {prop1: 1})
CREATE (m:Node {prop1: '1'})
""",
)
result = execute_and_fetch_all(
cursor,
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
)
assert (list(result[0])) == [":`Node`", ["Node"], "prop1", ["Int", "String"], True] or (list(result[0])) == [
":`Node`",
["Node"],
"prop1",
["String", "Int"],
True,
]
assert (result.__len__()) == 1
@ -544,7 +587,7 @@ def test_rel_type_properties1():
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
)[0]
)
assert (result) == [":`LOVES`", "", "", False]
assert (result) == [":`LOVES`", "", [], False]
def test_rel_type_properties2():
@ -560,7 +603,7 @@ def test_rel_type_properties2():
cursor,
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
)
assert (list(result[0])) == [":`LOVES`", "duration", "Int", False]
assert (list(result[0])) == [":`LOVES`", "duration", ["Int"], False]
assert (result.__len__()) == 1
@ -576,7 +619,47 @@ def test_rel_type_properties3():
cursor,
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
)
assert (list(result[0])) == [":`LOVES`", "duration", "Int", True]
assert (list(result[0])) == [":`LOVES`", "duration", ["Int"], True]
assert (result.__len__()) == 1
def test_rel_type_properties4():
cursor = connect().cursor()
execute_and_fetch_all(
cursor,
"""
CREATE (n:Dog {name: 'Simba', owner: 'Lucy'})-[j:LOVES {duration: 30}]->(a:Activity {name: 'Running', location: 'Zadar'})
CREATE (m:Dog {name: 'Rex', owner: 'Lucy'})-[r:LOVES {duration: 30, weather: 'sunny'}]->(b:Activity {name: 'Running', location: 'Zadar'})
""",
)
result = execute_and_fetch_all(
cursor,
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
)
assert (list(result[0])) == [":`LOVES`", "weather", ["String"], False]
assert (list(result[1])) == [":`LOVES`", "duration", ["Int"], True]
assert (result.__len__()) == 2
def test_rel_type_properties_multiple_property_types():
cursor = connect().cursor()
execute_and_fetch_all(
cursor,
"""
CREATE (n:Dog {name: 'Simba', owner: 'Lucy'})-[j:LOVES {duration: 30}]->(a:Activity {name: 'Running', location: 'Zadar'})
CREATE (m:Dog {name: 'Rex', owner: 'Lucy'})-[r:LOVES {duration: "30"}]->(b:Activity {name: 'Running', location: 'Zadar'})
""",
)
result = execute_and_fetch_all(
cursor,
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
)
assert (list(result[0])) == [":`LOVES`", "duration", ["Int", "String"], True] or (list(result[0])) == [
":`LOVES`",
"duration",
["String", "Int"],
True,
]
assert (result.__len__()) == 1

View File

@ -430,3 +430,11 @@ target_include_directories(${test_prefix}distributed_lamport_clock PRIVATE ${CMA
add_unit_test(query_hint_provider.cpp)
target_link_libraries(${test_prefix}query_hint_provider mg-query mg-glue)
# Test coordination
if(MG_ENTERPRISE)
add_unit_test(coordination_utils.cpp)
target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue)
target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include)
endif()

View File

@ -0,0 +1,246 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "coordination/coordinator_instance.hpp"
#include "dbms/constants.hpp"
#include "replication_coordination_glue/common.hpp"
#include "utils/functional.hpp"
class CoordinationUtils : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_coordination"};
};
TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
// Choose any if everything is same
// X = dead
// Main : A(24) B(36) C(48) D(50) E(51) X
// replica 1: A(24) B(36) C(48) D(50) E(51)
// replica 2: A(24) B(36) C(48) D(50) E(51)
// replica 3: A(24) B(36) C(48) D(50) E(51)
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 51);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
// Prioritize one with the biggest last commit timestamp on last epoch
// X = dead
// Main : A(24) B(36) C(48) D(50) E(59) X
// replica 1: A(24) B(12) C(15) D(17) E(51)
// replica 2: A(24) B(12) C(15) D(17) E(57)
// replica 3: A(24) B(12) C(15) D(17) E(59)
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 59);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
db_histories.back().second = 51;
memgraph::replication_coordination_glue::DatabaseHistory history1{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history1};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
db_histories.back().second = 57;
memgraph::replication_coordination_glue::DatabaseHistory history2{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
db_histories.back().second = 59;
memgraph::replication_coordination_glue::DatabaseHistory history3{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
// Prioritize one biggest commit timestamp
// X = dead
// Main : A(24) B(36) C(48) D(50) E(51) X X X X
// replica 1: A(24) B(36) C(48) D(50) E(51) F(60) G(65) X up
// replica 2: A(24) B(36) C(48) D(50) E(51) X X X up
// replica 3: A(24) B(36) C(48) D(50) E(51) X X X up
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 24);
histories.emplace_back(memgraph::utils::UUID{}, 36);
histories.emplace_back(memgraph::utils::UUID{}, 48);
histories.emplace_back(memgraph::utils::UUID{}, 50);
histories.emplace_back(memgraph::utils::UUID{}, 51);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
histories.emplace_back(memgraph::utils::UUID{}, 60);
histories.emplace_back(memgraph::utils::UUID{}, 65);
auto db_histories_longest = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history_longest{
.db_uuid = db_uuid, .history = db_histories_longest, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first);
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
}
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
// When history diverged, also prioritize one with biggest last commit timestamp
// Main : A(1) B(2) C(3) X
// replica 1: A(1) B(2) C(3) X X up
// replica 2: A(1) B(2) X D(5) X up
// replica 3: A(1) B(2) X D(4) X up
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
instance_database_histories;
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
histories.emplace_back(memgraph::utils::UUID{}, 1);
histories.emplace_back(memgraph::utils::UUID{}, 2);
histories.emplace_back(memgraph::utils::UUID{}, 3);
memgraph::utils::UUID db_uuid;
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
auto db_histories = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history{
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
db_histories.pop_back();
auto oldest_commit_timestamp{5};
auto newest_different_epoch = memgraph::utils::UUID{};
histories.emplace_back(newest_different_epoch, oldest_commit_timestamp);
auto db_histories_different = memgraph::utils::fmap(
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
return std::make_pair(std::string(pair.first), pair.second);
},
histories);
memgraph::replication_coordination_glue::DatabaseHistory history_3{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_3};
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
db_histories_different.back().second = 4;
memgraph::replication_coordination_glue::DatabaseHistory history_2{
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
memgraph::coordination::CoordinatorInstance instance;
auto [instance_name, latest_epoch, latest_commit_timestamp] =
instance.ChooseMostUpToDateInstance(instance_database_histories);
ASSERT_TRUE(instance_name == "instance_3");
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
}