Merge branch 'master' into text-search-integration-poc
This commit is contained in:
commit
ddc628c271
@ -45,6 +45,7 @@ MEMGRAPH_BUILD_DEPS=(
|
||||
readline-devel # for memgraph console
|
||||
python3-devel # for query modules
|
||||
openssl-devel
|
||||
openssl
|
||||
libseccomp-devel
|
||||
python3 python3-pip nmap-ncat # for tests
|
||||
#
|
||||
|
@ -43,6 +43,7 @@ MEMGRAPH_BUILD_DEPS=(
|
||||
readline-devel # for memgraph console
|
||||
python3-devel # for query modules
|
||||
openssl-devel
|
||||
openssl
|
||||
libseccomp-devel
|
||||
python3 python-virtualenv python3-pip nmap-ncat # for qa, macro_benchmark and stress tests
|
||||
#
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -9,10 +9,11 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <mgp.hpp>
|
||||
#include "utils/string.hpp"
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace Schema {
|
||||
|
||||
@ -37,6 +38,7 @@ constexpr std::string_view kParameterIndices = "indices";
|
||||
constexpr std::string_view kParameterUniqueConstraints = "unique_constraints";
|
||||
constexpr std::string_view kParameterExistenceConstraints = "existence_constraints";
|
||||
constexpr std::string_view kParameterDropExisting = "drop_existing";
|
||||
constexpr int kInitialNumberOfPropertyOccurances = 1;
|
||||
|
||||
std::string TypeOf(const mgp::Type &type);
|
||||
|
||||
@ -108,83 +110,79 @@ void Schema::ProcessPropertiesRel(mgp::Record &record, const std::string_view &t
|
||||
record.Insert(std::string(kReturnMandatory).c_str(), mandatory);
|
||||
}
|
||||
|
||||
struct Property {
|
||||
std::string name;
|
||||
mgp::Value value;
|
||||
struct PropertyInfo {
|
||||
std::unordered_set<std::string> property_types; // property types
|
||||
int64_t number_of_property_occurrences = 0;
|
||||
|
||||
Property(const std::string &name, mgp::Value &&value) : name(name), value(std::move(value)) {}
|
||||
PropertyInfo() = default;
|
||||
explicit PropertyInfo(std::string &&property_type)
|
||||
: property_types({std::move(property_type)}),
|
||||
number_of_property_occurrences(Schema::kInitialNumberOfPropertyOccurances) {}
|
||||
};
|
||||
|
||||
struct LabelsInfo {
|
||||
std::unordered_map<std::string, PropertyInfo> properties; // key is a property name
|
||||
int64_t number_of_label_occurrences = 0;
|
||||
};
|
||||
|
||||
struct LabelsHash {
|
||||
std::size_t operator()(const std::set<std::string> &set) const {
|
||||
std::size_t seed = set.size();
|
||||
for (const auto &i : set) {
|
||||
seed ^= std::hash<std::string>{}(i) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
|
||||
}
|
||||
return seed;
|
||||
}
|
||||
std::size_t operator()(const std::set<std::string> &s) const { return boost::hash_range(s.begin(), s.end()); }
|
||||
};
|
||||
|
||||
struct LabelsComparator {
|
||||
bool operator()(const std::set<std::string> &lhs, const std::set<std::string> &rhs) const { return lhs == rhs; }
|
||||
};
|
||||
|
||||
struct PropertyComparator {
|
||||
bool operator()(const Property &lhs, const Property &rhs) const { return lhs.name < rhs.name; }
|
||||
};
|
||||
|
||||
struct PropertyInfo {
|
||||
std::set<Property, PropertyComparator> properties;
|
||||
bool mandatory;
|
||||
};
|
||||
|
||||
void Schema::NodeTypeProperties(mgp_list * /*args*/, mgp_graph *memgraph_graph, mgp_result *result,
|
||||
mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard{memory};
|
||||
const auto record_factory = mgp::RecordFactory(result);
|
||||
try {
|
||||
std::unordered_map<std::set<std::string>, PropertyInfo, LabelsHash, LabelsComparator> node_types_properties;
|
||||
std::unordered_map<std::set<std::string>, LabelsInfo, LabelsHash, LabelsComparator> node_types_properties;
|
||||
|
||||
for (auto node : mgp::Graph(memgraph_graph).Nodes()) {
|
||||
for (const auto node : mgp::Graph(memgraph_graph).Nodes()) {
|
||||
std::set<std::string> labels_set = {};
|
||||
for (auto label : node.Labels()) {
|
||||
for (const auto label : node.Labels()) {
|
||||
labels_set.emplace(label);
|
||||
}
|
||||
|
||||
if (node_types_properties.find(labels_set) == node_types_properties.end()) {
|
||||
node_types_properties[labels_set] = PropertyInfo{std::set<Property, PropertyComparator>(), true};
|
||||
}
|
||||
node_types_properties[labels_set].number_of_label_occurrences++;
|
||||
|
||||
if (node.Properties().empty()) {
|
||||
node_types_properties[labels_set].mandatory = false; // if there is node with no property, it is not mandatory
|
||||
continue;
|
||||
}
|
||||
|
||||
auto &property_info = node_types_properties.at(labels_set);
|
||||
for (auto &[key, prop] : node.Properties()) {
|
||||
property_info.properties.emplace(key, std::move(prop));
|
||||
if (property_info.mandatory) {
|
||||
property_info.mandatory =
|
||||
property_info.properties.size() == 1; // if there is only one property, it is mandatory
|
||||
auto &labels_info = node_types_properties.at(labels_set);
|
||||
for (const auto &[key, prop] : node.Properties()) {
|
||||
auto prop_type = TypeOf(prop.Type());
|
||||
if (labels_info.properties.find(key) == labels_info.properties.end()) {
|
||||
labels_info.properties[key] = PropertyInfo{std::move(prop_type)};
|
||||
} else {
|
||||
labels_info.properties[key].property_types.emplace(prop_type);
|
||||
labels_info.properties[key].number_of_property_occurrences++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto &[labels, property_info] : node_types_properties) {
|
||||
for (auto &[node_type, labels_info] : node_types_properties) { // node type is a set of labels
|
||||
std::string label_type;
|
||||
mgp::List labels_list = mgp::List();
|
||||
for (auto const &label : labels) {
|
||||
auto labels_list = mgp::List();
|
||||
for (const auto &label : node_type) {
|
||||
label_type += ":`" + std::string(label) + "`";
|
||||
labels_list.AppendExtend(mgp::Value(label));
|
||||
}
|
||||
for (auto const &prop : property_info.properties) {
|
||||
for (const auto &prop : labels_info.properties) {
|
||||
auto prop_types = mgp::List();
|
||||
for (const auto &prop_type : prop.second.property_types) {
|
||||
prop_types.AppendExtend(mgp::Value(prop_type));
|
||||
}
|
||||
bool mandatory = prop.second.number_of_property_occurrences == labels_info.number_of_label_occurrences;
|
||||
auto record = record_factory.NewRecord();
|
||||
ProcessPropertiesNode(record, label_type, labels_list, prop.name, TypeOf(prop.value.Type()),
|
||||
property_info.mandatory);
|
||||
ProcessPropertiesNode(record, label_type, labels_list, prop.first, prop_types, mandatory);
|
||||
}
|
||||
if (property_info.properties.empty()) {
|
||||
if (labels_info.properties.empty()) {
|
||||
auto record = record_factory.NewRecord();
|
||||
ProcessPropertiesNode<std::string>(record, label_type, labels_list, "", "", false);
|
||||
ProcessPropertiesNode<mgp::List>(record, label_type, labels_list, "", mgp::List(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -197,40 +195,45 @@ void Schema::NodeTypeProperties(mgp_list * /*args*/, mgp_graph *memgraph_graph,
|
||||
void Schema::RelTypeProperties(mgp_list * /*args*/, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) {
|
||||
mgp::MemoryDispatcherGuard guard{memory};
|
||||
|
||||
std::unordered_map<std::string, PropertyInfo> rel_types_properties;
|
||||
std::unordered_map<std::string, LabelsInfo> rel_types_properties;
|
||||
const auto record_factory = mgp::RecordFactory(result);
|
||||
try {
|
||||
const mgp::Graph graph = mgp::Graph(memgraph_graph);
|
||||
for (auto rel : graph.Relationships()) {
|
||||
const auto graph = mgp::Graph(memgraph_graph);
|
||||
for (const auto rel : graph.Relationships()) {
|
||||
std::string rel_type = std::string(rel.Type());
|
||||
if (rel_types_properties.find(rel_type) == rel_types_properties.end()) {
|
||||
rel_types_properties[rel_type] = PropertyInfo{std::set<Property, PropertyComparator>(), true};
|
||||
}
|
||||
|
||||
rel_types_properties[rel_type].number_of_label_occurrences++;
|
||||
|
||||
if (rel.Properties().empty()) {
|
||||
rel_types_properties[rel_type].mandatory = false; // if there is rel with no property, it is not mandatory
|
||||
continue;
|
||||
}
|
||||
|
||||
auto &property_info = rel_types_properties.at(rel_type);
|
||||
auto &labels_info = rel_types_properties.at(rel_type);
|
||||
for (auto &[key, prop] : rel.Properties()) {
|
||||
property_info.properties.emplace(key, std::move(prop));
|
||||
if (property_info.mandatory) {
|
||||
property_info.mandatory =
|
||||
property_info.properties.size() == 1; // if there is only one property, it is mandatory
|
||||
auto prop_type = TypeOf(prop.Type());
|
||||
if (labels_info.properties.find(key) == labels_info.properties.end()) {
|
||||
labels_info.properties[key] = PropertyInfo{std::move(prop_type)};
|
||||
} else {
|
||||
labels_info.properties[key].property_types.emplace(prop_type);
|
||||
labels_info.properties[key].number_of_property_occurrences++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto &[type, property_info] : rel_types_properties) {
|
||||
std::string type_str = ":`" + std::string(type) + "`";
|
||||
for (auto const &prop : property_info.properties) {
|
||||
for (auto &[rel_type, labels_info] : rel_types_properties) {
|
||||
std::string type_str = ":`" + std::string(rel_type) + "`";
|
||||
for (const auto &prop : labels_info.properties) {
|
||||
auto prop_types = mgp::List();
|
||||
for (const auto &prop_type : prop.second.property_types) {
|
||||
prop_types.AppendExtend(mgp::Value(prop_type));
|
||||
}
|
||||
bool mandatory = prop.second.number_of_property_occurrences == labels_info.number_of_label_occurrences;
|
||||
auto record = record_factory.NewRecord();
|
||||
ProcessPropertiesRel(record, type_str, prop.name, TypeOf(prop.value.Type()), property_info.mandatory);
|
||||
ProcessPropertiesRel(record, type_str, prop.first, prop_types, mandatory);
|
||||
}
|
||||
if (property_info.properties.empty()) {
|
||||
if (labels_info.properties.empty()) {
|
||||
auto record = record_factory.NewRecord();
|
||||
ProcessPropertiesRel<std::string>(record, type_str, "", "", false);
|
||||
ProcessPropertiesRel<mgp::List>(record, type_str, "", mgp::List(), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "coordination/coordinator_rpc.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "replication_coordination_glue/messages.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
@ -30,7 +31,7 @@ auto CreateClientContext(memgraph::coordination::CoordinatorClientConfig const &
|
||||
} // namespace
|
||||
|
||||
CoordinatorClient::CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
|
||||
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb)
|
||||
: rpc_context_{CreateClientContext(config)},
|
||||
rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port),
|
||||
&rpc_context_},
|
||||
@ -68,6 +69,10 @@ void CoordinatorClient::StartFrequentCheck() {
|
||||
auto stream{rpc_client_.Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()};
|
||||
stream.AwaitResponse();
|
||||
}
|
||||
// Subtle race condition:
|
||||
// acquiring of lock needs to happen before function call, as function callback can be changed
|
||||
// for instance after lock is already acquired
|
||||
// (failover case when instance is promoted to MAIN)
|
||||
succ_cb_(coord_instance_, instance_name);
|
||||
} catch (rpc::RpcFailedException const &) {
|
||||
fail_cb_(coord_instance_, instance_name);
|
||||
@ -79,11 +84,6 @@ void CoordinatorClient::StopFrequentCheck() { instance_checker_.Stop(); }
|
||||
void CoordinatorClient::PauseFrequentCheck() { instance_checker_.Pause(); }
|
||||
void CoordinatorClient::ResumeFrequentCheck() { instance_checker_.Resume(); }
|
||||
|
||||
auto CoordinatorClient::SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void {
|
||||
succ_cb_ = std::move(succ_cb);
|
||||
fail_cb_ = std::move(fail_cb);
|
||||
}
|
||||
|
||||
auto CoordinatorClient::ReplicationClientInfo() const -> ReplClientInfo { return config_.replication_client_info; }
|
||||
|
||||
auto CoordinatorClient::SendPromoteReplicaToMainRpc(const utils::UUID &uuid,
|
||||
@ -171,5 +171,19 @@ auto CoordinatorClient::SendEnableWritingOnMainRpc() const -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories> {
|
||||
try {
|
||||
auto stream{rpc_client_.Stream<coordination::GetDatabaseHistoriesRpc>()};
|
||||
auto res = stream.AwaitResponse();
|
||||
|
||||
return res.database_histories;
|
||||
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
|
||||
return GetInstanceUUIDError::RPC_EXCEPTION;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -57,6 +57,17 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
|
||||
spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
|
||||
CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
|
||||
});
|
||||
|
||||
server.Register<coordination::GetDatabaseHistoriesRpc>(
|
||||
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
|
||||
spdlog::info("Received GetDatabasesHistoryRpc on coordinator server");
|
||||
CoordinatorHandlers::GetDatabaseHistoriesHandler(replication_handler, req_reader, res_builder);
|
||||
});
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler,
|
||||
slk::Reader * /*req_reader*/, slk::Builder *res_builder) {
|
||||
slk::Save(coordination::GetDatabaseHistoriesRes{replication_handler.GetDatabasesHistories()}, res_builder);
|
||||
}
|
||||
|
||||
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,
|
||||
|
@ -15,10 +15,12 @@
|
||||
|
||||
#include "coordination/coordinator_exceptions.hpp"
|
||||
#include "coordination/fmt.hpp"
|
||||
#include "dbms/constants.hpp"
|
||||
#include "nuraft/coordinator_state_machine.hpp"
|
||||
#include "nuraft/coordinator_state_manager.hpp"
|
||||
#include "utils/counter.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
#include "utils/resource_lock.hpp"
|
||||
|
||||
#include <range/v3/view.hpp>
|
||||
#include <shared_mutex>
|
||||
@ -32,96 +34,28 @@ CoordinatorInstance::CoordinatorInstance()
|
||||
: raft_state_(RaftState::MakeRaftState(
|
||||
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StartFrequentCheck); },
|
||||
[this] { std::ranges::for_each(repl_instances_, &ReplicationInstance::StopFrequentCheck); })) {
|
||||
auto find_repl_instance = [](CoordinatorInstance *self,
|
||||
std::string_view repl_instance_name) -> ReplicationInstance & {
|
||||
auto repl_instance =
|
||||
std::ranges::find_if(self->repl_instances_, [repl_instance_name](ReplicationInstance const &instance) {
|
||||
return instance.InstanceName() == repl_instance_name;
|
||||
});
|
||||
|
||||
MG_ASSERT(repl_instance != self->repl_instances_.end(), "Instance {} not found during callback!",
|
||||
repl_instance_name);
|
||||
return *repl_instance;
|
||||
client_succ_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||
std::invoke(repl_instance.GetSuccessCallback(), self, repl_instance_name);
|
||||
};
|
||||
|
||||
replica_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
||||
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
||||
auto &repl_instance = find_repl_instance(self, repl_instance_name);
|
||||
|
||||
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
|
||||
// and that it didn't go down for less time than we could notice
|
||||
// We need to get id of main replica is listening to
|
||||
// and swap if necessary
|
||||
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(self->GetMainUUID())) {
|
||||
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
|
||||
return;
|
||||
}
|
||||
|
||||
repl_instance.OnSuccessPing();
|
||||
client_fail_cb_ = [](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::unique_lock{self->coord_instance_lock_};
|
||||
auto &repl_instance = self->FindReplicationInstance(repl_instance_name);
|
||||
std::invoke(repl_instance.GetFailCallback(), self, repl_instance_name);
|
||||
};
|
||||
}
|
||||
|
||||
replica_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
||||
auto &repl_instance = find_repl_instance(self, repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
};
|
||||
auto CoordinatorInstance::FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance & {
|
||||
auto repl_instance =
|
||||
std::ranges::find_if(repl_instances_, [replication_instance_name](ReplicationInstance const &instance) {
|
||||
return instance.InstanceName() == replication_instance_name;
|
||||
});
|
||||
|
||||
main_succ_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
||||
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
|
||||
|
||||
auto &repl_instance = find_repl_instance(self, repl_instance_name);
|
||||
|
||||
if (repl_instance.IsAlive()) {
|
||||
repl_instance.OnSuccessPing();
|
||||
return;
|
||||
}
|
||||
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
|
||||
|
||||
auto const curr_main_uuid = self->GetMainUUID();
|
||||
if (curr_main_uuid == repl_instance_uuid.value()) {
|
||||
if (!repl_instance.EnableWritingOnMain()) {
|
||||
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
repl_instance.OnSuccessPing();
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO(antoniof) make demoteToReplica idempotent since main can be demoted to replica but
|
||||
// swapUUID can fail
|
||||
if (repl_instance.DemoteToReplica(self->replica_succ_cb_, self->replica_fail_cb_)) {
|
||||
repl_instance.OnSuccessPing();
|
||||
spdlog::info("Instance {} demoted to replica", repl_instance_name);
|
||||
} else {
|
||||
spdlog::error("Instance {} failed to become replica", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
|
||||
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
main_fail_cb_ = [find_repl_instance](CoordinatorInstance *self, std::string_view repl_instance_name) -> void {
|
||||
auto lock = std::lock_guard{self->coord_instance_lock_};
|
||||
spdlog::trace("Instance {} performing main failure callback", repl_instance_name);
|
||||
auto &repl_instance = find_repl_instance(self, repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
|
||||
|
||||
if (!repl_instance.IsAlive() && self->GetMainUUID() == repl_instance_uuid.value()) {
|
||||
spdlog::info("Cluster without main instance, trying automatic failover");
|
||||
self->TryFailover(); // TODO: (andi) Initiate failover
|
||||
}
|
||||
};
|
||||
MG_ASSERT(repl_instance != repl_instances_.end(), "Instance {} not found during callback!",
|
||||
replication_instance_name);
|
||||
return *repl_instance;
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::ShowInstances() const -> std::vector<InstanceStatus> {
|
||||
@ -166,8 +100,36 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: Smarter choice
|
||||
auto new_main = ranges::begin(alive_replicas);
|
||||
// for each DB in instance we get one DatabaseHistory
|
||||
using DatabaseHistories = replication_coordination_glue::DatabaseHistories;
|
||||
std::vector<std::pair<std::string, DatabaseHistories>> instance_database_histories;
|
||||
|
||||
bool success{true};
|
||||
std::ranges::for_each(alive_replicas, [&success, &instance_database_histories](ReplicationInstance &replica) {
|
||||
if (!success) {
|
||||
return;
|
||||
}
|
||||
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
|
||||
if (res.HasError()) {
|
||||
spdlog::error("Could get per db history data for instance {}", replica.InstanceName());
|
||||
success = false;
|
||||
return;
|
||||
}
|
||||
instance_database_histories.emplace_back(replica.InstanceName(), std::move(res.GetValue()));
|
||||
});
|
||||
|
||||
if (!success) {
|
||||
spdlog::error("Aborting failover as at least one instance didn't provide per database history.");
|
||||
return;
|
||||
}
|
||||
|
||||
auto [most_up_to_date_instance, latest_epoch, latest_commit_timestamp] =
|
||||
ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
spdlog::trace("The most up to date instance is {} with epoch {} and {} latest commit timestamp",
|
||||
most_up_to_date_instance, *latest_epoch, *latest_commit_timestamp);
|
||||
|
||||
auto *new_main = &FindReplicationInstance(most_up_to_date_instance);
|
||||
|
||||
new_main->PauseFrequentCheck();
|
||||
utils::OnScopeExit scope_exit{[&new_main] { new_main->ResumeFrequentCheck(); }};
|
||||
@ -191,7 +153,8 @@ auto CoordinatorInstance::TryFailover() -> void {
|
||||
ranges::views::transform(&ReplicationInstance::ReplicationClientInfo) |
|
||||
ranges::to<ReplicationClientsInfo>();
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
&CoordinatorInstance::MainFailCallback)) {
|
||||
spdlog::warn("Failover failed since promoting replica to main failed!");
|
||||
return;
|
||||
}
|
||||
@ -242,7 +205,8 @@ auto CoordinatorInstance::SetReplicationInstanceToMain(std::string instance_name
|
||||
std::ranges::transform(repl_instances_ | ranges::views::filter(is_not_new_main),
|
||||
std::back_inserter(repl_clients_info), &ReplicationInstance::ReplicationClientInfo);
|
||||
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), main_succ_cb_, main_fail_cb_)) {
|
||||
if (!new_main->PromoteToMain(new_main_uuid, std::move(repl_clients_info), &CoordinatorInstance::MainSuccessCallback,
|
||||
&CoordinatorInstance::MainFailCallback)) {
|
||||
return SetInstanceToMainCoordinatorStatus::COULD_NOT_PROMOTE_TO_MAIN;
|
||||
}
|
||||
|
||||
@ -290,7 +254,9 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
|
||||
|
||||
spdlog::info("Request for registering instance {} accepted", instance_name);
|
||||
try {
|
||||
repl_instances_.emplace_back(this, std::move(config), replica_succ_cb_, replica_fail_cb_);
|
||||
repl_instances_.emplace_back(this, std::move(config), client_succ_cb_, client_fail_cb_,
|
||||
&CoordinatorInstance::ReplicaSuccessCallback,
|
||||
&CoordinatorInstance::ReplicaFailCallback);
|
||||
} catch (CoordinatorRegisterInstanceException const &) {
|
||||
return RegisterInstanceCoordinatorStatus::RPC_FAILED;
|
||||
}
|
||||
@ -304,6 +270,85 @@ auto CoordinatorInstance::RegisterReplicationInstance(CoordinatorClientConfig co
|
||||
return RegisterInstanceCoordinatorStatus::SUCCESS;
|
||||
}
|
||||
|
||||
void CoordinatorInstance::MainFailCallback(std::string_view repl_instance_name) {
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set");
|
||||
|
||||
if (!repl_instance.IsAlive() && GetMainUUID() == repl_instance_uuid.value()) {
|
||||
spdlog::info("Cluster without main instance, trying automatic failover");
|
||||
TryFailover();
|
||||
}
|
||||
}
|
||||
|
||||
void CoordinatorInstance::MainSuccessCallback(std::string_view repl_instance_name) {
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
spdlog::trace("Instance {} performing main successful callback", repl_instance_name);
|
||||
|
||||
if (repl_instance.IsAlive()) {
|
||||
repl_instance.OnSuccessPing();
|
||||
return;
|
||||
}
|
||||
|
||||
const auto &repl_instance_uuid = repl_instance.GetMainUUID();
|
||||
MG_ASSERT(repl_instance_uuid.has_value(), "Instance must have uuid set.");
|
||||
|
||||
auto const curr_main_uuid = GetMainUUID();
|
||||
if (curr_main_uuid == repl_instance_uuid.value()) {
|
||||
if (!repl_instance.EnableWritingOnMain()) {
|
||||
spdlog::error("Failed to enable writing on main instance {}", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
repl_instance.OnSuccessPing();
|
||||
return;
|
||||
}
|
||||
|
||||
if (repl_instance.DemoteToReplica(&CoordinatorInstance::ReplicaSuccessCallback,
|
||||
&CoordinatorInstance::ReplicaFailCallback)) {
|
||||
repl_instance.OnSuccessPing();
|
||||
spdlog::info("Instance {} demoted to replica", repl_instance_name);
|
||||
} else {
|
||||
spdlog::error("Instance {} failed to become replica", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!repl_instance.SendSwapAndUpdateUUID(curr_main_uuid)) {
|
||||
spdlog::error(fmt::format("Failed to swap uuid for demoted main instance {}", repl_instance.InstanceName()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void CoordinatorInstance::ReplicaSuccessCallback(std::string_view repl_instance_name) {
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
if (!repl_instance.IsReplica()) {
|
||||
spdlog::error("Aborting replica callback since instance {} is not replica anymore", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Instance {} performing replica successful callback", repl_instance_name);
|
||||
// We need to get replicas UUID from time to time to ensure replica is listening to correct main
|
||||
// and that it didn't go down for less time than we could notice
|
||||
// We need to get id of main replica is listening to
|
||||
// and swap if necessary
|
||||
if (!repl_instance.EnsureReplicaHasCorrectMainUUID(GetMainUUID())) {
|
||||
spdlog::error("Failed to swap uuid for replica instance {} which is alive", repl_instance.InstanceName());
|
||||
return;
|
||||
}
|
||||
|
||||
repl_instance.OnSuccessPing();
|
||||
}
|
||||
|
||||
void CoordinatorInstance::ReplicaFailCallback(std::string_view repl_instance_name) {
|
||||
auto &repl_instance = FindReplicationInstance(repl_instance_name);
|
||||
if (!repl_instance.IsReplica()) {
|
||||
spdlog::error("Aborting replica fail callback since instance {} is not replica anymore", repl_instance_name);
|
||||
return;
|
||||
}
|
||||
spdlog::trace("Instance {} performing replica failure callback", repl_instance_name);
|
||||
repl_instance.OnFailPing();
|
||||
}
|
||||
|
||||
auto CoordinatorInstance::UnregisterReplicationInstance(std::string instance_name)
|
||||
-> UnregisterInstanceCoordinatorStatus {
|
||||
auto lock = std::lock_guard{coord_instance_lock_};
|
||||
@ -343,5 +388,82 @@ auto CoordinatorInstance::GetMainUUID() const -> utils::UUID { return main_uuid_
|
||||
// TODO: (andi) Add to the RAFT log.
|
||||
auto CoordinatorInstance::SetMainUUID(utils::UUID new_uuid) -> void { main_uuid_ = new_uuid; }
|
||||
|
||||
auto CoordinatorInstance::ChooseMostUpToDateInstance(
|
||||
const std::vector<std::pair<std::string, replication_coordination_glue::DatabaseHistories>>
|
||||
&instance_database_histories) -> NewMainRes {
|
||||
NewMainRes new_main_res;
|
||||
std::for_each(
|
||||
instance_database_histories.begin(), instance_database_histories.end(),
|
||||
[&new_main_res](const InstanceNameDbHistories &instance_res_pair) {
|
||||
const auto &[instance_name, instance_db_histories] = instance_res_pair;
|
||||
|
||||
// Find default db for instance and its history
|
||||
auto default_db_history_data = std::ranges::find_if(
|
||||
instance_db_histories, [default_db = memgraph::dbms::kDefaultDB](
|
||||
const replication_coordination_glue::DatabaseHistory &db_timestamps) {
|
||||
return db_timestamps.name == default_db;
|
||||
});
|
||||
|
||||
std::ranges::for_each(
|
||||
instance_db_histories,
|
||||
[&instance_name = instance_name](const replication_coordination_glue::DatabaseHistory &db_history) {
|
||||
spdlog::trace("Instance {}: name {}, default db {}", instance_name, db_history.name,
|
||||
memgraph::dbms::kDefaultDB);
|
||||
});
|
||||
|
||||
MG_ASSERT(default_db_history_data != instance_db_histories.end(), "No history for instance");
|
||||
|
||||
const auto &instance_default_db_history = default_db_history_data->history;
|
||||
|
||||
std::ranges::for_each(instance_default_db_history | ranges::views::reverse,
|
||||
[&instance_name = instance_name](const auto &epoch_history_it) {
|
||||
spdlog::trace("Instance {}: epoch {}, last_commit_timestamp: {}", instance_name,
|
||||
std::get<0>(epoch_history_it), std::get<1>(epoch_history_it));
|
||||
});
|
||||
|
||||
// get latest epoch
|
||||
// get latest timestamp
|
||||
|
||||
if (!new_main_res.latest_epoch) {
|
||||
const auto &[epoch, timestamp] = *instance_default_db_history.crbegin();
|
||||
new_main_res = NewMainRes{
|
||||
.most_up_to_date_instance = instance_name,
|
||||
.latest_epoch = epoch,
|
||||
.latest_commit_timestamp = timestamp,
|
||||
};
|
||||
spdlog::trace("Currently the most up to date instance is {} with epoch {} and {} latest commit timestamp",
|
||||
instance_name, epoch, timestamp);
|
||||
return;
|
||||
}
|
||||
|
||||
bool found_same_point{false};
|
||||
std::string last_most_up_to_date_epoch{*new_main_res.latest_epoch};
|
||||
for (auto [epoch, timestamp] : ranges::reverse_view(instance_default_db_history)) {
|
||||
if (*new_main_res.latest_commit_timestamp < timestamp) {
|
||||
new_main_res = NewMainRes{
|
||||
.most_up_to_date_instance = instance_name,
|
||||
.latest_epoch = epoch,
|
||||
.latest_commit_timestamp = timestamp,
|
||||
};
|
||||
|
||||
spdlog::trace("Found the new most up to date instance {} with epoch {} and {} latest commit timestamp",
|
||||
instance_name, epoch, timestamp);
|
||||
}
|
||||
|
||||
// we found point at which they were same
|
||||
if (epoch == last_most_up_to_date_epoch) {
|
||||
found_same_point = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found_same_point) {
|
||||
spdlog::error("Didn't find same history epoch {} for instance {} and instance {}", last_most_up_to_date_epoch,
|
||||
new_main_res.most_up_to_date_instance, instance_name);
|
||||
}
|
||||
});
|
||||
|
||||
return new_main_res;
|
||||
}
|
||||
} // namespace memgraph::coordination
|
||||
#endif
|
||||
|
@ -76,9 +76,9 @@ void EnableWritingOnMainRes::Load(EnableWritingOnMainRes *self, memgraph::slk::R
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const &self, memgraph::slk::Builder *builder) {}
|
||||
void EnableWritingOnMainReq::Save(EnableWritingOnMainReq const & /*self*/, memgraph::slk::Builder * /*builder*/) {}
|
||||
|
||||
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq *self, memgraph::slk::Reader *reader) {}
|
||||
void EnableWritingOnMainReq::Load(EnableWritingOnMainReq * /*self*/, memgraph::slk::Reader * /*reader*/) {}
|
||||
|
||||
// GetInstanceUUID
|
||||
void GetInstanceUUIDReq::Save(const GetInstanceUUIDReq &self, memgraph::slk::Builder *builder) {
|
||||
@ -97,6 +97,24 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
// GetDatabaseHistoriesRpc
|
||||
|
||||
void GetDatabaseHistoriesReq::Save(const GetDatabaseHistoriesReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
|
||||
/* nothing to serialize */
|
||||
}
|
||||
|
||||
void GetDatabaseHistoriesReq::Load(GetDatabaseHistoriesReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
|
||||
/* nothing to serialize */
|
||||
}
|
||||
|
||||
void GetDatabaseHistoriesRes::Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self, builder);
|
||||
}
|
||||
|
||||
void GetDatabaseHistoriesRes::Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(self, reader);
|
||||
}
|
||||
|
||||
} // namespace coordination
|
||||
|
||||
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
|
||||
@ -130,6 +148,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId:
|
||||
constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
|
||||
nullptr};
|
||||
|
||||
constexpr utils::TypeInfo coordination::GetDatabaseHistoriesReq::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_REQ,
|
||||
"GetInstanceDatabasesReq", nullptr};
|
||||
|
||||
constexpr utils::TypeInfo coordination::GetDatabaseHistoriesRes::kType{utils::TypeId::COORD_GET_INSTANCE_DATABASES_RES,
|
||||
"GetInstanceDatabasesRes", nullptr};
|
||||
|
||||
namespace slk {
|
||||
|
||||
// PromoteReplicaToMainRpc
|
||||
@ -213,6 +237,16 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade
|
||||
memgraph::slk::Load(&self->uuid, reader);
|
||||
}
|
||||
|
||||
// GetInstanceTimestampsReq
|
||||
|
||||
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder) {
|
||||
memgraph::slk::Save(self.database_histories, builder);
|
||||
}
|
||||
|
||||
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader) {
|
||||
memgraph::slk::Load(&self->database_histories, reader);
|
||||
}
|
||||
|
||||
} // namespace slk
|
||||
|
||||
} // namespace memgraph
|
||||
|
@ -14,6 +14,7 @@
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "rpc/client.hpp"
|
||||
#include "rpc_errors.hpp"
|
||||
#include "utils/result.hpp"
|
||||
@ -23,13 +24,13 @@
|
||||
namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorInstance;
|
||||
using HealthCheckCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
|
||||
using HealthCheckClientCallback = std::function<void(CoordinatorInstance *, std::string_view)>;
|
||||
using ReplicationClientsInfo = std::vector<ReplClientInfo>;
|
||||
|
||||
class CoordinatorClient {
|
||||
public:
|
||||
explicit CoordinatorClient(CoordinatorInstance *coord_instance, CoordinatorClientConfig config,
|
||||
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb);
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb);
|
||||
|
||||
~CoordinatorClient() = default;
|
||||
|
||||
@ -62,7 +63,8 @@ class CoordinatorClient {
|
||||
|
||||
auto ReplicationClientInfo() const -> ReplClientInfo;
|
||||
|
||||
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
|
||||
auto SendGetInstanceTimestampsRpc() const
|
||||
-> utils::BasicResult<GetInstanceUUIDError, replication_coordination_glue::DatabaseHistories>;
|
||||
|
||||
auto RpcClient() -> rpc::Client & { return rpc_client_; }
|
||||
|
||||
@ -82,8 +84,8 @@ class CoordinatorClient {
|
||||
|
||||
CoordinatorClientConfig config_;
|
||||
CoordinatorInstance *coord_instance_;
|
||||
HealthCheckCallback succ_cb_;
|
||||
HealthCheckCallback fail_cb_;
|
||||
HealthCheckClientCallback succ_cb_;
|
||||
HealthCheckClientCallback fail_cb_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -41,6 +41,9 @@ class CoordinatorHandlers {
|
||||
|
||||
static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
|
||||
static void GetDatabaseHistoriesHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
|
||||
slk::Builder *res_builder);
|
||||
};
|
||||
|
||||
} // namespace memgraph::dbms
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "coordination/raft_state.hpp"
|
||||
#include "coordination/register_main_replica_coordinator_status.hpp"
|
||||
#include "coordination/replication_instance.hpp"
|
||||
#include "utils/resource_lock.hpp"
|
||||
#include "utils/rw_lock.hpp"
|
||||
#include "utils/thread_pool.hpp"
|
||||
|
||||
@ -25,6 +26,13 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
struct NewMainRes {
|
||||
std::string most_up_to_date_instance;
|
||||
std::optional<std::string> latest_epoch;
|
||||
std::optional<uint64_t> latest_commit_timestamp;
|
||||
};
|
||||
using InstanceNameDbHistories = std::pair<std::string, replication_coordination_glue::DatabaseHistories>;
|
||||
|
||||
class CoordinatorInstance {
|
||||
public:
|
||||
CoordinatorInstance();
|
||||
@ -44,12 +52,24 @@ class CoordinatorInstance {
|
||||
|
||||
auto SetMainUUID(utils::UUID new_uuid) -> void;
|
||||
|
||||
auto FindReplicationInstance(std::string_view replication_instance_name) -> ReplicationInstance &;
|
||||
|
||||
void MainFailCallback(std::string_view);
|
||||
|
||||
void MainSuccessCallback(std::string_view);
|
||||
|
||||
void ReplicaSuccessCallback(std::string_view);
|
||||
|
||||
void ReplicaFailCallback(std::string_view);
|
||||
|
||||
static auto ChooseMostUpToDateInstance(const std::vector<InstanceNameDbHistories> &) -> NewMainRes;
|
||||
|
||||
private:
|
||||
HealthCheckCallback main_succ_cb_, main_fail_cb_, replica_succ_cb_, replica_fail_cb_;
|
||||
HealthCheckClientCallback client_succ_cb_, client_fail_cb_;
|
||||
|
||||
// NOTE: Must be std::list because we rely on pointer stability
|
||||
std::list<ReplicationInstance> repl_instances_;
|
||||
mutable utils::RWLock coord_instance_lock_{utils::RWLock::Priority::READ};
|
||||
mutable utils::ResourceLock coord_instance_lock_{};
|
||||
|
||||
utils::UUID main_uuid_;
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "rpc/messages.hpp"
|
||||
#include "slk/serialization.hpp"
|
||||
|
||||
@ -161,6 +162,32 @@ struct GetInstanceUUIDRes {
|
||||
|
||||
using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
|
||||
|
||||
struct GetDatabaseHistoriesReq {
|
||||
static const utils::TypeInfo kType;
|
||||
static const utils::TypeInfo &GetTypeInfo() { return kType; }
|
||||
|
||||
static void Load(GetDatabaseHistoriesReq *self, memgraph::slk::Reader *reader);
|
||||
static void Save(const GetDatabaseHistoriesReq &self, memgraph::slk::Builder *builder);
|
||||
|
||||
GetDatabaseHistoriesReq() = default;
|
||||
};
|
||||
|
||||
struct GetDatabaseHistoriesRes {
|
||||
static const utils::TypeInfo kType;
|
||||
static const utils::TypeInfo &GetTypeInfo() { return kType; }
|
||||
|
||||
static void Load(GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
|
||||
static void Save(const GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
|
||||
|
||||
explicit GetDatabaseHistoriesRes(const replication_coordination_glue::DatabaseHistories &database_histories)
|
||||
: database_histories(database_histories) {}
|
||||
GetDatabaseHistoriesRes() = default;
|
||||
|
||||
replication_coordination_glue::DatabaseHistories database_histories;
|
||||
};
|
||||
|
||||
using GetDatabaseHistoriesRpc = rpc::RequestResponse<GetDatabaseHistoriesReq, GetDatabaseHistoriesRes>;
|
||||
|
||||
} // namespace memgraph::coordination
|
||||
|
||||
// SLK serialization declarations
|
||||
@ -183,15 +210,21 @@ void Save(const memgraph::coordination::GetInstanceUUIDReq &self, memgraph::slk:
|
||||
void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reader *reader);
|
||||
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// UnregisterReplicaRpc
|
||||
void Save(memgraph::coordination::UnregisterReplicaRes const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::UnregisterReplicaRes *self, memgraph::slk::Reader *reader);
|
||||
void Save(memgraph::coordination::UnregisterReplicaReq const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::UnregisterReplicaReq *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// EnableWritingOnMainRpc
|
||||
void Save(memgraph::coordination::EnableWritingOnMainRes const &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::EnableWritingOnMainRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
// GetDatabaseHistoriesRpc
|
||||
void Save(const memgraph::coordination::GetDatabaseHistoriesRes &self, memgraph::slk::Builder *builder);
|
||||
void Load(memgraph::coordination::GetDatabaseHistoriesRes *self, memgraph::slk::Reader *reader);
|
||||
|
||||
} // namespace memgraph::slk
|
||||
|
||||
#endif
|
||||
|
@ -14,6 +14,7 @@
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
||||
#include "coordination/coordinator_config.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "slk/serialization.hpp"
|
||||
#include "slk/streams.hpp"
|
||||
|
||||
@ -34,5 +35,18 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) {
|
||||
Load(&obj->replication_ip_address, reader);
|
||||
Load(&obj->replication_port, reader);
|
||||
}
|
||||
|
||||
inline void Save(const replication_coordination_glue::DatabaseHistory &obj, Builder *builder) {
|
||||
Save(obj.db_uuid, builder);
|
||||
Save(obj.history, builder);
|
||||
Save(obj.name, builder);
|
||||
}
|
||||
|
||||
inline void Load(replication_coordination_glue::DatabaseHistory *obj, Reader *reader) {
|
||||
Load(&obj->db_uuid, reader);
|
||||
Load(&obj->history, reader);
|
||||
Load(&obj->name, reader);
|
||||
}
|
||||
|
||||
} // namespace memgraph::slk
|
||||
#endif
|
||||
|
@ -18,17 +18,22 @@
|
||||
#include "replication_coordination_glue/role.hpp"
|
||||
|
||||
#include <libnuraft/nuraft.hxx>
|
||||
#include "utils/resource_lock.hpp"
|
||||
#include "utils/result.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
class CoordinatorInstance;
|
||||
class ReplicationInstance;
|
||||
|
||||
using HealthCheckInstanceCallback = void (CoordinatorInstance::*)(std::string_view);
|
||||
|
||||
class ReplicationInstance {
|
||||
public:
|
||||
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckCallback succ_cb,
|
||||
HealthCheckCallback fail_cb);
|
||||
ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config, HealthCheckClientCallback succ_cb,
|
||||
HealthCheckClientCallback fail_cb, HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb);
|
||||
|
||||
ReplicationInstance(ReplicationInstance const &other) = delete;
|
||||
ReplicationInstance &operator=(ReplicationInstance const &other) = delete;
|
||||
@ -50,9 +55,10 @@ class ReplicationInstance {
|
||||
auto IsReplica() const -> bool;
|
||||
auto IsMain() const -> bool;
|
||||
|
||||
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info, HealthCheckCallback main_succ_cb,
|
||||
HealthCheckCallback main_fail_cb) -> bool;
|
||||
auto DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb) -> bool;
|
||||
auto PromoteToMain(utils::UUID uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckInstanceCallback main_succ_cb, HealthCheckInstanceCallback main_fail_cb) -> bool;
|
||||
auto DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb, HealthCheckInstanceCallback replica_fail_cb)
|
||||
-> bool;
|
||||
|
||||
auto StartFrequentCheck() -> void;
|
||||
auto StopFrequentCheck() -> void;
|
||||
@ -66,16 +72,17 @@ class ReplicationInstance {
|
||||
auto SendSwapAndUpdateUUID(const utils::UUID &new_main_uuid) -> bool;
|
||||
auto SendUnregisterReplicaRpc(std::string const &instance_name) -> bool;
|
||||
|
||||
|
||||
auto SendGetInstanceUUID() -> utils::BasicResult<coordination::GetInstanceUUIDError, std::optional<utils::UUID>>;
|
||||
auto GetClient() -> CoordinatorClient &;
|
||||
|
||||
auto EnableWritingOnMain() -> bool;
|
||||
|
||||
auto SetNewMainUUID(utils::UUID const &main_uuid) -> void;
|
||||
auto ResetMainUUID() -> void;
|
||||
auto GetMainUUID() const -> const std::optional<utils::UUID> &;
|
||||
|
||||
auto GetSuccessCallback() -> HealthCheckInstanceCallback &;
|
||||
auto GetFailCallback() -> HealthCheckInstanceCallback &;
|
||||
|
||||
private:
|
||||
CoordinatorClient client_;
|
||||
replication_coordination_glue::ReplicationRole replication_role_;
|
||||
@ -90,6 +97,9 @@ class ReplicationInstance {
|
||||
// so we need to send swap uuid again
|
||||
std::optional<utils::UUID> main_uuid_;
|
||||
|
||||
HealthCheckInstanceCallback succ_cb_;
|
||||
HealthCheckInstanceCallback fail_cb_;
|
||||
|
||||
friend bool operator==(ReplicationInstance const &first, ReplicationInstance const &second) {
|
||||
return first.client_ == second.client_ && first.replication_role_ == second.replication_role_;
|
||||
}
|
||||
|
@ -11,4 +11,5 @@
|
||||
|
||||
namespace memgraph::coordination {
|
||||
enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
|
||||
enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION };
|
||||
} // namespace memgraph::coordination
|
||||
|
@ -13,15 +13,21 @@
|
||||
|
||||
#include "coordination/replication_instance.hpp"
|
||||
|
||||
#include <utility>
|
||||
|
||||
#include "replication_coordination_glue/handler.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
namespace memgraph::coordination {
|
||||
|
||||
ReplicationInstance::ReplicationInstance(CoordinatorInstance *peer, CoordinatorClientConfig config,
|
||||
HealthCheckCallback succ_cb, HealthCheckCallback fail_cb)
|
||||
HealthCheckClientCallback succ_cb, HealthCheckClientCallback fail_cb,
|
||||
HealthCheckInstanceCallback succ_instance_cb,
|
||||
HealthCheckInstanceCallback fail_instance_cb)
|
||||
: client_(peer, std::move(config), std::move(succ_cb), std::move(fail_cb)),
|
||||
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA) {
|
||||
replication_role_(replication_coordination_glue::ReplicationRole::REPLICA),
|
||||
succ_cb_(succ_instance_cb),
|
||||
fail_cb_(fail_instance_cb) {
|
||||
if (!client_.DemoteToReplica()) {
|
||||
throw CoordinatorRegisterInstanceException("Failed to demote instance {} to replica", client_.InstanceName());
|
||||
}
|
||||
@ -57,26 +63,29 @@ auto ReplicationInstance::IsMain() const -> bool {
|
||||
}
|
||||
|
||||
auto ReplicationInstance::PromoteToMain(utils::UUID new_uuid, ReplicationClientsInfo repl_clients_info,
|
||||
HealthCheckCallback main_succ_cb, HealthCheckCallback main_fail_cb) -> bool {
|
||||
HealthCheckInstanceCallback main_succ_cb,
|
||||
HealthCheckInstanceCallback main_fail_cb) -> bool {
|
||||
if (!client_.SendPromoteReplicaToMainRpc(new_uuid, std::move(repl_clients_info))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
replication_role_ = replication_coordination_glue::ReplicationRole::MAIN;
|
||||
main_uuid_ = new_uuid;
|
||||
client_.SetCallbacks(std::move(main_succ_cb), std::move(main_fail_cb));
|
||||
succ_cb_ = main_succ_cb;
|
||||
fail_cb_ = main_fail_cb;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto ReplicationInstance::DemoteToReplica(HealthCheckCallback replica_succ_cb, HealthCheckCallback replica_fail_cb)
|
||||
-> bool {
|
||||
auto ReplicationInstance::DemoteToReplica(HealthCheckInstanceCallback replica_succ_cb,
|
||||
HealthCheckInstanceCallback replica_fail_cb) -> bool {
|
||||
if (!client_.DemoteToReplica()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
replication_role_ = replication_coordination_glue::ReplicationRole::REPLICA;
|
||||
client_.SetCallbacks(std::move(replica_succ_cb), std::move(replica_fail_cb));
|
||||
succ_cb_ = replica_succ_cb;
|
||||
fail_cb_ = replica_fail_cb;
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -90,10 +99,12 @@ auto ReplicationInstance::ReplicationClientInfo() const -> CoordinatorClientConf
|
||||
return client_.ReplicationClientInfo();
|
||||
}
|
||||
|
||||
auto ReplicationInstance::GetSuccessCallback() -> HealthCheckInstanceCallback & { return succ_cb_; }
|
||||
auto ReplicationInstance::GetFailCallback() -> HealthCheckInstanceCallback & { return fail_cb_; }
|
||||
|
||||
auto ReplicationInstance::GetClient() -> CoordinatorClient & { return client_; }
|
||||
|
||||
auto ReplicationInstance::SetNewMainUUID(utils::UUID const &main_uuid) -> void { main_uuid_ = main_uuid; }
|
||||
auto ReplicationInstance::ResetMainUUID() -> void { main_uuid_ = std::nullopt; }
|
||||
auto ReplicationInstance::GetMainUUID() const -> std::optional<utils::UUID> const & { return main_uuid_; }
|
||||
|
||||
auto ReplicationInstance::EnsureReplicaHasCorrectMainUUID(utils::UUID const &curr_main_uuid) -> bool {
|
||||
|
@ -266,10 +266,6 @@ class DbmsHandler {
|
||||
bool IsMain() const { return repl_state_.IsMain(); }
|
||||
bool IsReplica() const { return repl_state_.IsReplica(); }
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
// coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Return all active databases.
|
||||
*
|
||||
|
@ -122,11 +122,11 @@ static bool my_commit(extent_hooks_t *extent_hooks, void *addr, size_t size, siz
|
||||
|
||||
[[maybe_unused]] auto blocker = memgraph::utils::MemoryTracker::OutOfMemoryExceptionBlocker{};
|
||||
if (GetQueriesMemoryControl().IsThreadTracked()) [[unlikely]] {
|
||||
bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(length);
|
||||
[[maybe_unused]] bool ok = GetQueriesMemoryControl().TrackAllocOnCurrentThread(length);
|
||||
DMG_ASSERT(ok);
|
||||
}
|
||||
|
||||
auto ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
|
||||
[[maybe_unused]] auto ok = memgraph::utils::total_memory_tracker.Alloc(static_cast<int64_t>(length));
|
||||
DMG_ASSERT(ok);
|
||||
|
||||
return false;
|
||||
|
@ -416,7 +416,7 @@ memgraph::storage::PropertyValue StringToValue(const std::string &str, const std
|
||||
std::string GetIdSpace(const std::string &type) {
|
||||
// The format of this field is as follows:
|
||||
// [START_|END_]ID[(<id_space>)]
|
||||
std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)", std::regex::extended);
|
||||
static std::regex format(R"(^(START_|END_)?ID(\(([^\(\)]+)\))?$)", std::regex::extended);
|
||||
std::smatch res;
|
||||
if (!std::regex_match(type, res, format))
|
||||
throw LoadException(
|
||||
|
@ -3907,7 +3907,7 @@ void PrintFuncSignature(const mgp_func &func, std::ostream &stream) {
|
||||
|
||||
bool IsValidIdentifierName(const char *name) {
|
||||
if (!name) return false;
|
||||
std::regex regex("[_[:alpha:]][_[:alnum:]]*");
|
||||
static std::regex regex("[_[:alpha:]][_[:alnum:]]*");
|
||||
return std::regex_match(name, regex);
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue
|
||||
mode.hpp
|
||||
role.hpp
|
||||
handler.hpp
|
||||
common.hpp
|
||||
|
||||
PRIVATE
|
||||
messages.cpp
|
||||
|
32
src/replication_coordination_glue/common.hpp
Normal file
32
src/replication_coordination_glue/common.hpp
Normal file
@ -0,0 +1,32 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "rpc/client.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
#include <deque>
|
||||
#include "messages.hpp"
|
||||
#include "rpc/messages.hpp"
|
||||
#include "utils/uuid.hpp"
|
||||
|
||||
namespace memgraph::replication_coordination_glue {
|
||||
|
||||
struct DatabaseHistory {
|
||||
memgraph::utils::UUID db_uuid;
|
||||
std::vector<std::pair<std::string, uint64_t>> history;
|
||||
std::string name;
|
||||
};
|
||||
|
||||
using DatabaseHistories = std::vector<DatabaseHistory>;
|
||||
|
||||
} // namespace memgraph::replication_coordination_glue
|
@ -14,6 +14,7 @@
|
||||
#include "dbms/dbms_handler.hpp"
|
||||
#include "flags/experimental.hpp"
|
||||
#include "replication/include/replication/state.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "replication_handler/system_replication.hpp"
|
||||
#include "replication_handler/system_rpc.hpp"
|
||||
#include "utils/result.hpp"
|
||||
@ -149,6 +150,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
|
||||
|
||||
auto GetReplicaUUID() -> std::optional<utils::UUID>;
|
||||
|
||||
auto GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories;
|
||||
|
||||
private:
|
||||
template <bool SendSwapUUID>
|
||||
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config)
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include "dbms/dbms_handler.hpp"
|
||||
#include "replication/replication_client.hpp"
|
||||
#include "replication_handler/system_replication.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
|
||||
namespace memgraph::replication {
|
||||
|
||||
@ -265,8 +266,26 @@ auto ReplicationHandler::GetRole() const -> replication_coordination_glue::Repli
|
||||
return repl_state_.GetRole();
|
||||
}
|
||||
|
||||
auto ReplicationHandler::GetDatabasesHistories() -> replication_coordination_glue::DatabaseHistories {
|
||||
replication_coordination_glue::DatabaseHistories results;
|
||||
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
|
||||
auto &repl_storage_state = db_acc->storage()->repl_storage_state_;
|
||||
|
||||
std::vector<std::pair<std::string, uint64_t>> history =
|
||||
utils::fmap([](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); },
|
||||
repl_storage_state.history);
|
||||
|
||||
history.emplace_back(std::string(repl_storage_state.epoch_.id()), repl_storage_state.last_commit_timestamp_.load());
|
||||
replication_coordination_glue::DatabaseHistory repl{
|
||||
.db_uuid = utils::UUID{db_acc->storage()->uuid()}, .history = history, .name = std::string(db_acc->name())};
|
||||
results.emplace_back(repl);
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
|
||||
MG_ASSERT(repl_state_.IsReplica());
|
||||
MG_ASSERT(repl_state_.IsReplica(), "Instance is not replica");
|
||||
return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;
|
||||
}
|
||||
|
||||
|
@ -123,6 +123,26 @@ inline bool operator==(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer
|
||||
|
||||
inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer &b) { return !(a == b); }
|
||||
|
||||
struct opt_str {
|
||||
opt_str(std::optional<std::string> const &other) : str_{other ? new_cstr(*other) : nullptr} {}
|
||||
|
||||
~opt_str() { delete[] str_; }
|
||||
|
||||
auto as_opt_str() const -> std::optional<std::string> {
|
||||
if (!str_) return std::nullopt;
|
||||
return std::optional<std::string>{std::in_place, str_};
|
||||
}
|
||||
|
||||
private:
|
||||
static auto new_cstr(std::string const &str) -> char const * {
|
||||
auto *mem = new char[str.length() + 1];
|
||||
strcpy(mem, str.c_str());
|
||||
return mem;
|
||||
}
|
||||
|
||||
char const *str_ = nullptr;
|
||||
};
|
||||
|
||||
struct Delta {
|
||||
enum class Action : std::uint8_t {
|
||||
/// Use for Vertex and Edge
|
||||
@ -160,7 +180,7 @@ struct Delta {
|
||||
// Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from
|
||||
// current tx. This timestamp we got from RocksDB timestamp stored in key.
|
||||
Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, std::optional<std::string> old_disk_key)
|
||||
: timestamp(new std::atomic<uint64_t>(ts)), command_id(0), old_disk_key{.value = std::move(old_disk_key)} {}
|
||||
: timestamp(new std::atomic<uint64_t>(ts)), command_id(0), old_disk_key{.value = old_disk_key} {}
|
||||
|
||||
Delta(DeleteObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: timestamp(timestamp), command_id(command_id), action(Action::DELETE_OBJECT) {}
|
||||
@ -222,7 +242,7 @@ struct Delta {
|
||||
case Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
case Action::DELETE_DESERIALIZED_OBJECT:
|
||||
old_disk_key.value.reset();
|
||||
std::destroy_at(&old_disk_key.value);
|
||||
delete timestamp;
|
||||
timestamp = nullptr;
|
||||
break;
|
||||
@ -242,7 +262,7 @@ struct Delta {
|
||||
Action action;
|
||||
struct {
|
||||
Action action = Action::DELETE_DESERIALIZED_OBJECT;
|
||||
std::optional<std::string> value;
|
||||
opt_str value;
|
||||
} old_disk_key;
|
||||
struct {
|
||||
Action action;
|
||||
|
@ -106,8 +106,8 @@ uint64_t ReplicateCurrentWal(const utils::UUID &main_uuid, const InMemoryStorage
|
||||
return response.current_commit_timestamp;
|
||||
}
|
||||
|
||||
/// This method tries to find the optimal path for recoverying a single replica.
|
||||
/// Based on the last commit transfered to replica it tries to update the
|
||||
/// This method tries to find the optimal path for recovering a single replica.
|
||||
/// Based on the last commit transferred to replica it tries to update the
|
||||
/// replica using durability files - WALs and Snapshots. WAL files are much
|
||||
/// smaller in size as they contain only the Deltas (changes) made during the
|
||||
/// transactions while Snapshots contain all the data. For that reason we prefer
|
||||
@ -175,7 +175,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
|
||||
auto add_snapshot = [&]() {
|
||||
if (!latest_snapshot) return;
|
||||
const auto lock_success = locker_acc.AddPath(latest_snapshot->path);
|
||||
MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path.");
|
||||
MG_ASSERT(!lock_success.HasError(), "Tried to lock a non-existent snapshot path.");
|
||||
recovery_steps.emplace_back(std::in_place_type_t<RecoverySnapshot>{}, std::move(latest_snapshot->path));
|
||||
};
|
||||
|
||||
|
@ -53,11 +53,13 @@ void ReplicationStorageClient::UpdateReplicaState(Storage *storage, DatabaseAcce
|
||||
#endif
|
||||
|
||||
std::optional<uint64_t> branching_point;
|
||||
// different epoch id, replica was main
|
||||
if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) {
|
||||
auto const &history = replStorageState.history;
|
||||
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
|
||||
return main_epoch_info.first == replica.epoch_id;
|
||||
});
|
||||
// main didn't have that epoch, but why is here branching point
|
||||
if (epoch_info_iter == history.crend()) {
|
||||
branching_point = 0;
|
||||
} else if (epoch_info_iter->second != replica.current_commit_timestamp) {
|
||||
|
@ -21,7 +21,7 @@ inline std::optional<std::string> GetOldDiskKeyOrNull(storage::Delta *head) {
|
||||
head = head->next;
|
||||
}
|
||||
if (head->action == storage::Delta::Action::DELETE_DESERIALIZED_OBJECT) {
|
||||
return head->old_disk_key.value;
|
||||
return head->old_disk_key.value.as_opt_str();
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
@ -18,8 +18,11 @@
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
template <class F, class T, class R = typename std::invoke_result<F, T>::type>
|
||||
auto fmap(F &&f, std::vector<T> const &v) -> std::vector<R> {
|
||||
template <template <typename, typename...> class Container, typename T, typename Allocator = std::allocator<T>,
|
||||
typename F, typename R = std::invoke_result_t<F, T>>
|
||||
requires ranges::range<Container<T, Allocator>> &&
|
||||
(!std::same_as<Container<T, Allocator>, std::string>)auto fmap(F &&f, const Container<T, Allocator> &v)
|
||||
-> std::vector<R> {
|
||||
return v | ranges::views::transform(std::forward<F>(f)) | ranges::to<std::vector<R>>();
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ class Scheduler {
|
||||
// program and there is probably no work to do in scheduled function at
|
||||
// the start of the program. Since Server will log some messages on
|
||||
// the program start we let him log first and we make sure by first
|
||||
// waiting that funcion f will not log before it.
|
||||
// waiting that function f will not log before it.
|
||||
// Check for pause also.
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto now = std::chrono::system_clock::now();
|
||||
|
@ -242,7 +242,7 @@ std::vector<TString, TAllocator> *Split(std::vector<TString, TAllocator> *out, c
|
||||
if (src.empty()) return out;
|
||||
// TODO: Investigate how much regex allocate and perhaps replace with custom
|
||||
// solution doing no allocations.
|
||||
std::regex not_whitespace("[^\\s]+");
|
||||
static std::regex not_whitespace("[^\\s]+");
|
||||
auto matches_begin = std::cregex_iterator(src.data(), src.data() + src.size(), not_whitespace);
|
||||
auto matches_end = std::cregex_iterator();
|
||||
out->reserve(std::distance(matches_begin, matches_end));
|
||||
|
@ -114,6 +114,8 @@ enum class TypeId : uint64_t {
|
||||
|
||||
COORD_GET_UUID_REQ,
|
||||
COORD_GET_UUID_RES,
|
||||
COORD_GET_INSTANCE_DATABASES_REQ,
|
||||
COORD_GET_INSTANCE_DATABASES_RES,
|
||||
|
||||
// AST
|
||||
AST_LABELIX = 3000,
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -39,9 +39,10 @@ struct UUID {
|
||||
|
||||
UUID() { uuid_generate(uuid.data()); }
|
||||
explicit operator std::string() const {
|
||||
auto decoded = std::array<char, UUID_STR_LEN>{};
|
||||
// Note not using UUID_STR_LEN so we can build with older libuuid
|
||||
auto decoded = std::array<char, 37 /*UUID_STR_LEN*/>{};
|
||||
uuid_unparse(uuid.data(), decoded.data());
|
||||
return std::string{decoded.data(), UUID_STR_LEN - 1};
|
||||
return std::string{decoded.data(), 37 /*UUID_STR_LEN*/ - 1};
|
||||
}
|
||||
|
||||
explicit operator arr_t() const { return uuid; }
|
||||
|
@ -431,7 +431,7 @@ def test_node_type_properties1():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)[0]
|
||||
)
|
||||
assert (result) == [":`Activity`", ["Activity"], "location", "String", False]
|
||||
assert (result) == [":`Activity`", ["Activity"], "location", ["String"], True]
|
||||
|
||||
result = list(
|
||||
execute_and_fetch_all(
|
||||
@ -439,7 +439,7 @@ def test_node_type_properties1():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)[1]
|
||||
)
|
||||
assert (result) == [":`Activity`", ["Activity"], "name", "String", False]
|
||||
assert (result) == [":`Activity`", ["Activity"], "name", ["String"], True]
|
||||
|
||||
result = list(
|
||||
execute_and_fetch_all(
|
||||
@ -447,7 +447,7 @@ def test_node_type_properties1():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)[2]
|
||||
)
|
||||
assert (result) == [":`Dog`", ["Dog"], "name", "String", False]
|
||||
assert (result) == [":`Dog`", ["Dog"], "name", ["String"], True]
|
||||
|
||||
result = list(
|
||||
execute_and_fetch_all(
|
||||
@ -455,7 +455,7 @@ def test_node_type_properties1():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)[3]
|
||||
)
|
||||
assert (result) == [":`Dog`", ["Dog"], "owner", "String", False]
|
||||
assert (result) == [":`Dog`", ["Dog"], "owner", ["String"], True]
|
||||
|
||||
|
||||
def test_node_type_properties2():
|
||||
@ -471,7 +471,8 @@ def test_node_type_properties2():
|
||||
cursor,
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)
|
||||
assert (list(result[0])) == [":`MyNode`", ["MyNode"], "", "", False]
|
||||
|
||||
assert (list(result[0])) == [":`MyNode`", ["MyNode"], "", [], False]
|
||||
assert (result.__len__()) == 1
|
||||
|
||||
|
||||
@ -489,8 +490,8 @@ def test_node_type_properties3():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)
|
||||
|
||||
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", "String", False]
|
||||
assert (list(result[1])) == [":`Dog`", ["Dog"], "owner", "String", False]
|
||||
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", ["String"], False]
|
||||
assert (list(result[1])) == [":`Dog`", ["Dog"], "owner", ["String"], False]
|
||||
assert (result.__len__()) == 2
|
||||
|
||||
|
||||
@ -509,9 +510,9 @@ def test_node_type_properties4():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)
|
||||
)
|
||||
assert (list(result[0])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property1", "String", False]
|
||||
assert (list(result[1])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property2", "String", False]
|
||||
assert (list(result[2])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property3", "String", False]
|
||||
assert (list(result[0])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property1", ["String"], False]
|
||||
assert (list(result[1])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property2", ["String"], False]
|
||||
assert (list(result[2])) == [":`Label1`:`Label2`", ["Label1", "Label2"], "property3", ["String"], False]
|
||||
assert (result.__len__()) == 3
|
||||
|
||||
|
||||
@ -528,7 +529,49 @@ def test_node_type_properties5():
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)
|
||||
|
||||
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", "String", True]
|
||||
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", ["String"], True]
|
||||
assert (result.__len__()) == 1
|
||||
|
||||
|
||||
def test_node_type_properties6():
|
||||
cursor = connect().cursor()
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
"""
|
||||
CREATE (d:Dog {name: 'Rex'})
|
||||
CREATE (n:Dog {name: 'Simba', owner: 'Lucy'})
|
||||
""",
|
||||
)
|
||||
result = execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)
|
||||
|
||||
assert (list(result[0])) == [":`Dog`", ["Dog"], "name", ["String"], True]
|
||||
assert (list(result[1])) == [":`Dog`", ["Dog"], "owner", ["String"], False]
|
||||
assert (result.__len__()) == 2
|
||||
|
||||
|
||||
def test_node_type_properties_multiple_property_types():
|
||||
cursor = connect().cursor()
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
"""
|
||||
CREATE (n:Node {prop1: 1})
|
||||
CREATE (m:Node {prop1: '1'})
|
||||
""",
|
||||
)
|
||||
result = execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CALL libschema.node_type_properties() YIELD nodeType, nodeLabels, propertyName, propertyTypes , mandatory RETURN nodeType, nodeLabels, propertyName, propertyTypes , mandatory ORDER BY propertyName, nodeLabels[0];",
|
||||
)
|
||||
assert (list(result[0])) == [":`Node`", ["Node"], "prop1", ["Int", "String"], True] or (list(result[0])) == [
|
||||
":`Node`",
|
||||
["Node"],
|
||||
"prop1",
|
||||
["String", "Int"],
|
||||
True,
|
||||
]
|
||||
assert (result.__len__()) == 1
|
||||
|
||||
|
||||
@ -544,7 +587,7 @@ def test_rel_type_properties1():
|
||||
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
|
||||
)[0]
|
||||
)
|
||||
assert (result) == [":`LOVES`", "", "", False]
|
||||
assert (result) == [":`LOVES`", "", [], False]
|
||||
|
||||
|
||||
def test_rel_type_properties2():
|
||||
@ -560,7 +603,7 @@ def test_rel_type_properties2():
|
||||
cursor,
|
||||
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
|
||||
)
|
||||
assert (list(result[0])) == [":`LOVES`", "duration", "Int", False]
|
||||
assert (list(result[0])) == [":`LOVES`", "duration", ["Int"], False]
|
||||
assert (result.__len__()) == 1
|
||||
|
||||
|
||||
@ -576,7 +619,47 @@ def test_rel_type_properties3():
|
||||
cursor,
|
||||
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
|
||||
)
|
||||
assert (list(result[0])) == [":`LOVES`", "duration", "Int", True]
|
||||
assert (list(result[0])) == [":`LOVES`", "duration", ["Int"], True]
|
||||
assert (result.__len__()) == 1
|
||||
|
||||
|
||||
def test_rel_type_properties4():
|
||||
cursor = connect().cursor()
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
"""
|
||||
CREATE (n:Dog {name: 'Simba', owner: 'Lucy'})-[j:LOVES {duration: 30}]->(a:Activity {name: 'Running', location: 'Zadar'})
|
||||
CREATE (m:Dog {name: 'Rex', owner: 'Lucy'})-[r:LOVES {duration: 30, weather: 'sunny'}]->(b:Activity {name: 'Running', location: 'Zadar'})
|
||||
""",
|
||||
)
|
||||
result = execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
|
||||
)
|
||||
assert (list(result[0])) == [":`LOVES`", "weather", ["String"], False]
|
||||
assert (list(result[1])) == [":`LOVES`", "duration", ["Int"], True]
|
||||
assert (result.__len__()) == 2
|
||||
|
||||
|
||||
def test_rel_type_properties_multiple_property_types():
|
||||
cursor = connect().cursor()
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
"""
|
||||
CREATE (n:Dog {name: 'Simba', owner: 'Lucy'})-[j:LOVES {duration: 30}]->(a:Activity {name: 'Running', location: 'Zadar'})
|
||||
CREATE (m:Dog {name: 'Rex', owner: 'Lucy'})-[r:LOVES {duration: "30"}]->(b:Activity {name: 'Running', location: 'Zadar'})
|
||||
""",
|
||||
)
|
||||
result = execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CALL libschema.rel_type_properties() YIELD relType,propertyName, propertyTypes , mandatory RETURN relType, propertyName, propertyTypes , mandatory;",
|
||||
)
|
||||
assert (list(result[0])) == [":`LOVES`", "duration", ["Int", "String"], True] or (list(result[0])) == [
|
||||
":`LOVES`",
|
||||
"duration",
|
||||
["String", "Int"],
|
||||
True,
|
||||
]
|
||||
assert (result.__len__()) == 1
|
||||
|
||||
|
||||
|
@ -430,3 +430,11 @@ target_include_directories(${test_prefix}distributed_lamport_clock PRIVATE ${CMA
|
||||
|
||||
add_unit_test(query_hint_provider.cpp)
|
||||
target_link_libraries(${test_prefix}query_hint_provider mg-query mg-glue)
|
||||
|
||||
|
||||
# Test coordination
|
||||
if(MG_ENTERPRISE)
|
||||
add_unit_test(coordination_utils.cpp)
|
||||
target_link_libraries(${test_prefix}coordination_utils gflags mg-coordination mg-repl_coord_glue)
|
||||
target_include_directories(${test_prefix}coordination_utils PRIVATE ${CMAKE_SOURCE_DIR}/include)
|
||||
endif()
|
||||
|
246
tests/unit/coordination_utils.cpp
Normal file
246
tests/unit/coordination_utils.cpp
Normal file
@ -0,0 +1,246 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include "coordination/coordinator_instance.hpp"
|
||||
#include "dbms/constants.hpp"
|
||||
#include "replication_coordination_glue/common.hpp"
|
||||
#include "utils/functional.hpp"
|
||||
|
||||
class CoordinationUtils : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {}
|
||||
|
||||
void TearDown() override {}
|
||||
|
||||
std::filesystem::path test_folder_{std::filesystem::temp_directory_path() / "MG_tests_unit_coordination"};
|
||||
};
|
||||
|
||||
TEST_F(CoordinationUtils, MemgraphDbHistorySimple) {
|
||||
// Choose any if everything is same
|
||||
// X = dead
|
||||
// Main : A(24) B(36) C(48) D(50) E(51) X
|
||||
// replica 1: A(24) B(36) C(48) D(50) E(51)
|
||||
// replica 2: A(24) B(36) C(48) D(50) E(51)
|
||||
// replica 3: A(24) B(36) C(48) D(50) E(51)
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 24);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 36);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 48);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 50);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 51);
|
||||
|
||||
memgraph::utils::UUID db_uuid;
|
||||
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
|
||||
|
||||
auto db_histories = memgraph::utils::fmap(
|
||||
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
|
||||
return std::make_pair(std::string(pair.first), pair.second);
|
||||
},
|
||||
histories);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history{
|
||||
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
ASSERT_TRUE(instance_name == "instance_1" || instance_name == "instance_2" || instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
|
||||
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
|
||||
}
|
||||
|
||||
TEST_F(CoordinationUtils, MemgraphDbHistoryLastEpochDifferent) {
|
||||
// Prioritize one with the biggest last commit timestamp on last epoch
|
||||
// X = dead
|
||||
// Main : A(24) B(36) C(48) D(50) E(59) X
|
||||
// replica 1: A(24) B(12) C(15) D(17) E(51)
|
||||
// replica 2: A(24) B(12) C(15) D(17) E(57)
|
||||
// replica 3: A(24) B(12) C(15) D(17) E(59)
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 24);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 36);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 48);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 50);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 59);
|
||||
|
||||
memgraph::utils::UUID db_uuid;
|
||||
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
|
||||
|
||||
auto db_histories = memgraph::utils::fmap(
|
||||
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
|
||||
return std::make_pair(std::string(pair.first), pair.second);
|
||||
},
|
||||
histories);
|
||||
|
||||
db_histories.back().second = 51;
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history1{
|
||||
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history1};
|
||||
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
|
||||
|
||||
db_histories.back().second = 57;
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history2{
|
||||
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history2};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
db_histories.back().second = 59;
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history3{
|
||||
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history3};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
ASSERT_TRUE(instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == db_histories.back().first);
|
||||
ASSERT_TRUE(*latest_commit_timestamp == db_histories.back().second);
|
||||
}
|
||||
|
||||
TEST_F(CoordinationUtils, MemgraphDbHistoryOneInstanceAheadFewEpochs) {
|
||||
// Prioritize one biggest commit timestamp
|
||||
// X = dead
|
||||
// Main : A(24) B(36) C(48) D(50) E(51) X X X X
|
||||
// replica 1: A(24) B(36) C(48) D(50) E(51) F(60) G(65) X up
|
||||
// replica 2: A(24) B(36) C(48) D(50) E(51) X X X up
|
||||
// replica 3: A(24) B(36) C(48) D(50) E(51) X X X up
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 24);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 36);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 48);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 50);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 51);
|
||||
|
||||
memgraph::utils::UUID db_uuid;
|
||||
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
|
||||
|
||||
auto db_histories = memgraph::utils::fmap(
|
||||
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
|
||||
return std::make_pair(std::string(pair.first), pair.second);
|
||||
},
|
||||
histories);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history{
|
||||
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 60);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 65);
|
||||
auto db_histories_longest = memgraph::utils::fmap(
|
||||
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
|
||||
return std::make_pair(std::string(pair.first), pair.second);
|
||||
},
|
||||
histories);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history_longest{
|
||||
.db_uuid = db_uuid, .history = db_histories_longest, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_longest};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
ASSERT_TRUE(instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == db_histories_longest.back().first);
|
||||
ASSERT_TRUE(*latest_commit_timestamp == db_histories_longest.back().second);
|
||||
}
|
||||
|
||||
TEST_F(CoordinationUtils, MemgraphDbHistoryInstancesHistoryDiverged) {
|
||||
// When history diverged, also prioritize one with biggest last commit timestamp
|
||||
// Main : A(1) B(2) C(3) X
|
||||
// replica 1: A(1) B(2) C(3) X X up
|
||||
// replica 2: A(1) B(2) X D(5) X up
|
||||
// replica 3: A(1) B(2) X D(4) X up
|
||||
std::vector<std::pair<std::string, memgraph::replication_coordination_glue::DatabaseHistories>>
|
||||
instance_database_histories;
|
||||
|
||||
std::vector<std::pair<memgraph::utils::UUID, uint64_t>> histories;
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 1);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 2);
|
||||
histories.emplace_back(memgraph::utils::UUID{}, 3);
|
||||
|
||||
memgraph::utils::UUID db_uuid;
|
||||
std::string default_name = std::string(memgraph::dbms::kDefaultDB);
|
||||
|
||||
auto db_histories = memgraph::utils::fmap(
|
||||
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
|
||||
return std::make_pair(std::string(pair.first), pair.second);
|
||||
},
|
||||
histories);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history{
|
||||
.db_uuid = db_uuid, .history = db_histories, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_1_db_histories_{history};
|
||||
instance_database_histories.emplace_back("instance_1", instance_1_db_histories_);
|
||||
|
||||
db_histories.pop_back();
|
||||
|
||||
auto oldest_commit_timestamp{5};
|
||||
auto newest_different_epoch = memgraph::utils::UUID{};
|
||||
histories.emplace_back(newest_different_epoch, oldest_commit_timestamp);
|
||||
auto db_histories_different = memgraph::utils::fmap(
|
||||
[](const std::pair<memgraph::utils::UUID, uint64_t> &pair) {
|
||||
return std::make_pair(std::string(pair.first), pair.second);
|
||||
},
|
||||
histories);
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history_3{
|
||||
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_3_db_histories_{history_3};
|
||||
instance_database_histories.emplace_back("instance_3", instance_3_db_histories_);
|
||||
|
||||
db_histories_different.back().second = 4;
|
||||
memgraph::replication_coordination_glue::DatabaseHistory history_2{
|
||||
.db_uuid = db_uuid, .history = db_histories_different, .name = default_name};
|
||||
|
||||
memgraph::replication_coordination_glue::DatabaseHistories instance_2_db_histories_{history_2};
|
||||
instance_database_histories.emplace_back("instance_2", instance_2_db_histories_);
|
||||
|
||||
memgraph::coordination::CoordinatorInstance instance;
|
||||
auto [instance_name, latest_epoch, latest_commit_timestamp] =
|
||||
instance.ChooseMostUpToDateInstance(instance_database_histories);
|
||||
|
||||
ASSERT_TRUE(instance_name == "instance_3");
|
||||
ASSERT_TRUE(*latest_epoch == std::string(newest_different_epoch));
|
||||
ASSERT_TRUE(*latest_commit_timestamp == oldest_commit_timestamp);
|
||||
}
|
Loading…
Reference in New Issue
Block a user