Performance tuning based on stress test (#1572)

Minor changes that speedup the large stress test.
Also now uses a stop token for a more productive shutdown. No need to wait for expensive GC runs.
This commit is contained in:
Gareth Andrew Lloyd 2024-01-25 17:14:58 +00:00 committed by GitHub
parent 38ade99652
commit 9f7118d893
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 285 additions and 176 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -72,8 +72,9 @@ class TypedValueVectorCompare final {
/// Raise QueryRuntimeException if the value for symbol isn't of expected type.
inline void ExpectType(const Symbol &symbol, const TypedValue &value, TypedValue::Type expected) {
if (value.type() != expected)
if (value.type() != expected) [[unlikely]] {
throw QueryRuntimeException("Expected a {} for '{}', but got {}.", expected, symbol.name(), value.type());
}
}
inline void ProcessError(const storage::Error error) {

View File

@ -1209,7 +1209,8 @@ class PropertyLookup : public memgraph::query::Expression {
}
protected:
PropertyLookup(Expression *expression, PropertyIx property) : expression_(expression), property_(property) {}
PropertyLookup(Expression *expression, PropertyIx property)
: expression_(expression), property_(std::move(property)) {}
private:
friend class AstStorage;
@ -1805,9 +1806,9 @@ class EdgeAtom : public memgraph::query::PatternAtom {
static const utils::TypeInfo kType;
const utils::TypeInfo &GetTypeInfo() const override { return kType; }
enum class Type { SINGLE, DEPTH_FIRST, BREADTH_FIRST, WEIGHTED_SHORTEST_PATH, ALL_SHORTEST_PATHS };
enum class Type : uint8_t { SINGLE, DEPTH_FIRST, BREADTH_FIRST, WEIGHTED_SHORTEST_PATH, ALL_SHORTEST_PATHS };
enum class Direction { IN, OUT, BOTH };
enum class Direction : uint8_t { IN, OUT, BOTH };
/// Lambda for use in filtering or weight calculation during variable expand.
struct Lambda {
@ -2216,7 +2217,7 @@ class IndexQuery : public memgraph::query::Query {
protected:
IndexQuery(Action action, LabelIx label, std::vector<PropertyIx> properties)
: action_(action), label_(label), properties_(properties) {}
: action_(action), label_(std::move(label)), properties_(std::move(properties)) {}
private:
friend class AstStorage;

View File

@ -1121,11 +1121,11 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
throw QueryRuntimeException("Unexpected error when getting properties.");
}
}
return *maybe_props;
return *std::move(maybe_props);
}
template <class TRecordAccessor>
storage::PropertyValue GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) {
storage::PropertyValue GetProperty(const TRecordAccessor &record_accessor, const PropertyIx &prop) {
auto maybe_prop = record_accessor.GetProperty(view_, ctx_->properties[prop.ix]);
if (maybe_prop.HasError() && maybe_prop.GetError() == storage::Error::NONEXISTENT_OBJECT) {
// This is a very nasty and temporary hack in order to make MERGE work.
@ -1148,7 +1148,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
throw QueryRuntimeException("Unexpected error when getting a property.");
}
}
return *maybe_prop;
return *std::move(maybe_prop);
}
template <class TRecordAccessor>
@ -1178,7 +1178,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
return *maybe_prop;
}
storage::LabelId GetLabel(LabelIx label) { return ctx_->labels[label.ix]; }
storage::LabelId GetLabel(const LabelIx &label) { return ctx_->labels[label.ix]; }
Frame *frame_;
const SymbolTable *symbol_table_;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -257,7 +257,7 @@ void Filters::EraseFilter(const FilterInfo &filter) {
all_filters_.end());
}
void Filters::EraseLabelFilter(const Symbol &symbol, LabelIx label, std::vector<Expression *> *removed_filters) {
void Filters::EraseLabelFilter(const Symbol &symbol, const LabelIx &label, std::vector<Expression *> *removed_filters) {
for (auto filter_it = all_filters_.begin(); filter_it != all_filters_.end();) {
if (filter_it->type != FilterInfo::Type::Label) {
++filter_it;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -366,7 +366,8 @@ class Filters final {
/// Remove a label filter for symbol; may invalidate iterators.
/// If removed_filters is not nullptr, fills the vector with original
/// `Expression *` which are now completely removed.
void EraseLabelFilter(const Symbol &, LabelIx, std::vector<Expression *> *removed_filters = nullptr);
void EraseLabelFilter(const Symbol &symbol, const LabelIx &label,
std::vector<Expression *> *removed_filters = nullptr);
/// Returns a vector of FilterInfo for properties.
auto PropertyFilters(const Symbol &symbol) const {

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -655,9 +655,9 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
}
}
storage::LabelId GetLabel(LabelIx label) { return db_->NameToLabel(label.name); }
storage::LabelId GetLabel(const LabelIx &label) { return db_->NameToLabel(label.name); }
storage::PropertyId GetProperty(PropertyIx prop) { return db_->NameToProperty(prop.name); }
storage::PropertyId GetProperty(const PropertyIx &prop) { return db_->NameToProperty(prop.name); }
std::optional<LabelIx> FindBestLabelIndex(const std::unordered_set<LabelIx> &labels) {
MG_ASSERT(!labels.empty(), "Trying to find the best label without any labels.");

View File

@ -271,9 +271,9 @@ class RuleBasedPlanner {
private:
TPlanningContext *context_;
storage::LabelId GetLabel(LabelIx label) { return context_->db->NameToLabel(label.name); }
storage::LabelId GetLabel(const LabelIx &label) { return context_->db->NameToLabel(label.name); }
storage::PropertyId GetProperty(PropertyIx prop) { return context_->db->NameToProperty(prop.name); }
storage::PropertyId GetProperty(const PropertyIx &prop) { return context_->db->NameToProperty(prop.name); }
storage::EdgeTypeId GetEdgeType(EdgeTypeIx edge_type) { return context_->db->NameToEdgeType(edge_type.name); }

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -125,9 +125,7 @@ TypedValue::TypedValue(storage::PropertyValue &&other, utils::MemoryResource *me
case storage::PropertyValue::Type::List: {
type_ = Type::List;
auto &vec = other.ValueList();
new (&list_v) TVector(memory_);
list_v.reserve(vec.size());
for (auto &v : vec) list_v.emplace_back(std::move(v));
new (&list_v) TVector(std::make_move_iterator(vec.begin()), std::make_move_iterator(vec.end()), memory_);
break;
}
case storage::PropertyValue::Type::Map: {
@ -324,13 +322,13 @@ TypedValue::operator storage::PropertyValue() const {
#define DEFINE_VALUE_AND_TYPE_GETTERS(type_param, type_enum, field) \
type_param &TypedValue::Value##type_enum() { \
if (type_ != Type::type_enum) \
if (type_ != Type::type_enum) [[unlikely]] \
throw TypedValueException("TypedValue is of type '{}', not '{}'", type_, Type::type_enum); \
return field; \
} \
\
const type_param &TypedValue::Value##type_enum() const { \
if (type_ != Type::type_enum) \
if (type_ != Type::type_enum) [[unlikely]] \
throw TypedValueException("TypedValue is of type '{}', not '{}'", type_, Type::type_enum); \
return field; \
} \

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -1983,18 +1983,34 @@ void CreateSnapshot(Storage *storage, Transaction *transaction, const std::files
snapshot.WritePropertyValue(item.second);
}
const auto &in_edges = maybe_in_edges.GetValue().edges;
snapshot.WriteUint(in_edges.size());
for (const auto &item : in_edges) {
snapshot.WriteUint(item.Gid().AsUint());
snapshot.WriteUint(item.FromVertex().Gid().AsUint());
write_mapping(item.EdgeType());
}
const auto &out_edges = maybe_out_edges.GetValue().edges;
snapshot.WriteUint(out_edges.size());
for (const auto &item : out_edges) {
snapshot.WriteUint(item.Gid().AsUint());
snapshot.WriteUint(item.ToVertex().Gid().AsUint());
write_mapping(item.EdgeType());
if (storage->config_.salient.items.properties_on_edges) {
snapshot.WriteUint(in_edges.size());
for (const auto &item : in_edges) {
snapshot.WriteUint(item.GidPropertiesOnEdges().AsUint());
snapshot.WriteUint(item.FromVertex().Gid().AsUint());
write_mapping(item.EdgeType());
}
snapshot.WriteUint(out_edges.size());
for (const auto &item : out_edges) {
snapshot.WriteUint(item.GidPropertiesOnEdges().AsUint());
snapshot.WriteUint(item.ToVertex().Gid().AsUint());
write_mapping(item.EdgeType());
}
} else {
snapshot.WriteUint(in_edges.size());
for (const auto &item : in_edges) {
snapshot.WriteUint(item.GidNoPropertiesOnEdges().AsUint());
snapshot.WriteUint(item.FromVertex().Gid().AsUint());
write_mapping(item.EdgeType());
}
snapshot.WriteUint(out_edges.size());
for (const auto &item : out_edges) {
snapshot.WriteUint(item.GidNoPropertiesOnEdges().AsUint());
snapshot.WriteUint(item.ToVertex().Gid().AsUint());
write_mapping(item.EdgeType());
}
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -225,19 +225,19 @@ Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view)
if (!storage_->config_.salient.items.properties_on_edges) return PropertyValue();
bool exists = true;
bool deleted = false;
PropertyValue value;
std::optional<PropertyValue> value;
Delta *delta = nullptr;
{
auto guard = std::shared_lock{edge_.ptr->lock};
deleted = edge_.ptr->deleted;
value = edge_.ptr->properties.GetProperty(property);
value.emplace(edge_.ptr->properties.GetProperty(property));
delta = edge_.ptr->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &value, property](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
if (delta.property.key == property) {
value = delta.property.value;
*value = delta.property.value;
}
break;
}
@ -261,7 +261,7 @@ Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view)
});
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return std::move(value);
return *std::move(value);
}
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::Properties(View view) const {

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -85,6 +85,8 @@ class EdgeAccessor final {
/// @throw std::bad_alloc
Result<std::map<PropertyId, PropertyValue>> Properties(View view) const;
auto GidPropertiesOnEdges() const -> Gid { return edge_.ptr->gid; }
auto GidNoPropertiesOnEdges() const -> Gid { return edge_.gid; }
Gid Gid() const noexcept;
bool IsCycle() const { return from_vertex_ == to_vertex_; }

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -32,10 +32,10 @@ void Indices::AbortEntries(LabelId label, std::span<std::pair<PropertyValue, Ver
->AbortEntries(label, vertices, exact_start_timestamp);
}
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const {
static_cast<InMemoryLabelIndex *>(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp);
void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) const {
static_cast<InMemoryLabelIndex *>(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp, token);
static_cast<InMemoryLabelPropertyIndex *>(label_property_index_.get())
->RemoveObsoleteEntries(oldest_active_start_timestamp);
->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token));
}
void Indices::UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx) const {

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -33,7 +33,7 @@ struct Indices {
/// This function should be called from garbage collection to clean-up the
/// index.
/// TODO: unused in disk indices
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const;
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) const;
/// Surgical removal of entries that was inserted this transaction
/// TODO: unused in disk indices

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -21,11 +21,18 @@
namespace memgraph::storage {
namespace {
template <Delta::Action... actions>
struct ActionSet {
constexpr bool contains(Delta::Action action) const { return ((action == actions) || ...); }
};
/// Traverses deltas visible from transaction with start timestamp greater than
/// the provided timestamp, and calls the provided callback function for each
/// delta. If the callback ever returns true, traversal is stopped and the
/// function returns true. Otherwise, the function returns false.
template <typename TCallback>
template <ActionSet interesting, typename TCallback>
inline bool AnyVersionSatisfiesPredicate(uint64_t timestamp, const Delta *delta, const TCallback &predicate) {
while (delta != nullptr) {
const auto ts = delta->timestamp->load(std::memory_order_acquire);
@ -33,7 +40,7 @@ inline bool AnyVersionSatisfiesPredicate(uint64_t timestamp, const Delta *delta,
if (ts < timestamp) {
break;
}
if (predicate(*delta)) {
if (interesting.contains(delta->action) && predicate(*delta)) {
return true;
}
// Move to the next delta.
@ -42,6 +49,8 @@ inline bool AnyVersionSatisfiesPredicate(uint64_t timestamp, const Delta *delta,
return false;
}
} // namespace
/// Helper function for label index garbage collection. Returns true if there's
/// a reachable version of the vertex that has the given label.
inline bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t timestamp) {
@ -57,7 +66,10 @@ inline bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t tim
if (!deleted && has_label) {
return true;
}
return AnyVersionSatisfiesPredicate(timestamp, delta, [&has_label, &deleted, label](const Delta &delta) {
constexpr auto interesting =
ActionSet<Delta::Action::ADD_LABEL, Delta::Action::REMOVE_LABEL, Delta::Action::RECREATE_OBJECT,
Delta::Action::DELETE_DESERIALIZED_OBJECT, Delta::Action::DELETE_OBJECT>{};
return AnyVersionSatisfiesPredicate<interesting>(timestamp, delta, [&has_label, &deleted, label](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
if (delta.label == label) {
@ -98,10 +110,10 @@ inline bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t tim
/// property value.
inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, PropertyId key, const PropertyValue &value,
uint64_t timestamp) {
bool has_label{false};
bool current_value_equal_to_value{value.IsNull()};
bool deleted{false};
const Delta *delta = nullptr;
Delta const *delta;
bool deleted;
bool has_label;
bool current_value_equal_to_value;
{
auto guard = std::shared_lock{vertex.lock};
delta = vertex.delta;
@ -116,7 +128,10 @@ inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, Prop
return true;
}
return AnyVersionSatisfiesPredicate(
constexpr auto interesting = ActionSet<Delta::Action::ADD_LABEL, Delta::Action::REMOVE_LABEL,
Delta::Action::SET_PROPERTY, Delta::Action::RECREATE_OBJECT,
Delta::Action::DELETE_DESERIALIZED_OBJECT, Delta::Action::DELETE_OBJECT>{};
return AnyVersionSatisfiesPredicate<interesting>(
timestamp, delta, [&has_label, &current_value_equal_to_value, &deleted, label, key, &value](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -16,6 +16,7 @@
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/indices_utils.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "utils/counter.hpp"
namespace memgraph::storage {
@ -79,10 +80,18 @@ std::vector<LabelId> InMemoryLabelIndex::ListIndices() const {
return ret;
}
void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) {
void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
auto maybe_stop = utils::ResettableCounter<2048>();
for (auto &label_storage : index_) {
// before starting index, check if stop_requested
if (token.stop_requested()) return;
auto vertices_acc = label_storage.second.access();
for (auto it = vertices_acc.begin(); it != vertices_acc.end();) {
// Hot loop, don't check stop_requested every time
if (maybe_stop() && token.stop_requested()) return;
auto next_it = it;
++next_it;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -54,7 +54,7 @@ class InMemoryLabelIndex : public storage::LabelIndex {
std::vector<LabelId> ListIndices() const override;
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
/// Surgical removal of entries that was inserted this transaction
void AbortEntries(LabelId labelId, std::span<Vertex *const> vertices, uint64_t exact_start_timestamp);

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,6 +13,7 @@
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/indices/indices_utils.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "utils/counter.hpp"
#include "utils/logging.hpp"
namespace memgraph::storage {
@ -139,10 +140,18 @@ std::vector<std::pair<LabelId, PropertyId>> InMemoryLabelPropertyIndex::ListIndi
return ret;
}
void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) {
void InMemoryLabelPropertyIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
auto maybe_stop = utils::ResettableCounter<2048>();
for (auto &[label_property, index] : index_) {
// before starting index, check if stop_requested
if (token.stop_requested()) return;
auto index_acc = index.access();
for (auto it = index_acc.begin(); it != index_acc.end();) {
// Hot loop, don't check stop_requested every time
if (maybe_stop() && token.stop_requested()) return;
auto next_it = it;
++next_it;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -60,7 +60,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex {
std::vector<std::pair<LabelId, PropertyId>> ListIndices() const override;
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
void AbortEntries(PropertyId property, std::span<std::pair<PropertyValue, Vertex *> const> vertices,
uint64_t exact_start_timestamp);

View File

@ -154,6 +154,8 @@ InMemoryStorage::InMemoryStorage(Config config)
}
InMemoryStorage::~InMemoryStorage() {
stop_source.request_stop();
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
}
@ -1575,9 +1577,12 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
if (run_index_cleanup) {
// This operation is very expensive as it traverses through all of the items
// in every index every time.
indices_.RemoveObsoleteEntries(oldest_active_start_timestamp);
auto *mem_unique_constraints = static_cast<InMemoryUniqueConstraints *>(constraints_.unique_constraints_.get());
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_start_timestamp);
auto token = stop_source.get_token();
if (!token.stop_requested()) {
indices_.RemoveObsoleteEntries(oldest_active_start_timestamp, token);
auto *mem_unique_constraints = static_cast<InMemoryUniqueConstraints *>(constraints_.unique_constraints_.get());
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_start_timestamp, std::move(token));
}
}
{
@ -2125,8 +2130,11 @@ void InMemoryStorage::CreateSnapshotHandler(
// Run the snapshot thread (if enabled)
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval,
[this]() { this->create_snapshot_handler(); });
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this, token = stop_source.get_token()]() {
if (!token.stop_requested()) {
this->create_snapshot_handler();
}
});
}
}
IndicesInfo InMemoryStorage::InMemoryAccessor::ListAllIndices() const {

View File

@ -460,6 +460,9 @@ class InMemoryStorage final : public Storage {
// Moved the create snapshot to a user defined handler so we can remove the global replication state from the storage
std::function<void()> create_snapshot_handler{};
// A way to tell async operation to stop
std::stop_source stop_source;
};
} // namespace memgraph::storage

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,6 +15,7 @@
#include "storage/v2/constraints/utils.hpp"
#include "storage/v2/durability/recovery_type.hpp"
#include "storage/v2/id_types.hpp"
#include "utils/counter.hpp"
#include "utils/logging.hpp"
#include "utils/skip_list.hpp"
namespace memgraph::storage {
@ -487,10 +488,18 @@ std::vector<std::pair<LabelId, std::set<PropertyId>>> InMemoryUniqueConstraints:
return ret;
}
void InMemoryUniqueConstraints::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) {
void InMemoryUniqueConstraints::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token) {
auto maybe_stop = utils::ResettableCounter<2048>();
for (auto &[label_props, storage] : constraints_) {
// before starting constraint, check if stop_requested
if (token.stop_requested()) return;
auto acc = storage.access();
for (auto it = acc.begin(); it != acc.end();) {
// Hot loop, don't check stop_requested every time
if (maybe_stop() && token.stop_requested()) return;
auto next_it = it;
++next_it;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -122,7 +122,7 @@ class InMemoryUniqueConstraints : public UniqueConstraints {
std::vector<std::pair<LabelId, std::set<PropertyId>>> ListConstraints() const override;
/// GC method that removes outdated entries from constraints' storages.
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp);
void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp, std::stop_token token);
void Clear() override;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -123,21 +123,21 @@ class PropertyValue {
// value getters for primitive types
/// @throw PropertyValueException if value isn't of correct type.
bool ValueBool() const {
if (type_ != Type::Bool) {
if (type_ != Type::Bool) [[unlikely]] {
throw PropertyValueException("The value isn't a bool!");
}
return bool_v;
}
/// @throw PropertyValueException if value isn't of correct type.
int64_t ValueInt() const {
if (type_ != Type::Int) {
if (type_ != Type::Int) [[unlikely]] {
throw PropertyValueException("The value isn't an int!");
}
return int_v;
}
/// @throw PropertyValueException if value isn't of correct type.
double ValueDouble() const {
if (type_ != Type::Double) {
if (type_ != Type::Double) [[unlikely]] {
throw PropertyValueException("The value isn't a double!");
}
return double_v;
@ -145,7 +145,7 @@ class PropertyValue {
/// @throw PropertyValueException if value isn't of correct type.
TemporalData ValueTemporalData() const {
if (type_ != Type::TemporalData) {
if (type_ != Type::TemporalData) [[unlikely]] {
throw PropertyValueException("The value isn't a temporal data!");
}
@ -155,7 +155,7 @@ class PropertyValue {
// const value getters for non-primitive types
/// @throw PropertyValueException if value isn't of correct type.
const std::string &ValueString() const {
if (type_ != Type::String) {
if (type_ != Type::String) [[unlikely]] {
throw PropertyValueException("The value isn't a string!");
}
return string_v;
@ -163,7 +163,7 @@ class PropertyValue {
/// @throw PropertyValueException if value isn't of correct type.
const std::vector<PropertyValue> &ValueList() const {
if (type_ != Type::List) {
if (type_ != Type::List) [[unlikely]] {
throw PropertyValueException("The value isn't a list!");
}
return list_v;
@ -171,7 +171,7 @@ class PropertyValue {
/// @throw PropertyValueException if value isn't of correct type.
const std::map<std::string, PropertyValue> &ValueMap() const {
if (type_ != Type::Map) {
if (type_ != Type::Map) [[unlikely]] {
throw PropertyValueException("The value isn't a map!");
}
return map_v;
@ -180,7 +180,7 @@ class PropertyValue {
// reference value getters for non-primitive types
/// @throw PropertyValueException if value isn't of correct type.
std::string &ValueString() {
if (type_ != Type::String) {
if (type_ != Type::String) [[unlikely]] {
throw PropertyValueException("The value isn't a string!");
}
return string_v;
@ -188,7 +188,7 @@ class PropertyValue {
/// @throw PropertyValueException if value isn't of correct type.
std::vector<PropertyValue> &ValueList() {
if (type_ != Type::List) {
if (type_ != Type::List) [[unlikely]] {
throw PropertyValueException("The value isn't a list!");
}
return list_v;
@ -196,7 +196,7 @@ class PropertyValue {
/// @throw PropertyValueException if value isn't of correct type.
std::map<std::string, PropertyValue> &ValueMap() {
if (type_ != Type::Map) {
if (type_ != Type::Map) [[unlikely]] {
throw PropertyValueException("The value isn't a map!");
}
return map_v;
@ -279,7 +279,7 @@ inline bool operator==(const PropertyValue &first, const PropertyValue &second)
case PropertyValue::Type::Bool:
return first.ValueBool() == second.ValueBool();
case PropertyValue::Type::Int:
if (second.type() == PropertyValue::Type::Double) {
if (second.type() == PropertyValue::Type::Double) [[unlikely]] {
return first.ValueInt() == second.ValueDouble();
} else {
return first.ValueInt() == second.ValueInt();
@ -310,7 +310,7 @@ inline bool operator<(const PropertyValue &first, const PropertyValue &second) n
case PropertyValue::Type::Bool:
return first.ValueBool() < second.ValueBool();
case PropertyValue::Type::Int:
if (second.type() == PropertyValue::Type::Double) {
if (second.type() == PropertyValue::Type::Double) [[unlikely]] {
return first.ValueInt() < second.ValueDouble();
} else {
return first.ValueInt() < second.ValueInt();
@ -363,36 +363,35 @@ inline PropertyValue::PropertyValue(const PropertyValue &other) : type_(other.ty
}
}
inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(other.type_) {
switch (other.type_) {
inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(std::exchange(other.type_, Type::Null)) {
switch (type_) {
case Type::Null:
break;
case Type::Bool:
this->bool_v = other.bool_v;
bool_v = other.bool_v;
break;
case Type::Int:
this->int_v = other.int_v;
int_v = other.int_v;
break;
case Type::Double:
this->double_v = other.double_v;
double_v = other.double_v;
break;
case Type::String:
new (&string_v) std::string(std::move(other.string_v));
std::construct_at(&string_v, std::move(other.string_v));
std::destroy_at(&other.string_v);
break;
case Type::List:
new (&list_v) std::vector<PropertyValue>(std::move(other.list_v));
std::construct_at(&list_v, std::move(other.list_v));
std::destroy_at(&other.list_v);
break;
case Type::Map:
new (&map_v) std::map<std::string, PropertyValue>(std::move(other.map_v));
std::construct_at(&map_v, std::move(other.map_v));
std::destroy_at(&other.map_v);
break;
case Type::TemporalData:
this->temporal_data_v = other.temporal_data_v;
temporal_data_v = other.temporal_data_v;
break;
}
// reset the type of other
other.DestroyValue();
other.type_ = Type::Null;
}
inline PropertyValue &PropertyValue::operator=(const PropertyValue &other) {
@ -431,46 +430,48 @@ inline PropertyValue &PropertyValue::operator=(const PropertyValue &other) {
}
inline PropertyValue &PropertyValue::operator=(PropertyValue &&other) noexcept {
if (this == &other) return *this;
if (type_ == other.type_) {
// maybe the same object, check if no work is required
if (this == &other) return *this;
DestroyValue();
type_ = other.type_;
switch (other.type_) {
case Type::Null:
break;
case Type::Bool:
this->bool_v = other.bool_v;
break;
case Type::Int:
this->int_v = other.int_v;
break;
case Type::Double:
this->double_v = other.double_v;
break;
case Type::String:
new (&string_v) std::string(std::move(other.string_v));
break;
case Type::List:
new (&list_v) std::vector<PropertyValue>(std::move(other.list_v));
break;
case Type::Map:
new (&map_v) std::map<std::string, PropertyValue>(std::move(other.map_v));
break;
case Type::TemporalData:
this->temporal_data_v = other.temporal_data_v;
break;
switch (type_) {
case Type::Null:
break;
case Type::Bool:
bool_v = other.bool_v;
break;
case Type::Int:
int_v = other.int_v;
break;
case Type::Double:
double_v = other.double_v;
break;
case Type::String:
string_v = std::move(other.string_v);
std::destroy_at(&other.string_v);
break;
case Type::List:
list_v = std::move(other.list_v);
std::destroy_at(&other.list_v);
break;
case Type::Map:
map_v = std::move(other.map_v);
std::destroy_at(&other.map_v);
break;
case Type::TemporalData:
temporal_data_v = other.temporal_data_v;
break;
}
other.type_ = Type::Null;
return *this;
} else {
std::destroy_at(this);
return *std::construct_at(std::launder(this), std::move(other));
}
// reset the type of other
other.DestroyValue();
other.type_ = Type::Null;
return *this;
}
inline void PropertyValue::DestroyValue() noexcept {
switch (type_) {
switch (std::exchange(type_, Type::Null)) {
// destructor for primitive types does nothing
case Type::Null:
case Type::Bool:

29
src/utils/counter.hpp Normal file
View File

@ -0,0 +1,29 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::utils {
/// A resetable counter, every Nth call returns true
template <std::size_t N>
auto ResettableCounter() {
return [counter = N]() mutable {
--counter;
if (counter != 0) return false;
counter = N;
return true;
};
}
} // namespace memgraph::utils

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -504,7 +504,7 @@ void OutputFile::Close() noexcept {
void OutputFile::FlushBuffer(bool force_flush) {
MG_ASSERT(IsOpen(), "Flushing an unopend file.");
if (!force_flush && buffer_position_.load() < kFileBufferSize) return;
if (!force_flush && buffer_position_ < kFileBufferSize) return;
std::unique_lock flush_guard(flush_lock_);
FlushBufferInternal();

View File

@ -365,9 +365,7 @@ class SkipListGc final {
leftover.Push(*item);
}
}
while ((item = leftover.Pop())) {
deleted_.Push(*item);
}
deleted_ = std::move(leftover);
}
MemoryResource *GetMemoryResource() const { return memory_; }
@ -384,11 +382,14 @@ class SkipListGc final {
}
// Delete all items that have to be garbage collected.
std::optional<TDeleted> item;
while ((item = deleted_.Pop())) {
size_t bytes = SkipListNodeSize(*item->second);
item->second->~TNode();
memory_->Deallocate(item->second, bytes);
{
std::optional<TDeleted> item;
std::unique_lock guard(lock_);
while ((item = deleted_.Pop())) {
size_t bytes = SkipListNodeSize(*item->second);
item->second->~TNode();
memory_->Deallocate(item->second, bytes);
}
}
// Reset all variables.
@ -1028,40 +1029,46 @@ class SkipList final : detail::SkipListNode_base {
continue;
}
std::unique_lock<SpinLock> guards[kSkipListMaxHeight];
TNode *pred, *succ, *prev_pred = nullptr;
bool valid = true;
// The paper has a wrong condition here. In the paper it states that this
// loop should have `(layer <= top_layer)`, but that isn't correct.
for (int layer = 0; valid && (layer < top_layer); ++layer) {
pred = preds[layer];
succ = succs[layer];
if (pred != prev_pred) {
guards[layer] = std::unique_lock<SpinLock>(pred->lock);
prev_pred = pred;
TNode *new_node;
{
TNode *prev_pred = nullptr;
bool valid = true;
std::unique_lock<SpinLock> guards[kSkipListMaxHeight];
// The paper has a wrong condition here. In the paper it states that this
// loop should have `(layer <= top_layer)`, but that isn't correct.
for (int layer = 0; valid && (layer < top_layer); ++layer) {
TNode *pred = preds[layer];
TNode *succ = succs[layer];
if (pred != prev_pred) {
guards[layer] = std::unique_lock<SpinLock>(pred->lock);
prev_pred = pred;
}
// Existence test is missing in the paper.
valid = !pred->marked.load(std::memory_order_acquire) &&
pred->nexts[layer].load(std::memory_order_acquire) == succ &&
(succ == nullptr || !succ->marked.load(std::memory_order_acquire));
}
// Existence test is missing in the paper.
valid = !pred->marked.load(std::memory_order_acquire) &&
pred->nexts[layer].load(std::memory_order_acquire) == succ &&
(succ == nullptr || !succ->marked.load(std::memory_order_acquire));
}
if (!valid) continue;
if (!valid) continue;
size_t node_bytes = sizeof(TNode) + top_layer * sizeof(std::atomic<TNode *>);
void *ptr = GetMemoryResource()->Allocate(node_bytes);
// `calloc` would be faster, but the API has no such call.
memset(ptr, 0, node_bytes);
auto *new_node = static_cast<TNode *>(ptr);
// Construct through allocator so it propagates if needed.
Allocator<TNode> allocator(GetMemoryResource());
allocator.construct(new_node, top_layer, std::forward<TObjUniv>(object));
size_t node_bytes = sizeof(TNode) + top_layer * sizeof(std::atomic<TNode *>);
// The paper is also wrong here. It states that the loop should go up to
// `top_layer` which is wrong.
for (int layer = 0; layer < top_layer; ++layer) {
new_node->nexts[layer].store(succs[layer], std::memory_order_release);
preds[layer]->nexts[layer].store(new_node, std::memory_order_release);
MemoryResource *memoryResource = GetMemoryResource();
void *ptr = memoryResource->Allocate(node_bytes);
// `calloc` would be faster, but the API has no such call.
memset(ptr, 0, node_bytes);
new_node = static_cast<TNode *>(ptr);
// Construct through allocator so it propagates if needed.
Allocator<TNode> allocator(memoryResource);
allocator.construct(new_node, top_layer, std::forward<TObjUniv>(object));
// The paper is also wrong here. It states that the loop should go up to
// `top_layer` which is wrong.
for (int layer = 0; layer < top_layer; ++layer) {
new_node->nexts[layer].store(succs[layer], std::memory_order_release);
preds[layer]->nexts[layer].store(new_node, std::memory_order_release);
}
}
new_node->fully_linked.store(true, std::memory_order_release);