diff --git a/config/flags.yaml b/config/flags.yaml index 6dd50e50c..0b76ebf90 100644 --- a/config/flags.yaml +++ b/config/flags.yaml @@ -103,6 +103,10 @@ modifications: value: "true" override: false + - name: "storage_parallel_index_recovery" + value: "false" + override: true + undocumented: - "flag_file" - "also_log_to_stderr" diff --git a/src/memgraph.cpp b/src/memgraph.cpp index cf8bccd15..36f802e40 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -196,6 +196,10 @@ DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creat DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch, "The number of edges and vertices stored in a batch in a snapshot file."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_parallel_index_recovery, false, + "Controls whether the index creation can be done in a multithreaded fashion."); + // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint64(storage_recovery_thread_count, std::max(static_cast(std::thread::hardware_concurrency()), @@ -864,7 +868,8 @@ int main(int argc, char **argv) { .snapshot_on_exit = FLAGS_storage_snapshot_on_exit, .restore_replicas_on_startup = true, .items_per_batch = FLAGS_storage_items_per_batch, - .recovery_thread_count = FLAGS_storage_recovery_thread_count}, + .recovery_thread_count = FLAGS_storage_recovery_thread_count, + .allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery}, .transaction = {.isolation_level = ParseIsolationLevel()}}; if (FLAGS_storage_snapshot_interval_sec == 0) { if (FLAGS_storage_wal_enabled) { diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index 5048e3d89..126cbc10c 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -53,6 +53,8 @@ struct Config { uint64_t items_per_batch{1'000'000}; uint64_t recovery_thread_count{8}; + + bool allow_parallel_index_creation{false}; } durability; struct Transaction { diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 33236c67c..b5ad8a852 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -113,13 +113,15 @@ std::optional> GetWalFiles(const std::filesystem: // to ensure that the indices and constraints are consistent at the end of the // recovery process. void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, - Constraints *constraints, utils::SkipList *vertices) { + Constraints *constraints, utils::SkipList *vertices, + const std::optional ¶lell_exec_info) { spdlog::info("Recreating indices from metadata."); // Recover label indices. spdlog::info("Recreating {} label indices from metadata.", indices_constraints.indices.label.size()); for (const auto &item : indices_constraints.indices.label) { - if (!indices->label_index.CreateIndex(item, vertices->access())) + if (!indices->label_index.CreateIndex(item, vertices->access(), paralell_exec_info)) throw RecoveryFailure("The label index must be created here!"); + spdlog::info("A label index is recreated from metadata."); } spdlog::info("Label indices are recreated."); @@ -213,7 +215,11 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di *epoch_id = std::move(recovered_snapshot->snapshot_info.epoch_id); if (!utils::DirExists(wal_directory)) { - RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices); + const auto par_exec_info = config.durability.allow_parallel_index_creation + ? std::make_optional(std::make_pair(recovery_info.vertex_batches, + config.durability.recovery_thread_count)) + : std::nullopt; + RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, par_exec_info); return recovered_snapshot->recovery_info; } } else { diff --git a/src/storage/v2/durability/durability.hpp b/src/storage/v2/durability/durability.hpp index 40bff3d7c..1af07136f 100644 --- a/src/storage/v2/durability/durability.hpp +++ b/src/storage/v2/durability/durability.hpp @@ -91,13 +91,18 @@ std::optional> GetWalFiles(const std::filesystem: std::string_view uuid = "", std::optional current_seq_num = {}); +using ParalellizedIndexCreationInfo = + std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; + // Helper function used to recover all discovered indices and constraints. The // indices and constraints must be recovered after the data recovery is done // to ensure that the indices and constraints are consistent at the end of the // recovery process. /// @throw RecoveryFailure -void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, - Constraints *constraints, utils::SkipList *vertices); +void RecoverIndicesAndConstraints( + const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, Constraints *constraints, + utils::SkipList *vertices, + const std::optional ¶lell_exec_info = std::nullopt); /// Recovers data either from a snapshot and/or WAL files. /// @throw RecoveryFailure diff --git a/src/storage/v2/indices.cpp b/src/storage/v2/indices.cpp index e02e0f9fc..ebf022245 100644 --- a/src/storage/v2/indices.cpp +++ b/src/storage/v2/indices.cpp @@ -13,12 +13,14 @@ #include #include #include +#include #include "storage/v2/mvcc.hpp" #include "storage/v2/property_value.hpp" #include "utils/bound.hpp" #include "utils/logging.hpp" #include "utils/memory_tracker.hpp" +#include "utils/synchronized.hpp" namespace memgraph::storage { @@ -263,6 +265,95 @@ bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label, Propert return !deleted && has_label && current_value_equal_to_value; } +template +void TryInsertLabelIndex(Vertex &vertex, LabelId label, TIndexAccessor &index_accessor) { + if (vertex.deleted || !utils::Contains(vertex.labels, label)) { + return; + } + + index_accessor.insert({&vertex, 0}); +} + +template +void TryInsertLabelPropertyIndex(Vertex &vertex, std::pair label_property_pair, + TIndexAccessor &index_accessor) { + if (vertex.deleted || !utils::Contains(vertex.labels, label_property_pair.first)) { + return; + } + auto value = vertex.properties.GetProperty(label_property_pair.second); + if (value.IsNull()) { + return; + } + index_accessor.insert({std::move(value), &vertex, 0}); +} + +template +void CreateIndexOnSingleThread(utils::SkipList::Accessor &vertices, TSkiplistIter it, TIndex &index, + TIndexKey key, const TFunc &func) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; + try { + auto acc = it->second.access(); + for (Vertex &vertex : vertices) { + func(vertex, key, acc); + } + } catch (const utils::OutOfMemoryException &) { + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker; + index.erase(it); + throw; + } +} + +template +void CreateIndexOnMultipleThreads(utils::SkipList::Accessor &vertices, TSKiplistIter skiplist_iter, + TIndex &index, TIndexKey key, const ParalellizedIndexCreationInfo ¶lell_exec_info, + const TFunc &func) { + utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; + + const auto &vertex_batches = paralell_exec_info.first; + const auto thread_count = std::min(paralell_exec_info.second, vertex_batches.size()); + + MG_ASSERT(!vertex_batches.empty(), + "The size of batches should always be greater than zero if you want to use the parallel version of index " + "creation!"); + + std::atomic batch_counter = 0; + + utils::Synchronized, utils::SpinLock> maybe_error{}; + { + std::vector threads; + threads.reserve(thread_count); + + for (auto i{0U}; i < thread_count; ++i) { + threads.emplace_back( + [&skiplist_iter, &func, &index, &vertex_batches, &maybe_error, &batch_counter, &key, &vertices]() { + while (!maybe_error.Lock()->has_value()) { + const auto batch_index = batch_counter++; + if (batch_index >= vertex_batches.size()) { + return; + } + const auto &batch = vertex_batches[batch_index]; + auto index_accessor = index.at(key).access(); + auto it = vertices.find(batch.first); + + try { + for (auto i{0U}; i < batch.second; ++i, ++it) { + func(*it, key, index_accessor); + } + + } catch (utils::OutOfMemoryException &failure) { + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker; + index.erase(skiplist_iter); + *maybe_error.Lock() = std::move(failure); + } + } + }); + } + } + if (maybe_error.Lock()->has_value()) { + throw utils::OutOfMemoryException((*maybe_error.Lock())->what()); + } +} + } // namespace void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx) { @@ -272,27 +363,43 @@ void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transacti acc.insert(Entry{vertex, tx.start_timestamp}); } -bool LabelIndex::CreateIndex(LabelId label, utils::SkipList::Accessor vertices) { - utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; +bool LabelIndex::CreateIndex(LabelId label, utils::SkipList::Accessor vertices, + const std::optional ¶lell_exec_info) { + auto create_index_seq = [this](LabelId label, utils::SkipList::Accessor &vertices, + std::map>::iterator it) { + using IndexAccessor = decltype(it->second.access()); + + CreateIndexOnSingleThread(vertices, it, index_, label, + [](Vertex &vertex, LabelId label, IndexAccessor &index_accessor) { + TryInsertLabelIndex(vertex, label, index_accessor); + }); + + return true; + }; + + auto create_index_par = [this](LabelId label, utils::SkipList::Accessor &vertices, + std::map>::iterator label_it, + const ParalellizedIndexCreationInfo ¶lell_exec_info) { + using IndexAccessor = decltype(label_it->second.access()); + + CreateIndexOnMultipleThreads(vertices, label_it, index_, label, paralell_exec_info, + [](Vertex &vertex, LabelId label, IndexAccessor &index_accessor) { + TryInsertLabelIndex(vertex, label, index_accessor); + }); + + return true; + }; + auto [it, emplaced] = index_.emplace(std::piecewise_construct, std::forward_as_tuple(label), std::forward_as_tuple()); if (!emplaced) { // Index already exists. return false; } - try { - auto acc = it->second.access(); - for (Vertex &vertex : vertices) { - if (vertex.deleted || !utils::Contains(vertex.labels, label)) { - continue; - } - acc.insert(Entry{&vertex, 0}); - } - } catch (const utils::OutOfMemoryException &) { - utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker; - index_.erase(it); - throw; + + if (paralell_exec_info) { + return create_index_par(label, vertices, it, *paralell_exec_info); } - return true; + return create_index_seq(label, vertices, it); } std::vector LabelIndex::ListIndices() const { @@ -418,32 +525,46 @@ void LabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const Property } } -bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices) { - utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception; +bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices, + const std::optional ¶lell_exec_info) { + auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList::Accessor &vertices, + std::map, utils::SkipList>::iterator it) { + using IndexAccessor = decltype(it->second.access()); + + CreateIndexOnSingleThread(vertices, it, index_, std::make_pair(label, property), + [](Vertex &vertex, std::pair key, IndexAccessor &index_accessor) { + TryInsertLabelPropertyIndex(vertex, key, index_accessor); + }); + + return true; + }; + + auto create_index_par = + [this](LabelId label, PropertyId property, utils::SkipList::Accessor &vertices, + std::map, utils::SkipList>::iterator label_property_it, + const ParalellizedIndexCreationInfo ¶lell_exec_info) { + using IndexAccessor = decltype(label_property_it->second.access()); + + CreateIndexOnMultipleThreads( + vertices, label_property_it, index_, std::make_pair(label, property), paralell_exec_info, + [](Vertex &vertex, std::pair key, IndexAccessor &index_accessor) { + TryInsertLabelPropertyIndex(vertex, key, index_accessor); + }); + + return true; + }; + auto [it, emplaced] = index_.emplace(std::piecewise_construct, std::forward_as_tuple(label, property), std::forward_as_tuple()); if (!emplaced) { // Index already exists. return false; } - try { - auto acc = it->second.access(); - for (Vertex &vertex : vertices) { - if (vertex.deleted || !utils::Contains(vertex.labels, label)) { - continue; - } - auto value = vertex.properties.GetProperty(property); - if (value.IsNull()) { - continue; - } - acc.insert(Entry{std::move(value), &vertex, 0}); - } - } catch (const utils::OutOfMemoryException &) { - utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker; - index_.erase(it); - throw; + + if (paralell_exec_info) { + return create_index_par(label, property, vertices, it, *paralell_exec_info); } - return true; + return create_index_seq(label, property, vertices, it); } std::vector> LabelPropertyIndex::ListIndices() const { diff --git a/src/storage/v2/indices.hpp b/src/storage/v2/indices.hpp index dfb990e82..14b023110 100644 --- a/src/storage/v2/indices.hpp +++ b/src/storage/v2/indices.hpp @@ -28,6 +28,9 @@ namespace memgraph::storage { struct Indices; struct Constraints; +using ParalellizedIndexCreationInfo = + std::pair> /*vertex_recovery_info*/, uint64_t /*thread_count*/>; + class LabelIndex { private: struct Entry { @@ -58,7 +61,8 @@ class LabelIndex { void UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx); /// @throw std::bad_alloc - bool CreateIndex(LabelId label, utils::SkipList::Accessor vertices); + bool CreateIndex(LabelId label, utils::SkipList::Accessor vertices, + const std::optional ¶lell_exec_info = std::nullopt); /// Returns false if there was no index to drop bool DropIndex(LabelId label) { return index_.erase(label) > 0; } @@ -160,7 +164,8 @@ class LabelPropertyIndex { void UpdateOnSetProperty(PropertyId property, const PropertyValue &value, Vertex *vertex, const Transaction &tx); /// @throw std::bad_alloc - bool CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices); + bool CreateIndex(LabelId label, PropertyId property, utils::SkipList::Accessor vertices, + const std::optional ¶lell_exec_info = std::nullopt); bool DropIndex(LabelId label, PropertyId property) { return index_.erase({label, property}) > 0; } diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index db71f4007..1e0ecd862 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -98,6 +98,11 @@ startup_config_dict = { "IP address on which the websocket server for Memgraph monitoring should listen.", ), "monitoring_port": ("7444", "7444", "Port on which the websocket server for Memgraph monitoring should listen."), + "storage_parallel_index_recovery": ( + "false", + "false", + "Controls whether the index creation can be done in a multithreaded fashion.", + ), "password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."), "pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."), "query_execution_timeout_sec": (