Add commit/rollback

This commit is contained in:
Ante Pušić 2024-02-04 18:11:58 +01:00
parent a919429ea5
commit c426f4a4a4
4 changed files with 48 additions and 14 deletions

View File

@ -29,6 +29,7 @@
#include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/transaction_db.h>
#include "flags/run_time_configurable.hpp"
#include "kvstore/kvstore.hpp"
#include "spdlog/spdlog.h"
#include "storage/v2/constraints/unique_constraints.hpp"
@ -1763,6 +1764,9 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
return StorageManipulationError{SerializationError{}};
}
spdlog::trace("rocksdb: Commit successful");
if (flags::run_time::GetTextSearchEnabled()) {
disk_storage->indices_.text_index_->Commit();
}
is_transaction_active_ = false;
@ -1881,6 +1885,9 @@ void DiskStorage::DiskAccessor::Abort() {
// query_plan_accumulate_aggregate.cpp
transaction_.disk_transaction_->Rollback();
transaction_.disk_transaction_->ClearSnapshot();
if (flags::run_time::GetTextSearchEnabled()) {
storage_->indices_.text_index_->Rollback();
}
delete transaction_.disk_transaction_;
transaction_.disk_transaction_ = nullptr;
is_transaction_active_ = false;

View File

@ -70,15 +70,15 @@ void TextIndex::AddNode(Vertex *vertex_after_update, Storage *storage, const std
}
}
void TextIndex::AddNode(Vertex *vertex_after_update, Storage *storage, const std::uint64_t transaction_start_timestamp,
bool skip_commit) {
void TextIndex::AddNode(Vertex *vertex_after_update, Storage *storage,
const std::uint64_t transaction_start_timestamp) {
if (!flags::run_time::GetTextSearchEnabled()) {
throw query::QueryException("To use text indices, enable the text search feature.");
}
auto applicable_text_indices = GetApplicableTextIndices(vertex_after_update);
if (applicable_text_indices.empty()) return;
AddNode(vertex_after_update, storage, transaction_start_timestamp, applicable_text_indices, skip_commit);
AddNode(vertex_after_update, storage, transaction_start_timestamp, applicable_text_indices);
}
void TextIndex::UpdateNode(Vertex *vertex_after_update, Storage *storage,
@ -120,7 +120,7 @@ void TextIndex::RemoveNode(Vertex *vertex_after_update,
for (auto *index_context : applicable_text_indices) {
try {
mgcxx::text_search::delete_document(*index_context, search_node_to_be_deleted, KDoSkipCommit);
mgcxx::text_search::delete_document(*index_context, search_node_to_be_deleted, kDoSkipCommit);
} catch (const std::exception &e) {
throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
}
@ -275,17 +275,20 @@ bool TextIndex::CreateIndex(std::string index_name, LabelId label, memgraph::que
index_context,
mgcxx::text_search::DocumentInput{
.data = document.dump(-1, ' ', false, nlohmann::json::error_handler_t::replace)},
KDoSkipCommit);
kDoSkipCommit);
} catch (const std::exception &e) {
throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
}
}
// try {
// mgcxx::text_search::commit(index_context);
// } catch (const std::exception &e) {
// throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
// }
// As CREATE TEXT INDEX (...) queries dont accumulate deltas, db_transactional_accessor_->Commit() does not reach
// the code area where changes to indices are committed. To get around that without needing to commit text indices
// after every such query, we commit here.
try {
mgcxx::text_search::commit(index_context);
} catch (const std::exception &e) {
throw query::QueryException(fmt::format("Tantivy error: {}", e.what()));
}
return true;
}
@ -342,6 +345,18 @@ std::vector<Gid> TextIndex::Search(std::string index_name, std::string search_qu
return found_nodes;
}
void TextIndex::Commit() {
for (auto &[_, index_data] : index_) {
mgcxx::text_search::commit(index_data.context_);
}
}
void TextIndex::Rollback() {
for (auto &[_, index_data] : index_) {
mgcxx::text_search::rollback(index_data.context_);
}
}
std::vector<std::pair<std::string, LabelId>> TextIndex::ListIndices() const {
std::vector<std::pair<std::string, LabelId>> ret;
ret.reserve(index_.size());

View File

@ -23,7 +23,7 @@ class DbAccessor;
namespace memgraph::storage {
class Storage;
constexpr bool KDoSkipCommit = false;
constexpr bool kDoSkipCommit = true;
struct TextIndexData {
mgcxx::text_search::Context context_;
@ -33,7 +33,8 @@ struct TextIndexData {
class TextIndex {
private:
void AddNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp,
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices, bool skip_commit = false);
const std::vector<mgcxx::text_search::Context *> &applicable_text_indices,
bool skip_commit = kDoSkipCommit);
std::vector<mgcxx::text_search::Context *> GetApplicableTextIndices(const std::vector<LabelId> &labels);
@ -54,8 +55,7 @@ class TextIndex {
std::map<std::string, TextIndexData> index_;
std::map<LabelId, std::string> label_to_index_;
void AddNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp,
bool skip_commit = false);
void AddNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp);
void UpdateNode(Vertex *vertex, Storage *storage, const std::uint64_t transaction_start_timestamp);
@ -81,6 +81,10 @@ class TextIndex {
std::vector<Gid> Search(std::string index_name, std::string search_query);
void Commit();
void Rollback();
std::vector<std::pair<std::string, LabelId>> ListIndices() const;
std::uint64_t ApproximateVertexCount(std::string index_name) const;

View File

@ -14,6 +14,7 @@
#include <functional>
#include <optional>
#include "dbms/constants.hpp"
#include "flags/run_time_configurable.hpp"
#include "memory/global_memory_control.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
@ -862,6 +863,10 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
commit_timestamp_.reset(); // We have aborted, hence we have not committed
return StorageManipulationError{*unique_constraint_violation};
}
if (flags::run_time::GetTextSearchEnabled()) {
mem_storage->indices_.text_index_->Commit();
}
}
is_transaction_active_ = false;
@ -1095,6 +1100,9 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
for (auto const &[property, prop_vertices] : property_cleanup) {
storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp);
}
if (flags::run_time::GetTextSearchEnabled()) {
storage_->indices_.text_index_->Rollback();
}
// VERTICES
{