Refactor TextIndex
This commit is contained in:
parent
ca27f3c21f
commit
73509269ce
@ -645,14 +645,11 @@ class DbAccessor final {
|
||||
return accessor_->DropIndex(label, property);
|
||||
}
|
||||
|
||||
utils::BasicResult<storage::StorageIndexDefinitionError, void> CreateTextIndex(const std::string &index_name,
|
||||
storage::LabelId label) {
|
||||
return accessor_->CreateTextIndex(index_name, label, this);
|
||||
void CreateTextIndex(const std::string &index_name, storage::LabelId label) {
|
||||
accessor_->CreateTextIndex(index_name, label, this);
|
||||
}
|
||||
|
||||
utils::BasicResult<storage::StorageIndexDefinitionError, void> DropTextIndex(const std::string &index_name) {
|
||||
return accessor_->DropTextIndex(index_name);
|
||||
}
|
||||
void DropTextIndex(const std::string &index_name) { accessor_->DropTextIndex(index_name); }
|
||||
|
||||
utils::BasicResult<storage::StorageExistenceConstraintDefinitionError, void> CreateExistenceConstraint(
|
||||
storage::LabelId label, storage::PropertyId property) {
|
||||
|
@ -2648,7 +2648,7 @@ PreparedQuery PrepareTextIndexQuery(ParsedQuery parsed_query, bool in_explicit_t
|
||||
index_notification.title = fmt::format("Created text index on label {}.", text_index_query->label_.name);
|
||||
|
||||
// TODO: not just storage + invalidate_plan_cache. Need a DB transaction (for replication)
|
||||
handler = [dba, label, index_name, label_name = text_index_query->label_.name,
|
||||
handler = [dba, label, index_name,
|
||||
invalidate_plan_cache = std::move(invalidate_plan_cache)](Notification &index_notification) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw TextSearchDisabledException();
|
||||
@ -2662,7 +2662,7 @@ PreparedQuery PrepareTextIndexQuery(ParsedQuery parsed_query, bool in_explicit_t
|
||||
index_notification.code = NotificationCode::DROP_INDEX;
|
||||
index_notification.title = fmt::format("Dropped text index on label {}.", text_index_query->label_.name);
|
||||
// TODO: not just storage + invalidate_plan_cache. Need a DB transaction (for replication)
|
||||
handler = [dba, label, index_name, label_name = text_index_query->label_.name,
|
||||
handler = [dba, index_name,
|
||||
invalidate_plan_cache = std::move(invalidate_plan_cache)](Notification &index_notification) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw TextSearchDisabledException();
|
||||
|
@ -217,7 +217,7 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *fram
|
||||
auto new_node = dba.InsertVertex();
|
||||
context.execution_stats[ExecutionStats::Key::CREATED_NODES] += 1;
|
||||
for (auto label : node_info.labels) {
|
||||
auto maybe_error = new_node.AddLabel(label); // skip updating text indices until all labels are added
|
||||
auto maybe_error = new_node.AddLabel(label);
|
||||
if (maybe_error.HasError()) {
|
||||
switch (maybe_error.GetError()) {
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
|
@ -202,8 +202,11 @@ void RecoverIndicesAndStats(const RecoveredIndicesAndConstraints::IndicesMetadat
|
||||
spdlog::info("Recreating {} text indices from metadata.", indices_metadata.text_indices.size());
|
||||
auto &mem_text_index = indices->text_index_;
|
||||
for (const auto &item : indices_metadata.text_indices) {
|
||||
if (!mem_text_index.RecoverIndex(item.first, item.second, vertices->access(), name_id_mapper))
|
||||
try {
|
||||
mem_text_index.RecoverIndex(item.first, item.second, vertices->access(), name_id_mapper);
|
||||
} catch (...) {
|
||||
throw RecoveryFailure("The text index must be created here!");
|
||||
}
|
||||
spdlog::info("Text index {} on :{} is recreated from metadata", item.first,
|
||||
name_id_mapper->IdToName(item.second.AsUint()));
|
||||
}
|
||||
|
@ -42,25 +42,16 @@ void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std:
|
||||
void Indices::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx, Storage *storage) const {
|
||||
label_index_->UpdateOnAddLabel(label, vertex, tx);
|
||||
label_property_index_->UpdateOnAddLabel(label, vertex, tx);
|
||||
// if (update_text_index) {
|
||||
// text_index_.UpdateOnAddLabel(label, vertex, storage->name_id_mapper_.get(), tx.start_timestamp);
|
||||
// }
|
||||
}
|
||||
|
||||
void Indices::UpdateOnRemoveLabel(LabelId label, Vertex *vertex, const Transaction &tx) const {
|
||||
label_index_->UpdateOnRemoveLabel(label, vertex, tx);
|
||||
label_property_index_->UpdateOnRemoveLabel(label, vertex, tx);
|
||||
// if (update_text_index) {
|
||||
// text_index_.UpdateOnRemoveLabel(label, vertex, tx.start_timestamp);
|
||||
// }
|
||||
}
|
||||
|
||||
void Indices::UpdateOnSetProperty(PropertyId property, const PropertyValue &value, Vertex *vertex,
|
||||
const Transaction &tx, Storage *storage) const {
|
||||
label_property_index_->UpdateOnSetProperty(property, value, vertex, tx);
|
||||
// if (update_text_index) {
|
||||
// text_index_.UpdateOnSetProperty(vertex, storage->name_id_mapper_.get(), tx.start_timestamp);
|
||||
// }
|
||||
}
|
||||
|
||||
Indices::Indices(const Config &config, StorageMode storage_mode) {
|
||||
|
@ -17,30 +17,49 @@
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
void TextIndex::AddNode(Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp,
|
||||
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices) {
|
||||
std::string GetPropertyName(PropertyId prop_id, memgraph::query::DbAccessor *db) { return db->PropertyToName(prop_id); }
|
||||
|
||||
std::string GetPropertyName(PropertyId prop_id, NameIdMapper *name_id_mapper) {
|
||||
return name_id_mapper->IdToName(prop_id.AsUint());
|
||||
}
|
||||
|
||||
void TextIndex::CreateEmptyIndex(const std::string &index_name, LabelId label) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
// NOTE: Text indexes are presently all-property indices. If we allow text indexes restricted to specific properties,
|
||||
// an indexable document should be created for each applicable index.
|
||||
nlohmann::json document = {};
|
||||
nlohmann::json properties = nlohmann::json::value_t::object;
|
||||
for (const auto &[prop_id, prop_value] : vertex_after_update->properties.Properties()) {
|
||||
try {
|
||||
nlohmann::json mappings = {};
|
||||
mappings["properties"] = {};
|
||||
mappings["properties"]["metadata"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
mappings["properties"]["data"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
|
||||
index_.emplace(index_name,
|
||||
TextIndexData{.context_ = mgcxx::text_search::create_index(
|
||||
index_name, mgcxx::text_search::IndexConfig{.mappings = mappings.dump()}),
|
||||
.scope_ = label});
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
label_to_index_.emplace(label, index_name);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
nlohmann::json TextIndex::SerializeProperties(const std::map<PropertyId, PropertyValue> &properties, T *name_resolver) {
|
||||
nlohmann::json serialized_properties = nlohmann::json::value_t::object;
|
||||
for (const auto &[prop_id, prop_value] : properties) {
|
||||
switch (prop_value.type()) {
|
||||
case PropertyValue::Type::Bool:
|
||||
properties[name_id_mapper->IdToName(prop_id.AsUint())] = prop_value.ValueBool();
|
||||
serialized_properties[GetPropertyName(prop_id, name_resolver)] = prop_value.ValueBool();
|
||||
break;
|
||||
case PropertyValue::Type::Int:
|
||||
properties[name_id_mapper->IdToName(prop_id.AsUint())] = prop_value.ValueInt();
|
||||
serialized_properties[GetPropertyName(prop_id, name_resolver)] = prop_value.ValueInt();
|
||||
break;
|
||||
case PropertyValue::Type::Double:
|
||||
properties[name_id_mapper->IdToName(prop_id.AsUint())] = prop_value.ValueDouble();
|
||||
serialized_properties[GetPropertyName(prop_id, name_resolver)] = prop_value.ValueDouble();
|
||||
break;
|
||||
case PropertyValue::Type::String:
|
||||
properties[name_id_mapper->IdToName(prop_id.AsUint())] = prop_value.ValueString();
|
||||
serialized_properties[GetPropertyName(prop_id, name_resolver)] = prop_value.ValueString();
|
||||
break;
|
||||
case PropertyValue::Type::Null:
|
||||
case PropertyValue::Type::List:
|
||||
@ -51,10 +70,35 @@ void TextIndex::AddNode(Vertex *vertex_after_update, NameIdMapper *name_id_mappe
|
||||
}
|
||||
}
|
||||
|
||||
return serialized_properties;
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> TextIndex::GetApplicableTextIndices(const std::vector<LabelId> &labels) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> applicable_text_indices;
|
||||
for (const auto &label : labels) {
|
||||
if (label_to_index_.contains(label)) {
|
||||
applicable_text_indices.push_back(&index_.at(label_to_index_.at(label)).context_);
|
||||
}
|
||||
}
|
||||
return applicable_text_indices;
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> TextIndex::GetApplicableTextIndices(Vertex *vertex) {
|
||||
return GetApplicableTextIndices(vertex->labels);
|
||||
}
|
||||
|
||||
void TextIndex::LoadNodeToTextIndices(const std::int64_t gid, const nlohmann::json &properties,
|
||||
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices) {
|
||||
// NOTE: Text indexes are presently all-property indices. If we allow text indexes restricted to specific properties,
|
||||
// an indexable document should be created for each applicable index.
|
||||
nlohmann::json document = {};
|
||||
document["data"] = properties;
|
||||
document["metadata"] = {};
|
||||
document["metadata"]["gid"] = vertex_after_update->gid.AsInt();
|
||||
document["metadata"]["txid"] = transaction_start_timestamp;
|
||||
document["metadata"]["gid"] = gid;
|
||||
document["metadata"]["deleted"] = false;
|
||||
document["metadata"]["is_node"] = true;
|
||||
|
||||
@ -71,43 +115,53 @@ void TextIndex::AddNode(Vertex *vertex_after_update, NameIdMapper *name_id_mappe
|
||||
}
|
||||
}
|
||||
|
||||
void TextIndex::CommitLoadedNodes(mgcxx::text_search::Context &index_context) {
|
||||
// As CREATE TEXT INDEX (...) queries don’t accumulate deltas, db_transactional_accessor_->Commit() does not reach
|
||||
// the code area where changes to indices are committed. To get around that without needing to commit text indices
|
||||
// after every such query, we commit here.
|
||||
try {
|
||||
mgcxx::text_search::commit(index_context);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void TextIndex::AddNode(Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp) {
|
||||
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
auto vertex_properties = vertex_after_update->properties.Properties();
|
||||
LoadNodeToTextIndices(vertex_after_update->gid.AsInt(), SerializeProperties(vertex_properties, name_id_mapper),
|
||||
applicable_text_indices);
|
||||
}
|
||||
|
||||
void TextIndex::AddNode(Vertex *vertex_after_update, NameIdMapper *name_id_mapper) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
auto applicable_text_indices = GetApplicableTextIndices(vertex_after_update);
|
||||
if (applicable_text_indices.empty()) return;
|
||||
AddNode(vertex_after_update, std::move(name_id_mapper), transaction_start_timestamp, applicable_text_indices);
|
||||
AddNode(vertex_after_update, name_id_mapper, applicable_text_indices);
|
||||
}
|
||||
|
||||
void TextIndex::UpdateNode(Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
auto applicable_text_indices = GetApplicableTextIndices(vertex_after_update);
|
||||
if (applicable_text_indices.empty()) return;
|
||||
RemoveNode(vertex_after_update, applicable_text_indices);
|
||||
AddNode(vertex_after_update, std::move(name_id_mapper), transaction_start_timestamp, applicable_text_indices);
|
||||
}
|
||||
|
||||
void TextIndex::UpdateNode(Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp,
|
||||
const std::vector<LabelId> &removed_labels) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
auto indexes_to_remove_node_from = GetApplicableTextIndices(removed_labels);
|
||||
RemoveNode(vertex_after_update, indexes_to_remove_node_from);
|
||||
if (!removed_labels.empty()) {
|
||||
auto indexes_to_remove_node_from = GetApplicableTextIndices(removed_labels);
|
||||
RemoveNode(vertex_after_update, indexes_to_remove_node_from);
|
||||
}
|
||||
|
||||
auto indexes_to_update_node = GetApplicableTextIndices(vertex_after_update);
|
||||
if (indexes_to_update_node.empty()) return;
|
||||
RemoveNode(vertex_after_update, indexes_to_update_node);
|
||||
AddNode(vertex_after_update, std::move(name_id_mapper), transaction_start_timestamp, indexes_to_update_node);
|
||||
auto applicable_text_indices = GetApplicableTextIndices(vertex_after_update);
|
||||
if (applicable_text_indices.empty()) return;
|
||||
RemoveNode(vertex_after_update, applicable_text_indices);
|
||||
AddNode(vertex_after_update, name_id_mapper, applicable_text_indices);
|
||||
}
|
||||
|
||||
void TextIndex::RemoveNode(Vertex *vertex_after_update,
|
||||
@ -138,263 +192,41 @@ void TextIndex::RemoveNode(Vertex *vertex_after_update) {
|
||||
RemoveNode(vertex_after_update, applicable_text_indices);
|
||||
}
|
||||
|
||||
void TextIndex::UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
void TextIndex::CreateIndex(const std::string &index_name, LabelId label, memgraph::query::DbAccessor *db) {
|
||||
CreateEmptyIndex(index_name, label);
|
||||
|
||||
if (!label_to_index_.contains(added_label)) {
|
||||
return;
|
||||
}
|
||||
AddNode(vertex_after_update, std::move(name_id_mapper), transaction_start_timestamp,
|
||||
std::vector<mgcxx::text_search::Context *>{&index_.at(label_to_index_.at(added_label)).context_});
|
||||
}
|
||||
|
||||
void TextIndex::UpdateOnRemoveLabel(LabelId removed_label, Vertex *vertex_after_update,
|
||||
const std::uint64_t transaction_start_timestamp) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
if (!label_to_index_.contains(removed_label)) {
|
||||
return;
|
||||
}
|
||||
RemoveNode(vertex_after_update, {&index_.at(label_to_index_.at(removed_label)).context_});
|
||||
}
|
||||
|
||||
void TextIndex::UpdateOnSetProperty(Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
std::uint64_t transaction_start_timestamp) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
UpdateNode(vertex_after_update, std::move(name_id_mapper), transaction_start_timestamp);
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> TextIndex::GetApplicableTextIndices(const std::vector<LabelId> &labels) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> applicable_text_indices;
|
||||
for (const auto &label : labels) {
|
||||
if (label_to_index_.contains(label)) {
|
||||
applicable_text_indices.push_back(&index_.at(label_to_index_.at(label)).context_);
|
||||
}
|
||||
}
|
||||
return applicable_text_indices;
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> TextIndex::GetApplicableTextIndices(Vertex *vertex) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> applicable_text_indices;
|
||||
for (const auto &label : vertex->labels) {
|
||||
if (label_to_index_.contains(label)) {
|
||||
applicable_text_indices.push_back(&index_.at(label_to_index_.at(label)).context_);
|
||||
}
|
||||
}
|
||||
return applicable_text_indices;
|
||||
}
|
||||
|
||||
bool TextIndex::CreateIndex(const std::string &index_name, LabelId label, memgraph::query::DbAccessor *db) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
|
||||
nlohmann::json mappings = {};
|
||||
mappings["properties"] = {};
|
||||
mappings["properties"]["metadata"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
mappings["properties"]["data"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
|
||||
try {
|
||||
index_.emplace(index_name,
|
||||
TextIndexData{.context_ = mgcxx::text_search::create_index(
|
||||
index_name, mgcxx::text_search::IndexConfig{.mappings = mappings.dump()}),
|
||||
.scope_ = label});
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
label_to_index_.emplace(label, index_name);
|
||||
|
||||
bool has_schema = false;
|
||||
std::vector<std::pair<PropertyId, std::string>> indexed_properties{};
|
||||
auto &index_context = index_.at(index_name).context_;
|
||||
|
||||
// TODO antepusic get nodes with label if there's an adequate label index
|
||||
for (const auto &v : db->Vertices(View::NEW)) {
|
||||
if (!v.HasLabel(View::NEW, label).GetValue()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!has_schema) [[unlikely]] {
|
||||
auto properties = v.Properties(View::NEW).GetValue();
|
||||
for (const auto &[prop_id, prop_val] : properties) {
|
||||
if (prop_val.IsBool() || prop_val.IsInt() || prop_val.IsDouble() || prop_val.IsString()) {
|
||||
indexed_properties.emplace_back(std::pair<PropertyId, std::string>{prop_id, db->PropertyToName(prop_id)});
|
||||
}
|
||||
}
|
||||
has_schema = true;
|
||||
}
|
||||
|
||||
nlohmann::json document = {};
|
||||
nlohmann::json properties = nlohmann::json::value_t::object;
|
||||
for (const auto &[prop_id, prop_name] : indexed_properties) {
|
||||
const auto prop_value = v.GetProperty(View::NEW, prop_id).GetValue();
|
||||
switch (prop_value.type()) {
|
||||
case PropertyValue::Type::Bool:
|
||||
properties[prop_name] = prop_value.ValueBool();
|
||||
break;
|
||||
case PropertyValue::Type::Int:
|
||||
properties[prop_name] = prop_value.ValueInt();
|
||||
break;
|
||||
case PropertyValue::Type::Double:
|
||||
properties[prop_name] = prop_value.ValueDouble();
|
||||
break;
|
||||
case PropertyValue::Type::String:
|
||||
properties[prop_name] = prop_value.ValueString();
|
||||
break;
|
||||
case PropertyValue::Type::Null:
|
||||
case PropertyValue::Type::List:
|
||||
case PropertyValue::Type::Map:
|
||||
case PropertyValue::Type::TemporalData:
|
||||
default:
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
document["data"] = properties;
|
||||
document["metadata"] = {};
|
||||
document["metadata"]["gid"] = v.Gid().AsInt();
|
||||
document["metadata"]["txid"] = v.impl_.transaction_->start_timestamp;
|
||||
document["metadata"]["deleted"] = false;
|
||||
document["metadata"]["is_node"] = true;
|
||||
|
||||
try {
|
||||
mgcxx::text_search::add_document(
|
||||
index_context,
|
||||
mgcxx::text_search::DocumentInput{
|
||||
.data = document.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)},
|
||||
kDoSkipCommit);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
auto vertex_properties = v.Properties(View::NEW).GetValue();
|
||||
LoadNodeToTextIndices(v.Gid().AsInt(), SerializeProperties(vertex_properties, db),
|
||||
{&index_.at(index_name).context_});
|
||||
}
|
||||
|
||||
// As CREATE TEXT INDEX (...) queries don’t accumulate deltas, db_transactional_accessor_->Commit() does not reach
|
||||
// the code area where changes to indices are committed. To get around that without needing to commit text indices
|
||||
// after every such query, we commit here.
|
||||
try {
|
||||
mgcxx::text_search::commit(index_context);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
return true;
|
||||
CommitLoadedNodes(index_.at(index_name).context_);
|
||||
}
|
||||
|
||||
bool TextIndex::RecoverIndex(const std::string &index_name, LabelId label,
|
||||
void TextIndex::RecoverIndex(const std::string &index_name, LabelId label,
|
||||
memgraph::utils::SkipList<Vertex>::Accessor vertices, NameIdMapper *name_id_mapper) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
CreateEmptyIndex(index_name, label);
|
||||
|
||||
nlohmann::json mappings = {};
|
||||
mappings["properties"] = {};
|
||||
mappings["properties"]["metadata"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
mappings["properties"]["data"] = {{"type", "json"}, {"fast", true}, {"stored", true}, {"text", true}};
|
||||
|
||||
try {
|
||||
index_.emplace(index_name,
|
||||
TextIndexData{.context_ = mgcxx::text_search::create_index(
|
||||
index_name, mgcxx::text_search::IndexConfig{.mappings = mappings.dump()}),
|
||||
.scope_ = label});
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
label_to_index_.emplace(label, index_name);
|
||||
|
||||
bool has_schema = false;
|
||||
std::vector<std::pair<PropertyId, std::string>> indexed_properties{};
|
||||
auto &index_context = index_.at(index_name).context_;
|
||||
for (const auto &v : vertices) {
|
||||
if (std::find(v.labels.begin(), v.labels.end(), label) == v.labels.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto vertex_properties = v.properties.Properties();
|
||||
|
||||
if (!has_schema) [[unlikely]] {
|
||||
for (const auto &[prop_id, prop_val] : vertex_properties) {
|
||||
if (prop_val.IsBool() || prop_val.IsInt() || prop_val.IsDouble() || prop_val.IsString()) {
|
||||
indexed_properties.emplace_back(
|
||||
std::pair<PropertyId, std::string>{prop_id, name_id_mapper->IdToName(prop_id.AsUint())});
|
||||
}
|
||||
}
|
||||
has_schema = true;
|
||||
}
|
||||
|
||||
nlohmann::json document = {};
|
||||
nlohmann::json properties = nlohmann::json::value_t::object;
|
||||
for (const auto &[prop_id, prop_name] : indexed_properties) {
|
||||
if (!vertex_properties.contains(prop_id)) {
|
||||
continue;
|
||||
}
|
||||
const auto prop_value = vertex_properties.at(prop_id);
|
||||
switch (prop_value.type()) {
|
||||
case PropertyValue::Type::Bool:
|
||||
properties[prop_name] = prop_value.ValueBool();
|
||||
break;
|
||||
case PropertyValue::Type::Int:
|
||||
properties[prop_name] = prop_value.ValueInt();
|
||||
break;
|
||||
case PropertyValue::Type::Double:
|
||||
properties[prop_name] = prop_value.ValueDouble();
|
||||
break;
|
||||
case PropertyValue::Type::String:
|
||||
properties[prop_name] = prop_value.ValueString();
|
||||
break;
|
||||
case PropertyValue::Type::Null:
|
||||
case PropertyValue::Type::List:
|
||||
case PropertyValue::Type::Map:
|
||||
case PropertyValue::Type::TemporalData:
|
||||
default:
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
document["data"] = properties;
|
||||
document["metadata"] = {};
|
||||
document["metadata"]["gid"] = v.gid.AsInt();
|
||||
document["metadata"]["txid"] = -1;
|
||||
document["metadata"]["deleted"] = false;
|
||||
document["metadata"]["is_node"] = true;
|
||||
|
||||
try {
|
||||
mgcxx::text_search::add_document(
|
||||
index_context,
|
||||
mgcxx::text_search::DocumentInput{
|
||||
.data = document.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)},
|
||||
kDoSkipCommit);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
auto vertex_properties = v.properties.Properties();
|
||||
LoadNodeToTextIndices(v.gid.AsInt(), SerializeProperties(vertex_properties, name_id_mapper),
|
||||
{&index_.at(index_name).context_});
|
||||
}
|
||||
|
||||
// As CREATE TEXT INDEX (...) queries don’t accumulate deltas, db_transactional_accessor_->Commit() does not reach
|
||||
// the code area where changes to indices are committed. To get around that without needing to commit text indices
|
||||
// after every such query, we commit here.
|
||||
try {
|
||||
mgcxx::text_search::commit(index_context);
|
||||
} catch (const std::exception &e) {
|
||||
throw query::TextSearchException("Tantivy error: {}", e.what());
|
||||
}
|
||||
return true;
|
||||
CommitLoadedNodes(index_.at(index_name).context_);
|
||||
}
|
||||
|
||||
bool TextIndex::DropIndex(const std::string &index_name) {
|
||||
void TextIndex::DropIndex(const std::string &index_name) {
|
||||
if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
|
||||
throw query::TextSearchDisabledException();
|
||||
}
|
||||
@ -406,7 +238,6 @@ bool TextIndex::DropIndex(const std::string &index_name) {
|
||||
}
|
||||
index_.erase(index_name);
|
||||
std::erase_if(label_to_index_, [index_name](const auto &item) { return item.second == index_name; });
|
||||
return true;
|
||||
}
|
||||
|
||||
bool TextIndex::IndexExists(const std::string &index_name) const { return index_.contains(index_name); }
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <json/json.hpp>
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/name_id_mapper.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
@ -33,13 +34,23 @@ struct TextIndexData {
|
||||
|
||||
class TextIndex {
|
||||
private:
|
||||
void AddNode(Vertex *vertex, NameIdMapper *name_id_mapper, const std::uint64_t transaction_start_timestamp,
|
||||
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices);
|
||||
void CreateEmptyIndex(const std::string &index_name, LabelId label);
|
||||
|
||||
template <typename T>
|
||||
nlohmann::json SerializeProperties(const std::map<PropertyId, PropertyValue> &properties, T *name_resolver);
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> GetApplicableTextIndices(const std::vector<LabelId> &labels);
|
||||
|
||||
std::vector<mgcxx::text_search::Context *> GetApplicableTextIndices(Vertex *vertex);
|
||||
|
||||
void LoadNodeToTextIndices(const std::int64_t gid, const nlohmann::json &properties,
|
||||
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices);
|
||||
|
||||
void CommitLoadedNodes(mgcxx::text_search::Context &index_context);
|
||||
|
||||
void AddNode(Vertex *vertex, NameIdMapper *name_id_mapper,
|
||||
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices);
|
||||
|
||||
void RemoveNode(Vertex *vertex, const std::vector<mgcxx::text_search::Context *> &applicable_text_indices);
|
||||
|
||||
public:
|
||||
@ -55,30 +66,18 @@ class TextIndex {
|
||||
std::map<std::string, TextIndexData> index_;
|
||||
std::map<LabelId, std::string> label_to_index_;
|
||||
|
||||
void AddNode(Vertex *vertex, NameIdMapper *name_id_mapper, const std::uint64_t transaction_start_timestamp);
|
||||
void AddNode(Vertex *vertex, NameIdMapper *name_id_mapper);
|
||||
|
||||
void UpdateNode(Vertex *vertex, NameIdMapper *name_id_mapper, const std::uint64_t transaction_start_timestamp);
|
||||
|
||||
void UpdateNode(Vertex *vertex, NameIdMapper *name_id_mapper, const std::uint64_t transaction_start_timestamp,
|
||||
const std::vector<LabelId> &removed_labels);
|
||||
void UpdateNode(Vertex *vertex, NameIdMapper *name_id_mapper, const std::vector<LabelId> &removed_labels = {});
|
||||
|
||||
void RemoveNode(Vertex *vertex);
|
||||
|
||||
void UpdateOnAddLabel(LabelId added_label, Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp);
|
||||
void CreateIndex(const std::string &index_name, LabelId label, memgraph::query::DbAccessor *db);
|
||||
|
||||
void UpdateOnRemoveLabel(LabelId removed_label, Vertex *vertex_after_update,
|
||||
const std::uint64_t transaction_start_timestamp);
|
||||
|
||||
void UpdateOnSetProperty(Vertex *vertex_after_update, NameIdMapper *name_id_mapper,
|
||||
const std::uint64_t transaction_start_timestamp);
|
||||
|
||||
bool CreateIndex(const std::string &index_name, LabelId label, memgraph::query::DbAccessor *db);
|
||||
|
||||
bool RecoverIndex(const std::string &index_name, LabelId label, memgraph::utils::SkipList<Vertex>::Accessor vertices,
|
||||
void RecoverIndex(const std::string &index_name, LabelId label, memgraph::utils::SkipList<Vertex>::Accessor vertices,
|
||||
NameIdMapper *name_id_mapper);
|
||||
|
||||
bool DropIndex(const std::string &index_name);
|
||||
void DropIndex(const std::string &index_name);
|
||||
|
||||
bool IndexExists(const std::string &index_name) const;
|
||||
|
||||
|
@ -232,16 +232,15 @@ class Storage {
|
||||
}
|
||||
|
||||
void TextIndexAddVertex(VertexAccessor *vertex) {
|
||||
storage_->indices_.text_index_.AddNode(vertex->vertex_, storage_->name_id_mapper_.get(), storage_->timestamp_);
|
||||
storage_->indices_.text_index_.AddNode(vertex->vertex_, storage_->name_id_mapper_.get());
|
||||
}
|
||||
|
||||
void TextIndexUpdateVertex(VertexAccessor *vertex) {
|
||||
storage_->indices_.text_index_.UpdateNode(vertex->vertex_, storage_->name_id_mapper_.get(), storage_->timestamp_);
|
||||
storage_->indices_.text_index_.UpdateNode(vertex->vertex_, storage_->name_id_mapper_.get());
|
||||
}
|
||||
|
||||
void TextIndexUpdateVertex(VertexAccessor *vertex, std::vector<LabelId> removed_labels) {
|
||||
storage_->indices_.text_index_.UpdateNode(vertex->vertex_, storage_->name_id_mapper_.get(), storage_->timestamp_,
|
||||
removed_labels);
|
||||
storage_->indices_.text_index_.UpdateNode(vertex->vertex_, storage_->name_id_mapper_.get(), removed_labels);
|
||||
}
|
||||
|
||||
std::vector<Gid> TextIndexSearch(const std::string &index_name, const std::string &search_query) const {
|
||||
@ -292,17 +291,11 @@ class Storage {
|
||||
|
||||
virtual utils::BasicResult<StorageIndexDefinitionError, void> DropIndex(LabelId label, PropertyId property) = 0;
|
||||
|
||||
virtual utils::BasicResult<StorageIndexDefinitionError, void> CreateTextIndex(const std::string &index_name,
|
||||
LabelId label,
|
||||
query::DbAccessor *db) {
|
||||
void CreateTextIndex(const std::string &index_name, LabelId label, query::DbAccessor *db) {
|
||||
storage_->indices_.text_index_.CreateIndex(index_name, label, db);
|
||||
return {};
|
||||
}
|
||||
|
||||
virtual utils::BasicResult<StorageIndexDefinitionError, void> DropTextIndex(const std::string &index_name) {
|
||||
storage_->indices_.text_index_.DropIndex(index_name);
|
||||
return {};
|
||||
}
|
||||
void DropTextIndex(const std::string &index_name) { storage_->indices_.text_index_.DropIndex(index_name); }
|
||||
|
||||
virtual utils::BasicResult<StorageExistenceConstraintDefinitionError, void> CreateExistenceConstraint(
|
||||
LabelId label, PropertyId property) = 0;
|
||||
|
Loading…
Reference in New Issue
Block a user