Move Tantivy directories inside the storage directory & delete duplicates
This commit is contained in:
parent
4841ee1573
commit
4a64a38511
@ -148,7 +148,8 @@ void RecoverConstraints(const RecoveredIndicesAndConstraints::ConstraintsMetadat
|
||||
|
||||
void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadata &indices_metadata, Indices *indices,
|
||||
utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
|
||||
const std::optional<ParallelizedSchemaCreationInfo> ¶llel_exec_info) {
|
||||
const std::optional<ParallelizedSchemaCreationInfo> ¶llel_exec_info,
|
||||
std::optional<std::filesystem::path> storage_dir) {
|
||||
spdlog::info("Recreating indices from metadata.");
|
||||
|
||||
// Recover label indices.
|
||||
@ -203,7 +204,7 @@ void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadat
|
||||
auto &mem_text_index = indices->text_index_;
|
||||
for (const auto &[index_name, label] : indices_metadata.text_indices) {
|
||||
try {
|
||||
mem_text_index.RecoverIndex(index_name, label, vertices->access(), name_id_mapper);
|
||||
mem_text_index.RecoverIndex(storage_dir.value(), index_name, label, vertices->access(), name_id_mapper);
|
||||
} catch (...) {
|
||||
throw RecoveryFailure("The text index must be created here!");
|
||||
}
|
||||
@ -335,8 +336,13 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
|
||||
repl_storage_state.epoch_.SetEpoch(std::move(recovered_snapshot->snapshot_info.epoch_id));
|
||||
|
||||
if (!utils::DirExists(wal_directory_)) {
|
||||
std::optional<std::filesystem::path> storage_dir = std::nullopt;
|
||||
if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
storage_dir = config.durability.storage_directory;
|
||||
}
|
||||
|
||||
RecoverIndicesAndStats(indices_constraints.indices, indices, vertices, name_id_mapper,
|
||||
GetParallelExecInfoIndices(recovery_info, config));
|
||||
GetParallelExecInfoIndices(recovery_info, config), storage_dir);
|
||||
RecoverConstraints(indices_constraints.constraints, constraints, vertices, name_id_mapper,
|
||||
GetParallelExecInfo(recovery_info, config));
|
||||
return recovered_snapshot->recovery_info;
|
||||
@ -465,8 +471,13 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
|
||||
spdlog::info("All necessary WAL files are loaded successfully.");
|
||||
}
|
||||
|
||||
std::optional<std::filesystem::path> storage_dir = std::nullopt;
|
||||
if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
storage_dir = config.durability.storage_directory;
|
||||
}
|
||||
|
||||
RecoverIndicesAndStats(indices_constraints.indices, indices, vertices, name_id_mapper,
|
||||
GetParallelExecInfoIndices(recovery_info, config));
|
||||
GetParallelExecInfoIndices(recovery_info, config), storage_dir);
|
||||
RecoverConstraints(indices_constraints.constraints, constraints, vertices, name_id_mapper,
|
||||
GetParallelExecInfo(recovery_info, config));
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -102,7 +102,8 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
|
||||
/// @throw RecoveryFailure
|
||||
void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadata &indices_metadata, Indices *indices,
|
||||
utils::SkipList<Vertex> *vertices, NameIdMapper *name_id_mapper,
|
||||
const std::optional<ParallelizedSchemaCreationInfo> ¶llel_exec_info = std::nullopt);
|
||||
const std::optional<ParallelizedSchemaCreationInfo> ¶llel_exec_info = std::nullopt,
|
||||
std::optional<std::filesystem::path> storage_dir = std::nullopt);
|
||||
|
||||
// Helper function used to recover all discovered constraints. The
|
||||
// constraints must be recovered after the data recovery is done
|
||||
|
@ -24,7 +24,12 @@ std::string GetPropertyName(PropertyId prop_id, NameIdMapper *name_id_mapper) {
|
||||
return name_id_mapper->IdToName(prop_id.AsUint());
|
||||
}
|
||||
|
||||
void TextIndex::CreateEmptyIndex(const std::string &index_name, LabelId label) {
|
||||
inline std::string TextIndex::MakeIndexPath(const std::filesystem::path &storage_dir, const std::string &index_name) {
|
||||
return (storage_dir / kTextIndicesDirectory / index_name).string();
|
||||
}
|
||||
|
||||
void TextIndex::CreateEmptyIndex(const std::filesystem::path &storage_dir, const std::string &index_name,
|
||||
LabelId label) {
|
||||
if (!flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
@ -40,10 +45,10 @@ void TextIndex::CreateEmptyIndex(const std::string &index_name, LabelId label) {
|
||||
mappings["properties"]["data"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
mappings["properties"]["all"] = {{"type", "text"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
|
||||
index_.emplace(index_name,
|
||||
TextIndexData{.context_ = mgcxx::text_search::create_index(
|
||||
index_name, mgcxx::text_search::IndexConfig{.mappings = mappings.dump()}),
|
||||
.scope_ = label});
|
||||
index_.emplace(index_name, TextIndexData{.context_ = mgcxx::text_search::create_index(
|
||||
MakeIndexPath(storage_dir, index_name),
|
||||
mgcxx::text_search::IndexConfig{.mappings = mappings.dump()}),
|
||||
.scope_ = label});
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
@ -214,12 +219,13 @@ void TextIndex::RemoveNode(Vertex *vertex_after_update,
|
||||
}
|
||||
}
|
||||
|
||||
void TextIndex::CreateIndex(const std::string &index_name, LabelId label, memgraph::query::DbAccessor *db) {
|
||||
void TextIndex::CreateIndex(const std::filesystem::path &storage_dir, const std::string &index_name, LabelId label,
|
||||
memgraph::query::DbAccessor *db) {
|
||||
if (!flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
CreateEmptyIndex(index_name, label);
|
||||
CreateEmptyIndex(storage_dir, index_name, label);
|
||||
|
||||
for (const auto &v : db->Vertices(View::NEW)) {
|
||||
if (!v.HasLabel(View::NEW, label).GetValue()) {
|
||||
@ -234,13 +240,16 @@ void TextIndex::CreateIndex(const std::string &index_name, LabelId label, memgra
|
||||
CommitLoadedNodes(index_.at(index_name).context_);
|
||||
}
|
||||
|
||||
void TextIndex::RecoverIndex(const std::string &index_name, LabelId label,
|
||||
void TextIndex::RecoverIndex(const std::filesystem::path &storage_dir, const std::string &index_name, LabelId label,
|
||||
memgraph::utils::SkipList<Vertex>::Accessor vertices, NameIdMapper *name_id_mapper) {
|
||||
if (!flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
CreateEmptyIndex(index_name, label);
|
||||
// Clear Tantivy-internal files if they exist from previous sessions
|
||||
std::filesystem::remove_all(storage_dir / kTextIndicesDirectory / index_name);
|
||||
|
||||
CreateEmptyIndex(storage_dir, index_name, label);
|
||||
|
||||
for (const auto &v : vertices) {
|
||||
if (std::find(v.labels.begin(), v.labels.end(), label) == v.labels.end()) {
|
||||
@ -255,7 +264,7 @@ void TextIndex::RecoverIndex(const std::string &index_name, LabelId label,
|
||||
CommitLoadedNodes(index_.at(index_name).context_);
|
||||
}
|
||||
|
||||
LabelId TextIndex::DropIndex(const std::string &index_name) {
|
||||
LabelId TextIndex::DropIndex(const std::filesystem::path &storage_dir, const std::string &index_name) {
|
||||
if (!flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
@ -265,7 +274,7 @@ LabelId TextIndex::DropIndex(const std::string &index_name) {
|
||||
}
|
||||
|
||||
try {
|
||||
mgcxx::text_search::drop_index(index_name);
|
||||
mgcxx::text_search::drop_index(MakeIndexPath(storage_dir, index_name));
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
|
@ -36,8 +36,11 @@ enum class TextSearchMode : uint8_t {
|
||||
class TextIndex {
|
||||
private:
|
||||
static constexpr bool kDoSkipCommit = true;
|
||||
static constexpr std::string_view kTextIndicesDirectory = "text_indices";
|
||||
|
||||
void CreateEmptyIndex(const std::string &index_name, LabelId label);
|
||||
inline std::string MakeIndexPath(const std::filesystem::path &storage_dir, const std::string &index_name);
|
||||
|
||||
void CreateEmptyIndex(const std::filesystem::path &storage_dir, const std::string &index_name, LabelId label);
|
||||
|
||||
template <typename T>
|
||||
nlohmann::json SerializeProperties(const std::map<PropertyId, PropertyValue> &properties, T *name_resolver);
|
||||
@ -80,12 +83,13 @@ class TextIndex {
|
||||
void RemoveNode(Vertex *vertex,
|
||||
std::optional<std::vector<mgcxx::text_search::Context *>> applicable_text_indices = std::nullopt);
|
||||
|
||||
void CreateIndex(const std::string &index_name, LabelId label, memgraph::query::DbAccessor *db);
|
||||
void CreateIndex(const std::filesystem::path &storage_dir, const std::string &index_name, LabelId label,
|
||||
memgraph::query::DbAccessor *db);
|
||||
|
||||
void RecoverIndex(const std::string &index_name, LabelId label, memgraph::utils::SkipList<Vertex>::Accessor vertices,
|
||||
NameIdMapper *name_id_mapper);
|
||||
void RecoverIndex(const std::filesystem::path &storage_dir, const std::string &index_name, LabelId label,
|
||||
memgraph::utils::SkipList<Vertex>::Accessor vertices, NameIdMapper *name_id_mapper);
|
||||
|
||||
LabelId DropIndex(const std::string &index_name);
|
||||
LabelId DropIndex(const std::filesystem::path &storage_dir, const std::string &index_name);
|
||||
|
||||
bool IndexExists(const std::string &index_name) const;
|
||||
|
||||
|
@ -553,14 +553,15 @@ void Storage::Accessor::MarkEdgeAsDeleted(Edge *edge) {
|
||||
|
||||
void Storage::Accessor::CreateTextIndex(const std::string &index_name, LabelId label, query::DbAccessor *db) {
|
||||
MG_ASSERT(unique_guard_.owns_lock(), "Creating a text index requires unique access to storage!");
|
||||
storage_->indices_.text_index_.CreateIndex(index_name, label, db);
|
||||
storage_->indices_.text_index_.CreateIndex(storage_->config_.durability.storage_directory, index_name, label, db);
|
||||
transaction_.md_deltas.emplace_back(MetadataDelta::text_index_create, index_name, label);
|
||||
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveTextIndices);
|
||||
}
|
||||
|
||||
void Storage::Accessor::DropTextIndex(const std::string &index_name) {
|
||||
MG_ASSERT(unique_guard_.owns_lock(), "Dropping a text index requires unique access to storage!");
|
||||
auto deleted_index_label = storage_->indices_.text_index_.DropIndex(index_name);
|
||||
auto deleted_index_label =
|
||||
storage_->indices_.text_index_.DropIndex(storage_->config_.durability.storage_directory, index_name);
|
||||
transaction_.md_deltas.emplace_back(MetadataDelta::text_index_drop, index_name, deleted_index_label);
|
||||
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveTextIndices);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user