Compare commits

...

1 Commits

Author SHA1 Message Date
Gareth Lloyd
3dc84eaa5c Bloom filter for GC index clean speedup 2024-02-27 17:29:25 +00:00
12 changed files with 124 additions and 38 deletions

View File

@ -32,10 +32,13 @@ void Indices::AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Ver
->AbortEntries(label, vertices, exact_start_timestamp);
}
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) const {
static_cast<InMemoryLabelIndex *>(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp, token);
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
std::optional<utils::BloomFilter<Vertex *>> filter) const {
auto const *filter_ptr = filter ? &*filter : nullptr;
static_cast<InMemoryLabelIndex *>(label_index_.get())
->RemoveObsoleteEntries(oldest_active_start_timestamp, token, filter_ptr);
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token));
->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token), filter_ptr);
}
void Indices::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx) const {

View File

@ -33,7 +33,8 @@ struct Indices {
/// This function should be called from garbage collection to clean-up the
/// index.
/// TODO: unused in disk indices
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) const;
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
std::optional<utils::BloomFilter<Vertex *>> filter) const;
/// Surgical removal of entries that was inserted this transaction
/// TODO: unused in disk indices

View File

@ -80,31 +80,34 @@ std::vector<LabelId> InMemoryLabelIndex::ListIndices() const {
return ret;
}
void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
utils::BloomFilter<Vertex *> const *filter) {
auto maybe_stop = utils::ResettableCounter<2048>();
for (auto &label_storage : index_) {
for (auto &[label_id, index] : index_) {
// before starting index, check if stop_requested
if (token.stop_requested()) return;
auto vertices_acc = label_storage.second.access();
for (auto it = vertices_acc.begin(); it != vertices_acc.end();) {
auto index_acc = index.access();
auto it = index_acc.begin();
auto end_it = index_acc.end();
if (it == end_it) continue;
while (true) {
// Hot loop, don't check stop_requested every time
if (maybe_stop() && token.stop_requested()) return;
auto next_it = it;
++next_it;
if (it->timestamp >= oldest_active_start_timestamp) {
it = next_it;
continue;
bool has_next = next_it != end_it;
if (it->timestamp < oldest_active_start_timestamp) {
bool redundant_duplicate = has_next && it->vertex == next_it->vertex;
if (redundant_duplicate || ((!filter || filter->maybe_contains(it->vertex)) &&
!AnyVersionHasLabel(*it->vertex, label_id, oldest_active_start_timestamp))) {
index_acc.remove(*it);
}
}
if ((next_it != vertices_acc.end() && it->vertex == next_it->vertex) ||
!AnyVersionHasLabel(*it->vertex, label_storage.first, oldest_active_start_timestamp)) {
vertices_acc.remove(*it);
}
if (!has_next) break;
it = next_it;
}
}

View File

@ -54,7 +54,8 @@ class InMemoryLabelIndex : public storage::LabelIndex {
std::vector<LabelId> ListIndices() const override;
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
utils::BloomFilter<Vertex *> const *filter);
/// Surgical removal of entries that was inserted this transaction
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp);

View File

@ -140,14 +140,16 @@ std::vector<std::pair<LabelId, PropertyId>> InMemoryLabelPropertyIndex::ListIndi
return ret;
}
void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
utils::BloomFilter<Vertex *> const *filter) {
auto maybe_stop = utils::ResettableCounter<2048>();
for (auto &[label_property, index] : index_) {
auto [label_id, prop_id] = label_property;
// before starting index, check if stop_requested
if (token.stop_requested()) return;
auto [label_id, prop_id] = label_property;
auto index_acc = index.access();
auto it = index_acc.begin();
auto end_it = index_acc.end();
@ -163,7 +165,8 @@ void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_st
if (it->timestamp < oldest_active_start_timestamp) {
bool redundant_duplicate = has_next && it->vertex == next_it->vertex && it->value == next_it->value;
if (redundant_duplicate ||
!AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp)) {
((!filter || filter->maybe_contains(it->vertex)) &&
!AnyVersionHasLabelProperty(*it->vertex, label_id, prop_id, it->value, oldest_active_start_timestamp))) {
index_acc.remove(*it);
}
}

View File

@ -60,7 +60,8 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
std::vector<std::pair<LabelId, PropertyId>> ListIndices() const override;
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token,
utils::BloomFilter<Vertex *> const *filter);
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp);

View File

@ -889,6 +889,7 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
std::list<Gid> current_deleted_edges;
std::list<Gid> current_deleted_vertices;
auto index_invalidator = utils::BloomFilter<Vertex *>{};
auto const unlink_remove_clear = [&](std::deque<Delta> &deltas) {
for (auto &delta : deltas) {
@ -938,22 +939,26 @@ void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_acti
// 1.b.1) unlink, gathering the removals
for (auto &gc_deltas : linked_undo_buffers) {
unlink_remove_clear(gc_deltas.deltas_);
index_invalidator.merge(std::move(gc_deltas.index_invalidator));
}
// 1.b.2) clear the list of deltas deques
linked_undo_buffers.clear();
// STEP 2) this transactions deltas also mininal unlinking + remove + clear
unlink_remove_clear(transaction_.deltas);
index_invalidator.merge(std::move(transaction_.index_invalidator));
// STEP 3) skip_list removals
if (!current_deleted_vertices.empty()) {
if (!index_invalidator.empty()) {
// 3.a) clear from indexes first
std::stop_source dummy;
mem_storage->indices_.RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token());
mem_storage->indices_.RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token(), index_invalidator);
auto *mem_unique_constraints =
static_cast<InMemoryUniqueConstraints *>(mem_storage->constraints_.unique_constraints_.get());
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token());
}
if (!current_deleted_vertices.empty()) {
// 3.b) remove from veretex skip_list
auto vertex_acc = mem_storage->vertices_.access();
for (auto gid : current_deleted_vertices) {
@ -1178,7 +1183,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
engine_guard.unlock();
garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas),
std::move(transaction_.commit_timestamp));
std::move(transaction_.commit_timestamp), utils::BloomFilter<Vertex *>{});
});
/// We MUST unlink (aka. remove) entries in indexes and constraints
@ -1227,8 +1232,8 @@ void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() {
// Only hand over delta to be GC'ed if there was any deltas
mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) {
// using mark of 0 as GC will assign a mark_timestamp after unlinking
committed_transactions.emplace_back(0, std::move(transaction_.deltas),
std::move(transaction_.commit_timestamp));
committed_transactions.emplace_back(0, std::move(transaction_.deltas), std::move(transaction_.commit_timestamp),
std::move(transaction_.index_invalidator));
});
}
commit_timestamp_.reset();
@ -1523,12 +1528,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
committed_transactions_.WithLock(
[&](auto &committed_transactions) { committed_transactions.swap(linked_undo_buffers); });
// Flag that will be used to determine whether the Index GC should be run. It
// should be run when there were any items that were cleaned up (there were
// updates between this run of the GC and the previous run of the GC). This
// eliminates high CPU usage when the GC doesn't have to clean up anything.
bool run_index_cleanup = !linked_undo_buffers.empty() || !garbage_undo_buffers_->empty() || need_full_scan_vertices ||
need_full_scan_edges;
auto index_invalidator = utils::BloomFilter<Vertex *>{};
auto const end_linked_undo_buffers = linked_undo_buffers.end();
for (auto linked_entry = linked_undo_buffers.begin(); linked_entry != end_linked_undo_buffers;) {
@ -1670,7 +1670,8 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
// Now unlinked, move to unlinked_undo_buffers
auto const to_move = linked_entry;
++linked_entry; // advanced to next before we move the list node
++linked_entry; // advanced to next before we move the list node
index_invalidator.merge(std::move(to_move->index_invalidator)); // track potential invalidations
unlinked_undo_buffers.splice(unlinked_undo_buffers.end(), linked_undo_buffers, to_move);
}
@ -1681,16 +1682,26 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
});
}
// Flag that will be used to determine whether the Index GC should be run. It
// should be run when there were any items that were cleaned up (there were
// updates between this run of the GC and the previous run of the GC). This
// eliminates high CPU usage when the GC doesn't have to clean up anything.
bool force_index_cleanup = need_full_scan_vertices || need_full_scan_edges;
// After unlinking deltas from vertices, we refresh the indices. That way
// we're sure that none of the vertices from `current_deleted_vertices`
// appears in an index, and we can safely remove the from the main storage
// after the last currently active transaction is finished.
if (run_index_cleanup) {
if (!index_invalidator.empty() || force_index_cleanup) {
// This operation is very expensive as it traverses through all of the items
// in every index every time.
auto token = stop_source.get_token();
if (!token.stop_requested()) {
indices_.RemoveObsoleteEntries(oldest_active_start_timestamp, token);
auto filter = std::optional<utils::BloomFilter<Vertex *>>{};
if (!force_index_cleanup) {
filter = std::move(index_invalidator);
}
indices_.RemoveObsoleteEntries(oldest_active_start_timestamp, token, std::move(filter));
auto *mem_unique_constraints = static_cast<InMemoryUniqueConstraints *>(constraints_.unique_constraints_.get());
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token));
}

View File

@ -432,8 +432,12 @@ class InMemoryStorage final : public Storage {
std::mutex gc_lock_;
struct GCDeltas {
GCDeltas(uint64_t mark_timestamp, std::deque<Delta> deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp)
: mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {}
GCDeltas(uint64_t mark_timestamp, std::deque<Delta> deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp,
utils::BloomFilter<Vertex *> indexInvalidator)
: mark_timestamp_{mark_timestamp},
deltas_{std::move(deltas)},
commit_timestamp_{std::move(commit_timestamp)},
index_invalidator(std::move(indexInvalidator)) {}
GCDeltas(GCDeltas &&) = default;
GCDeltas &operator=(GCDeltas &&) = default;
@ -441,6 +445,8 @@ class InMemoryStorage final : public Storage {
uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has
std::deque<Delta> deltas_; //!< the deltas that need cleaning
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp_{}; //!< the timestamp the deltas are pointing at
utils::BloomFilter<Vertex *>
index_invalidator{}; //!< bloom filter of maybe Vertex * that invalidated one or more indexes
};
// Ownership of linked deltas is transferred to committed_transactions_ once transaction is commited

View File

@ -275,6 +275,10 @@ Storage::Accessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector
auto deleted_vertices = maybe_deleted_vertices.GetValue();
for (auto const &vertex : deleted_vertices) {
transaction_.index_invalidator.insert(vertex.vertex_);
}
return std::make_optional<ReturnType>(std::move(deleted_vertices), std::move(deleted_edges));
}

View File

@ -15,6 +15,7 @@
#include <limits>
#include <memory>
#include "utils/bloom_filter.hpp"
#include "utils/memory.hpp"
#include "utils/skip_list.hpp"
@ -89,6 +90,7 @@ struct Transaction {
uint64_t command_id{};
std::deque<Delta> deltas;
utils::BloomFilter<Vertex *> index_invalidator;
utils::pmr::list<MetadataDelta> md_deltas;
bool must_abort{};
IsolationLevel isolation_level{};

View File

@ -123,6 +123,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
transaction_->constraint_verification_info.AddedLabel(vertex_);
storage_->indices_.UpdateOnAddLabel(label, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, label);
transaction_->index_invalidator.insert(vertex_);
return true;
}
@ -151,6 +152,7 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp);
storage_->indices_.UpdateOnRemoveLabel(label, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, label);
transaction_->index_invalidator.insert(vertex_);
return true;
}
@ -283,6 +285,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
}
storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
transaction_->index_invalidator.insert(vertex_);
return std::move(current_value);
}
@ -314,6 +317,7 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
transaction->index_invalidator.insert(vertex);
}
result = true;
}};
@ -352,6 +356,7 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
transaction->index_invalidator.insert(vertex);
}
}};
std::invoke(atomic_memory_block);
@ -384,6 +389,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
transaction->manyDeltasCache.Invalidate(vertex, property);
}
vertex->properties.ClearProperties();
transaction->index_invalidator.insert(vertex);
}};
std::invoke(atomic_memory_block);

View File

@ -0,0 +1,45 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "absl/container/flat_hash_set.h"
namespace memgraph::utils {
template <typename T>
struct BloomFilter {
void insert(T const &value) {
constexpr auto hasher = absl::Hash<T>{};
auto hash_1 = hasher(value);
auto hash_2 = hasher(value + 1987);
store.insert(hash_1);
store.insert(hash_2);
}
bool maybe_contains(T const &value) const {
constexpr auto hasher = absl::Hash<T>{};
auto hash_1 = hasher(value);
if (!store.contains(hash_1)) return false;
auto hash_2 = hasher(value + 1987);
return store.contains(hash_2);
}
void merge(BloomFilter &&other) { store.merge(std::move(other.store)); }
bool empty() const { return store.empty(); }
private:
// Deliberate truncate to uint32_t
absl::flat_hash_set<uint32_t> store; // TODO: replace with roaring bitmap?
};
} // namespace memgraph::utils