Add durability for text indices
This commit is contained in:
parent
ea64f2e906
commit
4b46f1bb54
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -197,6 +197,21 @@ void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadat
|
||||
}
|
||||
spdlog::info("Label+property indices statistics are recreated.");
|
||||
|
||||
if (flags::run_time::GetTextSearchEnabled()) {
|
||||
// Recover text indices.
|
||||
spdlog::info("Recreating {} text indices from metadata.", indices_metadata.text.size());
|
||||
auto *mem_text_index = static_cast<TextIndex *>(indices->text_index_.get());
|
||||
for (const auto &item : indices_metadata.text) {
|
||||
const auto index_name = item.first;
|
||||
const auto label_id = item.second;
|
||||
|
||||
if (!mem_text_index->RecoverIndex(item.first, item.second, vertices->access(), name_id_mapper))
|
||||
throw RecoveryFailure("The text index must be created here!");
|
||||
spdlog::info("Text index {} on :{} is recreated from metadata", item.first,
|
||||
name_id_mapper->IdToName(item.second.AsUint()));
|
||||
}
|
||||
}
|
||||
|
||||
spdlog::info("Indices are recreated.");
|
||||
|
||||
spdlog::info("Recreating constraints from metadata.");
|
||||
|
@ -13,6 +13,7 @@
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "flags/run_time_configurable.hpp"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "storage/v2/durability/exceptions.hpp"
|
||||
#include "storage/v2/durability/paths.hpp"
|
||||
@ -799,7 +800,6 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.",
|
||||
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||
props.SetProperty(get_property_from_id(*key), *value);
|
||||
// TODO antepusic: update text index here or at the end of LoadSnapshot()?
|
||||
}
|
||||
}
|
||||
|
||||
@ -993,6 +993,24 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
|
||||
spdlog::info("Metadata of label+property indices are recovered.");
|
||||
}
|
||||
|
||||
// Recover text indices.
|
||||
if (flags::run_time::GetTextSearchEnabled()) {
|
||||
auto size = snapshot.ReadUint();
|
||||
if (!size) throw RecoveryFailure("Couldn't recover the number of text indices!");
|
||||
spdlog::info("Recovering metadata of {} text indices.", *size);
|
||||
for (uint64_t i = 0; i < *size; ++i) {
|
||||
auto index_name = snapshot.ReadString();
|
||||
if (!index_name.has_value()) throw RecoveryFailure("Couldn't read text index name!");
|
||||
auto label = snapshot.ReadUint();
|
||||
if (!label) throw RecoveryFailure("Couldn't read text index label!");
|
||||
AddRecoveredIndexConstraint(&indices_constraints.indices.text, {index_name.value(), get_label_from_id(*label)},
|
||||
"The text index already exists!");
|
||||
SPDLOG_TRACE("Recovered metadata of text index {} for :{}", index_name.value(),
|
||||
name_id_mapper->IdToName(snapshot_id_map.at(*label)));
|
||||
}
|
||||
spdlog::info("Metadata of label+property indices are recovered.");
|
||||
}
|
||||
|
||||
spdlog::info("Metadata of indices are recovered.");
|
||||
}
|
||||
|
||||
@ -2106,6 +2124,16 @@ void CreateSnapshot(Storage *storage, Transaction *transaction, const std::files
|
||||
snapshot.SetPosition(last_pos);
|
||||
}
|
||||
}
|
||||
|
||||
// Write text indices.
|
||||
if (flags::run_time::GetTextSearchEnabled()) {
|
||||
auto text = storage->indices_.text_index_->ListIndices();
|
||||
snapshot.WriteUint(text.size());
|
||||
for (const auto &item : text) {
|
||||
snapshot.WriteString(item.first);
|
||||
write_mapping(item.second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write constraints.
|
||||
|
@ -802,7 +802,6 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
|
||||
auto &property_value = delta.vertex_edge_set_property.value;
|
||||
|
||||
vertex->properties.SetProperty(property_id, property_value);
|
||||
// TODO antepusic: update text index here or at the end of LoadWal()?
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -292,6 +292,105 @@ bool TextIndex::CreateIndex(std::string index_name, LabelId label, memgraph::que
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TextIndex::RecoverIndex(std::string index_name, LabelId label,
|
||||
memgraph::utils::SkipList<Vertex>::Accessor vertices, NameIdMapper *name_id_mapper) {
|
||||
if (!flags::run_time::GetTextSearchEnabled()) {
|
||||
throw query::QueryException("To use text indices, enable the text search feature.");
|
||||
}
|
||||
|
||||
nlohmann::json mappings = {};
|
||||
mappings["properties"] = {};
|
||||
mappings["properties"]["metadata"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
mappings["properties"]["data"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
|
||||
try {
|
||||
index_.emplace(index_name,
|
||||
TextIndexData{.context_ = mgcxx::text_search::create_index(
|
||||
index_name, mgcxx::text_search::IndexConfig{.mappings = mappings.dump()}),
|
||||
.scope_ = label});
|
||||
} catch (const std::exception &e) {
|
||||
throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
|
||||
}
|
||||
label_to_index_.emplace(label, index_name);
|
||||
|
||||
bool has_schema = false;
|
||||
std::vector<std::pair<PropertyId, std::string>> indexed_properties{};
|
||||
auto &index_context = index_.at(index_name).context_;
|
||||
for (const auto &v : vertices) {
|
||||
if (std::find(v.labels.begin(), v.labels.end(), label) == v.labels.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto vertex_properties = v.properties.Properties();
|
||||
|
||||
if (!has_schema) [[unlikely]] {
|
||||
for (const auto &[prop_id, prop_val] : vertex_properties) {
|
||||
if (prop_val.IsBool() || prop_val.IsInt() || prop_val.IsDouble() || prop_val.IsString()) {
|
||||
indexed_properties.emplace_back(
|
||||
std::pair<PropertyId, std::string>{prop_id, name_id_mapper->IdToName(prop_id.AsUint())});
|
||||
}
|
||||
}
|
||||
has_schema = true;
|
||||
}
|
||||
|
||||
nlohmann::json document = {};
|
||||
nlohmann::json properties = {};
|
||||
for (const auto &[prop_id, prop_name] : indexed_properties) {
|
||||
if (!vertex_properties.contains(prop_id)) {
|
||||
continue;
|
||||
}
|
||||
const auto prop_value = vertex_properties.at(prop_id);
|
||||
switch (prop_value.type()) {
|
||||
case PropertyValue::Type::Bool:
|
||||
properties[prop_name] = prop_value.ValueBool();
|
||||
break;
|
||||
case PropertyValue::Type::Int:
|
||||
properties[prop_name] = prop_value.ValueInt();
|
||||
break;
|
||||
case PropertyValue::Type::Double:
|
||||
properties[prop_name] = prop_value.ValueDouble();
|
||||
break;
|
||||
case PropertyValue::Type::String:
|
||||
properties[prop_name] = prop_value.ValueString();
|
||||
break;
|
||||
case PropertyValue::Type::Null:
|
||||
case PropertyValue::Type::List:
|
||||
case PropertyValue::Type::Map:
|
||||
case PropertyValue::Type::TemporalData:
|
||||
default:
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
document["data"] = properties;
|
||||
document["metadata"] = {};
|
||||
document["metadata"]["gid"] = v.gid.AsInt();
|
||||
document["metadata"]["txid"] = -1;
|
||||
document["metadata"]["deleted"] = false;
|
||||
document["metadata"]["is_node"] = true;
|
||||
|
||||
try {
|
||||
mgcxx::text_search::add_document(
|
||||
index_context,
|
||||
mgcxx::text_search::DocumentInput{
|
||||
.data = document.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)},
|
||||
kDoSkipCommit);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
// As CREATE TEXT INDEX (...) queries don’t accumulate deltas, db_transactional_accessor_->Commit() does not reach
|
||||
// the code area where changes to indices are committed. To get around that without needing to commit text indices
|
||||
// after every such query, we commit here.
|
||||
try {
|
||||
mgcxx::text_search::commit(index_context);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TextIndex::DropIndex(std::string index_name) {
|
||||
if (!flags::run_time::GetTextSearchEnabled()) {
|
||||
throw query::QueryException("To use text indices, enable the text search feature.");
|
||||
|
@ -12,6 +12,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/name_id_mapper.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "text_search.hpp"
|
||||
@ -75,6 +76,9 @@ class TextIndex {
|
||||
|
||||
bool CreateIndex(std::string index_name, LabelId label, memgraph::query::DbAccessor *db);
|
||||
|
||||
bool RecoverIndex(std::string index_name, LabelId label, memgraph::utils::SkipList<Vertex>::Accessor vertices,
|
||||
NameIdMapper *name_id_mapper);
|
||||
|
||||
bool DropIndex(std::string index_name);
|
||||
|
||||
bool IndexExists(std::string index_name) const;
|
||||
|
Loading…
Reference in New Issue
Block a user