Parallelize index creation (#882)

This commit is contained in:
gvolfing 2023-04-26 16:28:02 +02:00 committed by GitHub
parent 4fcdd52f88
commit 00f8d54249
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 195 additions and 42 deletions

View File

@ -103,6 +103,10 @@ modifications:
value: "true"
override: false
- name: "storage_parallel_index_recovery"
value: "false"
override: true
undocumented:
- "flag_file"
- "also_log_to_stderr"

View File

@ -196,6 +196,10 @@ DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creat
DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch,
"The number of edges and vertices stored in a batch in a snapshot file.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_parallel_index_recovery, false,
"Controls whether the index creation can be done in a multithreaded fashion.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(storage_recovery_thread_count,
std::max(static_cast<uint64_t>(std::thread::hardware_concurrency()),
@ -864,7 +868,8 @@ int main(int argc, char **argv) {
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit,
.restore_replicas_on_startup = true,
.items_per_batch = FLAGS_storage_items_per_batch,
.recovery_thread_count = FLAGS_storage_recovery_thread_count},
.recovery_thread_count = FLAGS_storage_recovery_thread_count,
.allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery},
.transaction = {.isolation_level = ParseIsolationLevel()}};
if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) {

View File

@ -53,6 +53,8 @@ struct Config {
uint64_t items_per_batch{1'000'000};
uint64_t recovery_thread_count{8};
bool allow_parallel_index_creation{false};
} durability;
struct Transaction {

View File

@ -113,13 +113,15 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
Constraints *constraints, utils::SkipList<Vertex> *vertices) {
Constraints *constraints, utils::SkipList<Vertex> *vertices,
const std::optional<ParalellizedIndexCreationInfo> &paralell_exec_info) {
spdlog::info("Recreating indices from metadata.");
// Recover label indices.
spdlog::info("Recreating {} label indices from metadata.", indices_constraints.indices.label.size());
for (const auto &item : indices_constraints.indices.label) {
if (!indices->label_index.CreateIndex(item, vertices->access()))
if (!indices->label_index.CreateIndex(item, vertices->access(), paralell_exec_info))
throw RecoveryFailure("The label index must be created here!");
spdlog::info("A label index is recreated from metadata.");
}
spdlog::info("Label indices are recreated.");
@ -213,7 +215,11 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
*epoch_id = std::move(recovered_snapshot->snapshot_info.epoch_id);
if (!utils::DirExists(wal_directory)) {
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices);
const auto par_exec_info = config.durability.allow_parallel_index_creation
? std::make_optional(std::make_pair(recovery_info.vertex_batches,
config.durability.recovery_thread_count))
: std::nullopt;
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices, par_exec_info);
return recovered_snapshot->recovery_info;
}
} else {

View File

@ -91,13 +91,18 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});
using ParalellizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
// Helper function used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
/// @throw RecoveryFailure
void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
Constraints *constraints, utils::SkipList<Vertex> *vertices);
void RecoverIndicesAndConstraints(
const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, Constraints *constraints,
utils::SkipList<Vertex> *vertices,
const std::optional<ParalellizedIndexCreationInfo> &paralell_exec_info = std::nullopt);
/// Recovers data either from a snapshot and/or WAL files.
/// @throw RecoveryFailure

View File

@ -13,12 +13,14 @@
#include <algorithm>
#include <iterator>
#include <limits>
#include <thread>
#include "storage/v2/mvcc.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/bound.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::storage {
@ -263,6 +265,95 @@ bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label, Propert
return !deleted && has_label && current_value_equal_to_value;
}
template <typename TIndexAccessor>
void TryInsertLabelIndex(Vertex &vertex, LabelId label, TIndexAccessor &index_accessor) {
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
return;
}
index_accessor.insert({&vertex, 0});
}
template <typename TIndexAccessor>
void TryInsertLabelPropertyIndex(Vertex &vertex, std::pair<LabelId, PropertyId> label_property_pair,
TIndexAccessor &index_accessor) {
if (vertex.deleted || !utils::Contains(vertex.labels, label_property_pair.first)) {
return;
}
auto value = vertex.properties.GetProperty(label_property_pair.second);
if (value.IsNull()) {
return;
}
index_accessor.insert({std::move(value), &vertex, 0});
}
template <typename TSkiplistIter, typename TIndex, typename TIndexKey, typename TFunc>
void CreateIndexOnSingleThread(utils::SkipList<Vertex>::Accessor &vertices, TSkiplistIter it, TIndex &index,
TIndexKey key, const TFunc &func) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
try {
auto acc = it->second.access();
for (Vertex &vertex : vertices) {
func(vertex, key, acc);
}
} catch (const utils::OutOfMemoryException &) {
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
index.erase(it);
throw;
}
}
template <typename TIndex, typename TIndexKey, typename TSKiplistIter, typename TFunc>
void CreateIndexOnMultipleThreads(utils::SkipList<Vertex>::Accessor &vertices, TSKiplistIter skiplist_iter,
TIndex &index, TIndexKey key, const ParalellizedIndexCreationInfo &paralell_exec_info,
const TFunc &func) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
const auto &vertex_batches = paralell_exec_info.first;
const auto thread_count = std::min(paralell_exec_info.second, vertex_batches.size());
MG_ASSERT(!vertex_batches.empty(),
"The size of batches should always be greater than zero if you want to use the parallel version of index "
"creation!");
std::atomic<uint64_t> batch_counter = 0;
utils::Synchronized<std::optional<utils::OutOfMemoryException>, utils::SpinLock> maybe_error{};
{
std::vector<std::jthread> threads;
threads.reserve(thread_count);
for (auto i{0U}; i < thread_count; ++i) {
threads.emplace_back(
[&skiplist_iter, &func, &index, &vertex_batches, &maybe_error, &batch_counter, &key, &vertices]() {
while (!maybe_error.Lock()->has_value()) {
const auto batch_index = batch_counter++;
if (batch_index >= vertex_batches.size()) {
return;
}
const auto &batch = vertex_batches[batch_index];
auto index_accessor = index.at(key).access();
auto it = vertices.find(batch.first);
try {
for (auto i{0U}; i < batch.second; ++i, ++it) {
func(*it, key, index_accessor);
}
} catch (utils::OutOfMemoryException &failure) {
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
index.erase(skiplist_iter);
*maybe_error.Lock() = std::move(failure);
}
}
});
}
}
if (maybe_error.Lock()->has_value()) {
throw utils::OutOfMemoryException((*maybe_error.Lock())->what());
}
}
} // namespace
void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx) {
@ -272,27 +363,43 @@ void LabelIndex::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transacti
acc.insert(Entry{vertex, tx.start_timestamp});
}
bool LabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
bool LabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParalellizedIndexCreationInfo> &paralell_exec_info) {
auto create_index_seq = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
std::map<LabelId, utils::SkipList<Entry>>::iterator it) {
using IndexAccessor = decltype(it->second.access());
CreateIndexOnSingleThread(vertices, it, index_, label,
[](Vertex &vertex, LabelId label, IndexAccessor &index_accessor) {
TryInsertLabelIndex(vertex, label, index_accessor);
});
return true;
};
auto create_index_par = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
std::map<LabelId, utils::SkipList<Entry>>::iterator label_it,
const ParalellizedIndexCreationInfo &paralell_exec_info) {
using IndexAccessor = decltype(label_it->second.access());
CreateIndexOnMultipleThreads(vertices, label_it, index_, label, paralell_exec_info,
[](Vertex &vertex, LabelId label, IndexAccessor &index_accessor) {
TryInsertLabelIndex(vertex, label, index_accessor);
});
return true;
};
auto [it, emplaced] = index_.emplace(std::piecewise_construct, std::forward_as_tuple(label), std::forward_as_tuple());
if (!emplaced) {
// Index already exists.
return false;
}
try {
auto acc = it->second.access();
for (Vertex &vertex : vertices) {
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
continue;
}
acc.insert(Entry{&vertex, 0});
}
} catch (const utils::OutOfMemoryException &) {
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
index_.erase(it);
throw;
if (paralell_exec_info) {
return create_index_par(label, vertices, it, *paralell_exec_info);
}
return true;
return create_index_seq(label, vertices, it);
}
std::vector<LabelId> LabelIndex::ListIndices() const {
@ -418,32 +525,46 @@ void LabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const Property
}
}
bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
bool LabelPropertyIndex::CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParalellizedIndexCreationInfo> &paralell_exec_info) {
auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator it) {
using IndexAccessor = decltype(it->second.access());
CreateIndexOnSingleThread(vertices, it, index_, std::make_pair(label, property),
[](Vertex &vertex, std::pair<LabelId, PropertyId> key, IndexAccessor &index_accessor) {
TryInsertLabelPropertyIndex(vertex, key, index_accessor);
});
return true;
};
auto create_index_par =
[this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator label_property_it,
const ParalellizedIndexCreationInfo &paralell_exec_info) {
using IndexAccessor = decltype(label_property_it->second.access());
CreateIndexOnMultipleThreads(
vertices, label_property_it, index_, std::make_pair(label, property), paralell_exec_info,
[](Vertex &vertex, std::pair<LabelId, PropertyId> key, IndexAccessor &index_accessor) {
TryInsertLabelPropertyIndex(vertex, key, index_accessor);
});
return true;
};
auto [it, emplaced] =
index_.emplace(std::piecewise_construct, std::forward_as_tuple(label, property), std::forward_as_tuple());
if (!emplaced) {
// Index already exists.
return false;
}
try {
auto acc = it->second.access();
for (Vertex &vertex : vertices) {
if (vertex.deleted || !utils::Contains(vertex.labels, label)) {
continue;
}
auto value = vertex.properties.GetProperty(property);
if (value.IsNull()) {
continue;
}
acc.insert(Entry{std::move(value), &vertex, 0});
}
} catch (const utils::OutOfMemoryException &) {
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_exception_blocker;
index_.erase(it);
throw;
if (paralell_exec_info) {
return create_index_par(label, property, vertices, it, *paralell_exec_info);
}
return true;
return create_index_seq(label, property, vertices, it);
}
std::vector<std::pair<LabelId, PropertyId>> LabelPropertyIndex::ListIndices() const {

View File

@ -28,6 +28,9 @@ namespace memgraph::storage {
struct Indices;
struct Constraints;
using ParalellizedIndexCreationInfo =
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
class LabelIndex {
private:
struct Entry {
@ -58,7 +61,8 @@ class LabelIndex {
void UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx);
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices);
bool CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParalellizedIndexCreationInfo> &paralell_exec_info = std::nullopt);
/// Returns false if there was no index to drop
bool DropIndex(LabelId label) { return index_.erase(label) > 0; }
@ -160,7 +164,8 @@ class LabelPropertyIndex {
void UpdateOnSetProperty(PropertyId property, const PropertyValue &value, Vertex *vertex, const Transaction &tx);
/// @throw std::bad_alloc
bool CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices);
bool CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
const std::optional<ParalellizedIndexCreationInfo> &paralell_exec_info = std::nullopt);
bool DropIndex(LabelId label, PropertyId property) { return index_.erase({label, property}) > 0; }

View File

@ -98,6 +98,11 @@ startup_config_dict = {
"IP address on which the websocket server for Memgraph monitoring should listen.",
),
"monitoring_port": ("7444", "7444", "Port on which the websocket server for Memgraph monitoring should listen."),
"storage_parallel_index_recovery": (
"false",
"false",
"Controls whether the index creation can be done in a multithreaded fashion.",
),
"password_encryption_algorithm": ("bcrypt", "bcrypt", "The password encryption algorithm used for authentication."),
"pulsar_service_url": ("", "", "Default URL used while connecting to Pulsar brokers."),
"query_execution_timeout_sec": (