diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index dda3ca9ea..f44ad2120 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -29,6 +29,7 @@ #include #include +#include "flags/run_time_configurable.hpp" #include "kvstore/kvstore.hpp" #include "spdlog/spdlog.h" #include "storage/v2/constraints/unique_constraints.hpp" @@ -1763,6 +1764,9 @@ utils::BasicResult DiskStorage::DiskAccessor::Co return StorageManipulationError{SerializationError{}}; } spdlog::trace("rocksdb: Commit successful"); + if (flags::run_time::GetTextSearchEnabled()) { + disk_storage->indices_.text_index_->Commit(); + } is_transaction_active_ = false; @@ -1881,6 +1885,9 @@ void DiskStorage::DiskAccessor::Abort() { // query_plan_accumulate_aggregate.cpp transaction_.disk_transaction_->Rollback(); transaction_.disk_transaction_->ClearSnapshot(); + if (flags::run_time::GetTextSearchEnabled()) { + storage_->indices_.text_index_->Rollback(); + } delete transaction_.disk_transaction_; transaction_.disk_transaction_ = nullptr; is_transaction_active_ = false; diff --git a/src/storage/v2/indices/text_index.cpp b/src/storage/v2/indices/text_index.cpp index e10856a6f..f69fd91d4 100644 --- a/src/storage/v2/indices/text_index.cpp +++ b/src/storage/v2/indices/text_index.cpp @@ -70,15 +70,15 @@ void TextIndex::AddNode(Vertex *vertex_after_update, Storage *storage, const std } } -void TextIndex::AddNode(Vertex *vertex_after_update, Storage *storage, const std::uint64_t transaction_start_timestamp, - bool skip_commit) { +void TextIndex::AddNode(Vertex *vertex_after_update, Storage *storage, + const std::uint64_t transaction_start_timestamp) { if (!flags::run_time::GetTextSearchEnabled()) { throw query::QueryException("To use text indices, enable the text search feature."); } auto applicable_text_indices = GetApplicableTextIndices(vertex_after_update); if (applicable_text_indices.empty()) return; - AddNode(vertex_after_update, storage, transaction_start_timestamp, applicable_text_indices, skip_commit); + AddNode(vertex_after_update, storage, transaction_start_timestamp, applicable_text_indices); } void TextIndex::UpdateNode(Vertex *vertex_after_update, Storage *storage, @@ -120,7 +120,7 @@ void TextIndex::RemoveNode(Vertex *vertex_after_update, for (auto *index_context : applicable_text_indices) { try { - mgcxx::text_search::delete_document(*index_context, search_node_to_be_deleted, KDoSkipCommit); + mgcxx::text_search::delete_document(*index_context, search_node_to_be_deleted, kDoSkipCommit); } catch (const std::exception &e) { throw query::QueryException(fmt::format("Tantivy error: {}", e.what())); } @@ -275,17 +275,20 @@ bool TextIndex::CreateIndex(std::string index_name, LabelId label, memgraph::que index_context, mgcxx::text_search::DocumentInput{ .data = document.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)}, - KDoSkipCommit); + kDoSkipCommit); } catch (const std::exception &e) { throw query::QueryException(fmt::format("Tantivy error: {}", e.what())); } } - // try { - // mgcxx::text_search::commit(index_context); - // } catch (const std::exception &e) { - // throw query::QueryException(fmt::format("Tantivy error: {}", e.what())); - // } + // As CREATE TEXT INDEX (...) queries don’t accumulate deltas, db_transactional_accessor_->Commit() does not reach + // the code area where changes to indices are committed. To get around that without needing to commit text indices + // after every such query, we commit here. + try { + mgcxx::text_search::commit(index_context); + } catch (const std::exception &e) { + throw query::QueryException(fmt::format("Tantivy error: {}", e.what())); + } return true; } @@ -342,6 +345,18 @@ std::vector TextIndex::Search(std::string index_name, std::string search_qu return found_nodes; } +void TextIndex::Commit() { + for (auto &[_, index_data] : index_) { + mgcxx::text_search::commit(index_data.context_); + } +} + +void TextIndex::Rollback() { + for (auto &[_, index_data] : index_) { + mgcxx::text_search::rollback(index_data.context_); + } +} + std::vector> TextIndex::ListIndices() const { std::vector> ret; ret.reserve(index_.size()); diff --git a/src/storage/v2/indices/text_index.hpp b/src/storage/v2/indices/text_index.hpp index 39cac0d12..463c234db 100644 --- a/src/storage/v2/indices/text_index.hpp +++ b/src/storage/v2/indices/text_index.hpp @@ -23,7 +23,7 @@ class DbAccessor; namespace memgraph::storage { class Storage; -constexpr bool KDoSkipCommit = false; +constexpr bool kDoSkipCommit = true; struct TextIndexData { mgcxx::text_search::Context context_; @@ -33,7 +33,8 @@ struct TextIndexData { class TextIndex { private: void AddNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp, - const std::vector &applicable_text_indices, bool skip_commit = false); + const std::vector &applicable_text_indices, + bool skip_commit = kDoSkipCommit); std::vector GetApplicableTextIndices(const std::vector &labels); @@ -54,8 +55,7 @@ class TextIndex { std::map index_; std::map label_to_index_; - void AddNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp, - bool skip_commit = false); + void AddNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp); void UpdateNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp); @@ -81,6 +81,10 @@ class TextIndex { std::vector Search(std::string index_name, std::string search_query); + void Commit(); + + void Rollback(); + std::vector> ListIndices() const; std::uint64_t ApproximateVertexCount(std::string index_name) const; diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 24cb0fbb2..9fede3173 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -14,6 +14,7 @@ #include #include #include "dbms/constants.hpp" +#include "flags/run_time_configurable.hpp" #include "memory/global_memory_control.hpp" #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" @@ -862,6 +863,10 @@ utils::BasicResult InMemoryStorage::InMemoryAcce commit_timestamp_.reset(); // We have aborted, hence we have not committed return StorageManipulationError{*unique_constraint_violation}; } + + if (flags::run_time::GetTextSearchEnabled()) { + mem_storage->indices_.text_index_->Commit(); + } } is_transaction_active_ = false; @@ -1095,6 +1100,9 @@ void InMemoryStorage::InMemoryAccessor::Abort() { for (auto const &[property, prop_vertices] : property_cleanup) { storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp); } + if (flags::run_time::GetTextSearchEnabled()) { + storage_->indices_.text_index_->Rollback(); + } // VERTICES {