Fix deltas and WAL for text indices

This commit is contained in:
Ante Pušić 2024-02-18 16:07:03 +01:00
parent 903ed3f25a
commit c40bf89f63
10 changed files with 80 additions and 37 deletions

View File

@ -2127,9 +2127,9 @@ void CreateSnapshot(Storage *storage, Transaction *transaction, const std::files
// Write text indices. // Write text indices.
if (flags::run_time::GetExperimentalTextSearchEnabled()) { if (flags::run_time::GetExperimentalTextSearchEnabled()) {
auto text = storage->indices_.text_index_.ListIndices(); auto text_indices = storage->indices_.text_index_.ListIndices();
snapshot.WriteUint(text.size()); snapshot.WriteUint(text_indices.size());
for (const auto &item : text) { for (const auto &item : text_indices) {
snapshot.WriteString(item.first); snapshot.WriteString(item.first);
write_mapping(item.second); write_mapping(item.second);
} }

View File

@ -317,8 +317,6 @@ WalDeltaData ReadSkipWalDeltaData(BaseDecoder *decoder) {
} break; } break;
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
case WalDeltaData::Type::TEXT_INDEX_CREATE:
case WalDeltaData::Type::TEXT_INDEX_DROP:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
if constexpr (read_data) { if constexpr (read_data) {
@ -371,6 +369,21 @@ WalDeltaData ReadSkipWalDeltaData(BaseDecoder *decoder) {
if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!"); if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
} }
} }
break;
}
case WalDeltaData::Type::TEXT_INDEX_CREATE:
case WalDeltaData::Type::TEXT_INDEX_DROP: {
if constexpr (read_data) {
auto index_name = decoder->ReadString();
if (!index_name) throw RecoveryFailure("Invalid WAL data!");
delta.operation_text.index_name = std::move(*index_name);
auto label = decoder->ReadString();
if (!label) throw RecoveryFailure("Invalid WAL data!");
delta.operation_text.label = std::move(*label);
} else {
if (!decoder->SkipString() || !decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
}
break;
} }
} }
@ -519,7 +532,11 @@ bool operator==(const WalDeltaData &a, const WalDeltaData &b) {
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
case WalDeltaData::Type::TEXT_INDEX_CREATE: case WalDeltaData::Type::TEXT_INDEX_CREATE:
return a.operation_text.index_name == b.operation_text.index_name &&
a.operation_text.label == b.operation_text.label;
case WalDeltaData::Type::TEXT_INDEX_DROP: case WalDeltaData::Type::TEXT_INDEX_DROP:
return a.operation_text.index_name == b.operation_text.index_name &&
a.operation_text.label == b.operation_text.label;
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
return a.operation_label_property.label == b.operation_label_property.label && return a.operation_label_property.label == b.operation_label_property.label &&
@ -663,8 +680,8 @@ void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp) {
} }
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageMetadataOperation operation, void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageMetadataOperation operation,
LabelId label, const std::set<PropertyId> &properties, const LabelIndexStats &stats, const std::string &text_index_name, LabelId label, const std::set<PropertyId> &properties,
const LabelPropertyIndexStats &property_stats, uint64_t timestamp) { const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t timestamp) {
encoder->WriteMarker(Marker::SECTION_DELTA); encoder->WriteMarker(Marker::SECTION_DELTA);
encoder->WriteUint(timestamp); encoder->WriteUint(timestamp);
switch (operation) { switch (operation) {
@ -689,8 +706,6 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage
} }
case StorageMetadataOperation::LABEL_PROPERTY_INDEX_CREATE: case StorageMetadataOperation::LABEL_PROPERTY_INDEX_CREATE:
case StorageMetadataOperation::LABEL_PROPERTY_INDEX_DROP: case StorageMetadataOperation::LABEL_PROPERTY_INDEX_DROP:
case StorageMetadataOperation::TEXT_INDEX_CREATE:
case StorageMetadataOperation::TEXT_INDEX_DROP:
case StorageMetadataOperation::EXISTENCE_CONSTRAINT_CREATE: case StorageMetadataOperation::EXISTENCE_CONSTRAINT_CREATE:
case StorageMetadataOperation::EXISTENCE_CONSTRAINT_DROP: { case StorageMetadataOperation::EXISTENCE_CONSTRAINT_DROP: {
MG_ASSERT(properties.size() == 1, "Invalid function call!"); MG_ASSERT(properties.size() == 1, "Invalid function call!");
@ -717,6 +732,13 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage
} }
break; break;
} }
case StorageMetadataOperation::TEXT_INDEX_CREATE:
case StorageMetadataOperation::TEXT_INDEX_DROP: {
encoder->WriteMarker(OperationToMarker(operation));
encoder->WriteString(text_index_name);
encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
break;
}
} }
} }
@ -1109,10 +1131,11 @@ void WalFile::AppendTransactionEnd(uint64_t timestamp) {
UpdateStats(timestamp); UpdateStats(timestamp);
} }
void WalFile::AppendOperation(StorageMetadataOperation operation, LabelId label, const std::set<PropertyId> &properties, void WalFile::AppendOperation(StorageMetadataOperation operation, const std::string &text_index_name, LabelId label,
const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, const std::set<PropertyId> &properties, const LabelIndexStats &stats,
uint64_t timestamp) { const LabelPropertyIndexStats &property_stats, uint64_t timestamp) {
EncodeOperation(&wal_, name_id_mapper_, operation, label, properties, stats, property_stats, timestamp); EncodeOperation(&wal_, name_id_mapper_, operation, text_index_name, label, properties, stats, property_stats,
timestamp);
UpdateStats(timestamp); UpdateStats(timestamp);
} }

View File

@ -214,8 +214,8 @@ void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp);
/// Function used to encode non-transactional operation. /// Function used to encode non-transactional operation.
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageMetadataOperation operation, void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageMetadataOperation operation,
LabelId label, const std::set<PropertyId> &properties, const LabelIndexStats &stats, const std::string &text_index_name, LabelId label, const std::set<PropertyId> &properties,
const LabelPropertyIndexStats &property_stats, uint64_t timestamp); const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t timestamp);
/// Function used to load the WAL data into the storage. /// Function used to load the WAL data into the storage.
/// @throw RecoveryFailure /// @throw RecoveryFailure
@ -246,8 +246,9 @@ class WalFile {
void AppendTransactionEnd(uint64_t timestamp); void AppendTransactionEnd(uint64_t timestamp);
void AppendOperation(StorageMetadataOperation operation, LabelId label, const std::set<PropertyId> &properties, void AppendOperation(StorageMetadataOperation operation, const std::string &text_index_name, LabelId label,
const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t timestamp); const std::set<PropertyId> &properties, const LabelIndexStats &stats,
const LabelPropertyIndexStats &property_stats, uint64_t timestamp);
void Sync(); void Sync();

View File

@ -226,18 +226,24 @@ void TextIndex::RecoverIndex(const std::string &index_name, LabelId label,
CommitLoadedNodes(index_.at(index_name).context_); CommitLoadedNodes(index_.at(index_name).context_);
} }
void TextIndex::DropIndex(const std::string &index_name) { LabelId TextIndex::DropIndex(const std::string &index_name) {
if (!flags::run_time::GetExperimentalTextSearchEnabled()) { if (!flags::run_time::GetExperimentalTextSearchEnabled()) {
throw query::TextSearchDisabledException(); throw query::TextSearchDisabledException();
} }
// TODO antepusic check if index exists
try { try {
mgcxx::text_search::drop_index(index_name); mgcxx::text_search::drop_index(index_name);
} catch (const std::exception &e) { } catch (const std::exception &e) {
throw query::TextSearchException("Tantivy error: {}", e.what()); throw query::TextSearchException("Tantivy error: {}", e.what());
} }
auto deleted_index_label = index_.at(index_name).scope_;
index_.erase(index_name); index_.erase(index_name);
std::erase_if(label_to_index_, [index_name](const auto &item) { return item.second == index_name; }); std::erase_if(label_to_index_, [index_name](const auto &item) { return item.second == index_name; });
return deleted_index_label;
} }
bool TextIndex::IndexExists(const std::string &index_name) const { return index_.contains(index_name); } bool TextIndex::IndexExists(const std::string &index_name) const { return index_.contains(index_name); }

View File

@ -77,7 +77,7 @@ class TextIndex {
void RecoverIndex(const std::string &index_name, LabelId label, memgraph::utils::SkipList<Vertex>::Accessor vertices, void RecoverIndex(const std::string &index_name, LabelId label, memgraph::utils::SkipList<Vertex>::Accessor vertices,
NameIdMapper *name_id_mapper); NameIdMapper *name_id_mapper);
void DropIndex(const std::string &index_name); LabelId DropIndex(const std::string &index_name);
bool IndexExists(const std::string &index_name) const; bool IndexExists(const std::string &index_name) const;

View File

@ -2058,12 +2058,12 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
final_commit_timestamp); final_commit_timestamp);
} break; } break;
case MetadataDelta::Action::TEXT_INDEX_CREATE: { case MetadataDelta::Action::TEXT_INDEX_CREATE: {
const auto &info = md_delta.text_indices; const auto &info = md_delta.text_index;
AppendToWalDataDefinition(durability::StorageMetadataOperation::TEXT_INDEX_CREATE, info.index_name, info.label, AppendToWalDataDefinition(durability::StorageMetadataOperation::TEXT_INDEX_CREATE, info.index_name, info.label,
final_commit_timestamp); final_commit_timestamp);
} break; } break;
case MetadataDelta::Action::TEXT_INDEX_DROP: { case MetadataDelta::Action::TEXT_INDEX_DROP: {
const auto &info = md_delta.text_indices; const auto &info = md_delta.text_index;
AppendToWalDataDefinition(durability::StorageMetadataOperation::TEXT_INDEX_DROP, info.index_name, info.label, AppendToWalDataDefinition(durability::StorageMetadataOperation::TEXT_INDEX_DROP, info.index_name, info.label,
final_commit_timestamp); final_commit_timestamp);
} break; } break;
@ -2097,11 +2097,13 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
return repl_storage_state_.FinalizeTransaction(final_commit_timestamp, this, std::move(db_acc)); return repl_storage_state_.FinalizeTransaction(final_commit_timestamp, this, std::move(db_acc));
} }
void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation,
const std::string &text_index_name, LabelId label,
const std::set<PropertyId> &properties, LabelIndexStats stats, const std::set<PropertyId> &properties, LabelIndexStats stats,
LabelPropertyIndexStats property_stats, LabelPropertyIndexStats property_stats,
uint64_t final_commit_timestamp) { uint64_t final_commit_timestamp) {
wal_file_->AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); wal_file_->AppendOperation(operation, text_index_name, label, properties, stats, property_stats,
final_commit_timestamp);
repl_storage_state_.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); repl_storage_state_.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp);
} }
@ -2109,12 +2111,12 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
const std::set<PropertyId> &properties, const std::set<PropertyId> &properties,
LabelPropertyIndexStats property_stats, LabelPropertyIndexStats property_stats,
uint64_t final_commit_timestamp) { uint64_t final_commit_timestamp) {
return AppendToWalDataDefinition(operation, label, properties, {}, property_stats, final_commit_timestamp); return AppendToWalDataDefinition(operation, "", label, properties, {}, property_stats, final_commit_timestamp);
} }
void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label,
LabelIndexStats stats, uint64_t final_commit_timestamp) { LabelIndexStats stats, uint64_t final_commit_timestamp) {
return AppendToWalDataDefinition(operation, label, {}, stats, {}, final_commit_timestamp); return AppendToWalDataDefinition(operation, "", label, {}, stats, {}, final_commit_timestamp);
} }
void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label,
@ -2128,9 +2130,10 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
return AppendToWalDataDefinition(operation, label, {}, {}, final_commit_timestamp); return AppendToWalDataDefinition(operation, label, {}, {}, final_commit_timestamp);
} }
void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, std::string index_name, void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation,
LabelId label, uint64_t final_commit_timestamp) { const std::string &index_name, LabelId label,
return AppendToWalDataDefinition(operation, label, {}, {}, final_commit_timestamp); uint64_t final_commit_timestamp) {
return AppendToWalDataDefinition(operation, index_name, label, {}, {}, {}, final_commit_timestamp);
} }
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot( utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(

View File

@ -389,12 +389,12 @@ class InMemoryStorage final : public Storage {
const std::set<PropertyId> &properties, LabelPropertyIndexStats property_stats, const std::set<PropertyId> &properties, LabelPropertyIndexStats property_stats,
uint64_t final_commit_timestamp); uint64_t final_commit_timestamp);
/// Return true in all cases except if any sync replicas have not sent confirmation. /// Return true in all cases except if any sync replicas have not sent confirmation.
void AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, void AppendToWalDataDefinition(durability::StorageMetadataOperation operation, const std::string &text_index_name,
const std::set<PropertyId> &properties, LabelIndexStats stats, LabelId label, const std::set<PropertyId> &properties, LabelIndexStats stats,
LabelPropertyIndexStats property_stats, uint64_t final_commit_timestamp); LabelPropertyIndexStats property_stats, uint64_t final_commit_timestamp);
/// Return true in all cases except if any sync replicas have not sent confirmation. /// Return true in all cases except if any sync replicas have not sent confirmation.
void AppendToWalDataDefinition(durability::StorageMetadataOperation operation, std::string index_name, LabelId label, void AppendToWalDataDefinition(durability::StorageMetadataOperation operation, const std::string &index_name,
uint64_t final_commit_timestamp); LabelId label, uint64_t final_commit_timestamp);
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {}); uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});

View File

@ -94,10 +94,10 @@ struct MetadataDelta {
: action(Action::LABEL_PROPERTY_INDEX_STATS_CLEAR), label{label} {} : action(Action::LABEL_PROPERTY_INDEX_STATS_CLEAR), label{label} {}
MetadataDelta(TextIndexCreate /*tag*/, std::string index_name, LabelId label) MetadataDelta(TextIndexCreate /*tag*/, std::string index_name, LabelId label)
: action(Action::TEXT_INDEX_CREATE), text_indices{index_name, label} {} : action(Action::TEXT_INDEX_CREATE), text_index{index_name, label} {}
MetadataDelta(TextIndexDrop /*tag*/, std::string index_name, LabelId label) MetadataDelta(TextIndexDrop /*tag*/, std::string index_name, LabelId label)
: action(Action::TEXT_INDEX_DROP), text_indices{index_name, label} {} : action(Action::TEXT_INDEX_DROP), text_index{index_name, label} {}
MetadataDelta(ExistenceConstraintCreate /*tag*/, LabelId label, PropertyId property) MetadataDelta(ExistenceConstraintCreate /*tag*/, LabelId label, PropertyId property)
: action(Action::EXISTENCE_CONSTRAINT_CREATE), label_property{label, property} {} : action(Action::EXISTENCE_CONSTRAINT_CREATE), label_property{label, property} {}
@ -167,7 +167,7 @@ struct MetadataDelta {
struct { struct {
std::string index_name; std::string index_name;
LabelId label; LabelId label;
} text_indices; } text_index;
}; };
}; };

View File

@ -52,6 +52,7 @@ extern const Event SnapshotCreationLatency_us;
extern const Event ActiveLabelIndices; extern const Event ActiveLabelIndices;
extern const Event ActiveLabelPropertyIndices; extern const Event ActiveLabelPropertyIndices;
extern const Event ActiveTextIndices;
} // namespace memgraph::metrics } // namespace memgraph::metrics
namespace memgraph::storage { namespace memgraph::storage {
@ -292,10 +293,18 @@ class Storage {
virtual utils::BasicResult<StorageIndexDefinitionError, void> DropIndex(LabelId label, PropertyId property) = 0; virtual utils::BasicResult<StorageIndexDefinitionError, void> DropIndex(LabelId label, PropertyId property) = 0;
void CreateTextIndex(const std::string &index_name, LabelId label, query::DbAccessor *db) { void CreateTextIndex(const std::string &index_name, LabelId label, query::DbAccessor *db) {
MG_ASSERT(unique_guard_.owns_lock(), "Creating a text index requires unique access to storage!");
storage_->indices_.text_index_.CreateIndex(index_name, label, db); storage_->indices_.text_index_.CreateIndex(index_name, label, db);
transaction_.md_deltas.emplace_back(MetadataDelta::text_index_create, index_name, label);
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveTextIndices);
} }
void DropTextIndex(const std::string &index_name) { storage_->indices_.text_index_.DropIndex(index_name); } void DropTextIndex(const std::string &index_name) {
MG_ASSERT(unique_guard_.owns_lock(), "Dropping a text index requires unique access to storage!");
auto deleted_index_label = storage_->indices_.text_index_.DropIndex(index_name);
transaction_.md_deltas.emplace_back(MetadataDelta::text_index_drop, index_name, deleted_index_label);
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveTextIndices);
}
virtual utils::BasicResult<StorageExistenceConstraintDefinitionError, void> CreateExistenceConstraint( virtual utils::BasicResult<StorageExistenceConstraintDefinitionError, void> CreateExistenceConstraint(
LabelId label, PropertyId property) = 0; LabelId label, PropertyId property) = 0;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -59,6 +59,7 @@
\ \
M(ActiveLabelIndices, Index, "Number of active label indices in the system.") \ M(ActiveLabelIndices, Index, "Number of active label indices in the system.") \
M(ActiveLabelPropertyIndices, Index, "Number of active label property indices in the system.") \ M(ActiveLabelPropertyIndices, Index, "Number of active label property indices in the system.") \
M(ActiveTextIndices, Index, "Number of active text indices in the system.") \
\ \
M(StreamsCreated, Stream, "Number of Streams created.") \ M(StreamsCreated, Stream, "Number of Streams created.") \
M(MessagesConsumed, Stream, "Number of consumed streamed messages.") \ M(MessagesConsumed, Stream, "Number of consumed streamed messages.") \