Implement parallel constraints recovery (#1545)

This commit is contained in:
Antonio Filipovic 2023-12-04 21:56:05 +01:00 committed by GitHub
parent d836b38a8b
commit 74fa6d21f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 628 additions and 202 deletions

View File

@ -111,6 +111,10 @@ modifications:
value: "false"
override: true
- name: "storage_parallel_schema_recovery"
value: "false"
override: true
- name: "storage_enable_schema_metadata"
value: "false"
override: true

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#include "dbms/inmemory/replication_handlers.hpp"
#include <optional>
#include "dbms/constants.hpp"
#include "dbms/dbms_handler.hpp"
#include "replication/replication_server.hpp"
@ -187,9 +188,9 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
storage::replication::Decoder decoder(req_reader);
auto *storage = static_cast<storage::InMemoryStorage *>(db_acc->get()->storage());
utils::EnsureDirOrDie(storage->snapshot_directory_);
utils::EnsureDirOrDie(storage->recovery_.snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(storage->snapshot_directory_);
const auto maybe_snapshot_path = decoder.ReadFile(storage->recovery_.snapshot_directory_);
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
@ -219,7 +220,10 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
storage->timestamp_ = std::max(storage->timestamp_, recovery_info.next_timestamp);
spdlog::trace("Recovering indices and constraints from snapshot.");
storage::durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage->indices_,
memgraph::storage::durability::RecoverIndicesAndStats(recovered_snapshot.indices_constraints.indices,
&storage->indices_, &storage->vertices_,
storage->name_id_mapper_.get());
memgraph::storage::durability::RecoverConstraints(recovered_snapshot.indices_constraints.constraints,
&storage->constraints_, &storage->vertices_,
storage->name_id_mapper_.get());
} catch (const storage::durability::RecoveryFailure &e) {
@ -233,7 +237,7 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
spdlog::trace("Deleting old snapshot files due to snapshot recovery.");
// Delete other durability files
auto snapshot_files = storage::durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_);
auto snapshot_files = storage::durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_);
for (const auto &[path, uuid, _] : snapshot_files) {
if (path != *maybe_snapshot_path) {
spdlog::trace("Deleting snapshot file {}", path);
@ -242,7 +246,7 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
}
spdlog::trace("Deleting old WAL files due to snapshot recovery.");
auto wal_files = storage::durability::GetWalFiles(storage->wal_directory_, storage->uuid_);
auto wal_files = storage::durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_);
if (wal_files) {
for (const auto &wal_file : *wal_files) {
spdlog::trace("Deleting WAL file {}", wal_file.path);
@ -267,7 +271,7 @@ void InMemoryReplicationHandlers::WalFilesHandler(dbms::DbmsHandler *dbms_handle
storage::replication::Decoder decoder(req_reader);
auto *storage = static_cast<storage::InMemoryStorage *>(db_acc->get()->storage());
utils::EnsureDirOrDie(storage->wal_directory_);
utils::EnsureDirOrDie(storage->recovery_.wal_directory_);
for (auto i = 0; i < wal_file_number; ++i) {
LoadWal(storage, &decoder);
@ -289,7 +293,7 @@ void InMemoryReplicationHandlers::CurrentWalHandler(dbms::DbmsHandler *dbms_hand
storage::replication::Decoder decoder(req_reader);
auto *storage = static_cast<storage::InMemoryStorage *>(db_acc->get()->storage());
utils::EnsureDirOrDie(storage->wal_directory_);
utils::EnsureDirOrDie(storage->recovery_.wal_directory_);
LoadWal(storage, &decoder);

View File

@ -104,9 +104,19 @@ DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creat
DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch,
"The number of edges and vertices stored in a batch in a snapshot file.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables,misc-unused-parameters)
DEFINE_VALIDATED_bool(
storage_parallel_index_recovery, false,
"Controls whether the index creation can be done in a multithreaded fashion.", {
spdlog::warn(
"storage_parallel_index_recovery flag is deprecated. Check storage_mode_parallel_schema_recovery for more "
"details.");
return true;
});
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_parallel_index_recovery, false,
"Controls whether the index creation can be done in a multithreaded fashion.");
DEFINE_bool(storage_parallel_schema_recovery, false,
"Controls whether the indices and constraints creation can be done in a multithreaded fashion.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(storage_recovery_thread_count,

View File

@ -73,9 +73,12 @@ DECLARE_uint64(storage_wal_file_flush_every_n_tx);
DECLARE_bool(storage_snapshot_on_exit);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_items_per_batch);
// storage_parallel_index_recovery deprecated; use storage_parallel_schema_recovery instead
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_parallel_index_recovery);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_parallel_schema_recovery);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_recovery_thread_count);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_enable_schema_metadata);

View File

@ -305,7 +305,9 @@ int main(int argc, char **argv) {
.restore_replication_state_on_startup = FLAGS_replication_restore_state_on_startup,
.items_per_batch = FLAGS_storage_items_per_batch,
.recovery_thread_count = FLAGS_storage_recovery_thread_count,
.allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery},
// deprecated
.allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery,
.allow_parallel_schema_creation = FLAGS_storage_parallel_schema_recovery},
.transaction = {.isolation_level = memgraph::flags::ParseIsolationLevel()},
.disk = {.main_storage_directory = FLAGS_data_directory + "/rocksdb_main_storage",
.label_index_directory = FLAGS_data_directory + "/rocksdb_label_index",

View File

@ -65,7 +65,10 @@ struct Config {
uint64_t items_per_batch{1'000'000};
uint64_t recovery_thread_count{8};
// deprecated
bool allow_parallel_index_creation{false};
bool allow_parallel_schema_creation{false};
friend bool operator==(const Durability &lrh, const Durability &rhs) = default;
} durability;

View File

@ -11,10 +11,11 @@
#include "storage/v2/constraints/existence_constraints.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/constraints/utils.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/mvcc.hpp"
#include "utils/logging.hpp"
#include "utils/rw_spin_lock.hpp"
namespace memgraph::storage {
bool ExistenceConstraints::ConstraintExists(LabelId label, PropertyId property) const {
@ -55,4 +56,70 @@ void ExistenceConstraints::LoadExistenceConstraints(const std::vector<std::strin
}
}
[[nodiscard]] std::optional<ConstraintViolation> ExistenceConstraints::ValidateVertexOnConstraint(
const Vertex &vertex, const LabelId &label, const PropertyId &property) {
if (!vertex.deleted && utils::Contains(vertex.labels, label) && !vertex.properties.HasProperty(property)) {
return ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label, std::set<PropertyId>{property}};
}
return std::nullopt;
}
std::variant<ExistenceConstraints::MultipleThreadsConstraintValidation,
ExistenceConstraints::SingleThreadConstraintValidation>
ExistenceConstraints::GetCreationFunction(
const std::optional<durability::ParallelizedSchemaCreationInfo> &par_exec_info) {
if (par_exec_info.has_value()) {
return ExistenceConstraints::MultipleThreadsConstraintValidation{par_exec_info.value()};
}
return ExistenceConstraints::SingleThreadConstraintValidation{};
}
[[nodiscard]] std::optional<ConstraintViolation> ExistenceConstraints::ValidateVerticesOnConstraint(
utils::SkipList<Vertex>::Accessor vertices, LabelId label, PropertyId property,
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info) {
auto calling_existence_validation_function = GetCreationFunction(parallel_exec_info);
return std::visit(
[&vertices, &label, &property](auto &calling_object) { return calling_object(vertices, label, property); },
calling_existence_validation_function);
}
std::optional<ConstraintViolation> ExistenceConstraints::MultipleThreadsConstraintValidation::operator()(
const utils::SkipList<Vertex>::Accessor &vertices, const LabelId &label, const PropertyId &property) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
const auto &vertex_batches = parallel_exec_info.vertex_recovery_info;
MG_ASSERT(!vertex_batches.empty(),
"The size of batches should always be greater than zero if you want to use the parallel version of index "
"creation!");
const auto thread_count = std::min(parallel_exec_info.thread_count, vertex_batches.size());
std::atomic<uint64_t> batch_counter = 0;
memgraph::utils::Synchronized<std::optional<ConstraintViolation>, utils::RWSpinLock> maybe_error{};
{
std::vector<std::jthread> threads;
threads.reserve(thread_count);
for (auto i{0U}; i < thread_count; ++i) {
threads.emplace_back([&maybe_error, &vertex_batches, &batch_counter, &vertices, &label, &property]() {
do_per_thread_validation(maybe_error, ValidateVertexOnConstraint, vertex_batches, batch_counter, vertices,
label, property);
});
}
}
if (maybe_error.Lock()->has_value()) {
return maybe_error->value();
}
return std::nullopt;
}
std::optional<ConstraintViolation> ExistenceConstraints::SingleThreadConstraintValidation::operator()(
const utils::SkipList<Vertex>::Accessor &vertices, const LabelId &label, const PropertyId &property) {
for (const Vertex &vertex : vertices) {
if (auto violation = ValidateVertexOnConstraint(vertex, label, property); violation.has_value()) {
return violation;
}
}
return std::nullopt;
}
} // namespace memgraph::storage

View File

@ -11,34 +11,45 @@
#pragma once
#include <atomic>
#include <optional>
#include <thread>
#include <variant>
#include "storage/v2/constraints/constraint_violation.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/skip_list.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::storage {
class ExistenceConstraints {
private:
std::vector<std::pair<LabelId, PropertyId>> constraints_;
public:
struct MultipleThreadsConstraintValidation {
std::optional<ConstraintViolation> operator()(const utils::SkipList<Vertex>::Accessor &vertices,
const LabelId &label, const PropertyId &property);
const durability::ParallelizedSchemaCreationInfo &parallel_exec_info;
};
struct SingleThreadConstraintValidation {
std::optional<ConstraintViolation> operator()(const utils::SkipList<Vertex>::Accessor &vertices,
const LabelId &label, const PropertyId &property);
};
[[nodiscard]] static std::optional<ConstraintViolation> ValidateVertexOnConstraint(const Vertex &vertex,
LabelId label,
PropertyId property) {
if (!vertex.deleted && utils::Contains(vertex.labels, label) && !vertex.properties.HasProperty(property)) {
return ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label, std::set<PropertyId>{property}};
}
return std::nullopt;
}
const LabelId &label,
const PropertyId &property);
[[nodiscard]] static std::optional<ConstraintViolation> ValidateVerticesOnConstraint(
utils::SkipList<Vertex>::Accessor vertices, LabelId label, PropertyId property) {
for (const auto &vertex : vertices) {
if (auto violation = ValidateVertexOnConstraint(vertex, label, property); violation.has_value()) {
return violation;
}
}
return std::nullopt;
}
utils::SkipList<Vertex>::Accessor vertices, LabelId label, PropertyId property,
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info = std::nullopt);
static std::variant<MultipleThreadsConstraintValidation, SingleThreadConstraintValidation> GetCreationFunction(
const std::optional<durability::ParallelizedSchemaCreationInfo> &);
bool ConstraintExists(LabelId label, PropertyId property) const;
@ -54,9 +65,6 @@ class ExistenceConstraints {
std::vector<std::pair<LabelId, PropertyId>> ListConstraints() const;
void LoadExistenceConstraints(const std::vector<std::string> &keys);
private:
std::vector<std::pair<LabelId, PropertyId>> constraints_;
};
} // namespace memgraph::storage

View File

@ -0,0 +1,42 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <vector>
#include "storage/v2/vertex.hpp"
#include "utils/skip_list.hpp"
namespace memgraph::storage {
template <typename ErrorType, typename Func, typename... Args>
void do_per_thread_validation(ErrorType &maybe_error, Func &&func,
const std::vector<std::pair<Gid, uint64_t>> &vertex_batches,
std::atomic<uint64_t> &batch_counter,
const memgraph::utils::SkipList<memgraph::storage::Vertex>::Accessor &vertices,
Args &&...args) {
while (!maybe_error.ReadLock()->has_value()) {
const auto batch_index = batch_counter.fetch_add(1, std::memory_order_acquire);
if (batch_index >= vertex_batches.size()) {
return;
}
const auto &[gid_start, batch_size] = vertex_batches[batch_index];
auto vertex_curr = vertices.find(gid_start);
DMG_ASSERT(vertex_curr != vertices.end(), "No vertex was found with given gid");
for (auto i{0U}; i < batch_size; ++i, ++vertex_curr) {
const auto violation = func(*vertex_curr, std::forward<Args>(args)...);
if (!violation.has_value()) [[likely]] {
continue;
}
maybe_error.WithLock([&violation](auto &maybe_error) { maybe_error = *violation; });
break;
}
}
}
} // namespace memgraph::storage

View File

@ -43,8 +43,9 @@ InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices(
storage, transaction);
}
bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
bool EdgeImportModeCache::CreateIndex(
LabelId label, PropertyId property,
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info) {
auto *mem_label_property_index =
static_cast<InMemoryLabelPropertyIndex *>(in_memory_indices_.label_property_index_.get());
bool res = mem_label_property_index->CreateIndex(label, property, vertices_.access(), parallel_exec_info);
@ -54,8 +55,8 @@ bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property,
return res;
}
bool EdgeImportModeCache::CreateIndex(LabelId label,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
bool EdgeImportModeCache::CreateIndex(
LabelId label, const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info) {
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(in_memory_indices_.label_index_.get());
bool res = mem_label_index->CreateIndex(label, vertices_.access(), parallel_exec_info);
if (res) {

View File

@ -42,9 +42,10 @@ class EdgeImportModeCache final {
View view, Storage *storage, Transaction *transaction) const;
bool CreateIndex(LabelId label, PropertyId property,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info = {});
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info = {});
bool CreateIndex(LabelId label, const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info = {});
bool CreateIndex(LabelId label,
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info = {});
bool VerticesWithLabelPropertyScanned(LabelId label, PropertyId property) const;

View File

@ -17,10 +17,6 @@
namespace memgraph::storage {
/// TODO: andi. Too many copies, extract at one place
using ParallelizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
class DiskLabelPropertyIndex : public storage::LabelPropertyIndex {
public:
explicit DiskLabelPropertyIndex(const Config &config);

View File

@ -9,8 +9,6 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/durability/durability.hpp"
#include <pwd.h>
#include <sys/stat.h>
#include <sys/types.h>
@ -20,23 +18,29 @@
#include <cstring>
#include <algorithm>
#include <optional>
#include <tuple>
#include <utility>
#include <vector>
#include "flags/all.hpp"
#include "gflags/gflags.h"
#include "replication/epoch.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "utils/event_histogram.hpp"
#include "utils/flag_validation.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/message.hpp"
#include "utils/timer.hpp"
namespace memgraph::metrics {
extern const Event SnapshotRecoveryLatency_us;
} // namespace memgraph::metrics
@ -134,15 +138,23 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
Constraints *constraints, utils::SkipList<Vertex> *vertices,
NameIdMapper *name_id_mapper,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
void RecoverConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata,
Constraints *constraints, utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
const std::optional<ParallelizedSchemaCreationInfo> &parallel_exec_info) {
RecoverExistenceConstraints(constraints_metadata, constraints, vertices, name_id_mapper, parallel_exec_info);
RecoverUniqueConstraints(constraints_metadata, constraints, vertices, name_id_mapper, parallel_exec_info);
}
void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadata &indices_metadata, Indices *indices,
utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
const std::optional<ParallelizedSchemaCreationInfo> &parallel_exec_info) {
spdlog::info("Recreating indices from metadata.");
// Recover label indices.
spdlog::info("Recreating {} label indices from metadata.", indices_constraints.indices.label.size());
spdlog::info("Recreating {} label indices from metadata.", indices_metadata.label.size());
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(indices->label_index_.get());
for (const auto &item : indices_constraints.indices.label) {
for (const auto &item : indices_metadata.label) {
if (!mem_label_index->CreateIndex(item, vertices->access(), parallel_exec_info)) {
throw RecoveryFailure("The label index must be created here!");
}
@ -151,9 +163,10 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
spdlog::info("Label indices are recreated.");
spdlog::info("Recreating index statistics from metadata.");
// Recover label indices statistics.
spdlog::info("Recreating {} label index statistics from metadata.", indices_constraints.indices.label_stats.size());
for (const auto &item : indices_constraints.indices.label_stats) {
spdlog::info("Recreating {} label index statistics from metadata.", indices_metadata.label_stats.size());
for (const auto &item : indices_metadata.label_stats) {
mem_label_index->SetIndexStats(item.first, item.second);
spdlog::info("Statistics for index on :{} are recreated from metadata",
name_id_mapper->IdToName(item.first.AsUint()));
@ -161,10 +174,9 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
spdlog::info("Label indices statistics are recreated.");
// Recover label+property indices.
spdlog::info("Recreating {} label+property indices from metadata.",
indices_constraints.indices.label_property.size());
spdlog::info("Recreating {} label+property indices from metadata.", indices_metadata.label_property.size());
auto *mem_label_property_index = static_cast<InMemoryLabelPropertyIndex *>(indices->label_property_index_.get());
for (const auto &item : indices_constraints.indices.label_property) {
for (const auto &item : indices_metadata.label_property) {
if (!mem_label_property_index->CreateIndex(item.first, item.second, vertices->access(), parallel_exec_info))
throw RecoveryFailure("The label+property index must be created here!");
spdlog::info("Index on :{}({}) is recreated from metadata", name_id_mapper->IdToName(item.first.AsUint()),
@ -174,8 +186,8 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
// Recover label+property indices statistics.
spdlog::info("Recreating {} label+property indices statistics from metadata.",
indices_constraints.indices.label_property_stats.size());
for (const auto &item : indices_constraints.indices.label_property_stats) {
indices_metadata.label_property_stats.size());
for (const auto &item : indices_metadata.label_property_stats) {
const auto label_id = item.first;
const auto property_id = item.second.first;
const auto &stats = item.second.second;
@ -188,14 +200,20 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
spdlog::info("Indices are recreated.");
spdlog::info("Recreating constraints from metadata.");
// Recover existence constraints.
spdlog::info("Recreating {} existence constraints from metadata.", indices_constraints.constraints.existence.size());
for (const auto &[label, property] : indices_constraints.constraints.existence) {
}
void RecoverExistenceConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata,
Constraints *constraints, utils::SkipList<Vertex> *vertices,
NameIdMapper *name_id_mapper,
const std::optional<ParallelizedSchemaCreationInfo> &parallel_exec_info) {
spdlog::info("Recreating {} existence constraints from metadata.", constraints_metadata.existence.size());
for (const auto &[label, property] : constraints_metadata.existence) {
if (constraints->existence_constraints_->ConstraintExists(label, property)) {
throw RecoveryFailure("The existence constraint already exists!");
}
if (auto violation = ExistenceConstraints::ValidateVerticesOnConstraint(vertices->access(), label, property);
if (auto violation =
ExistenceConstraints::ValidateVerticesOnConstraint(vertices->access(), label, property, parallel_exec_info);
violation.has_value()) {
throw RecoveryFailure("The existence constraint failed because it couldn't be validated!");
}
@ -205,38 +223,57 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
name_id_mapper->IdToName(property.AsUint()));
}
spdlog::info("Existence constraints are recreated from metadata.");
}
// Recover unique constraints.
spdlog::info("Recreating {} unique constraints from metadata.", indices_constraints.constraints.unique.size());
for (const auto &item : indices_constraints.constraints.unique) {
void RecoverUniqueConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata,
Constraints *constraints, utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
const std::optional<ParallelizedSchemaCreationInfo> &parallel_exec_info) {
spdlog::info("Recreating {} unique constraints from metadata.", constraints_metadata.unique.size());
for (const auto &[label, properties] : constraints_metadata.unique) {
auto *mem_unique_constraints = static_cast<InMemoryUniqueConstraints *>(constraints->unique_constraints_.get());
auto ret = mem_unique_constraints->CreateConstraint(item.first, item.second, vertices->access());
auto ret = mem_unique_constraints->CreateConstraint(label, properties, vertices->access(), parallel_exec_info);
if (ret.HasError() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw RecoveryFailure("The unique constraint must be created here!");
std::vector<std::string> property_names;
property_names.reserve(item.second.size());
for (const auto &prop : item.second) {
property_names.reserve(properties.size());
for (const auto &prop : properties) {
property_names.emplace_back(name_id_mapper->IdToName(prop.AsUint()));
}
const auto property_names_joined = utils::Join(property_names, ",");
spdlog::info("Unique constraint on :{}({}) is recreated from metadata",
name_id_mapper->IdToName(item.first.AsUint()), property_names_joined);
spdlog::info("Unique constraint on :{}({}) is recreated from metadata", name_id_mapper->IdToName(label.AsUint()),
property_names_joined);
}
spdlog::info("Unique constraints are recreated from metadata.");
spdlog::info("Constraints are recreated from metadata.");
}
std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
ReplicationStorageState &repl_storage_state, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, std::atomic<uint64_t> *edge_count,
NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints,
const Config &config, uint64_t *wal_seq_num) {
std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfo(const RecoveryInfo &recovery_info,
const Config &config) {
return config.durability.allow_parallel_schema_creation
? std::make_optional(ParallelizedSchemaCreationInfo{recovery_info.vertex_batches,
config.durability.recovery_thread_count})
: std::nullopt;
}
std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfoIndices(const RecoveryInfo &recovery_info,
const Config &config) {
return config.durability.allow_parallel_schema_creation || config.durability.allow_parallel_index_creation
? std::make_optional(ParallelizedSchemaCreationInfo{recovery_info.vertex_batches,
config.durability.recovery_thread_count})
: std::nullopt;
}
std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory,
wal_directory);
if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory)) {
spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory_,
wal_directory_);
if (!utils::DirExists(snapshot_directory_) && !utils::DirExists(wal_directory_)) {
spdlog::warn(utils::MessageWithLink("Snapshot or WAL directory don't exist, there is nothing to recover.",
"https://memgr.ph/durability"));
return std::nullopt;
@ -245,13 +282,13 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
auto *const epoch_history = &repl_storage_state.history;
utils::Timer timer;
auto snapshot_files = GetSnapshotFiles(snapshot_directory);
auto snapshot_files = GetSnapshotFiles(snapshot_directory_);
RecoveryInfo recovery_info;
RecoveredIndicesAndConstraints indices_constraints;
std::optional<uint64_t> snapshot_timestamp;
if (!snapshot_files.empty()) {
spdlog::info("Try recovering from snapshot directory {}.", snapshot_directory);
spdlog::info("Try recovering from snapshot directory {}.", wal_directory_);
// UUID used for durability is the UUID of the last snapshot file.
*uuid = snapshot_files.back().uuid;
@ -281,18 +318,17 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp;
repl_storage_state.epoch_.SetEpoch(std::move(recovered_snapshot->snapshot_info.epoch_id));
if (!utils::DirExists(wal_directory)) {
const auto par_exec_info = config.durability.allow_parallel_index_creation
? std::make_optional(std::make_pair(recovery_info.vertex_batches,
config.durability.recovery_thread_count))
: std::nullopt;
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, name_id_mapper, par_exec_info);
if (!utils::DirExists(wal_directory_)) {
RecoverIndicesAndStats(indices_constraints.indices, indices, vertices, name_id_mapper,
GetParallelExecInfoIndices(recovery_info, config));
RecoverConstraints(indices_constraints.constraints, constraints, vertices, name_id_mapper,
GetParallelExecInfo(recovery_info, config));
return recovered_snapshot->recovery_info;
}
} else {
spdlog::info("No snapshot file was found, collecting information from WAL directory {}.", wal_directory);
spdlog::info("No snapshot file was found, collecting information from WAL directory {}.", wal_directory_);
std::error_code error_code;
if (!utils::DirExists(wal_directory)) return std::nullopt;
if (!utils::DirExists(wal_directory_)) return std::nullopt;
// We use this smaller struct that contains only a subset of information
// necessary for the rest of the recovery function.
// Also, the struct is sorted primarily on the path it contains.
@ -306,7 +342,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
auto operator<=>(const WalFileInfo &) const = default;
};
std::vector<WalFileInfo> wal_files;
for (const auto &item : std::filesystem::directory_iterator(wal_directory, error_code)) {
for (const auto &item : std::filesystem::directory_iterator(wal_directory_, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadWalInfo(item.path());
@ -327,7 +363,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
repl_storage_state.epoch_.SetEpoch(std::move(wal_files.back().epoch_id));
}
auto maybe_wal_files = GetWalFiles(wal_directory, *uuid);
auto maybe_wal_files = GetWalFiles(wal_directory_, *uuid);
if (!maybe_wal_files) {
spdlog::warn(
utils::MessageWithLink("Couldn't get WAL file info from the WAL directory.", "https://memgr.ph/durability"));
@ -413,12 +449,10 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
spdlog::info("All necessary WAL files are loaded successfully.");
}
const auto par_exec_info =
config.durability.allow_parallel_index_creation && !recovery_info.vertex_batches.empty()
? std::make_optional(std::make_pair(recovery_info.vertex_batches, config.durability.recovery_thread_count))
: std::nullopt;
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, name_id_mapper, par_exec_info);
RecoverIndicesAndStats(indices_constraints.indices, indices, vertices, name_id_mapper,
GetParallelExecInfoIndices(recovery_info, config));
RecoverConstraints(indices_constraints.constraints, constraints, vertices, name_id_mapper,
GetParallelExecInfo(recovery_info, config));
memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());

View File

@ -23,6 +23,7 @@
#include "storage/v2/config.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/indices/indices.hpp"
@ -94,27 +95,50 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});
using ParallelizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
// Helper function used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// Helper function used to recover all discovered indices. The
// indices must be recovered after the data recovery is done
// to ensure that the indices consistent at the end of the
// recovery process.
/// @throw RecoveryFailure
void RecoverIndicesAndConstraints(
const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, Constraints *constraints,
void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadata &indices_metadata, Indices *indices,
utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info = std::nullopt);
const std::optional<ParallelizedSchemaCreationInfo> &parallel_exec_info = std::nullopt);
// Helper function used to recover all discovered constraints. The
// constraints must be recovered after the data recovery is done
// to ensure that the constraints are consistent at the end of the
// recovery process.
/// @throw RecoveryFailure
void RecoverConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata,
Constraints *constraints, utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
const std::optional<ParallelizedSchemaCreationInfo> &parallel_exec_info = std::nullopt);
std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfo(const RecoveryInfo &recovery_info,
const Config &config);
std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfoIndices(const RecoveryInfo &recovery_info,
const Config &config);
void RecoverExistenceConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &, Constraints *,
utils::SkipList<Vertex> *, NameIdMapper *,
const std::optional<ParallelizedSchemaCreationInfo> &);
void RecoverUniqueConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &, Constraints *,
utils::SkipList<Vertex> *, NameIdMapper *,
const std::optional<ParallelizedSchemaCreationInfo> &);
struct Recovery {
public:
/// Recovers data either from a snapshot and/or WAL files.
/// @throw RecoveryFailure
/// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
ReplicationStorageState &repl_storage_state, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, std::atomic<uint64_t> *edge_count,
NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints,
const Config &config, uint64_t *wal_seq_num);
std::optional<RecoveryInfo> RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num);
const std::filesystem::path snapshot_directory_;
const std::filesystem::path wal_directory_;
};
} // namespace memgraph::storage::durability

View File

@ -38,14 +38,14 @@ struct RecoveryInfo {
/// Structure used to track indices and constraints during recovery.
struct RecoveredIndicesAndConstraints {
struct {
struct IndicesMetadata {
std::vector<LabelId> label;
std::vector<std::pair<LabelId, PropertyId>> label_property;
std::vector<std::pair<LabelId, LabelIndexStats>> label_stats;
std::vector<std::pair<LabelId, std::pair<PropertyId, LabelPropertyIndexStats>>> label_property_stats;
} indices;
struct {
struct ConstraintsMetadata {
std::vector<std::pair<LabelId, PropertyId>> existence;
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
} constraints;

View File

@ -0,0 +1,23 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <utility>
#include <vector>
#include "storage/v2/id_types.hpp"
namespace memgraph::storage::durability {
struct ParallelizedSchemaCreationInfo {
std::vector<std::pair<Gid, uint64_t>> vertex_recovery_info;
uint64_t thread_count;
};
} // namespace memgraph::storage::durability

View File

@ -11,6 +11,7 @@
#include <thread>
#include "storage/v2/delta.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
@ -20,9 +21,6 @@
namespace memgraph::storage {
using ParallelizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
/// Traverses deltas visible from transaction with start timestamp greater than
/// the provided timestamp, and calls the provided callback function for each
/// delta. If the callback ever returns true, traversal is stopped and the
@ -259,11 +257,12 @@ inline void CreateIndexOnSingleThread(utils::SkipList<Vertex>::Accessor &vertice
template <typename TIndex, typename TIndexKey, typename TSKiplistIter, typename TFunc>
inline void CreateIndexOnMultipleThreads(utils::SkipList<Vertex>::Accessor &vertices, TSKiplistIter skiplist_iter,
TIndex &index, TIndexKey key,
const ParallelizedIndexCreationInfo &parallel_exec_info, const TFunc &func) {
const durability::ParallelizedSchemaCreationInfo &parallel_exec_info,
const TFunc &func) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
const auto &vertex_batches = parallel_exec_info.first;
const auto thread_count = std::min(parallel_exec_info.second, vertex_batches.size());
const auto &vertex_batches = parallel_exec_info.vertex_recovery_info;
const auto thread_count = std::min(parallel_exec_info.thread_count, vertex_batches.size());
MG_ASSERT(!vertex_batches.empty(),
"The size of batches should always be greater than zero if you want to use the parallel version of index "

View File

@ -26,8 +26,9 @@ void InMemoryLabelIndex::UpdateOnAddLabel(LabelId added_label, Vertex *vertex_af
acc.insert(Entry{vertex_after_update, tx.start_timestamp});
}
bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
bool InMemoryLabelIndex::CreateIndex(
LabelId label, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info) {
const auto create_index_seq = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
std::map<LabelId, utils::SkipList<Entry>>::iterator it) {
using IndexAccessor = decltype(it->second.access());
@ -42,7 +43,7 @@ bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Acc
const auto create_index_par = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
std::map<LabelId, utils::SkipList<Entry>>::iterator label_it,
const ParallelizedIndexCreationInfo &parallel_exec_info) {
const durability::ParallelizedSchemaCreationInfo &parallel_exec_info) {
using IndexAccessor = decltype(label_it->second.access());
CreateIndexOnMultipleThreads(vertices, label_it, index_, label, parallel_exec_info,

View File

@ -14,6 +14,7 @@
#include <span>
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/indices/label_index.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
#include "storage/v2/vertex.hpp"
@ -22,9 +23,6 @@
namespace memgraph::storage {
using ParallelizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
class InMemoryLabelIndex : public storage::LabelIndex {
private:
struct Entry {
@ -47,7 +45,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info);
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info);
/// Returns false if there was no index to drop
bool DropIndex(LabelId label) override;

View File

@ -35,9 +35,9 @@ bool InMemoryLabelPropertyIndex::Entry::operator<(const PropertyValue &rhs) cons
bool InMemoryLabelPropertyIndex::Entry::operator==(const PropertyValue &rhs) const { return value == rhs; }
bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info) {
bool InMemoryLabelPropertyIndex::CreateIndex(
LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info) {
spdlog::trace("Vertices size when creating index: {}", vertices.size());
auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator it) {
@ -54,7 +54,7 @@ bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
auto create_index_par =
[this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator label_property_it,
const ParallelizedIndexCreationInfo &parallel_exec_info) {
const durability::ParallelizedSchemaCreationInfo &parallel_exec_info) {
using IndexAccessor = decltype(label_property_it->second.access());
CreateIndexOnMultipleThreads(

View File

@ -14,6 +14,7 @@
#include <span>
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices/label_property_index.hpp"
#include "storage/v2/indices/label_property_index_stats.hpp"
@ -23,10 +24,6 @@
namespace memgraph::storage {
/// TODO: andi. Too many copies, extract at one place
using ParallelizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
private:
struct Entry {
@ -46,7 +43,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParallelizedIndexCreationInfo> &parallel_exec_info);
const std::optional<durability::ParallelizedSchemaCreationInfo> &parallel_exec_info);
/// @throw std::bad_alloc
void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) override;

View File

@ -134,7 +134,7 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
std::unique_lock transaction_guard(
storage->engine_lock_); // Hold the storage lock so the current wal file cannot be changed
(void)locker_acc.AddPath(storage->wal_directory_); // Protect all WALs from being deleted
(void)locker_acc.AddPath(storage->recovery_.wal_directory_); // Protect all WALs from being deleted
if (storage->wal_file_) {
current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber());
@ -146,20 +146,22 @@ std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileR
// Read in finalized WAL files (excluding the current/active WAL)
utils::OnScopeExit
release_wal_dir( // Each individually used file will be locked, so at the end, the dir can be released
[&locker_acc, &wal_dir = storage->wal_directory_]() { (void)locker_acc.RemovePath(wal_dir); });
[&locker_acc, &wal_dir = storage->recovery_.wal_directory_]() { (void)locker_acc.RemovePath(wal_dir); });
// Get WAL files, ordered by timestamp, from oldest to newest
auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num);
auto wal_files = durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_, current_wal_seq_num);
MG_ASSERT(wal_files, "Wal files could not be loaded");
if (transaction_guard.owns_lock())
transaction_guard.unlock(); // In case we didn't have a current wal file, we can unlock only now since there is no
// guarantee what we'll see after we add the wal file
// Read in snapshot files
(void)locker_acc.AddPath(storage->snapshot_directory_); // Protect all snapshots from being deleted
(void)locker_acc.AddPath(storage->recovery_.snapshot_directory_); // Protect all snapshots from being deleted
utils::OnScopeExit
release_snapshot_dir( // Each individually used file will be locked, so at the end, the dir can be released
[&locker_acc, &snapshot_dir = storage->snapshot_directory_]() { (void)locker_acc.RemovePath(snapshot_dir); });
auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_);
[&locker_acc, &snapshot_dir = storage->recovery_.snapshot_directory_]() {
(void)locker_acc.RemovePath(snapshot_dir);
});
auto snapshot_files = durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_);
std::optional<durability::SnapshotDurabilityInfo> latest_snapshot{};
if (!snapshot_files.empty()) {
latest_snapshot.emplace(std::move(snapshot_files.back()));

View File

@ -12,6 +12,7 @@
#include "storage/v2/inmemory/storage.hpp"
#include <algorithm>
#include <functional>
#include <optional>
#include "dbms/constants.hpp"
#include "memory/global_memory_control.hpp"
#include "storage/v2/durability/durability.hpp"
@ -65,9 +66,9 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
: Storage(config, storage_mode),
snapshot_directory_(config.durability.storage_directory / durability::kSnapshotDirectory),
recovery_{config.durability.storage_directory / durability::kSnapshotDirectory,
config.durability.storage_directory / durability::kWalDirectory},
lock_file_path_(config.durability.storage_directory / durability::kLockFile),
wal_directory_(config.durability.storage_directory / durability::kWalDirectory),
uuid_(utils::GenerateUUID()),
global_locker_(file_retainer_.AddLocker()) {
MG_ASSERT(storage_mode != StorageMode::ON_DISK_TRANSACTIONAL,
@ -78,9 +79,9 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
// permission errors. This is done early to crash the database on startup
// instead of crashing the database for the first time during runtime (which
// could be an unpleasant surprise).
utils::EnsureDirOrDie(snapshot_directory_);
utils::EnsureDirOrDie(recovery_.snapshot_directory_);
// Same reasoning as above.
utils::EnsureDirOrDie(wal_directory_);
utils::EnsureDirOrDie(recovery_.wal_directory_);
// Verify that the user that started the process is the same user that is
// the owner of the storage directory.
@ -98,9 +99,8 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
config_.durability.storage_directory);
}
if (config_.durability.recover_on_startup) {
auto info =
durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, repl_storage_state_, &vertices_, &edges_,
&edge_count_, name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_,
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
if (info) {
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
@ -114,8 +114,8 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
bool files_moved = false;
auto backup_root = config_.durability.storage_directory / durability::kBackupDirectory;
for (const auto &[path, dirname, what] :
{std::make_tuple(snapshot_directory_, durability::kSnapshotDirectory, "snapshot"),
std::make_tuple(wal_directory_, durability::kWalDirectory, "WAL")}) {
{std::make_tuple(recovery_.snapshot_directory_, durability::kSnapshotDirectory, "snapshot"),
std::make_tuple(recovery_.wal_directory_, durability::kWalDirectory, "WAL")}) {
if (!utils::DirExists(path)) continue;
auto backup_curr = backup_root / dirname;
std::error_code error_code;
@ -1200,8 +1200,8 @@ InMemoryStorage::InMemoryAccessor::CreateExistenceConstraint(LabelId label, Prop
if (existence_constraints->ConstraintExists(label, property)) {
return StorageExistenceConstraintDefinitionError{ConstraintDefinitionError{}};
}
if (auto violation =
ExistenceConstraints::ValidateVerticesOnConstraint(in_memory->vertices_.access(), label, property);
if (auto violation = ExistenceConstraints::ValidateVerticesOnConstraint(in_memory->vertices_.access(), label,
property, std::nullopt);
violation.has_value()) {
return StorageExistenceConstraintDefinitionError{violation.value()};
}
@ -1228,7 +1228,7 @@ InMemoryStorage::InMemoryAccessor::CreateUniqueConstraint(LabelId label, const s
auto *in_memory = static_cast<InMemoryStorage *>(storage_);
auto *mem_unique_constraints =
static_cast<InMemoryUniqueConstraints *>(in_memory->constraints_.unique_constraints_.get());
auto ret = mem_unique_constraints->CreateConstraint(label, properties, in_memory->vertices_.access());
auto ret = mem_unique_constraints->CreateConstraint(label, properties, in_memory->vertices_.access(), std::nullopt);
if (ret.HasError()) {
return StorageUniqueConstraintDefinitionError{ret.GetError()};
}
@ -1707,7 +1707,7 @@ bool InMemoryStorage::InitializeWalFile(memgraph::replication::ReplicationEpoch
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
return false;
if (!wal_file_) {
wal_file_.emplace(wal_directory_, uuid_, epoch.id(), config_.items, name_id_mapper_.get(), wal_seq_num_++,
wal_file_.emplace(recovery_.wal_directory_, uuid_, epoch.id(), config_.items, name_id_mapper_.get(), wal_seq_num_++,
&file_retainer_);
}
return true;
@ -2017,8 +2017,8 @@ utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::Create
auto snapshot_creator = [this, &epoch]() {
utils::Timer timer;
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_);
durability::CreateSnapshot(this, &transaction, snapshot_directory_, wal_directory_, &vertices_, &edges_, uuid_,
epoch, repl_storage_state_.history, &file_retainer_);
durability::CreateSnapshot(this, &transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_,
&edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);

View File

@ -345,6 +345,8 @@ class InMemoryStorage final : public Storage {
void SetStorageMode(StorageMode storage_mode);
const durability::Recovery &GetRecovery() const noexcept { return recovery_; }
private:
/// The force parameter determines the behaviour of the garbage collector.
/// If it's set to true, it will behave as a global operation, i.e. it can't
@ -397,10 +399,10 @@ class InMemoryStorage final : public Storage {
utils::SkipList<storage::Edge> edges_;
// Durability
std::filesystem::path snapshot_directory_;
durability::Recovery recovery_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
std::filesystem::path wal_directory_;
utils::Scheduler snapshot_runner_;
utils::SpinLock snapshot_lock_;

View File

@ -10,7 +10,13 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/unique_constraints.hpp"
#include <memory>
#include "storage/v2/constraints/constraint_violation.hpp"
#include "storage/v2/constraints/utils.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/id_types.hpp"
#include "utils/logging.hpp"
#include "utils/skip_list.hpp"
namespace memgraph::storage {
namespace {
@ -274,6 +280,75 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T
}
}
std::variant<InMemoryUniqueConstraints::MultipleThreadsConstraintValidation,
InMemoryUniqueConstraints::SingleThreadConstraintValidation>
InMemoryUniqueConstraints::GetCreationFunction(
const std::optional<durability::ParallelizedSchemaCreationInfo> &par_exec_info) {
if (par_exec_info.has_value()) {
return InMemoryUniqueConstraints::MultipleThreadsConstraintValidation{par_exec_info.value()};
}
return InMemoryUniqueConstraints::SingleThreadConstraintValidation{};
}
bool InMemoryUniqueConstraints::MultipleThreadsConstraintValidation::operator()(
const utils::SkipList<Vertex>::Accessor &vertex_accessor, utils::SkipList<Entry>::Accessor &constraint_accessor,
const LabelId &label, const std::set<PropertyId> &properties) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
const auto &vertex_batches = parallel_exec_info.vertex_recovery_info;
MG_ASSERT(!vertex_batches.empty(),
"The size of batches should always be greater than zero if you want to use the parallel version of index "
"creation!");
const auto thread_count = std::min(parallel_exec_info.thread_count, vertex_batches.size());
std::atomic<uint64_t> batch_counter = 0;
memgraph::utils::Synchronized<std::optional<ConstraintViolation>, utils::RWSpinLock> has_error;
{
std::vector<std::jthread> threads;
threads.reserve(thread_count);
for (auto i{0U}; i < thread_count; ++i) {
threads.emplace_back(
[&has_error, &vertex_batches, &batch_counter, &vertex_accessor, &constraint_accessor, &label, &properties]() {
do_per_thread_validation(has_error, DoValidate, vertex_batches, batch_counter, vertex_accessor,
constraint_accessor, label, properties);
});
}
}
return has_error.Lock()->has_value();
}
bool InMemoryUniqueConstraints::SingleThreadConstraintValidation::operator()(
const utils::SkipList<Vertex>::Accessor &vertex_accessor, utils::SkipList<Entry>::Accessor &constraint_accessor,
const LabelId &label, const std::set<PropertyId> &properties) {
for (const Vertex &vertex : vertex_accessor) {
if (const auto violation = DoValidate(vertex, constraint_accessor, label, properties); violation.has_value()) {
return true;
}
}
return false;
}
std::optional<ConstraintViolation> InMemoryUniqueConstraints::DoValidate(
const Vertex &vertex, utils::SkipList<Entry>::Accessor &constraint_accessor, const LabelId &label,
const std::set<PropertyId> &properties) {
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
return std::nullopt;
}
auto values = vertex.properties.ExtractPropertyValues(properties);
if (!values) {
return std::nullopt;
}
// Check whether there already is a vertex with the same values for the
// given label and property.
auto it = constraint_accessor.find_equal_or_greater(*values);
if (it != constraint_accessor.end() && it->values == *values) {
return ConstraintViolation{ConstraintViolation::Type::UNIQUE, label, properties};
}
constraint_accessor.insert(Entry{std::move(*values), &vertex, 0});
return std::nullopt;
}
void InMemoryUniqueConstraints::AbortEntries(std::span<Vertex const *const> vertices, uint64_t exact_start_timestamp) {
for (const auto &vertex : vertices) {
for (const auto &label : vertex->labels) {
@ -297,8 +372,9 @@ void InMemoryUniqueConstraints::AbortEntries(std::span<Vertex const *const> vert
}
utils::BasicResult<ConstraintViolation, InMemoryUniqueConstraints::CreationStatus>
InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set<PropertyId> &properties,
utils::SkipList<Vertex>::Accessor vertices) {
InMemoryUniqueConstraints::CreateConstraint(
LabelId label, const std::set<PropertyId> &properties, const utils::SkipList<Vertex>::Accessor &vertex_accessor,
const std::optional<durability::ParallelizedSchemaCreationInfo> &par_exec_info) {
if (properties.empty()) {
return CreationStatus::EMPTY_PROPERTIES;
}
@ -306,49 +382,28 @@ InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set<Proper
return CreationStatus::PROPERTIES_SIZE_LIMIT_EXCEEDED;
}
auto [constraint, emplaced] =
constraints_.emplace(std::piecewise_construct, std::forward_as_tuple(label, properties), std::forward_as_tuple());
if (!emplaced) {
// Constraint already exists.
if (constraints_.contains({label, properties})) {
return CreationStatus::ALREADY_EXISTS;
}
memgraph::utils::SkipList<Entry> constraints_skip_list;
utils::SkipList<Entry>::Accessor constraint_accessor{constraints_skip_list.access()};
bool violation_found = false;
auto multi_single_thread_processing = GetCreationFunction(par_exec_info);
{
auto acc = constraint->second.access();
for (const Vertex &vertex : vertices) {
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
continue;
}
auto values = vertex.properties.ExtractPropertyValues(properties);
if (!values) {
continue;
}
// Check whether there already is a vertex with the same values for the
// given label and property.
auto it = acc.find_equal_or_greater(*values);
if (it != acc.end() && it->values == *values) {
violation_found = true;
break;
}
acc.insert(Entry{std::move(*values), &vertex, 0});
}
}
bool violation_found = std::visit(
[&vertex_accessor, &constraint_accessor, &label, &properties](auto &multi_single_thread_processing) {
return multi_single_thread_processing(vertex_accessor, constraint_accessor, label, properties);
},
multi_single_thread_processing);
if (violation_found) {
// In the case of the violation, storage for the current constraint has to
// be removed.
constraints_.erase(constraint);
return ConstraintViolation{ConstraintViolation::Type::UNIQUE, label, properties};
}
auto [it, _] = constraints_.emplace(std::make_pair(label, properties), std::move(constraints_skip_list));
// Add the new constraint to the optimized structure only if there are no violations.
constraints_by_label_[label].insert({properties, &constraints_.at({label, properties})});
constraints_by_label_[label].insert({properties, &it->second});
return CreationStatus::SUCCESS;
}

View File

@ -11,9 +11,17 @@
#pragma once
#include <optional>
#include <span>
#include <thread>
#include <variant>
#include "storage/v2/constraints/constraint_violation.hpp"
#include "storage/v2/constraints/unique_constraints.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/id_types.hpp"
#include "utils/logging.hpp"
#include "utils/rw_spin_lock.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::storage {
@ -46,7 +54,24 @@ class InMemoryUniqueConstraints : public UniqueConstraints {
bool operator==(const std::vector<PropertyValue> &rhs) const;
};
static std::optional<ConstraintViolation> DoValidate(const Vertex &vertex,
utils::SkipList<Entry>::Accessor &constraint_accessor,
const LabelId &label, const std::set<PropertyId> &properties);
public:
struct MultipleThreadsConstraintValidation {
bool operator()(const utils::SkipList<Vertex>::Accessor &vertex_accessor,
utils::SkipList<Entry>::Accessor &constraint_accessor, const LabelId &label,
const std::set<PropertyId> &properties);
const durability::ParallelizedSchemaCreationInfo &parallel_exec_info;
};
struct SingleThreadConstraintValidation {
bool operator()(const utils::SkipList<Vertex>::Accessor &vertex_accessor,
utils::SkipList<Entry>::Accessor &constraint_accessor, const LabelId &label,
const std::set<PropertyId> &properties);
};
/// Indexes the given vertex for relevant labels and properties.
/// This method should be called before committing and validating vertices
/// against unique constraints.
@ -67,9 +92,9 @@ class InMemoryUniqueConstraints : public UniqueConstraints {
/// exceeds the maximum allowed number of properties, and
/// `CreationStatus::SUCCESS` on success.
/// @throw std::bad_alloc
utils::BasicResult<ConstraintViolation, CreationStatus> CreateConstraint(LabelId label,
const std::set<PropertyId> &properties,
utils::SkipList<Vertex>::Accessor vertices);
utils::BasicResult<ConstraintViolation, CreationStatus> CreateConstraint(
LabelId label, const std::set<PropertyId> &properties, const utils::SkipList<Vertex>::Accessor &vertex_accessor,
const std::optional<durability::ParallelizedSchemaCreationInfo> &par_exec_info);
/// Deletes the specified constraint. Returns `DeletionStatus::NOT_FOUND` if
/// there is not such constraint in the storage,
@ -101,6 +126,9 @@ class InMemoryUniqueConstraints : public UniqueConstraints {
void Clear() override;
static std::variant<MultipleThreadsConstraintValidation, SingleThreadConstraintValidation> GetCreationFunction(
const std::optional<durability::ParallelizedSchemaCreationInfo> &);
private:
std::map<std::pair<LabelId, std::set<PropertyId>>, utils::SkipList<Entry>> constraints_;
std::map<LabelId, std::map<std::set<PropertyId>, utils::SkipList<Entry> *>> constraints_by_label_;

View File

@ -31,7 +31,6 @@ def test_does_default_config_match():
use the DEFINE_HIDDEN_* macro instead of DEFINE_* to
prevent SHOW CONFIG from returning it.
"""
assert len(config) == len(default_config.startup_config_dict), define_msg
for flag in config:
@ -46,7 +45,6 @@ def test_does_default_config_match():
]
if flag_name in machine_dependent_configurations:
continue
# default_value
assert default_config.startup_config_dict[flag_name][0] == flag[1]
# current_value

View File

@ -115,6 +115,11 @@ startup_config_dict = {
"false",
"Controls whether the index creation can be done in a multithreaded fashion.",
),
"storage_parallel_schema_recovery": (
"false",
"false",
"Controls whether the indices and constraints creation can be done in a multithreaded fashion.",
),
"storage_enable_schema_metadata": (
"false",
"false",

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gmock/gmock-generated-matchers.h>
#include <gmock/gmock.h>
#include <gtest/gtest-death-test.h>
#include <gtest/gtest.h>
@ -19,13 +20,18 @@
#include <algorithm>
#include <chrono>
#include <csignal>
#include <cstdint>
#include <filesystem>
#include <iostream>
#include <thread>
#include <type_traits>
#include "dbms/database.hpp"
#include "replication/state.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/constraints/existence_constraints.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/marker.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
@ -34,10 +40,13 @@
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file.hpp"
#include "utils/logging.hpp"
#include "utils/timer.hpp"
#include "utils/uuid.hpp"
using testing::Contains;
using testing::UnorderedElementsAre;
@ -2703,3 +2712,113 @@ TEST_P(DurabilityTest, SnapshotAndWalMixedUUID) {
ASSERT_FALSE(acc->Commit().HasError());
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(DurabilityTest, ParallelConstraintsRecovery) {
// Create snapshot.
{
memgraph::storage::Config config{
.items = {.properties_on_edges = GetParam()},
.durability = {.storage_directory = storage_directory, .snapshot_on_exit = true, .items_per_batch = 13}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
CreateBaseDataset(db.storage(), GetParam());
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
CreateExtendedDataset(db.storage());
VerifyDataset(db.storage(), DatasetType::BASE_WITH_EXTENDED, GetParam());
}
ASSERT_EQ(GetSnapshotsList().size(), 1);
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
ASSERT_EQ(GetWalsList().size(), 0);
ASSERT_EQ(GetBackupWalsList().size(), 0);
// Recover snapshot.
memgraph::storage::Config config{.items = {.properties_on_edges = GetParam()},
.durability = {.storage_directory = storage_directory,
.recover_on_startup = true,
.snapshot_on_exit = false,
.items_per_batch = 13,
.allow_parallel_index_creation = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
VerifyDataset(db.storage(), DatasetType::BASE_WITH_EXTENDED, GetParam());
{
auto acc = db.storage()->Access();
auto vertex = acc->CreateVertex();
auto edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et"));
ASSERT_TRUE(edge.HasValue());
ASSERT_FALSE(acc->Commit().HasError());
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
memgraph::storage::Config config{.items = {.properties_on_edges = GetParam()},
.durability = {.storage_directory = storage_directory,
.recover_on_startup = true,
.snapshot_on_exit = false,
.items_per_batch = 13,
.allow_parallel_schema_creation = true}};
// Create snapshot.
{
config.durability.recover_on_startup = false;
config.durability.snapshot_on_exit = true;
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
CreateBaseDataset(db.storage(), GetParam());
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
CreateExtendedDataset(db.storage());
VerifyDataset(db.storage(), DatasetType::BASE_WITH_EXTENDED, GetParam());
}
ASSERT_EQ(GetSnapshotsList().size(), 1);
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
ASSERT_EQ(GetWalsList().size(), 0);
ASSERT_EQ(GetBackupWalsList().size(), 0);
config.durability.recover_on_startup = true;
config.durability.snapshot_on_exit = false;
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::utils::SkipList<memgraph::storage::Vertex> vertices;
memgraph::utils::SkipList<memgraph::storage::Edge> edges;
std::unique_ptr<memgraph::storage::NameIdMapper> name_id_mapper = std::make_unique<memgraph::storage::NameIdMapper>();
std::atomic<uint64_t> edge_count{0};
uint64_t wal_seq_num{0};
std::string uuid{memgraph::utils::GenerateUUID()};
memgraph::storage::Indices indices{config, memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL};
memgraph::storage::Constraints constraints{config, memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL};
memgraph::storage::ReplicationStorageState repl_storage_state;
memgraph::storage::durability::Recovery recovery{
config.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory,
config.durability.storage_directory / memgraph::storage::durability::kWalDirectory};
// Recover snapshot.
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count,
name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num);
MG_ASSERT(info.has_value(), "Info doesn't have value present");
const auto par_exec_info = memgraph::storage::durability::GetParallelExecInfo(*info, config);
MG_ASSERT(par_exec_info.has_value(), "Parallel exec info should have value present");
// Unique constraint choose function
auto *mem_unique_constraints =
static_cast<memgraph::storage::InMemoryUniqueConstraints *>(constraints.unique_constraints_.get());
auto variant_unique_constraint_creation_func = mem_unique_constraints->GetCreationFunction(par_exec_info);
const auto *pval = std::get_if<memgraph::storage::InMemoryUniqueConstraints::MultipleThreadsConstraintValidation>(
&variant_unique_constraint_creation_func);
MG_ASSERT(pval, "Chose wrong function for recovery of data");
// Existence constraint choose function
auto *mem_existence_constraint =
static_cast<memgraph::storage::ExistenceConstraints *>(constraints.existence_constraints_.get());
auto variant_existence_constraint_creation_func = mem_existence_constraint->GetCreationFunction(par_exec_info);
const auto *pval_existence =
std::get_if<memgraph::storage::ExistenceConstraints::MultipleThreadsConstraintValidation>(
&variant_existence_constraint_creation_func);
MG_ASSERT(pval_existence, "Chose wrong type of function for recovery of existence constraint data");
}