Compare commits
1 Commits
master
...
index_clea
Author | SHA1 | Date | |
---|---|---|---|
|
3dc84eaa5c |
@ -32,10 +32,13 @@ void Indices::AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Ver
|
||||
->AbortEntries(label, vertices, exact_start_timestamp);
|
||||
}
|
||||
|
||||
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) const {
|
||||
static_cast<InMemoryLabelIndex *>(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp, token);
|
||||
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
|
||||
std::optional<utils::BloomFilter<Vertex *>> filter) const {
|
||||
auto const *filter_ptr = filter ? &*filter : nullptr;
|
||||
static_cast<InMemoryLabelIndex *>(label_index_.get())
|
||||
->RemoveObsoleteEntries(oldest_active_start_timestamp, token, filter_ptr);
|
||||
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
|
||||
->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token));
|
||||
->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token), filter_ptr);
|
||||
}
|
||||
|
||||
void Indices::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx) const {
|
||||
|
@ -33,7 +33,8 @@ struct Indices {
|
||||
/// This function should be called from garbage collection to clean-up the
|
||||
/// index.
|
||||
/// TODO: unused in disk indices
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) const;
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
|
||||
std::optional<utils::BloomFilter<Vertex *>> filter) const;
|
||||
|
||||
/// Surgical removal of entries that was inserted this transaction
|
||||
/// TODO: unused in disk indices
|
||||
|
@ -80,31 +80,34 @@ std::vector<LabelId> InMemoryLabelIndex::ListIndices() const {
|
||||
return ret;
|
||||
}
|
||||
|
||||
void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
|
||||
void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
|
||||
utils::BloomFilter<Vertex *> const *filter) {
|
||||
auto maybe_stop = utils::ResettableCounter<2048>();
|
||||
|
||||
for (auto &label_storage : index_) {
|
||||
for (auto &[label_id, index] : index_) {
|
||||
// before starting index, check if stop_requested
|
||||
if (token.stop_requested()) return;
|
||||
|
||||
auto vertices_acc = label_storage.second.access();
|
||||
for (auto it = vertices_acc.begin(); it != vertices_acc.end();) {
|
||||
auto index_acc = index.access();
|
||||
auto it = index_acc.begin();
|
||||
auto end_it = index_acc.end();
|
||||
if (it == end_it) continue;
|
||||
while (true) {
|
||||
// Hot loop, don't check stop_requested every time
|
||||
if (maybe_stop() && token.stop_requested()) return;
|
||||
|
||||
auto next_it = it;
|
||||
++next_it;
|
||||
|
||||
if (it->timestamp >= oldest_active_start_timestamp) {
|
||||
it = next_it;
|
||||
continue;
|
||||
bool has_next = next_it != end_it;
|
||||
if (it->timestamp < oldest_active_start_timestamp) {
|
||||
bool redundant_duplicate = has_next && it->vertex == next_it->vertex;
|
||||
if (redundant_duplicate || ((!filter || filter->maybe_contains(it->vertex)) &&
|
||||
!AnyVersionHasLabel(*it->vertex, label_id, oldest_active_start_timestamp))) {
|
||||
index_acc.remove(*it);
|
||||
}
|
||||
}
|
||||
|
||||
if ((next_it != vertices_acc.end() && it->vertex == next_it->vertex) ||
|
||||
!AnyVersionHasLabel(*it->vertex, label_storage.first, oldest_active_start_timestamp)) {
|
||||
vertices_acc.remove(*it);
|
||||
}
|
||||
|
||||
if (!has_next) break;
|
||||
it = next_it;
|
||||
}
|
||||
}
|
||||
|
@ -54,7 +54,8 @@ class InMemoryLabelIndex : public storage::LabelIndex {
|
||||
|
||||
std::vector<LabelId> ListIndices() const override;
|
||||
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
|
||||
utils::BloomFilter<Vertex *> const *filter);
|
||||
|
||||
/// Surgical removal of entries that was inserted this transaction
|
||||
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp);
|
||||
|
@ -140,14 +140,16 @@ std::vector<std::pair<LabelId, PropertyId>> InMemoryLabelPropertyIndex::ListIndi
|
||||
return ret;
|
||||
}
|
||||
|
||||
void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
|
||||
void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
|
||||
utils::BloomFilter<Vertex *> const *filter) {
|
||||
auto maybe_stop = utils::ResettableCounter<2048>();
|
||||
|
||||
for (auto &[label_property, index] : index_) {
|
||||
auto [label_id, prop_id] = label_property;
|
||||
// before starting index, check if stop_requested
|
||||
if (token.stop_requested()) return;
|
||||
|
||||
auto [label_id, prop_id] = label_property;
|
||||
|
||||
auto index_acc = index.access();
|
||||
auto it = index_acc.begin();
|
||||
auto end_it = index_acc.end();
|
||||
@ -163,7 +165,8 @@ void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_st
|
||||
if (it->timestamp < oldest_active_start_timestamp) {
|
||||
bool redundant_duplicate = has_next && it->vertex == next_it->vertex && it->value == next_it->value;
|
||||
if (redundant_duplicate ||
|
||||
!AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp)) {
|
||||
((!filter || filter->maybe_contains(it->vertex)) &&
|
||||
!AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp))) {
|
||||
index_acc.remove(*it);
|
||||
}
|
||||
}
|
||||
|
@ -60,7 +60,8 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
|
||||
|
||||
std::vector<std::pair<LabelId, PropertyId>> ListIndices() const override;
|
||||
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
|
||||
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
|
||||
utils::BloomFilter<Vertex *> const *filter);
|
||||
|
||||
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
|
||||
uint64_t exact_start_timestamp);
|
||||
|
@ -889,6 +889,7 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
|
||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||
std::list<Gid> current_deleted_edges;
|
||||
std::list<Gid> current_deleted_vertices;
|
||||
auto index_invalidator = utils::BloomFilter<Vertex *>{};
|
||||
|
||||
auto const unlink_remove_clear = [&](std::deque<Delta> &deltas) {
|
||||
for (auto &delta : deltas) {
|
||||
@ -938,22 +939,26 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
|
||||
// 1.b.1) unlink, gathering the removals
|
||||
for (auto &gc_deltas : linked_undo_buffers) {
|
||||
unlink_remove_clear(gc_deltas.deltas_);
|
||||
index_invalidator.merge(std::move(gc_deltas.index_invalidator));
|
||||
}
|
||||
// 1.b.2) clear the list of deltas deques
|
||||
linked_undo_buffers.clear();
|
||||
|
||||
// STEP 2) this transactions deltas also mininal unlinking + remove + clear
|
||||
unlink_remove_clear(transaction_.deltas);
|
||||
index_invalidator.merge(std::move(transaction_.index_invalidator));
|
||||
|
||||
// STEP 3) skip_list removals
|
||||
if (!current_deleted_vertices.empty()) {
|
||||
if (!index_invalidator.empty()) {
|
||||
// 3.a) clear from indexes first
|
||||
std::stop_source dummy;
|
||||
mem_storage->indices_.RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token());
|
||||
mem_storage->indices_.RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token(), index_invalidator);
|
||||
auto *mem_unique_constraints =
|
||||
static_cast<InMemoryUniqueConstraints *>(mem_storage->constraints_.unique_constraints_.get());
|
||||
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token());
|
||||
}
|
||||
|
||||
if (!current_deleted_vertices.empty()) {
|
||||
// 3.b) remove from veretex skip_list
|
||||
auto vertex_acc = mem_storage->vertices_.access();
|
||||
for (auto gid : current_deleted_vertices) {
|
||||
@ -1178,7 +1183,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
engine_guard.unlock();
|
||||
|
||||
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas),
|
||||
std::move(transaction_.commit_timestamp));
|
||||
std::move(transaction_.commit_timestamp), utils::BloomFilter<Vertex *>{});
|
||||
});
|
||||
|
||||
/// We MUST unlink (aka. remove) entries in indexes and constraints
|
||||
@ -1227,8 +1232,8 @@ void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() {
|
||||
// Only hand over delta to be GC'ed if there was any deltas
|
||||
mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) {
|
||||
// using mark of 0 as GC will assign a mark_timestamp after unlinking
|
||||
committed_transactions.emplace_back(0, std::move(transaction_.deltas),
|
||||
std::move(transaction_.commit_timestamp));
|
||||
committed_transactions.emplace_back(0, std::move(transaction_.deltas), std::move(transaction_.commit_timestamp),
|
||||
std::move(transaction_.index_invalidator));
|
||||
});
|
||||
}
|
||||
commit_timestamp_.reset();
|
||||
@ -1523,12 +1528,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
committed_transactions_.WithLock(
|
||||
[&](auto &committed_transactions) { committed_transactions.swap(linked_undo_buffers); });
|
||||
|
||||
// Flag that will be used to determine whether the Index GC should be run. It
|
||||
// should be run when there were any items that were cleaned up (there were
|
||||
// updates between this run of the GC and the previous run of the GC). This
|
||||
// eliminates high CPU usage when the GC doesn't have to clean up anything.
|
||||
bool run_index_cleanup = !linked_undo_buffers.empty() || !garbage_undo_buffers_->empty() || need_full_scan_vertices ||
|
||||
need_full_scan_edges;
|
||||
auto index_invalidator = utils::BloomFilter<Vertex *>{};
|
||||
|
||||
auto const end_linked_undo_buffers = linked_undo_buffers.end();
|
||||
for (auto linked_entry = linked_undo_buffers.begin(); linked_entry != end_linked_undo_buffers;) {
|
||||
@ -1670,7 +1670,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
|
||||
// Now unlinked, move to unlinked_undo_buffers
|
||||
auto const to_move = linked_entry;
|
||||
++linked_entry; // advanced to next before we move the list node
|
||||
++linked_entry; // advanced to next before we move the list node
|
||||
index_invalidator.merge(std::move(to_move->index_invalidator)); // track potential invalidations
|
||||
unlinked_undo_buffers.splice(unlinked_undo_buffers.end(), linked_undo_buffers, to_move);
|
||||
}
|
||||
|
||||
@ -1681,16 +1682,26 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
||||
});
|
||||
}
|
||||
|
||||
// Flag that will be used to determine whether the Index GC should be run. It
|
||||
// should be run when there were any items that were cleaned up (there were
|
||||
// updates between this run of the GC and the previous run of the GC). This
|
||||
// eliminates high CPU usage when the GC doesn't have to clean up anything.
|
||||
bool force_index_cleanup = need_full_scan_vertices || need_full_scan_edges;
|
||||
|
||||
// After unlinking deltas from vertices, we refresh the indices. That way
|
||||
// we're sure that none of the vertices from `current_deleted_vertices`
|
||||
// appears in an index, and we can safely remove the from the main storage
|
||||
// after the last currently active transaction is finished.
|
||||
if (run_index_cleanup) {
|
||||
if (!index_invalidator.empty() || force_index_cleanup) {
|
||||
// This operation is very expensive as it traverses through all of the items
|
||||
// in every index every time.
|
||||
auto token = stop_source.get_token();
|
||||
if (!token.stop_requested()) {
|
||||
indices_.RemoveObsoleteEntries(oldest_active_start_timestamp, token);
|
||||
auto filter = std::optional<utils::BloomFilter<Vertex *>>{};
|
||||
if (!force_index_cleanup) {
|
||||
filter = std::move(index_invalidator);
|
||||
}
|
||||
indices_.RemoveObsoleteEntries(oldest_active_start_timestamp, token, std::move(filter));
|
||||
auto *mem_unique_constraints = static_cast<InMemoryUniqueConstraints *>(constraints_.unique_constraints_.get());
|
||||
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token));
|
||||
}
|
||||
|
@ -432,8 +432,12 @@ class InMemoryStorage final : public Storage {
|
||||
std::mutex gc_lock_;
|
||||
|
||||
struct GCDeltas {
|
||||
GCDeltas(uint64_t mark_timestamp, std::deque<Delta> deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp)
|
||||
: mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {}
|
||||
GCDeltas(uint64_t mark_timestamp, std::deque<Delta> deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp,
|
||||
utils::BloomFilter<Vertex *> indexInvalidator)
|
||||
: mark_timestamp_{mark_timestamp},
|
||||
deltas_{std::move(deltas)},
|
||||
commit_timestamp_{std::move(commit_timestamp)},
|
||||
index_invalidator(std::move(indexInvalidator)) {}
|
||||
|
||||
GCDeltas(GCDeltas &&) = default;
|
||||
GCDeltas &operator=(GCDeltas &&) = default;
|
||||
@ -441,6 +445,8 @@ class InMemoryStorage final : public Storage {
|
||||
uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has
|
||||
std::deque<Delta> deltas_; //!< the deltas that need cleaning
|
||||
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp_{}; //!< the timestamp the deltas are pointing at
|
||||
utils::BloomFilter<Vertex *>
|
||||
index_invalidator{}; //!< bloom filter of maybe Vertex * that invalidated one or more indexes
|
||||
};
|
||||
|
||||
// Ownership of linked deltas is transferred to committed_transactions_ once transaction is commited
|
||||
|
@ -275,6 +275,10 @@ Storage::Accessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector
|
||||
|
||||
auto deleted_vertices = maybe_deleted_vertices.GetValue();
|
||||
|
||||
for (auto const &vertex : deleted_vertices) {
|
||||
transaction_.index_invalidator.insert(vertex.vertex_);
|
||||
}
|
||||
|
||||
return std::make_optional<ReturnType>(std::move(deleted_vertices), std::move(deleted_edges));
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
|
||||
#include "utils/bloom_filter.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/skip_list.hpp"
|
||||
|
||||
@ -89,6 +90,7 @@ struct Transaction {
|
||||
uint64_t command_id{};
|
||||
|
||||
std::deque<Delta> deltas;
|
||||
utils::BloomFilter<Vertex *> index_invalidator;
|
||||
utils::pmr::list<MetadataDelta> md_deltas;
|
||||
bool must_abort{};
|
||||
IsolationLevel isolation_level{};
|
||||
|
@ -123,6 +123,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
|
||||
transaction_->constraint_verification_info.AddedLabel(vertex_);
|
||||
storage_->indices_.UpdateOnAddLabel(label, vertex_, *transaction_);
|
||||
transaction_->manyDeltasCache.Invalidate(vertex_, label);
|
||||
transaction_->index_invalidator.insert(vertex_);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -151,6 +152,7 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
|
||||
storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp);
|
||||
storage_->indices_.UpdateOnRemoveLabel(label, vertex_, *transaction_);
|
||||
transaction_->manyDeltasCache.Invalidate(vertex_, label);
|
||||
transaction_->index_invalidator.insert(vertex_);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -283,6 +285,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
|
||||
}
|
||||
storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_);
|
||||
transaction_->manyDeltasCache.Invalidate(vertex_, property);
|
||||
transaction_->index_invalidator.insert(vertex_);
|
||||
|
||||
return std::move(current_value);
|
||||
}
|
||||
@ -314,6 +317,7 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
|
||||
} else {
|
||||
transaction->constraint_verification_info.RemovedProperty(vertex);
|
||||
}
|
||||
transaction->index_invalidator.insert(vertex);
|
||||
}
|
||||
result = true;
|
||||
}};
|
||||
@ -352,6 +356,7 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
|
||||
} else {
|
||||
transaction->constraint_verification_info.RemovedProperty(vertex);
|
||||
}
|
||||
transaction->index_invalidator.insert(vertex);
|
||||
}
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
@ -384,6 +389,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
|
||||
transaction->manyDeltasCache.Invalidate(vertex, property);
|
||||
}
|
||||
vertex->properties.ClearProperties();
|
||||
transaction->index_invalidator.insert(vertex);
|
||||
}};
|
||||
std::invoke(atomic_memory_block);
|
||||
|
||||
|
45
src/utils/bloom_filter.hpp
Normal file
45
src/utils/bloom_filter.hpp
Normal file
@ -0,0 +1,45 @@
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "absl/container/flat_hash_set.h"
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
template <typename T>
|
||||
struct BloomFilter {
|
||||
void insert(T const &value) {
|
||||
constexpr auto hasher = absl::Hash<T>{};
|
||||
auto hash_1 = hasher(value);
|
||||
auto hash_2 = hasher(value + 1987);
|
||||
store.insert(hash_1);
|
||||
store.insert(hash_2);
|
||||
}
|
||||
|
||||
bool maybe_contains(T const &value) const {
|
||||
constexpr auto hasher = absl::Hash<T>{};
|
||||
auto hash_1 = hasher(value);
|
||||
if (!store.contains(hash_1)) return false;
|
||||
auto hash_2 = hasher(value + 1987);
|
||||
return store.contains(hash_2);
|
||||
}
|
||||
|
||||
void merge(BloomFilter &&other) { store.merge(std::move(other.store)); }
|
||||
|
||||
bool empty() const { return store.empty(); }
|
||||
|
||||
private:
|
||||
// Deliberate truncate to uint32_t
|
||||
absl::flat_hash_set<uint32_t> store; // TODO: replace with roaring bitmap?
|
||||
};
|
||||
|
||||
} // namespace memgraph::utils
|
Loading…
Reference in New Issue
Block a user