From 14f92b4a0fcce743246da0b73ef8db53a6e91399 Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd Date: Fri, 1 Dec 2023 12:38:48 +0000 Subject: [PATCH 1/2] Bugfix: correct replication handler (#1540) Fixes root cause of a cascade of failures in replication code: - Replica handling of deleting an edge is now corrected. Now tolerant of multiple edges of the same relationship type. - Improved robustness: correct exception handling around failed stream of current WAL file. This now means a REPLICA failure will no longer prevent transactions on MAIN from performing WAL writes. - Slightly better diagnostic messages, not user friendly but helps get developer to correct root cause quicker. - Proactively remove vertex+edges during Abort rather than defer to GC to do that work, this included fixing constraints and indexes to be safe. Co-authored-by: Andreja Tonev --- src/dbms/inmemory/replication_handlers.cpp | 86 ++++++---- src/integrations/pulsar/consumer.hpp | 3 +- src/storage/v2/constraints/constraints.cpp | 4 + src/storage/v2/constraints/constraints.hpp | 4 + .../v2/disk/edge_import_mode_cache.cpp | 7 +- src/storage/v2/disk/storage.cpp | 45 ++++++ src/storage/v2/disk/storage.hpp | 3 + src/storage/v2/indices/indices.cpp | 19 +++ src/storage/v2/indices/indices.hpp | 17 ++ .../v2/indices/label_property_index.hpp | 5 + src/storage/v2/inmemory/label_index.cpp | 46 +++++- src/storage/v2/inmemory/label_index.hpp | 15 +- .../v2/inmemory/label_property_index.cpp | 63 +++++++- .../v2/inmemory/label_property_index.hpp | 30 +++- src/storage/v2/inmemory/storage.cpp | 147 ++++++++++++++++-- src/storage/v2/inmemory/storage.hpp | 5 + .../v2/inmemory/unique_constraints.cpp | 33 +++- .../v2/inmemory/unique_constraints.hpp | 4 + src/storage/v2/storage.hpp | 3 + .../test_query_modules/module_test.cpp | 10 +- tests/e2e/replication/CMakeLists.txt | 1 + tests/e2e/replication/edge_delete.py | 56 +++++++ tests/e2e/replication/workloads.yaml | 22 +++ 23 files changed, 558 insertions(+), 70 deletions(-) create mode 100755 tests/e2e/replication/edge_delete.py diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index ce1f6da20..a5f56ee3d 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -370,8 +370,9 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage constexpr bool kSharedAccess = false; std::optional> commit_timestamp_and_accessor; - auto get_transaction = [storage, &commit_timestamp_and_accessor](uint64_t commit_timestamp, - bool unique = kSharedAccess) { + auto const get_transaction = [storage, &commit_timestamp_and_accessor]( + uint64_t commit_timestamp, + bool unique = kSharedAccess) -> storage::InMemoryStorage::ReplicationAccessor * { if (!commit_timestamp_and_accessor) { std::unique_ptr acc = nullptr; if (unique) { @@ -415,9 +416,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage spdlog::trace(" Delete vertex {}", delta.vertex_create_delete.gid.AsUint()); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = transaction->DeleteVertex(&*vertex); - if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::VERTEX_ADD_LABEL: { @@ -425,9 +428,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.vertex_add_remove_label.label); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = vertex->AddLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label)); - if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::VERTEX_REMOVE_LABEL: { @@ -435,9 +440,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.vertex_add_remove_label.label); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = vertex->RemoveLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label)); - if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::VERTEX_SET_PROPERTY: { @@ -445,10 +452,12 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_edge_set_property.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = vertex->SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property), delta.vertex_edge_set_property.value); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EDGE_CREATE: { @@ -457,13 +466,16 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint()); auto *transaction = get_transaction(timestamp); auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW); - if (!from_vertex) throw utils::BasicException("Invalid transaction!"); + if (!from_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW); - if (!to_vertex) throw utils::BasicException("Invalid transaction!"); + if (!to_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto edge = transaction->CreateEdgeEx(&*from_vertex, &*to_vertex, transaction->NameToEdgeType(delta.edge_create_delete.edge_type), delta.edge_create_delete.gid); - if (edge.HasError()) throw utils::BasicException("Invalid transaction!"); + if (edge.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EDGE_DELETE: { @@ -472,16 +484,17 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint()); auto *transaction = get_transaction(timestamp); auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW); - if (!from_vertex) throw utils::BasicException("Invalid transaction!"); + if (!from_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW); - if (!to_vertex) throw utils::BasicException("Invalid transaction!"); - auto edges = from_vertex->OutEdges(View::NEW, {transaction->NameToEdgeType(delta.edge_create_delete.edge_type)}, - &*to_vertex); - if (edges.HasError()) throw utils::BasicException("Invalid transaction!"); - if (edges->edges.size() != 1) throw utils::BasicException("Invalid transaction!"); - auto &edge = (*edges).edges[0]; - auto ret = transaction->DeleteEdge(&edge); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (!to_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); + auto edgeType = transaction->NameToEdgeType(delta.edge_create_delete.edge_type); + auto edge = + transaction->FindEdge(delta.edge_create_delete.gid, View::NEW, edgeType, &*from_vertex, &*to_vertex); + if (!edge) throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); + if (auto ret = transaction->DeleteEdge(&*edge); ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EDGE_SET_PROPERTY: { @@ -498,7 +511,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage // yields an accessor that is only valid for managing the edge's // properties. auto edge = edge_acc.find(delta.vertex_edge_set_property.gid); - if (edge == edge_acc.end()) throw utils::BasicException("Invalid transaction!"); + if (edge == edge_acc.end()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); // The edge visibility check must be done here manually because we // don't allow direct access to the edges through the public API. { @@ -530,7 +544,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage } } }); - if (!is_visible) throw utils::BasicException("Invalid transaction!"); + if (!is_visible) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); } EdgeRef edge_ref(&*edge); // Here we create an edge accessor that we will use to get the @@ -543,7 +558,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property), delta.vertex_edge_set_property.value); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } @@ -553,7 +569,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage throw utils::BasicException("Invalid commit data!"); auto ret = commit_timestamp_and_accessor->second.Commit(commit_timestamp_and_accessor->first, false /* not main */); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); commit_timestamp_and_accessor = std::nullopt; break; } @@ -563,14 +580,14 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage // Need to send the timestamp auto *transaction = get_transaction(timestamp, kUniqueAccess); if (transaction->CreateIndex(storage->NameToLabel(delta.operation_label.label)).HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_INDEX_DROP: { spdlog::trace(" Drop label index on :{}", delta.operation_label.label); auto *transaction = get_transaction(timestamp, kUniqueAccess); if (transaction->DropIndex(storage->NameToLabel(delta.operation_label.label)).HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_INDEX_STATS_SET: { @@ -601,7 +618,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage ->CreateIndex(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)) .HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: { @@ -612,7 +629,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage ->DropIndex(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)) .HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_PROPERTY_INDEX_STATS_SET: { @@ -644,7 +661,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = transaction->CreateExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { @@ -655,7 +673,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage ->DropExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)) .HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: { @@ -670,7 +688,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = transaction->CreateUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label), properties); if (!ret.HasValue() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: { @@ -685,7 +703,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = transaction->DropUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label), properties); if (ret != UniqueConstraints::DeletionStatus::SUCCESS) { - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); } break; } diff --git a/src/integrations/pulsar/consumer.hpp b/src/integrations/pulsar/consumer.hpp index 1caa366ad..06ed3a550 100644 --- a/src/integrations/pulsar/consumer.hpp +++ b/src/integrations/pulsar/consumer.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,6 +10,7 @@ // licenses/APL.txt. #pragma once + #include #include #include diff --git a/src/storage/v2/constraints/constraints.cpp b/src/storage/v2/constraints/constraints.cpp index 42128511c..6a6554db4 100644 --- a/src/storage/v2/constraints/constraints.cpp +++ b/src/storage/v2/constraints/constraints.cpp @@ -29,4 +29,8 @@ Constraints::Constraints(const Config &config, StorageMode storage_mode) { }; }); } + +void Constraints::AbortEntries(std::span vertices, uint64_t exact_start_timestamp) const { + static_cast(unique_constraints_.get())->AbortEntries(vertices, exact_start_timestamp); +} } // namespace memgraph::storage diff --git a/src/storage/v2/constraints/constraints.hpp b/src/storage/v2/constraints/constraints.hpp index 8469a5470..1f5ef999e 100644 --- a/src/storage/v2/constraints/constraints.hpp +++ b/src/storage/v2/constraints/constraints.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "storage/v2/config.hpp" #include "storage/v2/constraints/existence_constraints.hpp" #include "storage/v2/constraints/unique_constraints.hpp" @@ -27,6 +29,8 @@ struct Constraints { Constraints &operator=(Constraints &&) = delete; ~Constraints() = default; + void AbortEntries(std::span vertices, uint64_t exact_start_timestamp) const; + std::unique_ptr existence_constraints_; std::unique_ptr unique_constraints_; }; diff --git a/src/storage/v2/disk/edge_import_mode_cache.cpp b/src/storage/v2/disk/edge_import_mode_cache.cpp index 2a29a1606..cd1ca0dc2 100644 --- a/src/storage/v2/disk/edge_import_mode_cache.cpp +++ b/src/storage/v2/disk/edge_import_mode_cache.cpp @@ -10,7 +10,9 @@ // licenses/APL.txt. #include "storage/v2/disk//edge_import_mode_cache.hpp" + #include + #include "storage/v2/disk/label_property_index.hpp" #include "storage/v2/indices/indices.hpp" #include "storage/v2/inmemory/label_index.hpp" @@ -28,7 +30,7 @@ EdgeImportModeCache::EdgeImportModeCache(const Config &config) InMemoryLabelIndex::Iterable EdgeImportModeCache::Vertices(LabelId label, View view, Storage *storage, Transaction *transaction) const { auto *mem_label_index = static_cast(in_memory_indices_.label_index_.get()); - return mem_label_index->Vertices(label, view, storage, transaction); + return mem_label_index->Vertices(label, vertices_.access(), view, storage, transaction); } InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices( @@ -37,7 +39,8 @@ InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices( Transaction *transaction) const { auto *mem_label_property_index = static_cast(in_memory_indices_.label_property_index_.get()); - return mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, storage, transaction); + return mem_label_property_index->Vertices(label, property, vertices_.access(), lower_bound, upper_bound, view, + storage, transaction); } bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property, diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index 09b28943c..073d1fbd1 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -71,6 +71,37 @@ namespace memgraph::storage { +namespace { + +auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex) + -> Result { + auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) { + // Obtain the locks by `gid` order to avoid lock cycles. + auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock}; + auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock}; + if (from_vertex->gid < to_vertex->gid) { + guard_from.lock(); + guard_to.lock(); + } else if (from_vertex->gid > to_vertex->gid) { + guard_to.lock(); + guard_from.lock(); + } else { + // The vertices are the same vertex, only lock one. + guard_from.lock(); + } + + // With the potentially cheaper side FindEdges + const auto out_n = from_vertex->out_edges.size(); + const auto in_n = to_vertex->in_edges.size(); + return out_n <= in_n; + }; + + return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex) + : to_vertex->InEdges(view, {edge_type}, from_vertex); +} + +} // namespace + using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; namespace { @@ -949,6 +980,20 @@ Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } +std::optional DiskStorage::DiskAccessor::FindEdge(Gid gid, View view, EdgeTypeId edge_type, + VertexAccessor *from_vertex, + VertexAccessor *to_vertex) { + auto res = FindEdges(view, edge_type, from_vertex, to_vertex); + if (res.HasError()) return std::nullopt; // TODO: use a Result type + + auto const it = std::ranges::find_if( + res->edges, [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; }); + + if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type + + return *it; +} + Result DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) { MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage"); return Error::NONEXISTENT_OBJECT; diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index e93566c09..8640462de 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -121,6 +121,9 @@ class DiskStorage final : public Storage { Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override; + std::optional FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex, + VertexAccessor *to_vertex) override; + Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override; Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; diff --git a/src/storage/v2/indices/indices.cpp b/src/storage/v2/indices/indices.cpp index bf7295de2..e0b194ad4 100644 --- a/src/storage/v2/indices/indices.cpp +++ b/src/storage/v2/indices/indices.cpp @@ -17,6 +17,21 @@ namespace memgraph::storage { +void Indices::AbortEntries(LabelId labelId, std::span vertices, uint64_t exact_start_timestamp) const { + static_cast(label_index_.get())->AbortEntries(labelId, vertices, exact_start_timestamp); +} + +void Indices::AbortEntries(PropertyId property, std::span const> vertices, + uint64_t exact_start_timestamp) const { + static_cast(label_property_index_.get()) + ->AbortEntries(property, vertices, exact_start_timestamp); +} +void Indices::AbortEntries(LabelId label, std::span const> vertices, + uint64_t exact_start_timestamp) const { + static_cast(label_property_index_.get()) + ->AbortEntries(label, vertices, exact_start_timestamp); +} + void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const { static_cast(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp); static_cast(label_property_index_.get()) @@ -50,4 +65,8 @@ Indices::Indices(const Config &config, StorageMode storage_mode) { }); } +Indices::IndexStats Indices::Analysis() const { + return {static_cast(label_index_.get())->Analysis(), + static_cast(label_property_index_.get())->Analysis()}; +} } // namespace memgraph::storage diff --git a/src/storage/v2/indices/indices.hpp b/src/storage/v2/indices/indices.hpp index 9a71107cd..33bd429e6 100644 --- a/src/storage/v2/indices/indices.hpp +++ b/src/storage/v2/indices/indices.hpp @@ -12,6 +12,9 @@ #pragma once #include +#include + +#include "storage/v2/id_types.hpp" #include "storage/v2/indices/label_index.hpp" #include "storage/v2/indices/label_property_index.hpp" #include "storage/v2/storage_mode.hpp" @@ -32,6 +35,20 @@ struct Indices { /// TODO: unused in disk indices void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const; + /// Surgical removal of entries that was inserted this transaction + /// TODO: unused in disk indices + void AbortEntries(LabelId labelId, std::span vertices, uint64_t exact_start_timestamp) const; + void AbortEntries(PropertyId property, std::span const> vertices, + uint64_t exact_start_timestamp) const; + void AbortEntries(LabelId label, std::span const> vertices, + uint64_t exact_start_timestamp) const; + + struct IndexStats { + std::vector label; + LabelPropertyIndex::IndexStats property_label; + }; + IndexStats Analysis() const; + // Indices are updated whenever an update occurs, instead of only on commit or // advance command. This is necessary because we want indices to support `NEW` // view for use in Merge. diff --git a/src/storage/v2/indices/label_property_index.hpp b/src/storage/v2/indices/label_property_index.hpp index 84908b3e9..e8389fbea 100644 --- a/src/storage/v2/indices/label_property_index.hpp +++ b/src/storage/v2/indices/label_property_index.hpp @@ -19,6 +19,11 @@ namespace memgraph::storage { class LabelPropertyIndex { public: + struct IndexStats { + std::map> l2p; + std::map> p2l; + }; + LabelPropertyIndex() = default; LabelPropertyIndex(const LabelPropertyIndex &) = delete; LabelPropertyIndex(LabelPropertyIndex &&) = delete; diff --git a/src/storage/v2/inmemory/label_index.cpp b/src/storage/v2/inmemory/label_index.cpp index 31c3634be..82ead4b44 100644 --- a/src/storage/v2/inmemory/label_index.cpp +++ b/src/storage/v2/inmemory/label_index.cpp @@ -10,8 +10,12 @@ // licenses/APL.txt. #include "storage/v2/inmemory/label_index.hpp" + +#include + #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/indices_utils.hpp" +#include "storage/v2/inmemory/storage.hpp" namespace memgraph::storage { @@ -96,9 +100,23 @@ void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_time } } -InMemoryLabelIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, LabelId label, View view, - Storage *storage, Transaction *transaction) - : index_accessor_(std::move(index_accessor)), +void InMemoryLabelIndex::AbortEntries(LabelId labelId, std::span vertices, + uint64_t exact_start_timestamp) { + auto const it = index_.find(labelId); + if (it == index_.end()) return; + + auto &label_storage = it->second; + auto vertices_acc = label_storage.access(); + for (auto *vertex : vertices) { + vertices_acc.remove(Entry{vertex, exact_start_timestamp}); + } +} + +InMemoryLabelIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, + utils::SkipList::ConstAccessor vertices_accessor, LabelId label, + View view, Storage *storage, Transaction *transaction) + : pin_accessor_(std::move(vertices_accessor)), + index_accessor_(std::move(index_accessor)), label_(label), view_(view), storage_(storage), @@ -147,9 +165,21 @@ void InMemoryLabelIndex::RunGC() { InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Storage *storage, Transaction *transaction) { + DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL || + storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL, + "LabelIndex trying to access InMemory vertices from OnDisk!"); + auto vertices_acc = static_cast(storage)->vertices_.access(); const auto it = index_.find(label); MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint()); - return {it->second.access(), label, view, storage, transaction}; + return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction}; +} + +InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices( + LabelId label, memgraph::utils::SkipList::ConstAccessor vertices_acc, View view, + Storage *storage, Transaction *transaction) { + const auto it = index_.find(label); + MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint()); + return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction}; } void InMemoryLabelIndex::SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats) { @@ -187,4 +217,12 @@ bool InMemoryLabelIndex::DeleteIndexStats(const storage::LabelId &label) { return false; } +std::vector InMemoryLabelIndex::Analysis() const { + std::vector res; + res.reserve(index_.size()); + for (const auto &[label, _] : index_) { + res.emplace_back(label); + } + return res; +} } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/label_index.hpp b/src/storage/v2/inmemory/label_index.hpp index 7d606574b..21df32deb 100644 --- a/src/storage/v2/inmemory/label_index.hpp +++ b/src/storage/v2/inmemory/label_index.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/label_index.hpp" #include "storage/v2/indices/label_index_stats.hpp" @@ -56,10 +58,15 @@ class InMemoryLabelIndex : public storage::LabelIndex { void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp); + /// Surgical removal of entries that was inserted this transaction + void AbortEntries(LabelId labelId, std::span vertices, uint64_t exact_start_timestamp); + + std::vector Analysis() const; + class Iterable { public: - Iterable(utils::SkipList::Accessor index_accessor, LabelId label, View view, Storage *storage, - Transaction *transaction); + Iterable(utils::SkipList::Accessor index_accessor, utils::SkipList::ConstAccessor vertices_accessor, + LabelId label, View view, Storage *storage, Transaction *transaction); class Iterator { public: @@ -85,6 +92,7 @@ class InMemoryLabelIndex : public storage::LabelIndex { Iterator end() { return {this, index_accessor_.end()}; } private: + utils::SkipList::ConstAccessor pin_accessor_; utils::SkipList::Accessor index_accessor_; LabelId label_; View view_; @@ -98,6 +106,9 @@ class InMemoryLabelIndex : public storage::LabelIndex { Iterable Vertices(LabelId label, View view, Storage *storage, Transaction *transaction); + Iterable Vertices(LabelId label, memgraph::utils::SkipList::ConstAccessor vertices_acc, + View view, Storage *storage, Transaction *transaction); + void SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats); std::optional GetIndexStats(const storage::LabelId &label) const; diff --git a/src/storage/v2/inmemory/label_property_index.cpp b/src/storage/v2/inmemory/label_property_index.cpp index fa5cce444..f61b9dd11 100644 --- a/src/storage/v2/inmemory/label_property_index.cpp +++ b/src/storage/v2/inmemory/label_property_index.cpp @@ -12,6 +12,8 @@ #include "storage/v2/inmemory/label_property_index.hpp" #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/indices_utils.hpp" +#include "storage/v2/inmemory/storage.hpp" +#include "utils/logging.hpp" namespace memgraph::storage { @@ -101,11 +103,12 @@ void InMemoryLabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const return; } - if (!indices_by_property_.contains(property)) { + auto index = indices_by_property_.find(property); + if (index == indices_by_property_.end()) { return; } - for (const auto &[_, storage] : indices_by_property_.at(property)) { + for (const auto &[_, storage] : index->second) { auto acc = storage->access(); acc.insert(Entry{value, vertex, tx.start_timestamp}); } @@ -220,12 +223,14 @@ const PropertyValue kSmallestMap = PropertyValue(std::map(0), std::numeric_limits::min()}); -InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, LabelId label, +InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, + utils::SkipList::ConstAccessor vertices_accessor, LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction) - : index_accessor_(std::move(index_accessor)), + : pin_accessor_(std::move(vertices_accessor)), + index_accessor_(std::move(index_accessor)), label_(label), property_(property), lower_bound_(lower_bound), @@ -428,9 +433,57 @@ InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices( LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction) { + DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL || + storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL, + "PropertyLabel index trying to access InMemory vertices from OnDisk!"); + auto vertices_acc = static_cast(storage)->vertices_.access(); auto it = index_.find({label, property}); MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint()); - return {it->second.access(), label, property, lower_bound, upper_bound, view, storage, transaction}; + return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage, + transaction}; } +InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices( + LabelId label, PropertyId property, + memgraph::utils::SkipList::ConstAccessor vertices_acc, + const std::optional> &lower_bound, + const std::optional> &upper_bound, View view, Storage *storage, + Transaction *transaction) { + auto it = index_.find({label, property}); + MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint()); + return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage, + transaction}; +} + +void InMemoryLabelPropertyIndex::AbortEntries(PropertyId property, + std::span const> vertices, + uint64_t exact_start_timestamp) { + auto const it = indices_by_property_.find(property); + if (it == indices_by_property_.end()) return; + + auto &indices = it->second; + for (const auto &[_, index] : indices) { + auto index_acc = index->access(); + for (auto const &[value, vertex] : vertices) { + index_acc.remove(Entry{value, vertex, exact_start_timestamp}); + } + } +} + +void InMemoryLabelPropertyIndex::AbortEntries(LabelId label, + std::span const> vertices, + uint64_t exact_start_timestamp) { + for (auto &[label_prop, storage] : index_) { + if (label_prop.first != label) { + continue; + } + + auto index_acc = storage.access(); + for (const auto &[property, vertex] : vertices) { + if (!property.IsNull()) { + index_acc.remove(Entry{property, vertex, exact_start_timestamp}); + } + } + } +} } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/label_property_index.hpp b/src/storage/v2/inmemory/label_property_index.hpp index 7f8c54909..ae96a37f8 100644 --- a/src/storage/v2/inmemory/label_property_index.hpp +++ b/src/storage/v2/inmemory/label_property_index.hpp @@ -11,9 +11,13 @@ #pragma once +#include + #include "storage/v2/constraints/constraints.hpp" +#include "storage/v2/id_types.hpp" #include "storage/v2/indices/label_property_index.hpp" #include "storage/v2/indices/label_property_index_stats.hpp" +#include "storage/v2/property_value.hpp" #include "utils/rw_lock.hpp" #include "utils/synchronized.hpp" @@ -61,10 +65,25 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp); + void AbortEntries(PropertyId property, std::span const> vertices, + uint64_t exact_start_timestamp); + void AbortEntries(LabelId label, std::span const> vertices, + uint64_t exact_start_timestamp); + + IndexStats Analysis() const { + IndexStats res{}; + for (const auto &[lp, _] : index_) { + const auto &[label, property] = lp; + res.l2p[label].emplace_back(property); + res.p2l[property].emplace_back(label); + } + return res; + } + class Iterable { public: - Iterable(utils::SkipList::Accessor index_accessor, LabelId label, PropertyId property, - const std::optional> &lower_bound, + Iterable(utils::SkipList::Accessor index_accessor, utils::SkipList::ConstAccessor vertices_accessor, + LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction); @@ -92,6 +111,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { Iterator end(); private: + utils::SkipList::ConstAccessor pin_accessor_; utils::SkipList::Accessor index_accessor_; LabelId label_; PropertyId property_; @@ -131,6 +151,12 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction); + Iterable Vertices(LabelId label, PropertyId property, + memgraph::utils::SkipList::ConstAccessor vertices_acc, + const std::optional> &lower_bound, + const std::optional> &upper_bound, View view, Storage *storage, + Transaction *transaction); + private: std::map, utils::SkipList> index_; std::unordered_map *>> indices_by_property_; diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index c27018d24..558330996 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -10,21 +10,57 @@ // licenses/APL.txt. #include "storage/v2/inmemory/storage.hpp" +#include +#include #include "dbms/constants.hpp" #include "memory/global_memory_control.hpp" #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" +#include "storage/v2/edge_direction.hpp" +#include "storage/v2/id_types.hpp" #include "storage/v2/metadata_delta.hpp" /// REPLICATION /// #include "dbms/inmemory/replication_handlers.hpp" #include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" +#include "storage/v2/property_value.hpp" #include "utils/resource_lock.hpp" #include "utils/stat.hpp" namespace memgraph::storage { +namespace { + +auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex) + -> Result { + auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) { + // Obtain the locks by `gid` order to avoid lock cycles. + auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock}; + auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock}; + if (from_vertex->gid < to_vertex->gid) { + guard_from.lock(); + guard_to.lock(); + } else if (from_vertex->gid > to_vertex->gid) { + guard_to.lock(); + guard_from.lock(); + } else { + // The vertices are the same vertex, only lock one. + guard_from.lock(); + } + + // With the potentially cheaper side FindEdges + const auto out_n = from_vertex->out_edges.size(); + const auto in_n = to_vertex->in_edges.size(); + return out_n <= in_n; + }; + + return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex) + : to_vertex->InEdges(view, {edge_type}, from_vertex); +} + +}; // namespace + using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) @@ -315,6 +351,24 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } +std::optional InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, const View view, EdgeTypeId edge_type, + VertexAccessor *from_vertex, + VertexAccessor *to_vertex) { + auto res = FindEdges(view, edge_type, from_vertex, to_vertex); + if (res.HasError()) return std::nullopt; // TODO: use a Result type + + auto const it = std::invoke([this, gid, &res]() { + auto const byGid = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.gid == gid; }; + auto const byEdgePtr = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; }; + if (config_.properties_on_edges) return std::ranges::find_if(res->edges, byEdgePtr); + return std::ranges::find_if(res->edges, byGid); + }); + + if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type + + return *it; +} + Result InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid) { MG_ASSERT(from->transaction_ == to->transaction_, @@ -697,7 +751,8 @@ utils::BasicResult InMemoryStorage::InMemoryAcce could_replicate_all_sync_replicas = mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // protected by engine_guard // TODO: release lock, and update all deltas to have a local copy of the commit timestamp - transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // protected by engine_guard + transaction_.commit_timestamp->store(*commit_timestamp_, + std::memory_order_release); // protected by engine_guard // Replica can only update the last commit timestamp with // the commits received from main. if (is_main || desired_commit_timestamp.has_value()) { @@ -823,6 +878,21 @@ void InMemoryStorage::InMemoryAccessor::Abort() { std::list my_deleted_vertices; std::list my_deleted_edges; + std::map> label_cleanup; + std::map>> label_property_cleanup; + std::map>> property_cleanup; + + // CONSTRAINTS + if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) { + // Need to remove elements from constraints before handling of the deltas, so the elements match the correct + // values + auto vertices_to_check = transaction_.constraint_verification_info.GetVerticesForUniqueConstraintChecking(); + auto vertices_to_check_v = std::vector{vertices_to_check.begin(), vertices_to_check.end()}; + storage_->constraints_.AbortEntries(vertices_to_check_v, transaction_.start_timestamp); + } + + const auto index_stats = storage_->indices_.Analysis(); + for (const auto &delta : transaction_.deltas.use()) { auto prev = delta.prev.Get(); switch (prev.type) { @@ -838,6 +908,24 @@ void InMemoryStorage::InMemoryAccessor::Abort() { MG_ASSERT(it != vertex->labels.end(), "Invalid database state!"); std::swap(*it, *vertex->labels.rbegin()); vertex->labels.pop_back(); + + // For label index + // check if there is a label index for the label and add entry if so + // For property label index + // check if we care about the label; this will return all the propertyIds we care about and then get + // the current property value + if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label)) { + label_cleanup[current->label].emplace_back(vertex); + } + const auto &properties = index_stats.property_label.l2p.find(current->label); + if (properties != index_stats.property_label.l2p.end()) { + for (const auto &property : properties->second) { + auto current_value = vertex->properties.GetProperty(property); + if (!current_value.IsNull()) { + label_property_cleanup[current->label].emplace_back(std::move(current_value), vertex); + } + } + } break; } case Delta::Action::ADD_LABEL: { @@ -847,6 +935,18 @@ void InMemoryStorage::InMemoryAccessor::Abort() { break; } case Delta::Action::SET_PROPERTY: { + // For label index nothing + // For property label index + // check if we care about the property, this will return all the labels and then get current property + // value + const auto &labels = index_stats.property_label.p2l.find(current->property.key); + if (labels != index_stats.property_label.p2l.end()) { + auto current_value = vertex->properties.GetProperty(current->property.key); + if (!current_value.IsNull()) { + property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex); + } + } + // Setting the correct value vertex->properties.SetProperty(current->property.key, current->property.value); break; } @@ -963,7 +1063,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() { auto *mem_storage = static_cast(storage_); { - std::unique_lock engine_guard(storage_->engine_lock_); + auto engine_guard = std::unique_lock(storage_->engine_lock_); uint64_t mark_timestamp = storage_->timestamp_; // Take garbage_undo_buffers lock while holding the engine lock to make // sure that entries are sorted by mark timestamp in the list. @@ -975,10 +1075,37 @@ void InMemoryStorage::InMemoryAccessor::Abort() { garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas), std::move(transaction_.commit_timestamp)); }); - mem_storage->deleted_vertices_.WithLock( - [&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); }); - mem_storage->deleted_edges_.WithLock( - [&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); }); + + /// We MUST unlink (aka. remove) entries in indexes and constraints + /// before we unlink (aka. remove) vertices from storage + /// this is because they point into vertices skip_list + + // INDICES + for (auto const &[label, vertices] : label_cleanup) { + storage_->indices_.AbortEntries(label, vertices, transaction_.start_timestamp); + } + for (auto const &[label, prop_vertices] : label_property_cleanup) { + storage_->indices_.AbortEntries(label, prop_vertices, transaction_.start_timestamp); + } + for (auto const &[property, prop_vertices] : property_cleanup) { + storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp); + } + + // VERTICES + { + auto vertices_acc = mem_storage->vertices_.access(); + for (auto gid : my_deleted_vertices) { + vertices_acc.remove(gid); + } + } + + // EDGES + { + auto edges_acc = mem_storage->edges_.access(); + for (auto gid : my_deleted_edges) { + edges_acc.remove(gid); + } + } } mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp); @@ -1271,8 +1398,6 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ // vertices that appear in an index also exist in main storage. std::list current_deleted_edges; std::list current_deleted_vertices; - deleted_vertices_->swap(current_deleted_vertices); - deleted_edges_->swap(current_deleted_edges); auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false); auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false); @@ -1922,12 +2047,12 @@ utils::BasicResult InMemoryStorage::Create void InMemoryStorage::FreeMemory(std::unique_lock main_guard) { CollectGarbage(std::move(main_guard)); + static_cast(indices_.label_index_.get())->RunGC(); + static_cast(indices_.label_property_index_.get())->RunGC(); + // SkipList is already threadsafe vertices_.run_gc(); edges_.run_gc(); - - static_cast(indices_.label_index_.get())->RunGC(); - static_cast(indices_.label_property_index_.get())->RunGC(); } uint64_t InMemoryStorage::CommitTimestamp(const std::optional desired_commit_timestamp) { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 3c50326b2..48d6f0cb7 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -51,6 +51,8 @@ class InMemoryStorage final : public Storage { friend std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker, const InMemoryStorage *storage); + friend class InMemoryLabelIndex; + friend class InMemoryLabelPropertyIndex; public: enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries }; @@ -185,6 +187,9 @@ class InMemoryStorage final : public Storage { /// @throw std::bad_alloc Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override; + std::optional FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex, + VertexAccessor *to_vertex) override; + Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override; Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; diff --git a/src/storage/v2/inmemory/unique_constraints.cpp b/src/storage/v2/inmemory/unique_constraints.cpp index 78929bc38..6a2945883 100644 --- a/src/storage/v2/inmemory/unique_constraints.cpp +++ b/src/storage/v2/inmemory/unique_constraints.cpp @@ -256,11 +256,12 @@ bool InMemoryUniqueConstraints::Entry::operator==(const std::vectorlabels) { - if (!constraints_by_label_.contains(label)) { + const auto &constraint = constraints_by_label_.find(label); + if (constraint == constraints_by_label_.end()) { continue; } - for (auto &[props, storage] : constraints_by_label_.at(label)) { + for (auto &[props, storage] : constraint->second) { auto values = vertex->properties.ExtractPropertyValues(props); if (!values) { @@ -273,6 +274,28 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T } } +void InMemoryUniqueConstraints::AbortEntries(std::span vertices, uint64_t exact_start_timestamp) { + for (const auto &vertex : vertices) { + for (const auto &label : vertex->labels) { + const auto &constraint = constraints_by_label_.find(label); + if (constraint == constraints_by_label_.end()) { + return; + } + + for (auto &[props, storage] : constraint->second) { + auto values = vertex->properties.ExtractPropertyValues(props); + + if (!values) { + continue; + } + + auto acc = storage->access(); + acc.remove(Entry{std::move(*values), vertex, exact_start_timestamp}); + } + } + } +} + utils::BasicResult InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set &properties, utils::SkipList::Accessor vertices) { @@ -364,12 +387,14 @@ std::optional InMemoryUniqueConstraints::Validate(const Ver if (vertex.deleted) { return std::nullopt; } + for (const auto &label : vertex.labels) { - if (!constraints_by_label_.contains(label)) { + const auto &constraint = constraints_by_label_.find(label); + if (constraint == constraints_by_label_.end()) { continue; } - for (const auto &[properties, storage] : constraints_by_label_.at(label)) { + for (const auto &[properties, storage] : constraint->second) { auto value_array = vertex.properties.ExtractPropertyValues(properties); if (!value_array) { diff --git a/src/storage/v2/inmemory/unique_constraints.hpp b/src/storage/v2/inmemory/unique_constraints.hpp index 45472ca74..d1e590357 100644 --- a/src/storage/v2/inmemory/unique_constraints.hpp +++ b/src/storage/v2/inmemory/unique_constraints.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "storage/v2/constraints/unique_constraints.hpp" namespace memgraph::storage { @@ -54,6 +56,8 @@ class InMemoryUniqueConstraints : public UniqueConstraints { void UpdateBeforeCommit(const Vertex *vertex, std::unordered_set &added_labels, std::unordered_set &added_properties, const Transaction &tx); + void AbortEntries(std::span vertices, uint64_t exact_start_timestamp); + /// Creates unique constraint on the given `label` and a list of `properties`. /// Returns constraint violation if there are multiple vertices with the same /// label and property values. Returns `CreationStatus::ALREADY_EXISTS` if diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 4142da3ca..60752d101 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -195,6 +195,9 @@ class Storage { virtual Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0; + virtual std::optional FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex, + VertexAccessor *to_vertex) = 0; + virtual Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) = 0; virtual Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0; diff --git a/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp b/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp index 44479d900..b53dda881 100644 --- a/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp +++ b/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp @@ -16,11 +16,11 @@ #include #include -constexpr char *kProcedureHackerNews = "hacker_news"; -constexpr char *kArgumentHackerNewsVotes = "votes"; -constexpr char *kArgumentHackerNewsItemHourAge = "item_hour_age"; -constexpr char *kArgumentHackerNewsGravity = "gravity"; -constexpr char *kReturnHackerNewsScore = "score"; +constexpr char const *kProcedureHackerNews = "hacker_news"; +constexpr char const *kArgumentHackerNewsVotes = "votes"; +constexpr char const *kArgumentHackerNewsItemHourAge = "item_hour_age"; +constexpr char const *kArgumentHackerNewsGravity = "gravity"; +constexpr char const *kReturnHackerNewsScore = "score"; void HackerNews(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { mgp::MemoryDispatcherGuard guard(memory); diff --git a/tests/e2e/replication/CMakeLists.txt b/tests/e2e/replication/CMakeLists.txt index b054981c8..39f179a3d 100644 --- a/tests/e2e/replication/CMakeLists.txt +++ b/tests/e2e/replication/CMakeLists.txt @@ -13,6 +13,7 @@ copy_e2e_python_files(replication_show common.py) copy_e2e_python_files(replication_show conftest.py) copy_e2e_python_files(replication_show show.py) copy_e2e_python_files(replication_show show_while_creating_invalid_state.py) +copy_e2e_python_files(replication_show edge_delete.py) copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py) copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py) copy_e2e_python_files_from_parent_folder(replication_show ".." mg_utils.py) diff --git a/tests/e2e/replication/edge_delete.py b/tests/e2e/replication/edge_delete.py new file mode 100755 index 000000000..0e25faee1 --- /dev/null +++ b/tests/e2e/replication/edge_delete.py @@ -0,0 +1,56 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys +import time + +import pytest +from common import execute_and_fetch_all +from mg_utils import mg_sleep_and_assert + + +# BUGFIX: for issue https://github.com/memgraph/memgraph/issues/1515 +def test_replication_handles_delete_when_multiple_edges_of_same_type(connection): + # Goal is to check the timestamp are correctly computed from the information we get from replicas. + # 0/ Check original state of replicas. + # 1/ Add nodes and edges to MAIN, then delete the edges. + # 2/ Check state of replicas. + + # 0/ + conn = connection(7687, "main") + conn.autocommit = True + cursor = conn.cursor() + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "async", 0, 0, "ready"), + } + assert actual_data == expected_data + + # 1/ + execute_and_fetch_all(cursor, "CREATE (a)-[r:X]->(b) CREATE (a)-[:X]->(b) DELETE r;") + + # 2/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "async", 2, 0, "ready"), + } + + def retrieve_data(): + return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/replication/workloads.yaml b/tests/e2e/replication/workloads.yaml index 72269d652..fc239b221 100644 --- a/tests/e2e/replication/workloads.yaml +++ b/tests/e2e/replication/workloads.yaml @@ -8,6 +8,23 @@ template_validation_queries: &template_validation_queries validation_queries: - <<: *template_test_nodes_query - <<: *template_test_edges_query +template_simple_cluster: &template_simple_cluster + cluster: + replica_1: + args: [ "--bolt-port", "7688", "--log-level=TRACE" ] + log_file: "replication-e2e-replica1.log" + setup_queries: [ "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;" ] + replica_2: + args: ["--bolt-port", "7689", "--log-level=TRACE"] + log_file: "replication-e2e-replica2.log" + setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"] + main: + args: ["--bolt-port", "7687", "--log-level=TRACE"] + log_file: "replication-e2e-main.log" + setup_queries: [ + "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:10002'", + ] template_cluster: &template_cluster cluster: replica_1: @@ -83,3 +100,8 @@ workloads: - name: "Show while creating invalid state" binary: "tests/e2e/pytest_runner.sh" args: ["replication/show_while_creating_invalid_state.py"] + + - name: "Delete edge replication" + binary: "tests/e2e/pytest_runner.sh" + args: ["replication/edge_delete.py"] + <<: *template_simple_cluster From 3ccd78ac71d423fd15ca55998a766c8d08cdaa46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ante=20Pu=C5=A1i=C4=87?= Date: Sat, 2 Dec 2023 20:03:40 +0100 Subject: [PATCH 2/2] Add path and weight to variable expand filter (#1434) Co-authored-by: Aidar Samerkhanov --- src/query/frontend/ast/ast.hpp | 6 + .../frontend/ast/cypher_main_visitor.cpp | 30 ++++ .../frontend/opencypher/grammar/Cypher.g4 | 2 +- .../frontend/semantic/symbol_generator.cpp | 23 ++- src/query/plan/operator.cpp | 167 ++++++++++++------ src/query/plan/operator.hpp | 6 + src/query/plan/preprocess.cpp | 14 ++ src/query/plan/rewrite/index_lookup.hpp | 5 + src/query/plan/rule_based_planner.hpp | 53 +++++- src/query/plan/variable_start_planner.cpp | 3 +- .../tests/memgraph_V1/features/match.feature | 72 ++++++++ .../features/memgraph_allshortest.feature | 100 +++++++++++ .../memgraph_V1/features/memgraph_bfs.feature | 92 ++++++++++ .../features/memgraph_wshortest.feature | 100 +++++++++++ .../memgraph_V1/graphs/graph_edges.cypher | 2 + .../memgraph_V1/graphs/graph_index.cypher | 2 + tests/unit/cypher_main_visitor.cpp | 86 +++++++++ 17 files changed, 692 insertions(+), 71 deletions(-) create mode 100644 tests/gql_behave/tests/memgraph_V1/graphs/graph_edges.cypher create mode 100644 tests/gql_behave/tests/memgraph_V1/graphs/graph_index.cypher diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index d63736c85..a36f1a8b5 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -1818,6 +1818,10 @@ class EdgeAtom : public memgraph::query::PatternAtom { memgraph::query::Identifier *inner_edge{nullptr}; /// Argument identifier for the destination node of the edge. memgraph::query::Identifier *inner_node{nullptr}; + /// Argument identifier for the currently-accumulated path. + memgraph::query::Identifier *accumulated_path{nullptr}; + /// Argument identifier for the weight of the currently-accumulated path. + memgraph::query::Identifier *accumulated_weight{nullptr}; /// Evaluates the result of the lambda. memgraph::query::Expression *expression{nullptr}; @@ -1825,6 +1829,8 @@ class EdgeAtom : public memgraph::query::PatternAtom { Lambda object; object.inner_edge = inner_edge ? inner_edge->Clone(storage) : nullptr; object.inner_node = inner_node ? inner_node->Clone(storage) : nullptr; + object.accumulated_path = accumulated_path ? accumulated_path->Clone(storage) : nullptr; + object.accumulated_weight = accumulated_weight ? accumulated_weight->Clone(storage) : nullptr; object.expression = expression ? expression->Clone(storage) : nullptr; return object; } diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 4bf7f36fd..cf9709a31 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -1978,6 +1978,15 @@ antlrcpp::Any CypherMainVisitor::visitRelationshipPattern(MemgraphCypher::Relati edge_lambda.inner_edge = storage_->Create(traversed_edge_variable); auto traversed_node_variable = std::any_cast(lambda->traversed_node->accept(this)); edge_lambda.inner_node = storage_->Create(traversed_node_variable); + if (lambda->accumulated_path) { + auto accumulated_path_variable = std::any_cast(lambda->accumulated_path->accept(this)); + edge_lambda.accumulated_path = storage_->Create(accumulated_path_variable); + + if (lambda->accumulated_weight) { + auto accumulated_weight_variable = std::any_cast(lambda->accumulated_weight->accept(this)); + edge_lambda.accumulated_weight = storage_->Create(accumulated_weight_variable); + } + } edge_lambda.expression = std::any_cast(lambda->expression()->accept(this)); return edge_lambda; }; @@ -2002,6 +2011,15 @@ antlrcpp::Any CypherMainVisitor::visitRelationshipPattern(MemgraphCypher::Relati // In variable expansion inner variables are mandatory. anonymous_identifiers.push_back(&edge->filter_lambda_.inner_edge); anonymous_identifiers.push_back(&edge->filter_lambda_.inner_node); + + // TODO: In what use case do we need accumulated path and weight here? + if (edge->filter_lambda_.accumulated_path) { + anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_path); + + if (edge->filter_lambda_.accumulated_weight) { + anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_weight); + } + } break; case 1: if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH || @@ -2013,9 +2031,21 @@ antlrcpp::Any CypherMainVisitor::visitRelationshipPattern(MemgraphCypher::Relati // Add mandatory inner variables for filter lambda. anonymous_identifiers.push_back(&edge->filter_lambda_.inner_edge); anonymous_identifiers.push_back(&edge->filter_lambda_.inner_node); + if (edge->filter_lambda_.accumulated_path) { + anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_path); + + if (edge->filter_lambda_.accumulated_weight) { + anonymous_identifiers.push_back(&edge->filter_lambda_.accumulated_weight); + } + } } else { // Other variable expands only have the filter lambda. edge->filter_lambda_ = visit_lambda(relationshipLambdas[0]); + if (edge->filter_lambda_.accumulated_weight) { + throw SemanticException( + "Accumulated weight in filter lambda can be used only with " + "shortest paths expansion."); + } } break; case 2: diff --git a/src/query/frontend/opencypher/grammar/Cypher.g4 b/src/query/frontend/opencypher/grammar/Cypher.g4 index 53f1fc765..783613695 100644 --- a/src/query/frontend/opencypher/grammar/Cypher.g4 +++ b/src/query/frontend/opencypher/grammar/Cypher.g4 @@ -175,7 +175,7 @@ relationshipDetail : '[' ( name=variable )? ( relationshipTypes )? ( variableExp | '[' ( name=variable )? ( relationshipTypes )? ( variableExpansion )? relationshipLambda ( total_weight=variable )? (relationshipLambda )? ']' | '[' ( name=variable )? ( relationshipTypes )? ( variableExpansion )? (properties )* ( relationshipLambda total_weight=variable )? (relationshipLambda )? ']'; -relationshipLambda: '(' traversed_edge=variable ',' traversed_node=variable '|' expression ')'; +relationshipLambda: '(' traversed_edge=variable ',' traversed_node=variable ( ',' accumulated_path=variable )? ( ',' accumulated_weight=variable )? '|' expression ')'; variableExpansion : '*' (BFS | WSHORTEST | ALLSHORTEST)? ( expression )? ( '..' ( expression )? )? ; diff --git a/src/query/frontend/semantic/symbol_generator.cpp b/src/query/frontend/semantic/symbol_generator.cpp index 30790ee4e..a3e855301 100644 --- a/src/query/frontend/semantic/symbol_generator.cpp +++ b/src/query/frontend/semantic/symbol_generator.cpp @@ -658,8 +658,16 @@ bool SymbolGenerator::PreVisit(EdgeAtom &edge_atom) { scope.in_edge_range = false; scope.in_pattern = false; if (edge_atom.filter_lambda_.expression) { - VisitWithIdentifiers(edge_atom.filter_lambda_.expression, - {edge_atom.filter_lambda_.inner_edge, edge_atom.filter_lambda_.inner_node}); + std::vector filter_lambda_identifiers{edge_atom.filter_lambda_.inner_edge, + edge_atom.filter_lambda_.inner_node}; + if (edge_atom.filter_lambda_.accumulated_path) { + filter_lambda_identifiers.emplace_back(edge_atom.filter_lambda_.accumulated_path); + + if (edge_atom.filter_lambda_.accumulated_weight) { + filter_lambda_identifiers.emplace_back(edge_atom.filter_lambda_.accumulated_weight); + } + } + VisitWithIdentifiers(edge_atom.filter_lambda_.expression, filter_lambda_identifiers); } else { // Create inner symbols, but don't bind them in scope, since they are to // be used in the missing filter expression. @@ -668,6 +676,17 @@ bool SymbolGenerator::PreVisit(EdgeAtom &edge_atom) { auto *inner_node = edge_atom.filter_lambda_.inner_node; inner_node->MapTo( symbol_table_->CreateSymbol(inner_node->name_, inner_node->user_declared_, Symbol::Type::VERTEX)); + if (edge_atom.filter_lambda_.accumulated_path) { + auto *accumulated_path = edge_atom.filter_lambda_.accumulated_path; + accumulated_path->MapTo( + symbol_table_->CreateSymbol(accumulated_path->name_, accumulated_path->user_declared_, Symbol::Type::PATH)); + + if (edge_atom.filter_lambda_.accumulated_weight) { + auto *accumulated_weight = edge_atom.filter_lambda_.accumulated_weight; + accumulated_weight->MapTo(symbol_table_->CreateSymbol( + accumulated_weight->name_, accumulated_weight->user_declared_, Symbol::Type::NUMBER)); + } + } } if (edge_atom.weight_lambda_.expression) { VisitWithIdentifiers(edge_atom.weight_lambda_.expression, diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 63bf5cd40..24ce66a69 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -1138,6 +1138,11 @@ class ExpandVariableCursor : public Cursor { edges_it_.emplace_back(edges_.back().begin()); } + if (self_.filter_lambda_.accumulated_path_symbol) { + // Add initial vertex of path to the accumulated path + frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex); + } + // reset the frame value to an empty edge list auto *pull_memory = context.evaluation_context.memory; frame[self_.common_.edge_symbol] = TypedValue::TVector(pull_memory); @@ -1234,6 +1239,13 @@ class ExpandVariableCursor : public Cursor { // Skip expanding out of filtered expansion. frame[self_.filter_lambda_.inner_edge_symbol] = current_edge.first; frame[self_.filter_lambda_.inner_node_symbol] = current_vertex; + if (self_.filter_lambda_.accumulated_path_symbol) { + MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(), + "Accumulated path must be path"); + Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath(); + accumulated_path.Expand(current_edge.first); + accumulated_path.Expand(current_vertex); + } if (self_.filter_lambda_.expression && !EvaluateFilter(evaluator, self_.filter_lambda_.expression)) continue; // we are doing depth-first search, so place the current @@ -1546,6 +1558,13 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { #endif frame[self_.filter_lambda_.inner_edge_symbol] = edge; frame[self_.filter_lambda_.inner_node_symbol] = vertex; + if (self_.filter_lambda_.accumulated_path_symbol) { + MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(), + "Accumulated path must have Path type"); + Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath(); + accumulated_path.Expand(edge); + accumulated_path.Expand(vertex); + } if (self_.filter_lambda_.expression) { TypedValue result = self_.filter_lambda_.expression->Accept(evaluator); @@ -1607,6 +1626,11 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { const auto &vertex = vertex_value.ValueVertex(); processed_.emplace(vertex, std::nullopt); + if (self_.filter_lambda_.accumulated_path_symbol) { + // Add initial vertex of path to the accumulated path + frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex); + } + expand_from_vertex(vertex); // go back to loop start and see if we expanded anything @@ -1677,6 +1701,10 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { namespace { void CheckWeightType(TypedValue current_weight, utils::MemoryResource *memory) { + if (current_weight.IsNull()) { + return; + } + if (!current_weight.IsNumeric() && !current_weight.IsDuration()) { throw QueryRuntimeException("Calculated weight must be numeric or a Duration, got {}.", current_weight.type()); } @@ -1694,6 +1722,34 @@ void CheckWeightType(TypedValue current_weight, utils::MemoryResource *memory) { } } +void ValidateWeightTypes(const TypedValue &lhs, const TypedValue &rhs) { + if ((lhs.IsNumeric() && rhs.IsNumeric()) || (lhs.IsDuration() && rhs.IsDuration())) { + return; + } + throw QueryRuntimeException(utils::MessageWithLink( + "All weights should be of the same type, either numeric or a Duration. Please update the weight " + "expression or the filter expression.", + "https://memgr.ph/wsp")); +} + +TypedValue CalculateNextWeight(const std::optional &weight_lambda, + const TypedValue &total_weight, ExpressionEvaluator evaluator) { + if (!weight_lambda) { + return {}; + } + auto *memory = evaluator.GetMemoryResource(); + TypedValue current_weight = weight_lambda->expression->Accept(evaluator); + CheckWeightType(current_weight, memory); + + if (total_weight.IsNull()) { + return current_weight; + } + + ValidateWeightTypes(current_weight, total_weight); + + return TypedValue(current_weight, memory) + total_weight; +} + } // namespace class ExpandWeightedShortestPathCursor : public query::plan::Cursor { @@ -1722,7 +1778,6 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { auto expand_pair = [this, &evaluator, &frame, &create_state, &context]( const EdgeAccessor &edge, const VertexAccessor &vertex, const TypedValue &total_weight, int64_t depth) { - auto *memory = evaluator.GetMemoryResource(); #ifdef MG_ENTERPRISE if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker && !(context.auth_checker->Has(vertex, storage::View::OLD, @@ -1731,32 +1786,31 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { return; } #endif + + frame[self_.weight_lambda_->inner_edge_symbol] = edge; + frame[self_.weight_lambda_->inner_node_symbol] = vertex; + TypedValue next_weight = CalculateNextWeight(self_.weight_lambda_, total_weight, evaluator); + if (self_.filter_lambda_.expression) { frame[self_.filter_lambda_.inner_edge_symbol] = edge; frame[self_.filter_lambda_.inner_node_symbol] = vertex; + if (self_.filter_lambda_.accumulated_path_symbol) { + MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(), + "Accumulated path must be path"); + Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath(); + accumulated_path.Expand(edge); + accumulated_path.Expand(vertex); + + if (self_.filter_lambda_.accumulated_weight_symbol) { + frame[self_.filter_lambda_.accumulated_weight_symbol.value()] = next_weight; + } + } if (!EvaluateFilter(evaluator, self_.filter_lambda_.expression)) return; } - frame[self_.weight_lambda_->inner_edge_symbol] = edge; - frame[self_.weight_lambda_->inner_node_symbol] = vertex; - - TypedValue current_weight = self_.weight_lambda_->expression->Accept(evaluator); - - CheckWeightType(current_weight, memory); - auto next_state = create_state(vertex, depth); - TypedValue next_weight = std::invoke([&] { - if (total_weight.IsNull()) { - return current_weight; - } - - ValidateWeightTypes(current_weight, total_weight); - - return TypedValue(current_weight, memory) + total_weight; - }); - auto found_it = total_cost_.find(next_state); if (found_it != total_cost_.end() && (found_it->second.IsNull() || (found_it->second <= next_weight).ValueBool())) return; @@ -1796,6 +1850,10 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // Skip expansion for such nodes. if (node.IsNull()) continue; } + if (self_.filter_lambda_.accumulated_path_symbol) { + // Add initial vertex of path to the accumulated path + frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(vertex); + } if (self_.upper_bound_) { upper_bound_ = EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in weighted shortest path expansion"); upper_bound_set_ = true; @@ -1808,12 +1866,17 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { "Maximum depth in weighted shortest path expansion must be at " "least 1."); + frame[self_.weight_lambda_->inner_edge_symbol] = TypedValue(); + frame[self_.weight_lambda_->inner_node_symbol] = vertex; + TypedValue current_weight = + CalculateNextWeight(self_.weight_lambda_, /* total_weight */ TypedValue(), evaluator); + // Clear existing data structures. previous_.clear(); total_cost_.clear(); yielded_vertices_.clear(); - pq_.emplace(TypedValue(), 0, vertex, std::nullopt); + pq_.emplace(current_weight, 0, vertex, std::nullopt); // We are adding the starting vertex to the set of yielded vertices // because we don't want to yield paths that end with the starting // vertex. @@ -1913,15 +1976,6 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // Keeps track of vertices for which we yielded a path already. utils::pmr::unordered_set yielded_vertices_; - static void ValidateWeightTypes(const TypedValue &lhs, const TypedValue &rhs) { - if (!((lhs.IsNumeric() && lhs.IsNumeric()) || (rhs.IsDuration() && rhs.IsDuration()))) { - throw QueryRuntimeException(utils::MessageWithLink( - "All weights should be of the same type, either numeric or a Duration. Please update the weight " - "expression or the filter expression.", - "https://memgr.ph/wsp")); - } - } - // Priority queue comparator. Keep lowest weight on top of the queue. class PriorityQueueComparator { public: @@ -1979,36 +2033,32 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor { // queue. auto expand_vertex = [this, &evaluator, &frame](const EdgeAccessor &edge, const EdgeAtom::Direction direction, const TypedValue &total_weight, int64_t depth) { - auto *memory = evaluator.GetMemoryResource(); - auto const &next_vertex = direction == EdgeAtom::Direction::IN ? edge.From() : edge.To(); + // Evaluate current weight + frame[self_.weight_lambda_->inner_edge_symbol] = edge; + frame[self_.weight_lambda_->inner_node_symbol] = next_vertex; + TypedValue next_weight = CalculateNextWeight(self_.weight_lambda_, total_weight, evaluator); + // If filter expression exists, evaluate filter if (self_.filter_lambda_.expression) { frame[self_.filter_lambda_.inner_edge_symbol] = edge; frame[self_.filter_lambda_.inner_node_symbol] = next_vertex; + if (self_.filter_lambda_.accumulated_path_symbol) { + MG_ASSERT(frame[self_.filter_lambda_.accumulated_path_symbol.value()].IsPath(), + "Accumulated path must be path"); + Path &accumulated_path = frame[self_.filter_lambda_.accumulated_path_symbol.value()].ValuePath(); + accumulated_path.Expand(edge); + accumulated_path.Expand(next_vertex); + + if (self_.filter_lambda_.accumulated_weight_symbol) { + frame[self_.filter_lambda_.accumulated_weight_symbol.value()] = next_weight; + } + } if (!EvaluateFilter(evaluator, self_.filter_lambda_.expression)) return; } - // Evaluate current weight - frame[self_.weight_lambda_->inner_edge_symbol] = edge; - frame[self_.weight_lambda_->inner_node_symbol] = next_vertex; - - TypedValue current_weight = self_.weight_lambda_->expression->Accept(evaluator); - - CheckWeightType(current_weight, memory); - - TypedValue next_weight = std::invoke([&] { - if (total_weight.IsNull()) { - return current_weight; - } - - ValidateWeightTypes(current_weight, total_weight); - - return TypedValue(current_weight, memory) + total_weight; - }); - auto found_it = visited_cost_.find(next_vertex); // Check if the vertex has already been processed. if (found_it != visited_cost_.end()) { @@ -2200,7 +2250,17 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor { traversal_stack_.clear(); total_cost_.clear(); - expand_from_vertex(*start_vertex, TypedValue(), 0); + if (self_.filter_lambda_.accumulated_path_symbol) { + // Add initial vertex of path to the accumulated path + frame[self_.filter_lambda_.accumulated_path_symbol.value()] = Path(*start_vertex); + } + + frame[self_.weight_lambda_->inner_edge_symbol] = TypedValue(); + frame[self_.weight_lambda_->inner_node_symbol] = *start_vertex; + TypedValue current_weight = + CalculateNextWeight(self_.weight_lambda_, /* total_weight */ TypedValue(), evaluator); + + expand_from_vertex(*start_vertex, current_weight, 0); visited_cost_.emplace(*start_vertex, 0); frame[self_.common_.edge_symbol] = TypedValue::TVector(memory); } @@ -2252,15 +2312,6 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor { // Stack indicating the traversal level. utils::pmr::list> traversal_stack_; - static void ValidateWeightTypes(const TypedValue &lhs, const TypedValue &rhs) { - if (!((lhs.IsNumeric() && lhs.IsNumeric()) || (rhs.IsDuration() && rhs.IsDuration()))) { - throw QueryRuntimeException(utils::MessageWithLink( - "All weights should be of the same type, either numeric or a Duration. Please update the weight " - "expression or the filter expression.", - "https://memgr.ph/wsp")); - } - } - // Priority queue comparator. Keep lowest weight on top of the queue. class PriorityQueueComparator { public: diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index 03df07378..8fa3d3a7c 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -917,12 +917,18 @@ struct ExpansionLambda { Symbol inner_node_symbol; /// Expression used in lambda during expansion. Expression *expression; + /// Currently expanded accumulated path symbol. + std::optional accumulated_path_symbol; + /// Currently expanded accumulated weight symbol. + std::optional accumulated_weight_symbol; ExpansionLambda Clone(AstStorage *storage) const { ExpansionLambda object; object.inner_edge_symbol = inner_edge_symbol; object.inner_node_symbol = inner_node_symbol; object.expression = expression ? expression->Clone(storage) : nullptr; + object.accumulated_path_symbol = accumulated_path_symbol; + object.accumulated_weight_symbol = accumulated_weight_symbol; return object; } }; diff --git a/src/query/plan/preprocess.cpp b/src/query/plan/preprocess.cpp index e03c51841..22899cbc0 100644 --- a/src/query/plan/preprocess.cpp +++ b/src/query/plan/preprocess.cpp @@ -74,6 +74,13 @@ std::vector NormalizePatterns(const SymbolTable &symbol_table, const // Remove symbols which are bound by lambda arguments. collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.inner_edge)); collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.inner_node)); + if (edge->filter_lambda_.accumulated_path) { + collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.accumulated_path)); + + if (edge->filter_lambda_.accumulated_weight) { + collector.symbols_.erase(symbol_table.at(*edge->filter_lambda_.accumulated_weight)); + } + } if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH || edge->type_ == EdgeAtom::Type::ALL_SHORTEST_PATHS) { collector.symbols_.erase(symbol_table.at(*edge->weight_lambda_.inner_edge)); @@ -295,6 +302,13 @@ void Filters::CollectPatternFilters(Pattern &pattern, SymbolTable &symbol_table, prop_pair.second->Accept(collector); collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.inner_node)); collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.inner_edge)); + if (atom->filter_lambda_.accumulated_path) { + collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.accumulated_path)); + + if (atom->filter_lambda_.accumulated_weight) { + collector.symbols_.emplace(symbol_table.at(*atom->filter_lambda_.accumulated_weight)); + } + } // First handle the inline property filter. auto *property_lookup = storage.Create(atom->filter_lambda_.inner_edge, prop_pair.first); auto *prop_equal = storage.Create(property_lookup, prop_pair.second); diff --git a/src/query/plan/rewrite/index_lookup.hpp b/src/query/plan/rewrite/index_lookup.hpp index 4054f8c12..7bb88f659 100644 --- a/src/query/plan/rewrite/index_lookup.hpp +++ b/src/query/plan/rewrite/index_lookup.hpp @@ -171,6 +171,11 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor { if (expand.common_.existing_node) { return true; } + if (expand.type_ == EdgeAtom::Type::BREADTH_FIRST && expand.filter_lambda_.accumulated_path_symbol) { + // When accumulated path is used, we cannot use ST shortest path algorithm. + return false; + } + std::unique_ptr indexed_scan; ScanAll dst_scan(expand.input(), expand.common_.node_symbol, storage::View::OLD); // With expand to existing we only get real gains with BFS, because we use a diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp index bdac76a93..b58d0170b 100644 --- a/src/query/plan/rule_based_planner.hpp +++ b/src/query/plan/rule_based_planner.hpp @@ -705,9 +705,9 @@ class RuleBasedPlanner { std::optional total_weight; if (edge->type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH || edge->type_ == EdgeAtom::Type::ALL_SHORTEST_PATHS) { - weight_lambda.emplace(ExpansionLambda{symbol_table.at(*edge->weight_lambda_.inner_edge), - symbol_table.at(*edge->weight_lambda_.inner_node), - edge->weight_lambda_.expression}); + weight_lambda.emplace(ExpansionLambda{.inner_edge_symbol = symbol_table.at(*edge->weight_lambda_.inner_edge), + .inner_node_symbol = symbol_table.at(*edge->weight_lambda_.inner_node), + .expression = edge->weight_lambda_.expression}); total_weight.emplace(symbol_table.at(*edge->total_weight_)); } @@ -715,12 +715,28 @@ class RuleBasedPlanner { ExpansionLambda filter_lambda; filter_lambda.inner_edge_symbol = symbol_table.at(*edge->filter_lambda_.inner_edge); filter_lambda.inner_node_symbol = symbol_table.at(*edge->filter_lambda_.inner_node); + if (edge->filter_lambda_.accumulated_path) { + filter_lambda.accumulated_path_symbol = symbol_table.at(*edge->filter_lambda_.accumulated_path); + + if (edge->filter_lambda_.accumulated_weight) { + filter_lambda.accumulated_weight_symbol = symbol_table.at(*edge->filter_lambda_.accumulated_weight); + } + } { // Bind the inner edge and node symbols so they're available for // inline filtering in ExpandVariable. bool inner_edge_bound = bound_symbols.insert(filter_lambda.inner_edge_symbol).second; bool inner_node_bound = bound_symbols.insert(filter_lambda.inner_node_symbol).second; MG_ASSERT(inner_edge_bound && inner_node_bound, "An inner edge and node can't be bound from before"); + if (filter_lambda.accumulated_path_symbol) { + bool accumulated_path_bound = bound_symbols.insert(*filter_lambda.accumulated_path_symbol).second; + MG_ASSERT(accumulated_path_bound, "The accumulated path can't be bound from before"); + + if (filter_lambda.accumulated_weight_symbol) { + bool accumulated_weight_bound = bound_symbols.insert(*filter_lambda.accumulated_weight_symbol).second; + MG_ASSERT(accumulated_weight_bound, "The accumulated weight can't be bound from before"); + } + } } // Join regular filters with lambda filter expression, so that they // are done inline together. Semantic analysis should guarantee that @@ -731,15 +747,34 @@ class RuleBasedPlanner { // filtering (they use the inner symbols. If they were not collected, // we have to remove them manually because no other filter-extraction // will ever bind them again. - filters.erase( - std::remove_if(filters.begin(), filters.end(), - [e = filter_lambda.inner_edge_symbol, n = filter_lambda.inner_node_symbol](FilterInfo &fi) { - return utils::Contains(fi.used_symbols, e) || utils::Contains(fi.used_symbols, n); - }), - filters.end()); + std::vector inner_symbols = {filter_lambda.inner_edge_symbol, filter_lambda.inner_node_symbol}; + if (filter_lambda.accumulated_path_symbol) { + inner_symbols.emplace_back(*filter_lambda.accumulated_path_symbol); + + if (filter_lambda.accumulated_weight_symbol) { + inner_symbols.emplace_back(*filter_lambda.accumulated_weight_symbol); + } + } + + filters.erase(std::remove_if(filters.begin(), filters.end(), + [&inner_symbols](FilterInfo &fi) { + for (const auto &symbol : inner_symbols) { + if (utils::Contains(fi.used_symbols, symbol)) return true; + } + return false; + }), + filters.end()); + // Unbind the temporarily bound inner symbols for filtering. bound_symbols.erase(filter_lambda.inner_edge_symbol); bound_symbols.erase(filter_lambda.inner_node_symbol); + if (filter_lambda.accumulated_path_symbol) { + bound_symbols.erase(*filter_lambda.accumulated_path_symbol); + + if (filter_lambda.accumulated_weight_symbol) { + bound_symbols.erase(*filter_lambda.accumulated_weight_symbol); + } + } if (total_weight) { bound_symbols.insert(*total_weight); diff --git a/src/query/plan/variable_start_planner.cpp b/src/query/plan/variable_start_planner.cpp index 1c230628a..4aa3580d0 100644 --- a/src/query/plan/variable_start_planner.cpp +++ b/src/query/plan/variable_start_planner.cpp @@ -72,8 +72,9 @@ void AddNextExpansions(const Symbol &node_symbol, const Matching &matching, cons // We are not expanding from node1, so flip the expansion. DMG_ASSERT(expansion.node2 && symbol_table.at(*expansion.node2->identifier_) == node_symbol, "Expected node_symbol to be bound in node2"); - if (expansion.edge->type_ != EdgeAtom::Type::BREADTH_FIRST) { + if (expansion.edge->type_ != EdgeAtom::Type::BREADTH_FIRST && !expansion.edge->filter_lambda_.accumulated_path) { // BFS must *not* be flipped. Doing that changes the BFS results. + // When filter lambda uses accumulated path, path must not be flipped. std::swap(expansion.node1, expansion.node2); expansion.is_flipped = true; if (expansion.direction != EdgeAtom::Direction::BOTH) { diff --git a/tests/gql_behave/tests/memgraph_V1/features/match.feature b/tests/gql_behave/tests/memgraph_V1/features/match.feature index cf41c20f2..227ad9ad6 100644 --- a/tests/gql_behave/tests/memgraph_V1/features/match.feature +++ b/tests/gql_behave/tests/memgraph_V1/features/match.feature @@ -699,3 +699,75 @@ Feature: Match Then the result should be | date(n.time) | | 2021-10-05 | + + Scenario: Variable expand with filter by size of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4}); + """ + When executing query: + """ + MATCH path = (:Person {id: 1})-[* (e, n, p | size(p) < 4)]->(:Person {id: 4}) RETURN path + """ + Then the result should be + | path | + | <(:Person{id:1})-[:KNOWS]->(:Person{id:2})-[:KNOWS]->(:Person{id:3})-[:KNOWS]->(:Person{id:4})> | + + Scenario: Variable expand with filter by last edge type of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4}); + """ + When executing query: + """ + MATCH path = (:Person {id: 1})-[* (e, n, p | type(relationships(p)[-1]) = 'KNOWS')]->(:Person {id: 4}) RETURN path + """ + Then the result should be + | path | + | <(:Person{id:1})-[:KNOWS]->(:Person{id:2})-[:KNOWS]->(:Person{id:3})-[:KNOWS]->(:Person{id:4})> | + + Scenario: Variable expand with too restricted filter by size of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4}); + """ + When executing query: + """ + MATCH path = (:Person {id: 1})-[* (e, n, p | size(p) < 3)]->(:Person {id: 4}) RETURN path + """ + Then the result should be empty + + Scenario: Variable expand with too restricted filter by last edge type of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:Person {id: 1})-[:KNOWS]->(:Person {id: 2})-[:KNOWS]->(:Person {id: 3})-[:KNOWS]->(:Person {id: 4}); + """ + When executing query: + """ + MATCH path = (:Person {id: 1})-[* (e, n, p | type(relationships(p)[-1]) = 'Invalid')]->(:Person {id: 4}) RETURN path + """ + Then the result should be empty + + Scenario: Test DFS variable expand with filter by edge type1 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[* (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path; + """ + Then the result should be: + | path | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | + + Scenario: Test DFS variable expand with filter by edge type2 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[* (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path; + """ + Then the result should be: + | path | + | <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | diff --git a/tests/gql_behave/tests/memgraph_V1/features/memgraph_allshortest.feature b/tests/gql_behave/tests/memgraph_V1/features/memgraph_allshortest.feature index 73fb9e75b..29dc0a5ef 100644 --- a/tests/gql_behave/tests/memgraph_V1/features/memgraph_allshortest.feature +++ b/tests/gql_behave/tests/memgraph_V1/features/memgraph_allshortest.feature @@ -203,3 +203,103 @@ Feature: All Shortest Path Then the result should be: | total_cost | | 20.3 | + + Scenario: Test match AllShortest with accumulated path filtered by order of ids + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4}); + """ + When executing query: + """ + MATCH pth=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e,n,p | e.id > 0 and (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label4) RETURN pth, total_weight; + """ + Then the result should be: + | pth | total_weight | + | <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})-[:type1{id:3}]->(:label4{id:4})> | 6 | + + Scenario: Test match AllShortest with accumulated path filtered by edge type1 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 | + + Scenario: Test match AllShortest with accumulated path filtered by edge type2 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | 3 | + + Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on edge + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 | + + Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on edge too restricted + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w < 10)]->(:label3) RETURN path, total_weight; + """ + Then the result should be empty + + Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex is int + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | n.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 4 | + + Scenario: Test match allShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are ints + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 14 | + + Scenario: Test match AllShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are doubles + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})-[:type1 {id: 3.4}]->(:label4 {id: 4}); + """ + When executing query: + """ + MATCH path=(:label1)-[*ALLSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type1 {id: 1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})> | 9.6 | + + Scenario: Test match AllShortest with accumulated path filtered by order of ids and accumulated weight based on both vertex and edge is duration + Given an empty graph + And having executed: + """ + CREATE (:station {name: "A", arrival: localTime("08:00"), departure: localTime("08:15")})-[:ride {id: 1, duration: duration("PT1H5M")}]->(:station {name: "B", arrival: localtime("09:20"), departure: localTime("09:30")})-[:ride {id: 2, duration: duration("PT30M")}]->(:station {name: "C", arrival: localTime("10:00"), departure: localTime("10:20")}); + """ + When executing query: + """ + MATCH path=(:station {name:"A"})-[*ALLSHORTEST (r, v | v.departure - v.arrival + coalesce(r.duration, duration("PT0M"))) total_weight (r,n,p,w | (nodes(p)[-1]).name > (nodes(p)[-2]).name AND not(w is null))]->(:station {name:"C"}) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:station {arrival: 08:00:00.000000000, departure: 08:15:00.000000000, name: 'A'})-[:ride {duration: PT1H5M, id: 1}]->(:station {arrival: 09:20:00.000000000, departure: 09:30:00.000000000, name: 'B'})-[:ride {duration: PT30M, id: 2}]->(:station {arrival: 10:00:00.000000000, departure: 10:20:00.000000000, name: 'C'})> | PT2H20M | diff --git a/tests/gql_behave/tests/memgraph_V1/features/memgraph_bfs.feature b/tests/gql_behave/tests/memgraph_V1/features/memgraph_bfs.feature index d47566012..2736a6d71 100644 --- a/tests/gql_behave/tests/memgraph_V1/features/memgraph_bfs.feature +++ b/tests/gql_behave/tests/memgraph_V1/features/memgraph_bfs.feature @@ -121,3 +121,95 @@ Feature: Bfs Then the result should be: | p | | <(:Node {id: 2})-[:LINK {date: '2023-03'}]->(:Node {id: 3})> | + + Scenario: Test BFS variable expand with filter by last edge type of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3}); + """ + When executing query: + """ + MATCH pth=(:label1)-[*BFS (e,n,p | type(relationships(p)[-1]) = 'type1')]->(:label3) return pth; + """ + Then the result should be: + | pth | + | <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})> | + + Scenario: Test BFS variable expand with restict filter by last edge type of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3}); + """ + When executing query: + """ + MATCH pth=(:label1)-[*BFS (e,n,p | type(relationships(p)[-1]) = 'type2')]->(:label2) return pth; + """ + Then the result should be empty + + Scenario: Test BFS variable expand with filter by size of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3}); + """ + When executing query: + """ + MATCH pth=(:label1)-[*BFS (e,n,p | size(p) < 3)]->(:label3) return pth; + """ + Then the result should be: + | pth | + | <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})> | + + Scenario: Test BFS variable expand with restict filter by size of accumulated path + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3}); + """ + When executing query: + """ + MATCH pth=(:label1)-[*BFS (e,n,p | size(p) < 2)]->(:label3) return pth; + """ + Then the result should be empty + + Scenario: Test BFS variable expand with filter by order of ids in accumulated path when target vertex is indexed + Given graph "graph_index" + When executing query: + """ + MATCH pth=(:label1)-[*BFS (e,n,p | (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label4) return pth; + """ + Then the result should be: + | pth | + | <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4})> | + + Scenario: Test BFS variable expand with filter by order of ids in accumulated path when target vertex is NOT indexed + Given graph "graph_index" + When executing query: + """ + MATCH pth=(:label1)-[*BFS (e,n,p | (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label3) return pth; + """ + Then the result should be: + | pth | + | <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | + + Scenario: Test BFS variable expand with filter by edge type1 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*BFS (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path; + """ + Then the result should be: + | path | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | + + Scenario: Test BFS variable expand with filter by edge type2 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*BFS (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path; + """ + Then the result should be: + | path | + | <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | diff --git a/tests/gql_behave/tests/memgraph_V1/features/memgraph_wshortest.feature b/tests/gql_behave/tests/memgraph_V1/features/memgraph_wshortest.feature index 1c98c2830..819bc94b3 100644 --- a/tests/gql_behave/tests/memgraph_V1/features/memgraph_wshortest.feature +++ b/tests/gql_behave/tests/memgraph_V1/features/memgraph_wshortest.feature @@ -155,3 +155,103 @@ Feature: Weighted Shortest Path MATCH (n {a:'0'})-[le *wShortest 10 (e, n | e.w ) w]->(m) RETURN m.a, size(le) as s, w """ Then an error should be raised + + Scenario: Test match wShortest with accumulated path filtered by order of ids + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4}); + """ + When executing query: + """ + MATCH pth=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e,n,p | e.id > 0 and (nodes(p)[-1]).id > (nodes(p)[-2]).id)]->(:label4) RETURN pth, total_weight; + """ + Then the result should be: + | pth | total_weight | + | <(:label1{id:1})-[:type1{id:1}]->(:label2{id:2})-[:type1{id:2}]->(:label3{id:3})-[:type1{id:3}]->(:label4{id:4})> | 6 | + + Scenario: Test match wShortest with accumulated path filtered by edge type1 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1'))]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 | + + Scenario: Test match wShortest with accumulated path filtered by edge type2 + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p | NOT(type(e)='type2' AND type(last(relationships(p))) = 'type2'))]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type1 {id: 1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})> | 3 | + + Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on edge + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 10 | + + Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on edge too restricted + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | r.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w < 10)]->(:label3) RETURN path, total_weight; + """ + Then the result should be empty + + Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex is int + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | n.id) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 4 | + + Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are ints + Given graph "graph_edges" + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | NOT(type(e)='type1' AND type(last(relationships(p))) = 'type1') AND w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type2 {id: 10}]->(:label3 {id: 3})> | 14 | + + Scenario: Test match wShortest with accumulated path filtered by edge type1 and accumulated weight based on vertex and edge are doubles + Given an empty graph + And having executed: + """ + CREATE (:label1 {id: 1})-[:type1 {id:1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})-[:type1 {id: 3.4}]->(:label4 {id: 4}); + """ + When executing query: + """ + MATCH path=(:label1)-[*WSHORTEST (r, n | n.id + coalesce(r.id, 0)) total_weight (e, n, p, w | w > 0)]->(:label3) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:label1 {id: 1})-[:type1 {id: 1.5}]->(:label2 {id: 2})-[:type1 {id: 2.1}]->(:label3 {id: 3})> | 9.6 | + + Scenario: Test match wShortest with accumulated path filtered by order of ids and accumulated weight based on both vertex and edge is duration + Given an empty graph + And having executed: + """ + CREATE (:station {name: "A", arrival: localTime("08:00"), departure: localTime("08:15")})-[:ride {id: 1, duration: duration("PT1H5M")}]->(:station {name: "B", arrival: localtime("09:20"), departure: localTime("09:30")})-[:ride {id: 2, duration: duration("PT30M")}]->(:station {name: "C", arrival: localTime("10:00"), departure: localTime("10:20")}); + """ + When executing query: + """ + MATCH path=(:station {name:"A"})-[*WSHORTEST (r, v | v.departure - v.arrival + coalesce(r.duration, duration("PT0M"))) total_weight (r,n,p,w | (nodes(p)[-1]).name > (nodes(p)[-2]).name AND not(w is null))]->(:station {name:"C"}) RETURN path, total_weight; + """ + Then the result should be: + | path | total_weight | + | <(:station {arrival: 08:00:00.000000000, departure: 08:15:00.000000000, name: 'A'})-[:ride {duration: PT1H5M, id: 1}]->(:station {arrival: 09:20:00.000000000, departure: 09:30:00.000000000, name: 'B'})-[:ride {duration: PT30M, id: 2}]->(:station {arrival: 10:00:00.000000000, departure: 10:20:00.000000000, name: 'C'})> | PT2H20M | diff --git a/tests/gql_behave/tests/memgraph_V1/graphs/graph_edges.cypher b/tests/gql_behave/tests/memgraph_V1/graphs/graph_edges.cypher new file mode 100644 index 000000000..06e7cdb5c --- /dev/null +++ b/tests/gql_behave/tests/memgraph_V1/graphs/graph_edges.cypher @@ -0,0 +1,2 @@ +CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id: 3}]->(:label4 {id: 4}); +MATCH (n :label1), (m :label3) CREATE (n)-[:type2 {id: 10}]->(m); diff --git a/tests/gql_behave/tests/memgraph_V1/graphs/graph_index.cypher b/tests/gql_behave/tests/memgraph_V1/graphs/graph_index.cypher new file mode 100644 index 000000000..ef30012d1 --- /dev/null +++ b/tests/gql_behave/tests/memgraph_V1/graphs/graph_index.cypher @@ -0,0 +1,2 @@ +CREATE INDEX ON :label4; +CREATE (:label1 {id: 1})-[:type1 {id:1}]->(:label2 {id: 2})-[:type1 {id: 2}]->(:label3 {id: 3})-[:type1 {id:3}]->(:label4 {id: 4}); diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 54453de09..31fd95c6c 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -1925,6 +1925,41 @@ TEST_P(CypherMainVisitorTest, MatchBfsReturn) { ASSERT_TRUE(eq); } +TEST_P(CypherMainVisitorTest, MatchBfsFilterByPathReturn) { + auto &ast_generator = *GetParam(); + { + const auto *query = dynamic_cast( + ast_generator.ParseQuery("MATCH pth=(r:type1 {id: 1})<-[*BFS ..10 (e, n, p | startNode(relationships(e)[-1]) = " + "c:type2)]->(:type3 {id: 3}) RETURN pth;")); + ASSERT_TRUE(query); + ASSERT_TRUE(query->single_query_); + const auto *match = dynamic_cast(query->single_query_->clauses_[0]); + ASSERT_TRUE(match); + ASSERT_EQ(match->patterns_.size(), 1U); + ASSERT_EQ(match->patterns_[0]->atoms_.size(), 3U); + auto *bfs = dynamic_cast(match->patterns_[0]->atoms_[1]); + ASSERT_TRUE(bfs); + EXPECT_TRUE(bfs->IsVariable()); + EXPECT_EQ(bfs->filter_lambda_.inner_edge->name_, "e"); + EXPECT_TRUE(bfs->filter_lambda_.inner_edge->user_declared_); + EXPECT_EQ(bfs->filter_lambda_.inner_node->name_, "n"); + EXPECT_TRUE(bfs->filter_lambda_.inner_node->user_declared_); + EXPECT_EQ(bfs->filter_lambda_.accumulated_path->name_, "p"); + EXPECT_TRUE(bfs->filter_lambda_.accumulated_path->user_declared_); + EXPECT_EQ(bfs->filter_lambda_.accumulated_weight, nullptr); + } +} + +TEST_P(CypherMainVisitorTest, SemanticExceptionOnBfsFilterByWeight) { + auto &ast_generator = *GetParam(); + { + ASSERT_THROW(ast_generator.ParseQuery( + "MATCH pth=(:type1 {id: 1})<-[*BFS ..10 (e, n, p, w | startNode(relationships(e)[-1] AND w > 0) = " + "c:type2)]->(:type3 {id: 3}) RETURN pth;"), + SemanticException); + } +} + TEST_P(CypherMainVisitorTest, MatchVariableLambdaSymbols) { auto &ast_generator = *GetParam(); auto *query = dynamic_cast(ast_generator.ParseQuery("MATCH () -[*]- () RETURN *")); @@ -1981,6 +2016,57 @@ TEST_P(CypherMainVisitorTest, MatchWShortestReturn) { EXPECT_TRUE(shortest->total_weight_->user_declared_); } +TEST_P(CypherMainVisitorTest, MatchWShortestFilterByPathReturn) { + auto &ast_generator = *GetParam(); + { + const auto *query = dynamic_cast( + ast_generator.ParseQuery("MATCH pth=()-[r:type1 *wShortest 10 (we, wn | 42) total_weight " + "(e, n, p | startNode(relationships(e)[-1]) = c:type3)]->(:type2) RETURN pth")); + ASSERT_TRUE(query); + ASSERT_TRUE(query->single_query_); + const auto *match = dynamic_cast(query->single_query_->clauses_[0]); + ASSERT_TRUE(match); + ASSERT_EQ(match->patterns_.size(), 1U); + ASSERT_EQ(match->patterns_[0]->atoms_.size(), 3U); + auto *shortestPath = dynamic_cast(match->patterns_[0]->atoms_[1]); + ASSERT_TRUE(shortestPath); + EXPECT_TRUE(shortestPath->IsVariable()); + EXPECT_EQ(shortestPath->filter_lambda_.inner_edge->name_, "e"); + EXPECT_TRUE(shortestPath->filter_lambda_.inner_edge->user_declared_); + EXPECT_EQ(shortestPath->filter_lambda_.inner_node->name_, "n"); + EXPECT_TRUE(shortestPath->filter_lambda_.inner_node->user_declared_); + EXPECT_EQ(shortestPath->filter_lambda_.accumulated_path->name_, "p"); + EXPECT_TRUE(shortestPath->filter_lambda_.accumulated_path->user_declared_); + EXPECT_EQ(shortestPath->filter_lambda_.accumulated_weight, nullptr); + } +} + +TEST_P(CypherMainVisitorTest, MatchWShortestFilterByPathWeightReturn) { + auto &ast_generator = *GetParam(); + { + const auto *query = dynamic_cast(ast_generator.ParseQuery( + "MATCH pth=()-[r:type1 *wShortest 10 (we, wn | 42) total_weight " + "(e, n, p, w | startNode(relationships(e)[-1]) = c:type3 AND w < 50)]->(:type2) RETURN pth")); + ASSERT_TRUE(query); + ASSERT_TRUE(query->single_query_); + const auto *match = dynamic_cast(query->single_query_->clauses_[0]); + ASSERT_TRUE(match); + ASSERT_EQ(match->patterns_.size(), 1U); + ASSERT_EQ(match->patterns_[0]->atoms_.size(), 3U); + auto *shortestPath = dynamic_cast(match->patterns_[0]->atoms_[1]); + ASSERT_TRUE(shortestPath); + EXPECT_TRUE(shortestPath->IsVariable()); + EXPECT_EQ(shortestPath->filter_lambda_.inner_edge->name_, "e"); + EXPECT_TRUE(shortestPath->filter_lambda_.inner_edge->user_declared_); + EXPECT_EQ(shortestPath->filter_lambda_.inner_node->name_, "n"); + EXPECT_TRUE(shortestPath->filter_lambda_.inner_node->user_declared_); + EXPECT_EQ(shortestPath->filter_lambda_.accumulated_path->name_, "p"); + EXPECT_TRUE(shortestPath->filter_lambda_.accumulated_path->user_declared_); + EXPECT_EQ(shortestPath->filter_lambda_.accumulated_weight->name_, "w"); + EXPECT_TRUE(shortestPath->filter_lambda_.accumulated_weight->user_declared_); + } +} + TEST_P(CypherMainVisitorTest, MatchWShortestNoFilterReturn) { auto &ast_generator = *GetParam(); auto *query =