Fix sequential label-property index recovery (#1135)
The parallel_exec_info should have been passed to this function before, otherwise, the recovery of label-property indices would never have been parallelized.
This commit is contained in:
parent
e5350a011c
commit
260660f1dd
@ -15,7 +15,7 @@
|
|||||||
namespace memgraph::storage {
|
namespace memgraph::storage {
|
||||||
|
|
||||||
/// TODO: andi. Too many copies, extract at one place
|
/// TODO: andi. Too many copies, extract at one place
|
||||||
using ParalellizedIndexCreationInfo =
|
using ParallelizedIndexCreationInfo =
|
||||||
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
||||||
|
|
||||||
class DiskLabelPropertyIndex : public storage::LabelPropertyIndex {
|
class DiskLabelPropertyIndex : public storage::LabelPropertyIndex {
|
||||||
|
@ -127,13 +127,13 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
|
|||||||
// recovery process.
|
// recovery process.
|
||||||
void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
|
void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices,
|
||||||
Constraints *constraints, utils::SkipList<Vertex> *vertices,
|
Constraints *constraints, utils::SkipList<Vertex> *vertices,
|
||||||
const std::optional<ParalellizedIndexCreationInfo> ¶lell_exec_info) {
|
const std::optional<ParallelizedIndexCreationInfo> ¶llel_exec_info) {
|
||||||
spdlog::info("Recreating indices from metadata.");
|
spdlog::info("Recreating indices from metadata.");
|
||||||
// Recover label indices.
|
// Recover label indices.
|
||||||
spdlog::info("Recreating {} label indices from metadata.", indices_constraints.indices.label.size());
|
spdlog::info("Recreating {} label indices from metadata.", indices_constraints.indices.label.size());
|
||||||
for (const auto &item : indices_constraints.indices.label) {
|
for (const auto &item : indices_constraints.indices.label) {
|
||||||
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(indices->label_index_.get());
|
auto *mem_label_index = static_cast<InMemoryLabelIndex *>(indices->label_index_.get());
|
||||||
if (!mem_label_index->CreateIndex(item, vertices->access(), paralell_exec_info))
|
if (!mem_label_index->CreateIndex(item, vertices->access(), parallel_exec_info))
|
||||||
throw RecoveryFailure("The label index must be created here!");
|
throw RecoveryFailure("The label index must be created here!");
|
||||||
|
|
||||||
spdlog::info("A label index is recreated from metadata.");
|
spdlog::info("A label index is recreated from metadata.");
|
||||||
@ -145,7 +145,7 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
|
|||||||
indices_constraints.indices.label_property.size());
|
indices_constraints.indices.label_property.size());
|
||||||
auto *mem_label_property_index = static_cast<InMemoryLabelPropertyIndex *>(indices->label_property_index_.get());
|
auto *mem_label_property_index = static_cast<InMemoryLabelPropertyIndex *>(indices->label_property_index_.get());
|
||||||
for (const auto &item : indices_constraints.indices.label_property) {
|
for (const auto &item : indices_constraints.indices.label_property) {
|
||||||
if (!mem_label_property_index->CreateIndex(item.first, item.second, vertices->access(), std::nullopt))
|
if (!mem_label_property_index->CreateIndex(item.first, item.second, vertices->access(), parallel_exec_info))
|
||||||
throw RecoveryFailure("The label+property index must be created here!");
|
throw RecoveryFailure("The label+property index must be created here!");
|
||||||
spdlog::info("A label+property index is recreated from metadata.");
|
spdlog::info("A label+property index is recreated from metadata.");
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
|
|||||||
std::string_view uuid = "",
|
std::string_view uuid = "",
|
||||||
std::optional<size_t> current_seq_num = {});
|
std::optional<size_t> current_seq_num = {});
|
||||||
|
|
||||||
using ParalellizedIndexCreationInfo =
|
using ParallelizedIndexCreationInfo =
|
||||||
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
||||||
|
|
||||||
// Helper function used to recover all discovered indices and constraints. The
|
// Helper function used to recover all discovered indices and constraints. The
|
||||||
@ -102,7 +102,7 @@ using ParalellizedIndexCreationInfo =
|
|||||||
void RecoverIndicesAndConstraints(
|
void RecoverIndicesAndConstraints(
|
||||||
const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, Constraints *constraints,
|
const RecoveredIndicesAndConstraints &indices_constraints, Indices *indices, Constraints *constraints,
|
||||||
utils::SkipList<Vertex> *vertices,
|
utils::SkipList<Vertex> *vertices,
|
||||||
const std::optional<ParalellizedIndexCreationInfo> ¶lell_exec_info = std::nullopt);
|
const std::optional<ParallelizedIndexCreationInfo> ¶llel_exec_info = std::nullopt);
|
||||||
|
|
||||||
/// Recovers data either from a snapshot and/or WAL files.
|
/// Recovers data either from a snapshot and/or WAL files.
|
||||||
/// @throw RecoveryFailure
|
/// @throw RecoveryFailure
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
|
|
||||||
namespace memgraph::storage {
|
namespace memgraph::storage {
|
||||||
|
|
||||||
using ParalellizedIndexCreationInfo =
|
using ParallelizedIndexCreationInfo =
|
||||||
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
||||||
|
|
||||||
/// Traverses deltas visible from transaction with start timestamp greater than
|
/// Traverses deltas visible from transaction with start timestamp greater than
|
||||||
@ -306,11 +306,11 @@ inline void CreateIndexOnSingleThread(utils::SkipList<Vertex>::Accessor &vertice
|
|||||||
template <typename TIndex, typename TIndexKey, typename TSKiplistIter, typename TFunc>
|
template <typename TIndex, typename TIndexKey, typename TSKiplistIter, typename TFunc>
|
||||||
inline void CreateIndexOnMultipleThreads(utils::SkipList<Vertex>::Accessor &vertices, TSKiplistIter skiplist_iter,
|
inline void CreateIndexOnMultipleThreads(utils::SkipList<Vertex>::Accessor &vertices, TSKiplistIter skiplist_iter,
|
||||||
TIndex &index, TIndexKey key,
|
TIndex &index, TIndexKey key,
|
||||||
const ParalellizedIndexCreationInfo ¶lell_exec_info, const TFunc &func) {
|
const ParallelizedIndexCreationInfo ¶llel_exec_info, const TFunc &func) {
|
||||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||||
|
|
||||||
const auto &vertex_batches = paralell_exec_info.first;
|
const auto &vertex_batches = parallel_exec_info.first;
|
||||||
const auto thread_count = std::min(paralell_exec_info.second, vertex_batches.size());
|
const auto thread_count = std::min(parallel_exec_info.second, vertex_batches.size());
|
||||||
|
|
||||||
MG_ASSERT(!vertex_batches.empty(),
|
MG_ASSERT(!vertex_batches.empty(),
|
||||||
"The size of batches should always be greater than zero if you want to use the parallel version of index "
|
"The size of batches should always be greater than zero if you want to use the parallel version of index "
|
||||||
|
@ -25,7 +25,7 @@ void InMemoryLabelIndex::UpdateOnAddLabel(LabelId added_label, Vertex *vertex_af
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
|
bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
|
||||||
const std::optional<ParalellizedIndexCreationInfo> ¶lell_exec_info) {
|
const std::optional<ParallelizedIndexCreationInfo> ¶llel_exec_info) {
|
||||||
const auto create_index_seq = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
|
const auto create_index_seq = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
|
||||||
std::map<LabelId, utils::SkipList<Entry>>::iterator it) {
|
std::map<LabelId, utils::SkipList<Entry>>::iterator it) {
|
||||||
using IndexAccessor = decltype(it->second.access());
|
using IndexAccessor = decltype(it->second.access());
|
||||||
@ -40,10 +40,10 @@ bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Acc
|
|||||||
|
|
||||||
const auto create_index_par = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
|
const auto create_index_par = [this](LabelId label, utils::SkipList<Vertex>::Accessor &vertices,
|
||||||
std::map<LabelId, utils::SkipList<Entry>>::iterator label_it,
|
std::map<LabelId, utils::SkipList<Entry>>::iterator label_it,
|
||||||
const ParalellizedIndexCreationInfo ¶lell_exec_info) {
|
const ParallelizedIndexCreationInfo ¶llel_exec_info) {
|
||||||
using IndexAccessor = decltype(label_it->second.access());
|
using IndexAccessor = decltype(label_it->second.access());
|
||||||
|
|
||||||
CreateIndexOnMultipleThreads(vertices, label_it, index_, label, paralell_exec_info,
|
CreateIndexOnMultipleThreads(vertices, label_it, index_, label, parallel_exec_info,
|
||||||
[](Vertex &vertex, LabelId label, IndexAccessor &index_accessor) {
|
[](Vertex &vertex, LabelId label, IndexAccessor &index_accessor) {
|
||||||
TryInsertLabelIndex(vertex, label, index_accessor);
|
TryInsertLabelIndex(vertex, label, index_accessor);
|
||||||
});
|
});
|
||||||
@ -57,8 +57,8 @@ bool InMemoryLabelIndex::CreateIndex(LabelId label, utils::SkipList<Vertex>::Acc
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (paralell_exec_info) {
|
if (parallel_exec_info) {
|
||||||
return create_index_par(label, vertices, it, *paralell_exec_info);
|
return create_index_par(label, vertices, it, *parallel_exec_info);
|
||||||
}
|
}
|
||||||
return create_index_seq(label, vertices, it);
|
return create_index_seq(label, vertices, it);
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ struct LabelIndexStats {
|
|||||||
double avg_degree;
|
double avg_degree;
|
||||||
};
|
};
|
||||||
|
|
||||||
using ParalellizedIndexCreationInfo =
|
using ParallelizedIndexCreationInfo =
|
||||||
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
||||||
|
|
||||||
class InMemoryLabelIndex : public storage::LabelIndex {
|
class InMemoryLabelIndex : public storage::LabelIndex {
|
||||||
@ -46,7 +46,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
|
|||||||
|
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
bool CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
|
bool CreateIndex(LabelId label, utils::SkipList<Vertex>::Accessor vertices,
|
||||||
const std::optional<ParalellizedIndexCreationInfo> ¶lell_exec_info);
|
const std::optional<ParallelizedIndexCreationInfo> ¶llel_exec_info);
|
||||||
|
|
||||||
/// Returns false if there was no index to drop
|
/// Returns false if there was no index to drop
|
||||||
bool DropIndex(LabelId label) override;
|
bool DropIndex(LabelId label) override;
|
||||||
|
@ -37,7 +37,7 @@ InMemoryLabelPropertyIndex::InMemoryLabelPropertyIndex(Indices *indices, Constra
|
|||||||
|
|
||||||
bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
|
bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
|
||||||
utils::SkipList<Vertex>::Accessor vertices,
|
utils::SkipList<Vertex>::Accessor vertices,
|
||||||
const std::optional<ParalellizedIndexCreationInfo> ¶lell_exec_info) {
|
const std::optional<ParallelizedIndexCreationInfo> ¶llel_exec_info) {
|
||||||
auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
|
auto create_index_seq = [this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
|
||||||
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator it) {
|
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator it) {
|
||||||
using IndexAccessor = decltype(it->second.access());
|
using IndexAccessor = decltype(it->second.access());
|
||||||
@ -53,11 +53,11 @@ bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
|
|||||||
auto create_index_par =
|
auto create_index_par =
|
||||||
[this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
|
[this](LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor &vertices,
|
||||||
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator label_property_it,
|
std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>>::iterator label_property_it,
|
||||||
const ParalellizedIndexCreationInfo ¶lell_exec_info) {
|
const ParallelizedIndexCreationInfo ¶llel_exec_info) {
|
||||||
using IndexAccessor = decltype(label_property_it->second.access());
|
using IndexAccessor = decltype(label_property_it->second.access());
|
||||||
|
|
||||||
CreateIndexOnMultipleThreads(
|
CreateIndexOnMultipleThreads(
|
||||||
vertices, label_property_it, index_, std::make_pair(label, property), paralell_exec_info,
|
vertices, label_property_it, index_, std::make_pair(label, property), parallel_exec_info,
|
||||||
[](Vertex &vertex, std::pair<LabelId, PropertyId> key, IndexAccessor &index_accessor) {
|
[](Vertex &vertex, std::pair<LabelId, PropertyId> key, IndexAccessor &index_accessor) {
|
||||||
TryInsertLabelPropertyIndex(vertex, key, index_accessor);
|
TryInsertLabelPropertyIndex(vertex, key, index_accessor);
|
||||||
});
|
});
|
||||||
@ -72,8 +72,8 @@ bool InMemoryLabelPropertyIndex::CreateIndex(LabelId label, PropertyId property,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (paralell_exec_info) {
|
if (parallel_exec_info) {
|
||||||
return create_index_par(label, property, vertices, it, *paralell_exec_info);
|
return create_index_par(label, property, vertices, it, *parallel_exec_info);
|
||||||
}
|
}
|
||||||
return create_index_seq(label, property, vertices, it);
|
return create_index_seq(label, property, vertices, it);
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ struct LabelPropertyIndexStats {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/// TODO: andi. Too many copies, extract at one place
|
/// TODO: andi. Too many copies, extract at one place
|
||||||
using ParalellizedIndexCreationInfo =
|
using ParallelizedIndexCreationInfo =
|
||||||
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
std::pair<std::vector<std::pair<Gid, uint64_t>> /*vertex_recovery_info*/, uint64_t /*thread_count*/>;
|
||||||
|
|
||||||
class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
||||||
@ -43,7 +43,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
|||||||
|
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
bool CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
|
bool CreateIndex(LabelId label, PropertyId property, utils::SkipList<Vertex>::Accessor vertices,
|
||||||
const std::optional<ParalellizedIndexCreationInfo> ¶lell_exec_info);
|
const std::optional<ParallelizedIndexCreationInfo> ¶llel_exec_info);
|
||||||
|
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) override;
|
void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, const Transaction &tx) override;
|
||||||
|
Loading…
Reference in New Issue
Block a user