From 74fa6d21f61b89a68f6d1daf3e1303e76faa1224 Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Mon, 4 Dec 2023 21:56:05 +0100 Subject: [PATCH] Implement parallel constraints recovery (#1545) --- config/flags.yaml | 4 + src/dbms/inmemory/replication_handlers.cpp | 18 ++- src/flags/general.cpp | 14 +- src/flags/general.hpp | 3 + src/memgraph.cpp | 4 +- src/storage/v2/config.hpp | 3 + .../v2/constraints/existence_constraints.cpp | 69 ++++++++- .../v2/constraints/existence_constraints.hpp | 44 +++--- src/storage/v2/constraints/utils.hpp | 42 +++++ .../v2/disk/edge_import_mode_cache.cpp | 9 +- .../v2/disk/edge_import_mode_cache.hpp | 5 +- src/storage/v2/disk/label_property_index.hpp | 4 - src/storage/v2/durability/durability.cpp | 144 +++++++++++------- src/storage/v2/durability/durability.hpp | 60 +++++--- src/storage/v2/durability/metadata.hpp | 4 +- src/storage/v2/durability/recovery_type.hpp | 23 +++ src/storage/v2/indices/indices_utils.hpp | 11 +- src/storage/v2/inmemory/label_index.cpp | 7 +- src/storage/v2/inmemory/label_index.hpp | 6 +- .../v2/inmemory/label_property_index.cpp | 8 +- .../v2/inmemory/label_property_index.hpp | 7 +- .../v2/inmemory/replication/recovery.cpp | 16 +- src/storage/v2/inmemory/storage.cpp | 30 ++-- src/storage/v2/inmemory/storage.hpp | 6 +- .../v2/inmemory/unique_constraints.cpp | 127 ++++++++++----- .../v2/inmemory/unique_constraints.hpp | 36 ++++- .../e2e/configuration/configuration_check.py | 2 - tests/e2e/configuration/default_config.py | 5 + tests/unit/storage_v2_durability_inmemory.cpp | 119 +++++++++++++++ 29 files changed, 628 insertions(+), 202 deletions(-) create mode 100644 src/storage/v2/constraints/utils.hpp create mode 100644 src/storage/v2/durability/recovery_type.hpp diff --git a/config/flags.yaml b/config/flags.yaml index cd3eee160..b551f90e4 100644 --- a/config/flags.yaml +++ b/config/flags.yaml @@ -111,6 +111,10 @@ modifications: value: "false" override: true + - name: "storage_parallel_schema_recovery" + value: "false" + override: true + - name: "storage_enable_schema_metadata" value: "false" override: true diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index a5f56ee3d..5eba61878 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #include "dbms/inmemory/replication_handlers.hpp" +#include #include "dbms/constants.hpp" #include "dbms/dbms_handler.hpp" #include "replication/replication_server.hpp" @@ -187,9 +188,9 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle storage::replication::Decoder decoder(req_reader); auto *storage = static_cast(db_acc->get()->storage()); - utils::EnsureDirOrDie(storage->snapshot_directory_); + utils::EnsureDirOrDie(storage->recovery_.snapshot_directory_); - const auto maybe_snapshot_path = decoder.ReadFile(storage->snapshot_directory_); + const auto maybe_snapshot_path = decoder.ReadFile(storage->recovery_.snapshot_directory_); MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!"); spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path); @@ -219,7 +220,10 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle storage->timestamp_ = std::max(storage->timestamp_, recovery_info.next_timestamp); spdlog::trace("Recovering indices and constraints from snapshot."); - storage::durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage->indices_, + memgraph::storage::durability::RecoverIndicesAndStats(recovered_snapshot.indices_constraints.indices, + &storage->indices_, &storage->vertices_, + storage->name_id_mapper_.get()); + memgraph::storage::durability::RecoverConstraints(recovered_snapshot.indices_constraints.constraints, &storage->constraints_, &storage->vertices_, storage->name_id_mapper_.get()); } catch (const storage::durability::RecoveryFailure &e) { @@ -233,7 +237,7 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle spdlog::trace("Deleting old snapshot files due to snapshot recovery."); // Delete other durability files - auto snapshot_files = storage::durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_); + auto snapshot_files = storage::durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_); for (const auto &[path, uuid, _] : snapshot_files) { if (path != *maybe_snapshot_path) { spdlog::trace("Deleting snapshot file {}", path); @@ -242,7 +246,7 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle } spdlog::trace("Deleting old WAL files due to snapshot recovery."); - auto wal_files = storage::durability::GetWalFiles(storage->wal_directory_, storage->uuid_); + auto wal_files = storage::durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_); if (wal_files) { for (const auto &wal_file : *wal_files) { spdlog::trace("Deleting WAL file {}", wal_file.path); @@ -267,7 +271,7 @@ void InMemoryReplicationHandlers::WalFilesHandler(dbms::DbmsHandler *dbms_handle storage::replication::Decoder decoder(req_reader); auto *storage = static_cast(db_acc->get()->storage()); - utils::EnsureDirOrDie(storage->wal_directory_); + utils::EnsureDirOrDie(storage->recovery_.wal_directory_); for (auto i = 0; i < wal_file_number; ++i) { LoadWal(storage, &decoder); @@ -289,7 +293,7 @@ void InMemoryReplicationHandlers::CurrentWalHandler(dbms::DbmsHandler *dbms_hand storage::replication::Decoder decoder(req_reader); auto *storage = static_cast(db_acc->get()->storage()); - utils::EnsureDirOrDie(storage->wal_directory_); + utils::EnsureDirOrDie(storage->recovery_.wal_directory_); LoadWal(storage, &decoder); diff --git a/src/flags/general.cpp b/src/flags/general.cpp index 5a28bf16d..6bee2e5b3 100644 --- a/src/flags/general.cpp +++ b/src/flags/general.cpp @@ -104,9 +104,19 @@ DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creat DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch, "The number of edges and vertices stored in a batch in a snapshot file."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables,misc-unused-parameters) +DEFINE_VALIDATED_bool( + storage_parallel_index_recovery, false, + "Controls whether the index creation can be done in a multithreaded fashion.", { + spdlog::warn( + "storage_parallel_index_recovery flag is deprecated. Check storage_mode_parallel_schema_recovery for more " + "details."); + return true; + }); + // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_parallel_index_recovery, false, - "Controls whether the index creation can be done in a multithreaded fashion."); +DEFINE_bool(storage_parallel_schema_recovery, false, + "Controls whether the indices and constraints creation can be done in a multithreaded fashion."); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint64(storage_recovery_thread_count, diff --git a/src/flags/general.hpp b/src/flags/general.hpp index eba59228b..890f32cd6 100644 --- a/src/flags/general.hpp +++ b/src/flags/general.hpp @@ -73,9 +73,12 @@ DECLARE_uint64(storage_wal_file_flush_every_n_tx); DECLARE_bool(storage_snapshot_on_exit); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint64(storage_items_per_batch); +// storage_parallel_index_recovery deprecated; use storage_parallel_schema_recovery instead // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_bool(storage_parallel_index_recovery); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_parallel_schema_recovery); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint64(storage_recovery_thread_count); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_bool(storage_enable_schema_metadata); diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 00a159aa5..057b30982 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -305,7 +305,9 @@ int main(int argc, char **argv) { .restore_replication_state_on_startup = FLAGS_replication_restore_state_on_startup, .items_per_batch = FLAGS_storage_items_per_batch, .recovery_thread_count = FLAGS_storage_recovery_thread_count, - .allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery}, + // deprecated + .allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery, + .allow_parallel_schema_creation = FLAGS_storage_parallel_schema_recovery}, .transaction = {.isolation_level = memgraph::flags::ParseIsolationLevel()}, .disk = {.main_storage_directory = FLAGS_data_directory + "/rocksdb_main_storage", .label_index_directory = FLAGS_data_directory + "/rocksdb_label_index", diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index 4f333b5d9..dee2afe87 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -65,7 +65,10 @@ struct Config { uint64_t items_per_batch{1'000'000}; uint64_t recovery_thread_count{8}; + // deprecated bool allow_parallel_index_creation{false}; + + bool allow_parallel_schema_creation{false}; friend bool operator==(const Durability &lrh, const Durability &rhs) = default; } durability; diff --git a/src/storage/v2/constraints/existence_constraints.cpp b/src/storage/v2/constraints/existence_constraints.cpp index a0d303c03..956e0a208 100644 --- a/src/storage/v2/constraints/existence_constraints.cpp +++ b/src/storage/v2/constraints/existence_constraints.cpp @@ -11,10 +11,11 @@ #include "storage/v2/constraints/existence_constraints.hpp" #include "storage/v2/constraints/constraints.hpp" +#include "storage/v2/constraints/utils.hpp" #include "storage/v2/id_types.hpp" #include "storage/v2/mvcc.hpp" #include "utils/logging.hpp" - +#include "utils/rw_spin_lock.hpp" namespace memgraph::storage { bool ExistenceConstraints::ConstraintExists(LabelId label, PropertyId property) const { @@ -55,4 +56,70 @@ void ExistenceConstraints::LoadExistenceConstraints(const std::vector ExistenceConstraints::ValidateVertexOnConstraint( + const Vertex &vertex, const LabelId &label, const PropertyId &property) { + if (!vertex.deleted && utils::Contains(vertex.labels, label) && !vertex.properties.HasProperty(property)) { + return ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label, std::set{property}}; + } + return std::nullopt; +} + +std::variant +ExistenceConstraints::GetCreationFunction( + const std::optional &par_exec_info) { + if (par_exec_info.has_value()) { + return ExistenceConstraints::MultipleThreadsConstraintValidation{par_exec_info.value()}; + } + return ExistenceConstraints::SingleThreadConstraintValidation{}; +} + +[[nodiscard]] std::optional ExistenceConstraints::ValidateVerticesOnConstraint( + utils::SkipList::Accessor vertices, LabelId label, PropertyId property, + const std::optional ¶llel_exec_info) { + auto calling_existence_validation_function = GetCreationFunction(parallel_exec_info); + return std::visit( + [&vertices, &label, &property](auto &calling_object) { return calling_object(vertices, label, property); }, + calling_existence_validation_function); +} + +std::optional ExistenceConstraints::MultipleThreadsConstraintValidation::operator()( + const utils::SkipList::Accessor &vertices, const LabelId &label, const PropertyId &property) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; + + const auto &vertex_batches = parallel_exec_info.vertex_recovery_info; + MG_ASSERT(!vertex_batches.empty(), + "The size of batches should always be greater than zero if you want to use the parallel version of index " + "creation!"); + const auto thread_count = std::min(parallel_exec_info.thread_count, vertex_batches.size()); + + std::atomic batch_counter = 0; + memgraph::utils::Synchronized, utils::RWSpinLock> maybe_error{}; + { + std::vector threads; + threads.reserve(thread_count); + + for (auto i{0U}; i < thread_count; ++i) { + threads.emplace_back([&maybe_error, &vertex_batches, &batch_counter, &vertices, &label, &property]() { + do_per_thread_validation(maybe_error, ValidateVertexOnConstraint, vertex_batches, batch_counter, vertices, + label, property); + }); + } + } + if (maybe_error.Lock()->has_value()) { + return maybe_error->value(); + } + return std::nullopt; +} + +std::optional ExistenceConstraints::SingleThreadConstraintValidation::operator()( + const utils::SkipList::Accessor &vertices, const LabelId &label, const PropertyId &property) { + for (const Vertex &vertex : vertices) { + if (auto violation = ValidateVertexOnConstraint(vertex, label, property); violation.has_value()) { + return violation; + } + } + return std::nullopt; +} + } // namespace memgraph::storage diff --git a/src/storage/v2/constraints/existence_constraints.hpp b/src/storage/v2/constraints/existence_constraints.hpp index 77f7bc43a..c3b68828a 100644 --- a/src/storage/v2/constraints/existence_constraints.hpp +++ b/src/storage/v2/constraints/existence_constraints.hpp @@ -11,34 +11,45 @@ #pragma once +#include #include +#include +#include #include "storage/v2/constraints/constraint_violation.hpp" +#include "storage/v2/durability/recovery_type.hpp" #include "storage/v2/vertex.hpp" #include "utils/skip_list.hpp" +#include "utils/synchronized.hpp" namespace memgraph::storage { class ExistenceConstraints { + private: + std::vector> constraints_; + public: + struct MultipleThreadsConstraintValidation { + std::optional operator()(const utils::SkipList::Accessor &vertices, + const LabelId &label, const PropertyId &property); + + const durability::ParallelizedSchemaCreationInfo ¶llel_exec_info; + }; + struct SingleThreadConstraintValidation { + std::optional operator()(const utils::SkipList::Accessor &vertices, + const LabelId &label, const PropertyId &property); + }; + [[nodiscard]] static std::optional ValidateVertexOnConstraint(const Vertex &vertex, - LabelId label, - PropertyId property) { - if (!vertex.deleted && utils::Contains(vertex.labels, label) && !vertex.properties.HasProperty(property)) { - return ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label, std::set{property}}; - } - return std::nullopt; - } + const LabelId &label, + const PropertyId &property); [[nodiscard]] static std::optional ValidateVerticesOnConstraint( - utils::SkipList::Accessor vertices, LabelId label, PropertyId property) { - for (const auto &vertex : vertices) { - if (auto violation = ValidateVertexOnConstraint(vertex, label, property); violation.has_value()) { - return violation; - } - } - return std::nullopt; - } + utils::SkipList::Accessor vertices, LabelId label, PropertyId property, + const std::optional ¶llel_exec_info = std::nullopt); + + static std::variant GetCreationFunction( + const std::optional &); bool ConstraintExists(LabelId label, PropertyId property) const; @@ -54,9 +65,6 @@ class ExistenceConstraints { std::vector> ListConstraints() const; void LoadExistenceConstraints(const std::vector &keys); - - private: - std::vector> constraints_; }; } // namespace memgraph::storage diff --git a/src/storage/v2/constraints/utils.hpp b/src/storage/v2/constraints/utils.hpp new file mode 100644 index 000000000..ca48708ff --- /dev/null +++ b/src/storage/v2/constraints/utils.hpp @@ -0,0 +1,42 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include "storage/v2/vertex.hpp" +#include "utils/skip_list.hpp" + +namespace memgraph::storage { +template +void do_per_thread_validation(ErrorType &maybe_error, Func &&func, + const std::vector> &vertex_batches, + std::atomic &batch_counter, + const memgraph::utils::SkipList::Accessor &vertices, + Args &&...args) { + while (!maybe_error.ReadLock()->has_value()) { + const auto batch_index = batch_counter.fetch_add(1, std::memory_order_acquire); + if (batch_index >= vertex_batches.size()) { + return; + } + const auto &[gid_start, batch_size] = vertex_batches[batch_index]; + + auto vertex_curr = vertices.find(gid_start); + DMG_ASSERT(vertex_curr != vertices.end(), "No vertex was found with given gid"); + for (auto i{0U}; i < batch_size; ++i, ++vertex_curr) { + const auto violation = func(*vertex_curr, std::forward(args)...); + if (!violation.has_value()) [[likely]] { + continue; + } + maybe_error.WithLock([&violation](auto &maybe_error) { maybe_error = *violation; }); + break; + } + } +} +} // namespace memgraph::storage diff --git a/src/storage/v2/disk/edge_import_mode_cache.cpp b/src/storage/v2/disk/edge_import_mode_cache.cpp index cd1ca0dc2..b6621281f 100644 --- a/src/storage/v2/disk/edge_import_mode_cache.cpp +++ b/src/storage/v2/disk/edge_import_mode_cache.cpp @@ -43,8 +43,9 @@ InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices( storage, transaction); } -bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property, - const std::optional ¶llel_exec_info) { +bool EdgeImportModeCache::CreateIndex( + LabelId label, PropertyId property, + const std::optional ¶llel_exec_info) { auto *mem_label_property_index = static_cast(in_memory_indices_.label_property_index_.get()); bool res = mem_label_property_index->CreateIndex(label, property, vertices_.access(), parallel_exec_info); @@ -54,8 +55,8 @@ bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property, return res; } -bool EdgeImportModeCache::CreateIndex(LabelId label, - const std::optional ¶llel_exec_info) { +bool EdgeImportModeCache::CreateIndex( + LabelId label, const std::optional ¶llel_exec_info) { auto *mem_label_index = static_cast(in_memory_indices_.label_index_.get()); bool res = mem_label_index->CreateIndex(label, vertices_.access(), parallel_exec_info); if (res) { diff --git a/src/storage/v2/disk/edge_import_mode_cache.hpp b/src/storage/v2/disk/edge_import_mode_cache.hpp index 78ffcff59..02af960d5 100644 --- a/src/storage/v2/disk/edge_import_mode_cache.hpp +++ b/src/storage/v2/disk/edge_import_mode_cache.hpp @@ -42,9 +42,10 @@ class EdgeImportModeCache final { View view, Storage *storage, Transaction *transaction) const; bool CreateIndex(LabelId label, PropertyId property, - const std::optional ¶llel_exec_info = {}); + const std::optional ¶llel_exec_info = {}); - bool CreateIndex(LabelId label, const std::optional ¶llel_exec_info = {}); + bool CreateIndex(LabelId label, + const std::optional ¶llel_exec_info = {}); bool VerticesWithLabelPropertyScanned(LabelId label, PropertyId property) const; diff --git a/src/storage/v2/disk/label_property_index.hpp b/src/storage/v2/disk/label_property_index.hpp index 26f972d79..a6842200f 100644 --- a/src/storage/v2/disk/label_property_index.hpp +++ b/src/storage/v2/disk/label_property_index.hpp @@ -17,10 +17,6 @@ namespace memgraph::storage { -/// TODO: andi. Too many copies, extract at one place -using ParallelizedIndexCreationInfo = - std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; - class DiskLabelPropertyIndex : public storage::LabelPropertyIndex { public: explicit DiskLabelPropertyIndex(const Config &config); diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 9a43a2876..6a89b7b5a 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -9,8 +9,6 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "storage/v2/durability/durability.hpp" - #include #include #include @@ -20,23 +18,29 @@ #include #include +#include #include #include #include +#include "flags/all.hpp" +#include "gflags/gflags.h" #include "replication/epoch.hpp" +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/durability/metadata.hpp" #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/wal.hpp" #include "storage/v2/inmemory/label_index.hpp" #include "storage/v2/inmemory/label_property_index.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" +#include "storage/v2/name_id_mapper.hpp" #include "utils/event_histogram.hpp" +#include "utils/flag_validation.hpp" #include "utils/logging.hpp" #include "utils/memory_tracker.hpp" #include "utils/message.hpp" #include "utils/timer.hpp" - namespace memgraph::metrics { extern const Event SnapshotRecoveryLatency_us; } // namespace memgraph::metrics @@ -134,15 +138,23 @@ std::optional> GetWalFiles(const std::filesystem: // indices and constraints must be recovered after the data recovery is done // to ensure that the indices and constraints are consistent at the end of the // recovery process. -void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, - Constraints *constraints, utils::SkipList *vertices, - NameIdMapper *name_id_mapper, - const std::optional ¶llel_exec_info) { + +void RecoverConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata, + Constraints *constraints, utils::SkipList *vertices, NameIdMapper *name_id_mapper, + const std::optional ¶llel_exec_info) { + RecoverExistenceConstraints(constraints_metadata, constraints, vertices, name_id_mapper, parallel_exec_info); + RecoverUniqueConstraints(constraints_metadata, constraints, vertices, name_id_mapper, parallel_exec_info); +} + +void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadata &indices_metadata, Indices *indices, + utils::SkipList *vertices, NameIdMapper *name_id_mapper, + const std::optional ¶llel_exec_info) { spdlog::info("Recreating indices from metadata."); + // Recover label indices. - spdlog::info("Recreating {} label indices from metadata.", indices_constraints.indices.label.size()); + spdlog::info("Recreating {} label indices from metadata.", indices_metadata.label.size()); auto *mem_label_index = static_cast(indices->label_index_.get()); - for (const auto &item : indices_constraints.indices.label) { + for (const auto &item : indices_metadata.label) { if (!mem_label_index->CreateIndex(item, vertices->access(), parallel_exec_info)) { throw RecoveryFailure("The label index must be created here!"); } @@ -151,9 +163,10 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ spdlog::info("Label indices are recreated."); spdlog::info("Recreating index statistics from metadata."); + // Recover label indices statistics. - spdlog::info("Recreating {} label index statistics from metadata.", indices_constraints.indices.label_stats.size()); - for (const auto &item : indices_constraints.indices.label_stats) { + spdlog::info("Recreating {} label index statistics from metadata.", indices_metadata.label_stats.size()); + for (const auto &item : indices_metadata.label_stats) { mem_label_index->SetIndexStats(item.first, item.second); spdlog::info("Statistics for index on :{} are recreated from metadata", name_id_mapper->IdToName(item.first.AsUint())); @@ -161,10 +174,9 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ spdlog::info("Label indices statistics are recreated."); // Recover label+property indices. - spdlog::info("Recreating {} label+property indices from metadata.", - indices_constraints.indices.label_property.size()); + spdlog::info("Recreating {} label+property indices from metadata.", indices_metadata.label_property.size()); auto *mem_label_property_index = static_cast(indices->label_property_index_.get()); - for (const auto &item : indices_constraints.indices.label_property) { + for (const auto &item : indices_metadata.label_property) { if (!mem_label_property_index->CreateIndex(item.first, item.second, vertices->access(), parallel_exec_info)) throw RecoveryFailure("The label+property index must be created here!"); spdlog::info("Index on :{}({}) is recreated from metadata", name_id_mapper->IdToName(item.first.AsUint()), @@ -174,8 +186,8 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ // Recover label+property indices statistics. spdlog::info("Recreating {} label+property indices statistics from metadata.", - indices_constraints.indices.label_property_stats.size()); - for (const auto &item : indices_constraints.indices.label_property_stats) { + indices_metadata.label_property_stats.size()); + for (const auto &item : indices_metadata.label_property_stats) { const auto label_id = item.first; const auto property_id = item.second.first; const auto &stats = item.second.second; @@ -188,14 +200,20 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ spdlog::info("Indices are recreated."); spdlog::info("Recreating constraints from metadata."); - // Recover existence constraints. - spdlog::info("Recreating {} existence constraints from metadata.", indices_constraints.constraints.existence.size()); - for (const auto &[label, property] : indices_constraints.constraints.existence) { +} + +void RecoverExistenceConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata, + Constraints *constraints, utils::SkipList *vertices, + NameIdMapper *name_id_mapper, + const std::optional ¶llel_exec_info) { + spdlog::info("Recreating {} existence constraints from metadata.", constraints_metadata.existence.size()); + for (const auto &[label, property] : constraints_metadata.existence) { if (constraints->existence_constraints_->ConstraintExists(label, property)) { throw RecoveryFailure("The existence constraint already exists!"); } - if (auto violation = ExistenceConstraints::ValidateVerticesOnConstraint(vertices->access(), label, property); + if (auto violation = + ExistenceConstraints::ValidateVerticesOnConstraint(vertices->access(), label, property, parallel_exec_info); violation.has_value()) { throw RecoveryFailure("The existence constraint failed because it couldn't be validated!"); } @@ -205,38 +223,57 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ name_id_mapper->IdToName(property.AsUint())); } spdlog::info("Existence constraints are recreated from metadata."); +} - // Recover unique constraints. - spdlog::info("Recreating {} unique constraints from metadata.", indices_constraints.constraints.unique.size()); - for (const auto &item : indices_constraints.constraints.unique) { +void RecoverUniqueConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata, + Constraints *constraints, utils::SkipList *vertices, NameIdMapper *name_id_mapper, + const std::optional ¶llel_exec_info) { + spdlog::info("Recreating {} unique constraints from metadata.", constraints_metadata.unique.size()); + + for (const auto &[label, properties] : constraints_metadata.unique) { auto *mem_unique_constraints = static_cast(constraints->unique_constraints_.get()); - auto ret = mem_unique_constraints->CreateConstraint(item.first, item.second, vertices->access()); + auto ret = mem_unique_constraints->CreateConstraint(label, properties, vertices->access(), parallel_exec_info); if (ret.HasError() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) throw RecoveryFailure("The unique constraint must be created here!"); std::vector property_names; - property_names.reserve(item.second.size()); - for (const auto &prop : item.second) { + property_names.reserve(properties.size()); + for (const auto &prop : properties) { property_names.emplace_back(name_id_mapper->IdToName(prop.AsUint())); } const auto property_names_joined = utils::Join(property_names, ","); - spdlog::info("Unique constraint on :{}({}) is recreated from metadata", - name_id_mapper->IdToName(item.first.AsUint()), property_names_joined); + spdlog::info("Unique constraint on :{}({}) is recreated from metadata", name_id_mapper->IdToName(label.AsUint()), + property_names_joined); } spdlog::info("Unique constraints are recreated from metadata."); spdlog::info("Constraints are recreated from metadata."); } -std::optional RecoverData(const std::filesystem::path &snapshot_directory, - const std::filesystem::path &wal_directory, std::string *uuid, - ReplicationStorageState &repl_storage_state, utils::SkipList *vertices, - utils::SkipList *edges, std::atomic *edge_count, - NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, - const Config &config, uint64_t *wal_seq_num) { +std::optional GetParallelExecInfo(const RecoveryInfo &recovery_info, + const Config &config) { + return config.durability.allow_parallel_schema_creation + ? std::make_optional(ParallelizedSchemaCreationInfo{recovery_info.vertex_batches, + config.durability.recovery_thread_count}) + : std::nullopt; +} + +std::optional GetParallelExecInfoIndices(const RecoveryInfo &recovery_info, + const Config &config) { + return config.durability.allow_parallel_schema_creation || config.durability.allow_parallel_index_creation + ? std::make_optional(ParallelizedSchemaCreationInfo{recovery_info.vertex_batches, + config.durability.recovery_thread_count}) + : std::nullopt; +} + +std::optional Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state, + utils::SkipList *vertices, utils::SkipList *edges, + std::atomic *edge_count, NameIdMapper *name_id_mapper, + Indices *indices, Constraints *constraints, const Config &config, + uint64_t *wal_seq_num) { utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; - spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory, - wal_directory); - if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory)) { + spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory_, + wal_directory_); + if (!utils::DirExists(snapshot_directory_) && !utils::DirExists(wal_directory_)) { spdlog::warn(utils::MessageWithLink("Snapshot or WAL directory don't exist, there is nothing to recover.", "https://memgr.ph/durability")); return std::nullopt; @@ -245,13 +282,13 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di auto *const epoch_history = &repl_storage_state.history; utils::Timer timer; - auto snapshot_files = GetSnapshotFiles(snapshot_directory); + auto snapshot_files = GetSnapshotFiles(snapshot_directory_); RecoveryInfo recovery_info; RecoveredIndicesAndConstraints indices_constraints; std::optional snapshot_timestamp; if (!snapshot_files.empty()) { - spdlog::info("Try recovering from snapshot directory {}.", snapshot_directory); + spdlog::info("Try recovering from snapshot directory {}.", wal_directory_); // UUID used for durability is the UUID of the last snapshot file. *uuid = snapshot_files.back().uuid; @@ -281,18 +318,17 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp; repl_storage_state.epoch_.SetEpoch(std::move(recovered_snapshot->snapshot_info.epoch_id)); - if (!utils::DirExists(wal_directory)) { - const auto par_exec_info = config.durability.allow_parallel_index_creation - ? std::make_optional(std::make_pair(recovery_info.vertex_batches, - config.durability.recovery_thread_count)) - : std::nullopt; - RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, name_id_mapper, par_exec_info); + if (!utils::DirExists(wal_directory_)) { + RecoverIndicesAndStats(indices_constraints.indices, indices, vertices, name_id_mapper, + GetParallelExecInfoIndices(recovery_info, config)); + RecoverConstraints(indices_constraints.constraints, constraints, vertices, name_id_mapper, + GetParallelExecInfo(recovery_info, config)); return recovered_snapshot->recovery_info; } } else { - spdlog::info("No snapshot file was found, collecting information from WAL directory {}.", wal_directory); + spdlog::info("No snapshot file was found, collecting information from WAL directory {}.", wal_directory_); std::error_code error_code; - if (!utils::DirExists(wal_directory)) return std::nullopt; + if (!utils::DirExists(wal_directory_)) return std::nullopt; // We use this smaller struct that contains only a subset of information // necessary for the rest of the recovery function. // Also, the struct is sorted primarily on the path it contains. @@ -306,7 +342,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di auto operator<=>(const WalFileInfo &) const = default; }; std::vector wal_files; - for (const auto &item : std::filesystem::directory_iterator(wal_directory, error_code)) { + for (const auto &item : std::filesystem::directory_iterator(wal_directory_, error_code)) { if (!item.is_regular_file()) continue; try { auto info = ReadWalInfo(item.path()); @@ -327,7 +363,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di repl_storage_state.epoch_.SetEpoch(std::move(wal_files.back().epoch_id)); } - auto maybe_wal_files = GetWalFiles(wal_directory, *uuid); + auto maybe_wal_files = GetWalFiles(wal_directory_, *uuid); if (!maybe_wal_files) { spdlog::warn( utils::MessageWithLink("Couldn't get WAL file info from the WAL directory.", "https://memgr.ph/durability")); @@ -413,12 +449,10 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di spdlog::info("All necessary WAL files are loaded successfully."); } - const auto par_exec_info = - config.durability.allow_parallel_index_creation && !recovery_info.vertex_batches.empty() - ? std::make_optional(std::make_pair(recovery_info.vertex_batches, config.durability.recovery_thread_count)) - : std::nullopt; - - RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, name_id_mapper, par_exec_info); + RecoverIndicesAndStats(indices_constraints.indices, indices, vertices, name_id_mapper, + GetParallelExecInfoIndices(recovery_info, config)); + RecoverConstraints(indices_constraints.constraints, constraints, vertices, name_id_mapper, + GetParallelExecInfo(recovery_info, config)); memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us, std::chrono::duration_cast(timer.Elapsed()).count()); diff --git a/src/storage/v2/durability/durability.hpp b/src/storage/v2/durability/durability.hpp index 8bb1223c4..97e2c7efc 100644 --- a/src/storage/v2/durability/durability.hpp +++ b/src/storage/v2/durability/durability.hpp @@ -23,6 +23,7 @@ #include "storage/v2/config.hpp" #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/durability/metadata.hpp" +#include "storage/v2/durability/recovery_type.hpp" #include "storage/v2/durability/wal.hpp" #include "storage/v2/edge.hpp" #include "storage/v2/indices/indices.hpp" @@ -94,27 +95,50 @@ std::optional> GetWalFiles(const std::filesystem: std::string_view uuid = "", std::optional current_seq_num = {}); -using ParallelizedIndexCreationInfo = - std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; - -// Helper function used to recover all discovered indices and constraints. The -// indices and constraints must be recovered after the data recovery is done -// to ensure that the indices and constraints are consistent at the end of the +// Helper function used to recover all discovered indices. The +// indices must be recovered after the data recovery is done +// to ensure that the indices consistent at the end of the // recovery process. /// @throw RecoveryFailure -void RecoverIndicesAndConstraints( - const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, Constraints *constraints, - utils::SkipList *vertices, NameIdMapper *name_id_mapper, - const std::optional ¶llel_exec_info = std::nullopt); +void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadata &indices_metadata, Indices *indices, + utils::SkipList *vertices, NameIdMapper *name_id_mapper, + const std::optional ¶llel_exec_info = std::nullopt); -/// Recovers data either from a snapshot and/or WAL files. +// Helper function used to recover all discovered constraints. The +// constraints must be recovered after the data recovery is done +// to ensure that the constraints are consistent at the end of the +// recovery process. /// @throw RecoveryFailure -/// @throw std::bad_alloc -std::optional RecoverData(const std::filesystem::path &snapshot_directory, - const std::filesystem::path &wal_directory, std::string *uuid, - ReplicationStorageState &repl_storage_state, utils::SkipList *vertices, - utils::SkipList *edges, std::atomic *edge_count, - NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, - const Config &config, uint64_t *wal_seq_num); +void RecoverConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &constraints_metadata, + Constraints *constraints, utils::SkipList *vertices, NameIdMapper *name_id_mapper, + const std::optional ¶llel_exec_info = std::nullopt); + +std::optional GetParallelExecInfo(const RecoveryInfo &recovery_info, + const Config &config); + +std::optional GetParallelExecInfoIndices(const RecoveryInfo &recovery_info, + const Config &config); + +void RecoverExistenceConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &, Constraints *, + utils::SkipList *, NameIdMapper *, + const std::optional &); + +void RecoverUniqueConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadata &, Constraints *, + utils::SkipList *, NameIdMapper *, + const std::optional &); +struct Recovery { + public: + /// Recovers data either from a snapshot and/or WAL files. + /// @throw RecoveryFailure + /// @throw std::bad_alloc + std::optional RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state, + utils::SkipList *vertices, utils::SkipList *edges, + std::atomic *edge_count, NameIdMapper *name_id_mapper, + Indices *indices, Constraints *constraints, const Config &config, + uint64_t *wal_seq_num); + + const std::filesystem::path snapshot_directory_; + const std::filesystem::path wal_directory_; +}; } // namespace memgraph::storage::durability diff --git a/src/storage/v2/durability/metadata.hpp b/src/storage/v2/durability/metadata.hpp index 1045d4f97..42e24e723 100644 --- a/src/storage/v2/durability/metadata.hpp +++ b/src/storage/v2/durability/metadata.hpp @@ -38,14 +38,14 @@ struct RecoveryInfo { /// Structure used to track indices and constraints during recovery. struct RecoveredIndicesAndConstraints { - struct { + struct IndicesMetadata { std::vector label; std::vector> label_property; std::vector> label_stats; std::vector>> label_property_stats; } indices; - struct { + struct ConstraintsMetadata { std::vector> existence; std::vector>> unique; } constraints; diff --git a/src/storage/v2/durability/recovery_type.hpp b/src/storage/v2/durability/recovery_type.hpp new file mode 100644 index 000000000..972cd53f2 --- /dev/null +++ b/src/storage/v2/durability/recovery_type.hpp @@ -0,0 +1,23 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include "storage/v2/id_types.hpp" + +namespace memgraph::storage::durability { +struct ParallelizedSchemaCreationInfo { + std::vector> vertex_recovery_info; + uint64_t thread_count; +}; +} // namespace memgraph::storage::durability diff --git a/src/storage/v2/indices/indices_utils.hpp b/src/storage/v2/indices/indices_utils.hpp index 0caad6686..59b492ba3 100644 --- a/src/storage/v2/indices/indices_utils.hpp +++ b/src/storage/v2/indices/indices_utils.hpp @@ -11,6 +11,7 @@ #include #include "storage/v2/delta.hpp" +#include "storage/v2/durability/recovery_type.hpp" #include "storage/v2/mvcc.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex.hpp" @@ -20,9 +21,6 @@ namespace memgraph::storage { -using ParallelizedIndexCreationInfo = - std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; - /// Traverses deltas visible from transaction with start timestamp greater than /// the provided timestamp, and calls the provided callback function for each /// delta. If the callback ever returns true, traversal is stopped and the @@ -259,11 +257,12 @@ inline void CreateIndexOnSingleThread(utils::SkipList::Accessor &vertice template inline void CreateIndexOnMultipleThreads(utils::SkipList::Accessor &vertices, TSKiplistIter skiplist_iter, TIndex &index, TIndexKey key, - const ParallelizedIndexCreationInfo ¶llel_exec_info, const TFunc &func) { + const durability::ParallelizedSchemaCreationInfo ¶llel_exec_info, + const TFunc &func) { utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; - const auto &vertex_batches = parallel_exec_info.first; - const auto thread_count = std::min(parallel_exec_info.second, vertex_batches.size()); + const auto &vertex_batches = parallel_exec_info.vertex_recovery_info; + const auto thread_count = std::min(parallel_exec_info.thread_count, vertex_batches.size()); MG_ASSERT(!vertex_batches.empty(), "The size of batches should always be greater than zero if you want to use the parallel version of index " diff --git a/src/storage/v2/inmemory/label_index.cpp b/src/storage/v2/inmemory/label_index.cpp index 82ead4b44..b833c97ff 100644 --- a/src/storage/v2/inmemory/label_index.cpp +++ b/src/storage/v2/inmemory/label_index.cpp @@ -26,8 +26,9 @@ void InMemoryLabelIndex::UpdateOnAddLabel(LabelId added_label, Vertex *vertex_af acc.insert(Entry{vertex_after_update, tx.start_timestamp}); } -bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList::Accessor vertices, - const std::optional ¶llel_exec_info) { +bool InMemoryLabelIndex::CreateIndex( + LabelId label, utils::SkipList::Accessor vertices, + const std::optional ¶llel_exec_info) { const auto create_index_seq = [this](LabelId label, utils::SkipList::Accessor &vertices, std::map>::iterator it) { using IndexAccessor = decltype(it->second.access()); @@ -42,7 +43,7 @@ bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList::Acc const auto create_index_par = [this](LabelId label, utils::SkipList::Accessor &vertices, std::map>::iterator label_it, - const ParallelizedIndexCreationInfo ¶llel_exec_info) { + const durability::ParallelizedSchemaCreationInfo ¶llel_exec_info) { using IndexAccessor = decltype(label_it->second.access()); CreateIndexOnMultipleThreads(vertices, label_it, index_, label, parallel_exec_info, diff --git a/src/storage/v2/inmemory/label_index.hpp b/src/storage/v2/inmemory/label_index.hpp index 21df32deb..2411f0ba1 100644 --- a/src/storage/v2/inmemory/label_index.hpp +++ b/src/storage/v2/inmemory/label_index.hpp @@ -14,6 +14,7 @@ #include #include "storage/v2/constraints/constraints.hpp" +#include "storage/v2/durability/recovery_type.hpp" #include "storage/v2/indices/label_index.hpp" #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/vertex.hpp" @@ -22,9 +23,6 @@ namespace memgraph::storage { -using ParallelizedIndexCreationInfo = - std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; - class InMemoryLabelIndex : public storage::LabelIndex { private: struct Entry { @@ -47,7 +45,7 @@ class InMemoryLabelIndex : public storage::LabelIndex { /// @throw std::bad_alloc bool CreateIndex(LabelId label, utils::SkipList::Accessor vertices, - const std::optional ¶llel_exec_info); + const std::optional ¶llel_exec_info); /// Returns false if there was no index to drop bool DropIndex(LabelId label) override; diff --git a/src/storage/v2/inmemory/label_property_index.cpp b/src/storage/v2/inmemory/label_property_index.cpp index f61b9dd11..c8333fb95 100644 --- a/src/storage/v2/inmemory/label_property_index.cpp +++ b/src/storage/v2/inmemory/label_property_index.cpp @@ -35,9 +35,9 @@ bool InMemoryLabelPropertyIndex::Entry::operator<(const PropertyValue &rhs) cons bool InMemoryLabelPropertyIndex::Entry::operator==(const PropertyValue &rhs) const { return value == rhs; } -bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, - utils::SkipList::Accessor vertices, - const std::optional ¶llel_exec_info) { +bool InMemoryLabelPropertyIndex::CreateIndex( + LabelId label, PropertyId property, utils::SkipList::Accessor vertices, + const std::optional ¶llel_exec_info) { spdlog::trace("Vertices size when creating index: {}", vertices.size()); auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList::Accessor &vertices, std::map, utils::SkipList>::iterator it) { @@ -54,7 +54,7 @@ bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, auto create_index_par = [this](LabelId label, PropertyId property, utils::SkipList::Accessor &vertices, std::map, utils::SkipList>::iterator label_property_it, - const ParallelizedIndexCreationInfo ¶llel_exec_info) { + const durability::ParallelizedSchemaCreationInfo ¶llel_exec_info) { using IndexAccessor = decltype(label_property_it->second.access()); CreateIndexOnMultipleThreads( diff --git a/src/storage/v2/inmemory/label_property_index.hpp b/src/storage/v2/inmemory/label_property_index.hpp index ae96a37f8..8bc4148bb 100644 --- a/src/storage/v2/inmemory/label_property_index.hpp +++ b/src/storage/v2/inmemory/label_property_index.hpp @@ -14,6 +14,7 @@ #include #include "storage/v2/constraints/constraints.hpp" +#include "storage/v2/durability/recovery_type.hpp" #include "storage/v2/id_types.hpp" #include "storage/v2/indices/label_property_index.hpp" #include "storage/v2/indices/label_property_index_stats.hpp" @@ -23,10 +24,6 @@ namespace memgraph::storage { -/// TODO: andi. Too many copies, extract at one place -using ParallelizedIndexCreationInfo = - std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; - class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { private: struct Entry { @@ -46,7 +43,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { /// @throw std::bad_alloc bool CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices, - const std::optional ¶llel_exec_info); + const std::optional ¶llel_exec_info); /// @throw std::bad_alloc void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) override; diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp index b62fdc2f9..536c7c8fc 100644 --- a/src/storage/v2/inmemory/replication/recovery.cpp +++ b/src/storage/v2/inmemory/replication/recovery.cpp @@ -133,8 +133,8 @@ std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileR std::optional current_wal_from_timestamp; std::unique_lock transaction_guard( - storage->engine_lock_); // Hold the storage lock so the current wal file cannot be changed - (void)locker_acc.AddPath(storage->wal_directory_); // Protect all WALs from being deleted + storage->engine_lock_); // Hold the storage lock so the current wal file cannot be changed + (void)locker_acc.AddPath(storage->recovery_.wal_directory_); // Protect all WALs from being deleted if (storage->wal_file_) { current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber()); @@ -146,20 +146,22 @@ std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileR // Read in finalized WAL files (excluding the current/active WAL) utils::OnScopeExit release_wal_dir( // Each individually used file will be locked, so at the end, the dir can be released - [&locker_acc, &wal_dir = storage->wal_directory_]() { (void)locker_acc.RemovePath(wal_dir); }); + [&locker_acc, &wal_dir = storage->recovery_.wal_directory_]() { (void)locker_acc.RemovePath(wal_dir); }); // Get WAL files, ordered by timestamp, from oldest to newest - auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num); + auto wal_files = durability::GetWalFiles(storage->recovery_.wal_directory_, storage->uuid_, current_wal_seq_num); MG_ASSERT(wal_files, "Wal files could not be loaded"); if (transaction_guard.owns_lock()) transaction_guard.unlock(); // In case we didn't have a current wal file, we can unlock only now since there is no // guarantee what we'll see after we add the wal file // Read in snapshot files - (void)locker_acc.AddPath(storage->snapshot_directory_); // Protect all snapshots from being deleted + (void)locker_acc.AddPath(storage->recovery_.snapshot_directory_); // Protect all snapshots from being deleted utils::OnScopeExit release_snapshot_dir( // Each individually used file will be locked, so at the end, the dir can be released - [&locker_acc, &snapshot_dir = storage->snapshot_directory_]() { (void)locker_acc.RemovePath(snapshot_dir); }); - auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_); + [&locker_acc, &snapshot_dir = storage->recovery_.snapshot_directory_]() { + (void)locker_acc.RemovePath(snapshot_dir); + }); + auto snapshot_files = durability::GetSnapshotFiles(storage->recovery_.snapshot_directory_, storage->uuid_); std::optional latest_snapshot{}; if (!snapshot_files.empty()) { latest_snapshot.emplace(std::move(snapshot_files.back())); diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 5b7b676b4..08aa896bf 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -12,6 +12,7 @@ #include "storage/v2/inmemory/storage.hpp" #include #include +#include #include "dbms/constants.hpp" #include "memory/global_memory_control.hpp" #include "storage/v2/durability/durability.hpp" @@ -65,9 +66,9 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) : Storage(config, storage_mode), - snapshot_directory_(config.durability.storage_directory / durability::kSnapshotDirectory), + recovery_{config.durability.storage_directory / durability::kSnapshotDirectory, + config.durability.storage_directory / durability::kWalDirectory}, lock_file_path_(config.durability.storage_directory / durability::kLockFile), - wal_directory_(config.durability.storage_directory / durability::kWalDirectory), uuid_(utils::GenerateUUID()), global_locker_(file_retainer_.AddLocker()) { MG_ASSERT(storage_mode != StorageMode::ON_DISK_TRANSACTIONAL, @@ -78,9 +79,9 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) // permission errors. This is done early to crash the database on startup // instead of crashing the database for the first time during runtime (which // could be an unpleasant surprise). - utils::EnsureDirOrDie(snapshot_directory_); + utils::EnsureDirOrDie(recovery_.snapshot_directory_); // Same reasoning as above. - utils::EnsureDirOrDie(wal_directory_); + utils::EnsureDirOrDie(recovery_.wal_directory_); // Verify that the user that started the process is the same user that is // the owner of the storage directory. @@ -98,9 +99,8 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) config_.durability.storage_directory); } if (config_.durability.recover_on_startup) { - auto info = - durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, repl_storage_state_, &vertices_, &edges_, - &edge_count_, name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_); + auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_, + name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_); if (info) { vertex_id_ = info->next_vertex_id; edge_id_ = info->next_edge_id; @@ -114,8 +114,8 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) bool files_moved = false; auto backup_root = config_.durability.storage_directory / durability::kBackupDirectory; for (const auto &[path, dirname, what] : - {std::make_tuple(snapshot_directory_, durability::kSnapshotDirectory, "snapshot"), - std::make_tuple(wal_directory_, durability::kWalDirectory, "WAL")}) { + {std::make_tuple(recovery_.snapshot_directory_, durability::kSnapshotDirectory, "snapshot"), + std::make_tuple(recovery_.wal_directory_, durability::kWalDirectory, "WAL")}) { if (!utils::DirExists(path)) continue; auto backup_curr = backup_root / dirname; std::error_code error_code; @@ -1200,8 +1200,8 @@ InMemoryStorage::InMemoryAccessor::CreateExistenceConstraint(LabelId label, Prop if (existence_constraints->ConstraintExists(label, property)) { return StorageExistenceConstraintDefinitionError{ConstraintDefinitionError{}}; } - if (auto violation = - ExistenceConstraints::ValidateVerticesOnConstraint(in_memory->vertices_.access(), label, property); + if (auto violation = ExistenceConstraints::ValidateVerticesOnConstraint(in_memory->vertices_.access(), label, + property, std::nullopt); violation.has_value()) { return StorageExistenceConstraintDefinitionError{violation.value()}; } @@ -1228,7 +1228,7 @@ InMemoryStorage::InMemoryAccessor::CreateUniqueConstraint(LabelId label, const s auto *in_memory = static_cast(storage_); auto *mem_unique_constraints = static_cast(in_memory->constraints_.unique_constraints_.get()); - auto ret = mem_unique_constraints->CreateConstraint(label, properties, in_memory->vertices_.access()); + auto ret = mem_unique_constraints->CreateConstraint(label, properties, in_memory->vertices_.access(), std::nullopt); if (ret.HasError()) { return StorageUniqueConstraintDefinitionError{ret.GetError()}; } @@ -1707,7 +1707,7 @@ bool InMemoryStorage::InitializeWalFile(memgraph::replication::ReplicationEpoch if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) return false; if (!wal_file_) { - wal_file_.emplace(wal_directory_, uuid_, epoch.id(), config_.items, name_id_mapper_.get(), wal_seq_num_++, + wal_file_.emplace(recovery_.wal_directory_, uuid_, epoch.id(), config_.items, name_id_mapper_.get(), wal_seq_num_++, &file_retainer_); } return true; @@ -2017,8 +2017,8 @@ utils::BasicResult InMemoryStorage::Create auto snapshot_creator = [this, &epoch]() { utils::Timer timer; auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_); - durability::CreateSnapshot(this, &transaction, snapshot_directory_, wal_directory_, &vertices_, &edges_, uuid_, - epoch, repl_storage_state_.history, &file_retainer_); + durability::CreateSnapshot(this, &transaction, recovery_.snapshot_directory_, recovery_.wal_directory_, &vertices_, + &edges_, uuid_, epoch, repl_storage_state_.history, &file_retainer_); // Finalize snapshot transaction. commit_log_->MarkFinished(transaction.start_timestamp); diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 48d6f0cb7..2d2837467 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -345,6 +345,8 @@ class InMemoryStorage final : public Storage { void SetStorageMode(StorageMode storage_mode); + const durability::Recovery &GetRecovery() const noexcept { return recovery_; } + private: /// The force parameter determines the behaviour of the garbage collector. /// If it's set to true, it will behave as a global operation, i.e. it can't @@ -397,10 +399,10 @@ class InMemoryStorage final : public Storage { utils::SkipList edges_; // Durability - std::filesystem::path snapshot_directory_; + durability::Recovery recovery_; + std::filesystem::path lock_file_path_; utils::OutputFile lock_file_handle_; - std::filesystem::path wal_directory_; utils::Scheduler snapshot_runner_; utils::SpinLock snapshot_lock_; diff --git a/src/storage/v2/inmemory/unique_constraints.cpp b/src/storage/v2/inmemory/unique_constraints.cpp index 6a2945883..76cda1730 100644 --- a/src/storage/v2/inmemory/unique_constraints.cpp +++ b/src/storage/v2/inmemory/unique_constraints.cpp @@ -10,7 +10,13 @@ // licenses/APL.txt. #include "storage/v2/inmemory/unique_constraints.hpp" - +#include +#include "storage/v2/constraints/constraint_violation.hpp" +#include "storage/v2/constraints/utils.hpp" +#include "storage/v2/durability/recovery_type.hpp" +#include "storage/v2/id_types.hpp" +#include "utils/logging.hpp" +#include "utils/skip_list.hpp" namespace memgraph::storage { namespace { @@ -274,6 +280,75 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T } } +std::variant +InMemoryUniqueConstraints::GetCreationFunction( + const std::optional &par_exec_info) { + if (par_exec_info.has_value()) { + return InMemoryUniqueConstraints::MultipleThreadsConstraintValidation{par_exec_info.value()}; + } + return InMemoryUniqueConstraints::SingleThreadConstraintValidation{}; +} + +bool InMemoryUniqueConstraints::MultipleThreadsConstraintValidation::operator()( + const utils::SkipList::Accessor &vertex_accessor, utils::SkipList::Accessor &constraint_accessor, + const LabelId &label, const std::set &properties) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; + const auto &vertex_batches = parallel_exec_info.vertex_recovery_info; + MG_ASSERT(!vertex_batches.empty(), + "The size of batches should always be greater than zero if you want to use the parallel version of index " + "creation!"); + const auto thread_count = std::min(parallel_exec_info.thread_count, vertex_batches.size()); + + std::atomic batch_counter = 0; + memgraph::utils::Synchronized, utils::RWSpinLock> has_error; + { + std::vector threads; + threads.reserve(thread_count); + for (auto i{0U}; i < thread_count; ++i) { + threads.emplace_back( + [&has_error, &vertex_batches, &batch_counter, &vertex_accessor, &constraint_accessor, &label, &properties]() { + do_per_thread_validation(has_error, DoValidate, vertex_batches, batch_counter, vertex_accessor, + constraint_accessor, label, properties); + }); + } + } + return has_error.Lock()->has_value(); +} + +bool InMemoryUniqueConstraints::SingleThreadConstraintValidation::operator()( + const utils::SkipList::Accessor &vertex_accessor, utils::SkipList::Accessor &constraint_accessor, + const LabelId &label, const std::set &properties) { + for (const Vertex &vertex : vertex_accessor) { + if (const auto violation = DoValidate(vertex, constraint_accessor, label, properties); violation.has_value()) { + return true; + } + } + return false; +} + +std::optional InMemoryUniqueConstraints::DoValidate( + const Vertex &vertex, utils::SkipList::Accessor &constraint_accessor, const LabelId &label, + const std::set &properties) { + if (vertex.deleted || !utils::Contains(vertex.labels, label)) { + return std::nullopt; + } + auto values = vertex.properties.ExtractPropertyValues(properties); + if (!values) { + return std::nullopt; + } + + // Check whether there already is a vertex with the same values for the + // given label and property. + auto it = constraint_accessor.find_equal_or_greater(*values); + if (it != constraint_accessor.end() && it->values == *values) { + return ConstraintViolation{ConstraintViolation::Type::UNIQUE, label, properties}; + } + + constraint_accessor.insert(Entry{std::move(*values), &vertex, 0}); + return std::nullopt; +} + void InMemoryUniqueConstraints::AbortEntries(std::span vertices, uint64_t exact_start_timestamp) { for (const auto &vertex : vertices) { for (const auto &label : vertex->labels) { @@ -297,8 +372,9 @@ void InMemoryUniqueConstraints::AbortEntries(std::span vert } utils::BasicResult -InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set &properties, - utils::SkipList::Accessor vertices) { +InMemoryUniqueConstraints::CreateConstraint( + LabelId label, const std::set &properties, const utils::SkipList::Accessor &vertex_accessor, + const std::optional &par_exec_info) { if (properties.empty()) { return CreationStatus::EMPTY_PROPERTIES; } @@ -306,49 +382,28 @@ InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set constraints_skip_list; + utils::SkipList::Accessor constraint_accessor{constraints_skip_list.access()}; - bool violation_found = false; + auto multi_single_thread_processing = GetCreationFunction(par_exec_info); - { - auto acc = constraint->second.access(); - - for (const Vertex &vertex : vertices) { - if (vertex.deleted || !utils::Contains(vertex.labels, label)) { - continue; - } - auto values = vertex.properties.ExtractPropertyValues(properties); - if (!values) { - continue; - } - - // Check whether there already is a vertex with the same values for the - // given label and property. - auto it = acc.find_equal_or_greater(*values); - if (it != acc.end() && it->values == *values) { - violation_found = true; - break; - } - - acc.insert(Entry{std::move(*values), &vertex, 0}); - } - } + bool violation_found = std::visit( + [&vertex_accessor, &constraint_accessor, &label, &properties](auto &multi_single_thread_processing) { + return multi_single_thread_processing(vertex_accessor, constraint_accessor, label, properties); + }, + multi_single_thread_processing); if (violation_found) { - // In the case of the violation, storage for the current constraint has to - // be removed. - constraints_.erase(constraint); return ConstraintViolation{ConstraintViolation::Type::UNIQUE, label, properties}; } + auto [it, _] = constraints_.emplace(std::make_pair(label, properties), std::move(constraints_skip_list)); + // Add the new constraint to the optimized structure only if there are no violations. - constraints_by_label_[label].insert({properties, &constraints_.at({label, properties})}); + constraints_by_label_[label].insert({properties, &it->second}); return CreationStatus::SUCCESS; } diff --git a/src/storage/v2/inmemory/unique_constraints.hpp b/src/storage/v2/inmemory/unique_constraints.hpp index d1e590357..15107f131 100644 --- a/src/storage/v2/inmemory/unique_constraints.hpp +++ b/src/storage/v2/inmemory/unique_constraints.hpp @@ -11,9 +11,17 @@ #pragma once +#include #include - +#include +#include +#include "storage/v2/constraints/constraint_violation.hpp" #include "storage/v2/constraints/unique_constraints.hpp" +#include "storage/v2/durability/recovery_type.hpp" +#include "storage/v2/id_types.hpp" +#include "utils/logging.hpp" +#include "utils/rw_spin_lock.hpp" +#include "utils/synchronized.hpp" namespace memgraph::storage { @@ -46,7 +54,24 @@ class InMemoryUniqueConstraints : public UniqueConstraints { bool operator==(const std::vector &rhs) const; }; + static std::optional DoValidate(const Vertex &vertex, + utils::SkipList::Accessor &constraint_accessor, + const LabelId &label, const std::set &properties); + public: + struct MultipleThreadsConstraintValidation { + bool operator()(const utils::SkipList::Accessor &vertex_accessor, + utils::SkipList::Accessor &constraint_accessor, const LabelId &label, + const std::set &properties); + + const durability::ParallelizedSchemaCreationInfo ¶llel_exec_info; + }; + struct SingleThreadConstraintValidation { + bool operator()(const utils::SkipList::Accessor &vertex_accessor, + utils::SkipList::Accessor &constraint_accessor, const LabelId &label, + const std::set &properties); + }; + /// Indexes the given vertex for relevant labels and properties. /// This method should be called before committing and validating vertices /// against unique constraints. @@ -67,9 +92,9 @@ class InMemoryUniqueConstraints : public UniqueConstraints { /// exceeds the maximum allowed number of properties, and /// `CreationStatus::SUCCESS` on success. /// @throw std::bad_alloc - utils::BasicResult CreateConstraint(LabelId label, - const std::set &properties, - utils::SkipList::Accessor vertices); + utils::BasicResult CreateConstraint( + LabelId label, const std::set &properties, const utils::SkipList::Accessor &vertex_accessor, + const std::optional &par_exec_info); /// Deletes the specified constraint. Returns `DeletionStatus::NOT_FOUND` if /// there is not such constraint in the storage, @@ -101,6 +126,9 @@ class InMemoryUniqueConstraints : public UniqueConstraints { void Clear() override; + static std::variant GetCreationFunction( + const std::optional &); + private: std::map>, utils::SkipList> constraints_; std::map, utils::SkipList *>> constraints_by_label_; diff --git a/tests/e2e/configuration/configuration_check.py b/tests/e2e/configuration/configuration_check.py index 3cdc919f4..8684b24d6 100644 --- a/tests/e2e/configuration/configuration_check.py +++ b/tests/e2e/configuration/configuration_check.py @@ -31,7 +31,6 @@ def test_does_default_config_match(): use the DEFINE_HIDDEN_* macro instead of DEFINE_* to prevent SHOW CONFIG from returning it. """ - assert len(config) == len(default_config.startup_config_dict), define_msg for flag in config: @@ -46,7 +45,6 @@ def test_does_default_config_match(): ] if flag_name in machine_dependent_configurations: continue - # default_value assert default_config.startup_config_dict[flag_name][0] == flag[1] # current_value diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index 025ba4d0a..e1d42b443 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -115,6 +115,11 @@ startup_config_dict = { "false", "Controls whether the index creation can be done in a multithreaded fashion.", ), + "storage_parallel_schema_recovery": ( + "false", + "false", + "Controls whether the indices and constraints creation can be done in a multithreaded fashion.", + ), "storage_enable_schema_metadata": ( "false", "false", diff --git a/tests/unit/storage_v2_durability_inmemory.cpp b/tests/unit/storage_v2_durability_inmemory.cpp index 725db9283..8a6d26fd1 100644 --- a/tests/unit/storage_v2_durability_inmemory.cpp +++ b/tests/unit/storage_v2_durability_inmemory.cpp @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include #include #include #include @@ -19,13 +20,18 @@ #include #include #include +#include #include #include #include +#include #include "dbms/database.hpp" #include "replication/state.hpp" #include "storage/v2/config.hpp" +#include "storage/v2/constraints/constraints.hpp" +#include "storage/v2/constraints/existence_constraints.hpp" +#include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/marker.hpp" #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/snapshot.hpp" @@ -34,10 +40,13 @@ #include "storage/v2/edge_accessor.hpp" #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/inmemory/storage.hpp" +#include "storage/v2/inmemory/unique_constraints.hpp" +#include "storage/v2/storage_mode.hpp" #include "storage/v2/vertex_accessor.hpp" #include "utils/file.hpp" #include "utils/logging.hpp" #include "utils/timer.hpp" +#include "utils/uuid.hpp" using testing::Contains; using testing::UnorderedElementsAre; @@ -2703,3 +2712,113 @@ TEST_P(DurabilityTest, SnapshotAndWalMixedUUID) { ASSERT_FALSE(acc->Commit().HasError()); } } + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, ParallelConstraintsRecovery) { + // Create snapshot. + { + memgraph::storage::Config config{ + .items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, .snapshot_on_exit = true, .items_per_batch = 13}}; + memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; + memgraph::dbms::Database db{config, repl_state}; + CreateBaseDataset(db.storage(), GetParam()); + VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam()); + CreateExtendedDataset(db.storage()); + VerifyDataset(db.storage(), DatasetType::BASE_WITH_EXTENDED, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetBackupSnapshotsList().size(), 0); + ASSERT_EQ(GetWalsList().size(), 0); + ASSERT_EQ(GetBackupWalsList().size(), 0); + + // Recover snapshot. + memgraph::storage::Config config{.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_on_exit = false, + .items_per_batch = 13, + .allow_parallel_index_creation = true}}; + memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; + memgraph::dbms::Database db{config, repl_state}; + VerifyDataset(db.storage(), DatasetType::BASE_WITH_EXTENDED, GetParam()); + { + auto acc = db.storage()->Access(); + auto vertex = acc->CreateVertex(); + auto edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc->Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) { + memgraph::storage::Config config{.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_on_exit = false, + .items_per_batch = 13, + .allow_parallel_schema_creation = true}}; + // Create snapshot. + { + config.durability.recover_on_startup = false; + config.durability.snapshot_on_exit = true; + memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; + memgraph::dbms::Database db{config, repl_state}; + CreateBaseDataset(db.storage(), GetParam()); + VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam()); + CreateExtendedDataset(db.storage()); + VerifyDataset(db.storage(), DatasetType::BASE_WITH_EXTENDED, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetBackupSnapshotsList().size(), 0); + ASSERT_EQ(GetWalsList().size(), 0); + ASSERT_EQ(GetBackupWalsList().size(), 0); + + config.durability.recover_on_startup = true; + config.durability.snapshot_on_exit = false; + memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; + memgraph::utils::SkipList vertices; + memgraph::utils::SkipList edges; + std::unique_ptr name_id_mapper = std::make_unique(); + std::atomic edge_count{0}; + uint64_t wal_seq_num{0}; + std::string uuid{memgraph::utils::GenerateUUID()}; + memgraph::storage::Indices indices{config, memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL}; + memgraph::storage::Constraints constraints{config, memgraph::storage::StorageMode::IN_MEMORY_TRANSACTIONAL}; + memgraph::storage::ReplicationStorageState repl_storage_state; + + memgraph::storage::durability::Recovery recovery{ + config.durability.storage_directory / memgraph::storage::durability::kSnapshotDirectory, + config.durability.storage_directory / memgraph::storage::durability::kWalDirectory}; + + // Recover snapshot. + const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count, + name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num); + + MG_ASSERT(info.has_value(), "Info doesn't have value present"); + const auto par_exec_info = memgraph::storage::durability::GetParallelExecInfo(*info, config); + + MG_ASSERT(par_exec_info.has_value(), "Parallel exec info should have value present"); + + // Unique constraint choose function + auto *mem_unique_constraints = + static_cast(constraints.unique_constraints_.get()); + auto variant_unique_constraint_creation_func = mem_unique_constraints->GetCreationFunction(par_exec_info); + + const auto *pval = std::get_if( + &variant_unique_constraint_creation_func); + MG_ASSERT(pval, "Chose wrong function for recovery of data"); + + // Existence constraint choose function + auto *mem_existence_constraint = + static_cast(constraints.existence_constraints_.get()); + auto variant_existence_constraint_creation_func = mem_existence_constraint->GetCreationFunction(par_exec_info); + + const auto *pval_existence = + std::get_if( + &variant_existence_constraint_creation_func); + MG_ASSERT(pval_existence, "Chose wrong type of function for recovery of existence constraint data"); +}