From 14f92b4a0fcce743246da0b73ef8db53a6e91399 Mon Sep 17 00:00:00 2001 From: Gareth Andrew Lloyd Date: Fri, 1 Dec 2023 12:38:48 +0000 Subject: [PATCH] Bugfix: correct replication handler (#1540) Fixes root cause of a cascade of failures in replication code: - Replica handling of deleting an edge is now corrected. Now tolerant of multiple edges of the same relationship type. - Improved robustness: correct exception handling around failed stream of current WAL file. This now means a REPLICA failure will no longer prevent transactions on MAIN from performing WAL writes. - Slightly better diagnostic messages, not user friendly but helps get developer to correct root cause quicker. - Proactively remove vertex+edges during Abort rather than defer to GC to do that work, this included fixing constraints and indexes to be safe. Co-authored-by: Andreja Tonev --- src/dbms/inmemory/replication_handlers.cpp | 86 ++++++---- src/integrations/pulsar/consumer.hpp | 3 +- src/storage/v2/constraints/constraints.cpp | 4 + src/storage/v2/constraints/constraints.hpp | 4 + .../v2/disk/edge_import_mode_cache.cpp | 7 +- src/storage/v2/disk/storage.cpp | 45 ++++++ src/storage/v2/disk/storage.hpp | 3 + src/storage/v2/indices/indices.cpp | 19 +++ src/storage/v2/indices/indices.hpp | 17 ++ .../v2/indices/label_property_index.hpp | 5 + src/storage/v2/inmemory/label_index.cpp | 46 +++++- src/storage/v2/inmemory/label_index.hpp | 15 +- .../v2/inmemory/label_property_index.cpp | 63 +++++++- .../v2/inmemory/label_property_index.hpp | 30 +++- src/storage/v2/inmemory/storage.cpp | 147 ++++++++++++++++-- src/storage/v2/inmemory/storage.hpp | 5 + .../v2/inmemory/unique_constraints.cpp | 33 +++- .../v2/inmemory/unique_constraints.hpp | 4 + src/storage/v2/storage.hpp | 3 + .../test_query_modules/module_test.cpp | 10 +- tests/e2e/replication/CMakeLists.txt | 1 + tests/e2e/replication/edge_delete.py | 56 +++++++ tests/e2e/replication/workloads.yaml | 22 +++ 23 files changed, 558 insertions(+), 70 deletions(-) create mode 100755 tests/e2e/replication/edge_delete.py diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index ce1f6da20..a5f56ee3d 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -370,8 +370,9 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage constexpr bool kSharedAccess = false; std::optional> commit_timestamp_and_accessor; - auto get_transaction = [storage, &commit_timestamp_and_accessor](uint64_t commit_timestamp, - bool unique = kSharedAccess) { + auto const get_transaction = [storage, &commit_timestamp_and_accessor]( + uint64_t commit_timestamp, + bool unique = kSharedAccess) -> storage::InMemoryStorage::ReplicationAccessor * { if (!commit_timestamp_and_accessor) { std::unique_ptr acc = nullptr; if (unique) { @@ -415,9 +416,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage spdlog::trace(" Delete vertex {}", delta.vertex_create_delete.gid.AsUint()); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = transaction->DeleteVertex(&*vertex); - if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::VERTEX_ADD_LABEL: { @@ -425,9 +428,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.vertex_add_remove_label.label); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = vertex->AddLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label)); - if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::VERTEX_REMOVE_LABEL: { @@ -435,9 +440,11 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.vertex_add_remove_label.label); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = vertex->RemoveLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label)); - if (ret.HasError() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || !ret.GetValue()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::VERTEX_SET_PROPERTY: { @@ -445,10 +452,12 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value); auto *transaction = get_transaction(timestamp); auto vertex = transaction->FindVertex(delta.vertex_edge_set_property.gid, View::NEW); - if (!vertex) throw utils::BasicException("Invalid transaction!"); + if (!vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto ret = vertex->SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property), delta.vertex_edge_set_property.value); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EDGE_CREATE: { @@ -457,13 +466,16 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint()); auto *transaction = get_transaction(timestamp); auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW); - if (!from_vertex) throw utils::BasicException("Invalid transaction!"); + if (!from_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW); - if (!to_vertex) throw utils::BasicException("Invalid transaction!"); + if (!to_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto edge = transaction->CreateEdgeEx(&*from_vertex, &*to_vertex, transaction->NameToEdgeType(delta.edge_create_delete.edge_type), delta.edge_create_delete.gid); - if (edge.HasError()) throw utils::BasicException("Invalid transaction!"); + if (edge.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EDGE_DELETE: { @@ -472,16 +484,17 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint()); auto *transaction = get_transaction(timestamp); auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, View::NEW); - if (!from_vertex) throw utils::BasicException("Invalid transaction!"); + if (!from_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, View::NEW); - if (!to_vertex) throw utils::BasicException("Invalid transaction!"); - auto edges = from_vertex->OutEdges(View::NEW, {transaction->NameToEdgeType(delta.edge_create_delete.edge_type)}, - &*to_vertex); - if (edges.HasError()) throw utils::BasicException("Invalid transaction!"); - if (edges->edges.size() != 1) throw utils::BasicException("Invalid transaction!"); - auto &edge = (*edges).edges[0]; - auto ret = transaction->DeleteEdge(&edge); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (!to_vertex) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); + auto edgeType = transaction->NameToEdgeType(delta.edge_create_delete.edge_type); + auto edge = + transaction->FindEdge(delta.edge_create_delete.gid, View::NEW, edgeType, &*from_vertex, &*to_vertex); + if (!edge) throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); + if (auto ret = transaction->DeleteEdge(&*edge); ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EDGE_SET_PROPERTY: { @@ -498,7 +511,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage // yields an accessor that is only valid for managing the edge's // properties. auto edge = edge_acc.find(delta.vertex_edge_set_property.gid); - if (edge == edge_acc.end()) throw utils::BasicException("Invalid transaction!"); + if (edge == edge_acc.end()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); // The edge visibility check must be done here manually because we // don't allow direct access to the edges through the public API. { @@ -530,7 +544,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage } } }); - if (!is_visible) throw utils::BasicException("Invalid transaction!"); + if (!is_visible) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); } EdgeRef edge_ref(&*edge); // Here we create an edge accessor that we will use to get the @@ -543,7 +558,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property), delta.vertex_edge_set_property.value); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } @@ -553,7 +569,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage throw utils::BasicException("Invalid commit data!"); auto ret = commit_timestamp_and_accessor->second.Commit(commit_timestamp_and_accessor->first, false /* not main */); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); commit_timestamp_and_accessor = std::nullopt; break; } @@ -563,14 +580,14 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage // Need to send the timestamp auto *transaction = get_transaction(timestamp, kUniqueAccess); if (transaction->CreateIndex(storage->NameToLabel(delta.operation_label.label)).HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_INDEX_DROP: { spdlog::trace(" Drop label index on :{}", delta.operation_label.label); auto *transaction = get_transaction(timestamp, kUniqueAccess); if (transaction->DropIndex(storage->NameToLabel(delta.operation_label.label)).HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_INDEX_STATS_SET: { @@ -601,7 +618,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage ->CreateIndex(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)) .HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: { @@ -612,7 +629,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage ->DropIndex(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)) .HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::LABEL_PROPERTY_INDEX_STATS_SET: { @@ -644,7 +661,8 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = transaction->CreateExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)); - if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { @@ -655,7 +673,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage ->DropExistenceConstraint(storage->NameToLabel(delta.operation_label_property.label), storage->NameToProperty(delta.operation_label_property.property)) .HasError()) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE: { @@ -670,7 +688,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = transaction->CreateUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label), properties); if (!ret.HasValue() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); break; } case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: { @@ -685,7 +703,7 @@ uint64_t InMemoryReplicationHandlers::ReadAndApplyDelta(storage::InMemoryStorage auto ret = transaction->DropUniqueConstraint(storage->NameToLabel(delta.operation_label_properties.label), properties); if (ret != UniqueConstraints::DeletionStatus::SUCCESS) { - throw utils::BasicException("Invalid transaction!"); + throw utils::BasicException("Invalid transaction! Please raise an issue, {}:{}", __FILE__, __LINE__); } break; } diff --git a/src/integrations/pulsar/consumer.hpp b/src/integrations/pulsar/consumer.hpp index 1caa366ad..06ed3a550 100644 --- a/src/integrations/pulsar/consumer.hpp +++ b/src/integrations/pulsar/consumer.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,6 +10,7 @@ // licenses/APL.txt. #pragma once + #include #include #include diff --git a/src/storage/v2/constraints/constraints.cpp b/src/storage/v2/constraints/constraints.cpp index 42128511c..6a6554db4 100644 --- a/src/storage/v2/constraints/constraints.cpp +++ b/src/storage/v2/constraints/constraints.cpp @@ -29,4 +29,8 @@ Constraints::Constraints(const Config &config, StorageMode storage_mode) { }; }); } + +void Constraints::AbortEntries(std::span vertices, uint64_t exact_start_timestamp) const { + static_cast(unique_constraints_.get())->AbortEntries(vertices, exact_start_timestamp); +} } // namespace memgraph::storage diff --git a/src/storage/v2/constraints/constraints.hpp b/src/storage/v2/constraints/constraints.hpp index 8469a5470..1f5ef999e 100644 --- a/src/storage/v2/constraints/constraints.hpp +++ b/src/storage/v2/constraints/constraints.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "storage/v2/config.hpp" #include "storage/v2/constraints/existence_constraints.hpp" #include "storage/v2/constraints/unique_constraints.hpp" @@ -27,6 +29,8 @@ struct Constraints { Constraints &operator=(Constraints &&) = delete; ~Constraints() = default; + void AbortEntries(std::span vertices, uint64_t exact_start_timestamp) const; + std::unique_ptr existence_constraints_; std::unique_ptr unique_constraints_; }; diff --git a/src/storage/v2/disk/edge_import_mode_cache.cpp b/src/storage/v2/disk/edge_import_mode_cache.cpp index 2a29a1606..cd1ca0dc2 100644 --- a/src/storage/v2/disk/edge_import_mode_cache.cpp +++ b/src/storage/v2/disk/edge_import_mode_cache.cpp @@ -10,7 +10,9 @@ // licenses/APL.txt. #include "storage/v2/disk//edge_import_mode_cache.hpp" + #include + #include "storage/v2/disk/label_property_index.hpp" #include "storage/v2/indices/indices.hpp" #include "storage/v2/inmemory/label_index.hpp" @@ -28,7 +30,7 @@ EdgeImportModeCache::EdgeImportModeCache(const Config &config) InMemoryLabelIndex::Iterable EdgeImportModeCache::Vertices(LabelId label, View view, Storage *storage, Transaction *transaction) const { auto *mem_label_index = static_cast(in_memory_indices_.label_index_.get()); - return mem_label_index->Vertices(label, view, storage, transaction); + return mem_label_index->Vertices(label, vertices_.access(), view, storage, transaction); } InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices( @@ -37,7 +39,8 @@ InMemoryLabelPropertyIndex::Iterable EdgeImportModeCache::Vertices( Transaction *transaction) const { auto *mem_label_property_index = static_cast(in_memory_indices_.label_property_index_.get()); - return mem_label_property_index->Vertices(label, property, lower_bound, upper_bound, view, storage, transaction); + return mem_label_property_index->Vertices(label, property, vertices_.access(), lower_bound, upper_bound, view, + storage, transaction); } bool EdgeImportModeCache::CreateIndex(LabelId label, PropertyId property, diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index 09b28943c..073d1fbd1 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -71,6 +71,37 @@ namespace memgraph::storage { +namespace { + +auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex) + -> Result { + auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) { + // Obtain the locks by `gid` order to avoid lock cycles. + auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock}; + auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock}; + if (from_vertex->gid < to_vertex->gid) { + guard_from.lock(); + guard_to.lock(); + } else if (from_vertex->gid > to_vertex->gid) { + guard_to.lock(); + guard_from.lock(); + } else { + // The vertices are the same vertex, only lock one. + guard_from.lock(); + } + + // With the potentially cheaper side FindEdges + const auto out_n = from_vertex->out_edges.size(); + const auto in_n = to_vertex->in_edges.size(); + return out_n <= in_n; + }; + + return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex) + : to_vertex->InEdges(view, {edge_type}, from_vertex); +} + +} // namespace + using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; namespace { @@ -949,6 +980,20 @@ Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } +std::optional DiskStorage::DiskAccessor::FindEdge(Gid gid, View view, EdgeTypeId edge_type, + VertexAccessor *from_vertex, + VertexAccessor *to_vertex) { + auto res = FindEdges(view, edge_type, from_vertex, to_vertex); + if (res.HasError()) return std::nullopt; // TODO: use a Result type + + auto const it = std::ranges::find_if( + res->edges, [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; }); + + if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type + + return *it; +} + Result DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) { MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage"); return Error::NONEXISTENT_OBJECT; diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index e93566c09..8640462de 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -121,6 +121,9 @@ class DiskStorage final : public Storage { Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override; + std::optional FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex, + VertexAccessor *to_vertex) override; + Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override; Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; diff --git a/src/storage/v2/indices/indices.cpp b/src/storage/v2/indices/indices.cpp index bf7295de2..e0b194ad4 100644 --- a/src/storage/v2/indices/indices.cpp +++ b/src/storage/v2/indices/indices.cpp @@ -17,6 +17,21 @@ namespace memgraph::storage { +void Indices::AbortEntries(LabelId labelId, std::span vertices, uint64_t exact_start_timestamp) const { + static_cast(label_index_.get())->AbortEntries(labelId, vertices, exact_start_timestamp); +} + +void Indices::AbortEntries(PropertyId property, std::span const> vertices, + uint64_t exact_start_timestamp) const { + static_cast(label_property_index_.get()) + ->AbortEntries(property, vertices, exact_start_timestamp); +} +void Indices::AbortEntries(LabelId label, std::span const> vertices, + uint64_t exact_start_timestamp) const { + static_cast(label_property_index_.get()) + ->AbortEntries(label, vertices, exact_start_timestamp); +} + void Indices::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const { static_cast(label_index_.get())->RemoveObsoleteEntries(oldest_active_start_timestamp); static_cast(label_property_index_.get()) @@ -50,4 +65,8 @@ Indices::Indices(const Config &config, StorageMode storage_mode) { }); } +Indices::IndexStats Indices::Analysis() const { + return {static_cast(label_index_.get())->Analysis(), + static_cast(label_property_index_.get())->Analysis()}; +} } // namespace memgraph::storage diff --git a/src/storage/v2/indices/indices.hpp b/src/storage/v2/indices/indices.hpp index 9a71107cd..33bd429e6 100644 --- a/src/storage/v2/indices/indices.hpp +++ b/src/storage/v2/indices/indices.hpp @@ -12,6 +12,9 @@ #pragma once #include +#include + +#include "storage/v2/id_types.hpp" #include "storage/v2/indices/label_index.hpp" #include "storage/v2/indices/label_property_index.hpp" #include "storage/v2/storage_mode.hpp" @@ -32,6 +35,20 @@ struct Indices { /// TODO: unused in disk indices void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) const; + /// Surgical removal of entries that was inserted this transaction + /// TODO: unused in disk indices + void AbortEntries(LabelId labelId, std::span vertices, uint64_t exact_start_timestamp) const; + void AbortEntries(PropertyId property, std::span const> vertices, + uint64_t exact_start_timestamp) const; + void AbortEntries(LabelId label, std::span const> vertices, + uint64_t exact_start_timestamp) const; + + struct IndexStats { + std::vector label; + LabelPropertyIndex::IndexStats property_label; + }; + IndexStats Analysis() const; + // Indices are updated whenever an update occurs, instead of only on commit or // advance command. This is necessary because we want indices to support `NEW` // view for use in Merge. diff --git a/src/storage/v2/indices/label_property_index.hpp b/src/storage/v2/indices/label_property_index.hpp index 84908b3e9..e8389fbea 100644 --- a/src/storage/v2/indices/label_property_index.hpp +++ b/src/storage/v2/indices/label_property_index.hpp @@ -19,6 +19,11 @@ namespace memgraph::storage { class LabelPropertyIndex { public: + struct IndexStats { + std::map> l2p; + std::map> p2l; + }; + LabelPropertyIndex() = default; LabelPropertyIndex(const LabelPropertyIndex &) = delete; LabelPropertyIndex(LabelPropertyIndex &&) = delete; diff --git a/src/storage/v2/inmemory/label_index.cpp b/src/storage/v2/inmemory/label_index.cpp index 31c3634be..82ead4b44 100644 --- a/src/storage/v2/inmemory/label_index.cpp +++ b/src/storage/v2/inmemory/label_index.cpp @@ -10,8 +10,12 @@ // licenses/APL.txt. #include "storage/v2/inmemory/label_index.hpp" + +#include + #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/indices_utils.hpp" +#include "storage/v2/inmemory/storage.hpp" namespace memgraph::storage { @@ -96,9 +100,23 @@ void InMemoryLabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_time } } -InMemoryLabelIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, LabelId label, View view, - Storage *storage, Transaction *transaction) - : index_accessor_(std::move(index_accessor)), +void InMemoryLabelIndex::AbortEntries(LabelId labelId, std::span vertices, + uint64_t exact_start_timestamp) { + auto const it = index_.find(labelId); + if (it == index_.end()) return; + + auto &label_storage = it->second; + auto vertices_acc = label_storage.access(); + for (auto *vertex : vertices) { + vertices_acc.remove(Entry{vertex, exact_start_timestamp}); + } +} + +InMemoryLabelIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, + utils::SkipList::ConstAccessor vertices_accessor, LabelId label, + View view, Storage *storage, Transaction *transaction) + : pin_accessor_(std::move(vertices_accessor)), + index_accessor_(std::move(index_accessor)), label_(label), view_(view), storage_(storage), @@ -147,9 +165,21 @@ void InMemoryLabelIndex::RunGC() { InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices(LabelId label, View view, Storage *storage, Transaction *transaction) { + DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL || + storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL, + "LabelIndex trying to access InMemory vertices from OnDisk!"); + auto vertices_acc = static_cast(storage)->vertices_.access(); const auto it = index_.find(label); MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint()); - return {it->second.access(), label, view, storage, transaction}; + return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction}; +} + +InMemoryLabelIndex::Iterable InMemoryLabelIndex::Vertices( + LabelId label, memgraph::utils::SkipList::ConstAccessor vertices_acc, View view, + Storage *storage, Transaction *transaction) { + const auto it = index_.find(label); + MG_ASSERT(it != index_.end(), "Index for label {} doesn't exist", label.AsUint()); + return {it->second.access(), std::move(vertices_acc), label, view, storage, transaction}; } void InMemoryLabelIndex::SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats) { @@ -187,4 +217,12 @@ bool InMemoryLabelIndex::DeleteIndexStats(const storage::LabelId &label) { return false; } +std::vector InMemoryLabelIndex::Analysis() const { + std::vector res; + res.reserve(index_.size()); + for (const auto &[label, _] : index_) { + res.emplace_back(label); + } + return res; +} } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/label_index.hpp b/src/storage/v2/inmemory/label_index.hpp index 7d606574b..21df32deb 100644 --- a/src/storage/v2/inmemory/label_index.hpp +++ b/src/storage/v2/inmemory/label_index.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/label_index.hpp" #include "storage/v2/indices/label_index_stats.hpp" @@ -56,10 +58,15 @@ class InMemoryLabelIndex : public storage::LabelIndex { void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp); + /// Surgical removal of entries that was inserted this transaction + void AbortEntries(LabelId labelId, std::span vertices, uint64_t exact_start_timestamp); + + std::vector Analysis() const; + class Iterable { public: - Iterable(utils::SkipList::Accessor index_accessor, LabelId label, View view, Storage *storage, - Transaction *transaction); + Iterable(utils::SkipList::Accessor index_accessor, utils::SkipList::ConstAccessor vertices_accessor, + LabelId label, View view, Storage *storage, Transaction *transaction); class Iterator { public: @@ -85,6 +92,7 @@ class InMemoryLabelIndex : public storage::LabelIndex { Iterator end() { return {this, index_accessor_.end()}; } private: + utils::SkipList::ConstAccessor pin_accessor_; utils::SkipList::Accessor index_accessor_; LabelId label_; View view_; @@ -98,6 +106,9 @@ class InMemoryLabelIndex : public storage::LabelIndex { Iterable Vertices(LabelId label, View view, Storage *storage, Transaction *transaction); + Iterable Vertices(LabelId label, memgraph::utils::SkipList::ConstAccessor vertices_acc, + View view, Storage *storage, Transaction *transaction); + void SetIndexStats(const storage::LabelId &label, const storage::LabelIndexStats &stats); std::optional GetIndexStats(const storage::LabelId &label) const; diff --git a/src/storage/v2/inmemory/label_property_index.cpp b/src/storage/v2/inmemory/label_property_index.cpp index fa5cce444..f61b9dd11 100644 --- a/src/storage/v2/inmemory/label_property_index.cpp +++ b/src/storage/v2/inmemory/label_property_index.cpp @@ -12,6 +12,8 @@ #include "storage/v2/inmemory/label_property_index.hpp" #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/indices/indices_utils.hpp" +#include "storage/v2/inmemory/storage.hpp" +#include "utils/logging.hpp" namespace memgraph::storage { @@ -101,11 +103,12 @@ void InMemoryLabelPropertyIndex::UpdateOnSetProperty(PropertyId property, const return; } - if (!indices_by_property_.contains(property)) { + auto index = indices_by_property_.find(property); + if (index == indices_by_property_.end()) { return; } - for (const auto &[_, storage] : indices_by_property_.at(property)) { + for (const auto &[_, storage] : index->second) { auto acc = storage->access(); acc.insert(Entry{value, vertex, tx.start_timestamp}); } @@ -220,12 +223,14 @@ const PropertyValue kSmallestMap = PropertyValue(std::map(0), std::numeric_limits::min()}); -InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, LabelId label, +InMemoryLabelPropertyIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, + utils::SkipList::ConstAccessor vertices_accessor, LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction) - : index_accessor_(std::move(index_accessor)), + : pin_accessor_(std::move(vertices_accessor)), + index_accessor_(std::move(index_accessor)), label_(label), property_(property), lower_bound_(lower_bound), @@ -428,9 +433,57 @@ InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices( LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction) { + DMG_ASSERT(storage->storage_mode_ == StorageMode::IN_MEMORY_TRANSACTIONAL || + storage->storage_mode_ == StorageMode::IN_MEMORY_ANALYTICAL, + "PropertyLabel index trying to access InMemory vertices from OnDisk!"); + auto vertices_acc = static_cast(storage)->vertices_.access(); auto it = index_.find({label, property}); MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint()); - return {it->second.access(), label, property, lower_bound, upper_bound, view, storage, transaction}; + return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage, + transaction}; } +InMemoryLabelPropertyIndex::Iterable InMemoryLabelPropertyIndex::Vertices( + LabelId label, PropertyId property, + memgraph::utils::SkipList::ConstAccessor vertices_acc, + const std::optional> &lower_bound, + const std::optional> &upper_bound, View view, Storage *storage, + Transaction *transaction) { + auto it = index_.find({label, property}); + MG_ASSERT(it != index_.end(), "Index for label {} and property {} doesn't exist", label.AsUint(), property.AsUint()); + return {it->second.access(), std::move(vertices_acc), label, property, lower_bound, upper_bound, view, storage, + transaction}; +} + +void InMemoryLabelPropertyIndex::AbortEntries(PropertyId property, + std::span const> vertices, + uint64_t exact_start_timestamp) { + auto const it = indices_by_property_.find(property); + if (it == indices_by_property_.end()) return; + + auto &indices = it->second; + for (const auto &[_, index] : indices) { + auto index_acc = index->access(); + for (auto const &[value, vertex] : vertices) { + index_acc.remove(Entry{value, vertex, exact_start_timestamp}); + } + } +} + +void InMemoryLabelPropertyIndex::AbortEntries(LabelId label, + std::span const> vertices, + uint64_t exact_start_timestamp) { + for (auto &[label_prop, storage] : index_) { + if (label_prop.first != label) { + continue; + } + + auto index_acc = storage.access(); + for (const auto &[property, vertex] : vertices) { + if (!property.IsNull()) { + index_acc.remove(Entry{property, vertex, exact_start_timestamp}); + } + } + } +} } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/label_property_index.hpp b/src/storage/v2/inmemory/label_property_index.hpp index 7f8c54909..ae96a37f8 100644 --- a/src/storage/v2/inmemory/label_property_index.hpp +++ b/src/storage/v2/inmemory/label_property_index.hpp @@ -11,9 +11,13 @@ #pragma once +#include + #include "storage/v2/constraints/constraints.hpp" +#include "storage/v2/id_types.hpp" #include "storage/v2/indices/label_property_index.hpp" #include "storage/v2/indices/label_property_index_stats.hpp" +#include "storage/v2/property_value.hpp" #include "utils/rw_lock.hpp" #include "utils/synchronized.hpp" @@ -61,10 +65,25 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { void RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp); + void AbortEntries(PropertyId property, std::span const> vertices, + uint64_t exact_start_timestamp); + void AbortEntries(LabelId label, std::span const> vertices, + uint64_t exact_start_timestamp); + + IndexStats Analysis() const { + IndexStats res{}; + for (const auto &[lp, _] : index_) { + const auto &[label, property] = lp; + res.l2p[label].emplace_back(property); + res.p2l[property].emplace_back(label); + } + return res; + } + class Iterable { public: - Iterable(utils::SkipList::Accessor index_accessor, LabelId label, PropertyId property, - const std::optional> &lower_bound, + Iterable(utils::SkipList::Accessor index_accessor, utils::SkipList::ConstAccessor vertices_accessor, + LabelId label, PropertyId property, const std::optional> &lower_bound, const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction); @@ -92,6 +111,7 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { Iterator end(); private: + utils::SkipList::ConstAccessor pin_accessor_; utils::SkipList::Accessor index_accessor_; LabelId label_; PropertyId property_; @@ -131,6 +151,12 @@ class InMemoryLabelPropertyIndex : public storage::LabelPropertyIndex { const std::optional> &upper_bound, View view, Storage *storage, Transaction *transaction); + Iterable Vertices(LabelId label, PropertyId property, + memgraph::utils::SkipList::ConstAccessor vertices_acc, + const std::optional> &lower_bound, + const std::optional> &upper_bound, View view, Storage *storage, + Transaction *transaction); + private: std::map, utils::SkipList> index_; std::unordered_map *>> indices_by_property_; diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index c27018d24..558330996 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -10,21 +10,57 @@ // licenses/APL.txt. #include "storage/v2/inmemory/storage.hpp" +#include +#include #include "dbms/constants.hpp" #include "memory/global_memory_control.hpp" #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" +#include "storage/v2/edge_direction.hpp" +#include "storage/v2/id_types.hpp" #include "storage/v2/metadata_delta.hpp" /// REPLICATION /// #include "dbms/inmemory/replication_handlers.hpp" #include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" +#include "storage/v2/property_value.hpp" #include "utils/resource_lock.hpp" #include "utils/stat.hpp" namespace memgraph::storage { +namespace { + +auto FindEdges(const View view, EdgeTypeId edge_type, const VertexAccessor *from_vertex, VertexAccessor *to_vertex) + -> Result { + auto use_out_edges = [](Vertex const *from_vertex, Vertex const *to_vertex) { + // Obtain the locks by `gid` order to avoid lock cycles. + auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock}; + auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock}; + if (from_vertex->gid < to_vertex->gid) { + guard_from.lock(); + guard_to.lock(); + } else if (from_vertex->gid > to_vertex->gid) { + guard_to.lock(); + guard_from.lock(); + } else { + // The vertices are the same vertex, only lock one. + guard_from.lock(); + } + + // With the potentially cheaper side FindEdges + const auto out_n = from_vertex->out_edges.size(); + const auto in_n = to_vertex->in_edges.size(); + return out_n <= in_n; + }; + + return use_out_edges(from_vertex->vertex_, to_vertex->vertex_) ? from_vertex->OutEdges(view, {edge_type}, to_vertex) + : to_vertex->InEdges(view, {edge_type}, from_vertex); +} + +}; // namespace + using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) @@ -315,6 +351,24 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } +std::optional InMemoryStorage::InMemoryAccessor::FindEdge(Gid gid, const View view, EdgeTypeId edge_type, + VertexAccessor *from_vertex, + VertexAccessor *to_vertex) { + auto res = FindEdges(view, edge_type, from_vertex, to_vertex); + if (res.HasError()) return std::nullopt; // TODO: use a Result type + + auto const it = std::invoke([this, gid, &res]() { + auto const byGid = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.gid == gid; }; + auto const byEdgePtr = [gid](EdgeAccessor const &edge_accessor) { return edge_accessor.edge_.ptr->gid == gid; }; + if (config_.properties_on_edges) return std::ranges::find_if(res->edges, byEdgePtr); + return std::ranges::find_if(res->edges, byGid); + }); + + if (it == res->edges.end()) return std::nullopt; // TODO: use a Result type + + return *it; +} + Result InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid) { MG_ASSERT(from->transaction_ == to->transaction_, @@ -697,7 +751,8 @@ utils::BasicResult InMemoryStorage::InMemoryAcce could_replicate_all_sync_replicas = mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // protected by engine_guard // TODO: release lock, and update all deltas to have a local copy of the commit timestamp - transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // protected by engine_guard + transaction_.commit_timestamp->store(*commit_timestamp_, + std::memory_order_release); // protected by engine_guard // Replica can only update the last commit timestamp with // the commits received from main. if (is_main || desired_commit_timestamp.has_value()) { @@ -823,6 +878,21 @@ void InMemoryStorage::InMemoryAccessor::Abort() { std::list my_deleted_vertices; std::list my_deleted_edges; + std::map> label_cleanup; + std::map>> label_property_cleanup; + std::map>> property_cleanup; + + // CONSTRAINTS + if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) { + // Need to remove elements from constraints before handling of the deltas, so the elements match the correct + // values + auto vertices_to_check = transaction_.constraint_verification_info.GetVerticesForUniqueConstraintChecking(); + auto vertices_to_check_v = std::vector{vertices_to_check.begin(), vertices_to_check.end()}; + storage_->constraints_.AbortEntries(vertices_to_check_v, transaction_.start_timestamp); + } + + const auto index_stats = storage_->indices_.Analysis(); + for (const auto &delta : transaction_.deltas.use()) { auto prev = delta.prev.Get(); switch (prev.type) { @@ -838,6 +908,24 @@ void InMemoryStorage::InMemoryAccessor::Abort() { MG_ASSERT(it != vertex->labels.end(), "Invalid database state!"); std::swap(*it, *vertex->labels.rbegin()); vertex->labels.pop_back(); + + // For label index + // check if there is a label index for the label and add entry if so + // For property label index + // check if we care about the label; this will return all the propertyIds we care about and then get + // the current property value + if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label)) { + label_cleanup[current->label].emplace_back(vertex); + } + const auto &properties = index_stats.property_label.l2p.find(current->label); + if (properties != index_stats.property_label.l2p.end()) { + for (const auto &property : properties->second) { + auto current_value = vertex->properties.GetProperty(property); + if (!current_value.IsNull()) { + label_property_cleanup[current->label].emplace_back(std::move(current_value), vertex); + } + } + } break; } case Delta::Action::ADD_LABEL: { @@ -847,6 +935,18 @@ void InMemoryStorage::InMemoryAccessor::Abort() { break; } case Delta::Action::SET_PROPERTY: { + // For label index nothing + // For property label index + // check if we care about the property, this will return all the labels and then get current property + // value + const auto &labels = index_stats.property_label.p2l.find(current->property.key); + if (labels != index_stats.property_label.p2l.end()) { + auto current_value = vertex->properties.GetProperty(current->property.key); + if (!current_value.IsNull()) { + property_cleanup[current->property.key].emplace_back(std::move(current_value), vertex); + } + } + // Setting the correct value vertex->properties.SetProperty(current->property.key, current->property.value); break; } @@ -963,7 +1063,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() { auto *mem_storage = static_cast(storage_); { - std::unique_lock engine_guard(storage_->engine_lock_); + auto engine_guard = std::unique_lock(storage_->engine_lock_); uint64_t mark_timestamp = storage_->timestamp_; // Take garbage_undo_buffers lock while holding the engine lock to make // sure that entries are sorted by mark timestamp in the list. @@ -975,10 +1075,37 @@ void InMemoryStorage::InMemoryAccessor::Abort() { garbage_undo_buffers.emplace_back(mark_timestamp, std::move(transaction_.deltas), std::move(transaction_.commit_timestamp)); }); - mem_storage->deleted_vertices_.WithLock( - [&](auto &deleted_vertices) { deleted_vertices.splice(deleted_vertices.begin(), my_deleted_vertices); }); - mem_storage->deleted_edges_.WithLock( - [&](auto &deleted_edges) { deleted_edges.splice(deleted_edges.begin(), my_deleted_edges); }); + + /// We MUST unlink (aka. remove) entries in indexes and constraints + /// before we unlink (aka. remove) vertices from storage + /// this is because they point into vertices skip_list + + // INDICES + for (auto const &[label, vertices] : label_cleanup) { + storage_->indices_.AbortEntries(label, vertices, transaction_.start_timestamp); + } + for (auto const &[label, prop_vertices] : label_property_cleanup) { + storage_->indices_.AbortEntries(label, prop_vertices, transaction_.start_timestamp); + } + for (auto const &[property, prop_vertices] : property_cleanup) { + storage_->indices_.AbortEntries(property, prop_vertices, transaction_.start_timestamp); + } + + // VERTICES + { + auto vertices_acc = mem_storage->vertices_.access(); + for (auto gid : my_deleted_vertices) { + vertices_acc.remove(gid); + } + } + + // EDGES + { + auto edges_acc = mem_storage->edges_.access(); + for (auto gid : my_deleted_edges) { + edges_acc.remove(gid); + } + } } mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp); @@ -1271,8 +1398,6 @@ void InMemoryStorage::CollectGarbage(std::unique_lock main_ // vertices that appear in an index also exist in main storage. std::list current_deleted_edges; std::list current_deleted_vertices; - deleted_vertices_->swap(current_deleted_vertices); - deleted_edges_->swap(current_deleted_edges); auto const need_full_scan_vertices = gc_full_scan_vertices_delete_.exchange(false); auto const need_full_scan_edges = gc_full_scan_edges_delete_.exchange(false); @@ -1922,12 +2047,12 @@ utils::BasicResult InMemoryStorage::Create void InMemoryStorage::FreeMemory(std::unique_lock main_guard) { CollectGarbage(std::move(main_guard)); + static_cast(indices_.label_index_.get())->RunGC(); + static_cast(indices_.label_property_index_.get())->RunGC(); + // SkipList is already threadsafe vertices_.run_gc(); edges_.run_gc(); - - static_cast(indices_.label_index_.get())->RunGC(); - static_cast(indices_.label_property_index_.get())->RunGC(); } uint64_t InMemoryStorage::CommitTimestamp(const std::optional desired_commit_timestamp) { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 3c50326b2..48d6f0cb7 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -51,6 +51,8 @@ class InMemoryStorage final : public Storage { friend std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker, const InMemoryStorage *storage); + friend class InMemoryLabelIndex; + friend class InMemoryLabelPropertyIndex; public: enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries }; @@ -185,6 +187,9 @@ class InMemoryStorage final : public Storage { /// @throw std::bad_alloc Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) override; + std::optional FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex, + VertexAccessor *to_vertex) override; + Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) override; Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; diff --git a/src/storage/v2/inmemory/unique_constraints.cpp b/src/storage/v2/inmemory/unique_constraints.cpp index 78929bc38..6a2945883 100644 --- a/src/storage/v2/inmemory/unique_constraints.cpp +++ b/src/storage/v2/inmemory/unique_constraints.cpp @@ -256,11 +256,12 @@ bool InMemoryUniqueConstraints::Entry::operator==(const std::vectorlabels) { - if (!constraints_by_label_.contains(label)) { + const auto &constraint = constraints_by_label_.find(label); + if (constraint == constraints_by_label_.end()) { continue; } - for (auto &[props, storage] : constraints_by_label_.at(label)) { + for (auto &[props, storage] : constraint->second) { auto values = vertex->properties.ExtractPropertyValues(props); if (!values) { @@ -273,6 +274,28 @@ void InMemoryUniqueConstraints::UpdateBeforeCommit(const Vertex *vertex, const T } } +void InMemoryUniqueConstraints::AbortEntries(std::span vertices, uint64_t exact_start_timestamp) { + for (const auto &vertex : vertices) { + for (const auto &label : vertex->labels) { + const auto &constraint = constraints_by_label_.find(label); + if (constraint == constraints_by_label_.end()) { + return; + } + + for (auto &[props, storage] : constraint->second) { + auto values = vertex->properties.ExtractPropertyValues(props); + + if (!values) { + continue; + } + + auto acc = storage->access(); + acc.remove(Entry{std::move(*values), vertex, exact_start_timestamp}); + } + } + } +} + utils::BasicResult InMemoryUniqueConstraints::CreateConstraint(LabelId label, const std::set &properties, utils::SkipList::Accessor vertices) { @@ -364,12 +387,14 @@ std::optional InMemoryUniqueConstraints::Validate(const Ver if (vertex.deleted) { return std::nullopt; } + for (const auto &label : vertex.labels) { - if (!constraints_by_label_.contains(label)) { + const auto &constraint = constraints_by_label_.find(label); + if (constraint == constraints_by_label_.end()) { continue; } - for (const auto &[properties, storage] : constraints_by_label_.at(label)) { + for (const auto &[properties, storage] : constraint->second) { auto value_array = vertex.properties.ExtractPropertyValues(properties); if (!value_array) { diff --git a/src/storage/v2/inmemory/unique_constraints.hpp b/src/storage/v2/inmemory/unique_constraints.hpp index 45472ca74..d1e590357 100644 --- a/src/storage/v2/inmemory/unique_constraints.hpp +++ b/src/storage/v2/inmemory/unique_constraints.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "storage/v2/constraints/unique_constraints.hpp" namespace memgraph::storage { @@ -54,6 +56,8 @@ class InMemoryUniqueConstraints : public UniqueConstraints { void UpdateBeforeCommit(const Vertex *vertex, std::unordered_set &added_labels, std::unordered_set &added_properties, const Transaction &tx); + void AbortEntries(std::span vertices, uint64_t exact_start_timestamp); + /// Creates unique constraint on the given `label` and a list of `properties`. /// Returns constraint violation if there are multiple vertices with the same /// label and property values. Returns `CreationStatus::ALREADY_EXISTS` if diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 4142da3ca..60752d101 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -195,6 +195,9 @@ class Storage { virtual Result CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0; + virtual std::optional FindEdge(Gid gid, View view, EdgeTypeId edge_type, VertexAccessor *from_vertex, + VertexAccessor *to_vertex) = 0; + virtual Result EdgeSetFrom(EdgeAccessor *edge, VertexAccessor *new_from) = 0; virtual Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0; diff --git a/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp b/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp index 44479d900..b53dda881 100644 --- a/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp +++ b/tests/e2e/concurrent_query_modules/test_query_modules/module_test.cpp @@ -16,11 +16,11 @@ #include #include -constexpr char *kProcedureHackerNews = "hacker_news"; -constexpr char *kArgumentHackerNewsVotes = "votes"; -constexpr char *kArgumentHackerNewsItemHourAge = "item_hour_age"; -constexpr char *kArgumentHackerNewsGravity = "gravity"; -constexpr char *kReturnHackerNewsScore = "score"; +constexpr char const *kProcedureHackerNews = "hacker_news"; +constexpr char const *kArgumentHackerNewsVotes = "votes"; +constexpr char const *kArgumentHackerNewsItemHourAge = "item_hour_age"; +constexpr char const *kArgumentHackerNewsGravity = "gravity"; +constexpr char const *kReturnHackerNewsScore = "score"; void HackerNews(mgp_list *args, mgp_graph *memgraph_graph, mgp_result *result, mgp_memory *memory) { mgp::MemoryDispatcherGuard guard(memory); diff --git a/tests/e2e/replication/CMakeLists.txt b/tests/e2e/replication/CMakeLists.txt index b054981c8..39f179a3d 100644 --- a/tests/e2e/replication/CMakeLists.txt +++ b/tests/e2e/replication/CMakeLists.txt @@ -13,6 +13,7 @@ copy_e2e_python_files(replication_show common.py) copy_e2e_python_files(replication_show conftest.py) copy_e2e_python_files(replication_show show.py) copy_e2e_python_files(replication_show show_while_creating_invalid_state.py) +copy_e2e_python_files(replication_show edge_delete.py) copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py) copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py) copy_e2e_python_files_from_parent_folder(replication_show ".." mg_utils.py) diff --git a/tests/e2e/replication/edge_delete.py b/tests/e2e/replication/edge_delete.py new file mode 100755 index 000000000..0e25faee1 --- /dev/null +++ b/tests/e2e/replication/edge_delete.py @@ -0,0 +1,56 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys +import time + +import pytest +from common import execute_and_fetch_all +from mg_utils import mg_sleep_and_assert + + +# BUGFIX: for issue https://github.com/memgraph/memgraph/issues/1515 +def test_replication_handles_delete_when_multiple_edges_of_same_type(connection): + # Goal is to check the timestamp are correctly computed from the information we get from replicas. + # 0/ Check original state of replicas. + # 1/ Add nodes and edges to MAIN, then delete the edges. + # 2/ Check state of replicas. + + # 0/ + conn = connection(7687, "main") + conn.autocommit = True + cursor = conn.cursor() + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "async", 0, 0, "ready"), + } + assert actual_data == expected_data + + # 1/ + execute_and_fetch_all(cursor, "CREATE (a)-[r:X]->(b) CREATE (a)-[:X]->(b) DELETE r;") + + # 2/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "async", 2, 0, "ready"), + } + + def retrieve_data(): + return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/replication/workloads.yaml b/tests/e2e/replication/workloads.yaml index 72269d652..fc239b221 100644 --- a/tests/e2e/replication/workloads.yaml +++ b/tests/e2e/replication/workloads.yaml @@ -8,6 +8,23 @@ template_validation_queries: &template_validation_queries validation_queries: - <<: *template_test_nodes_query - <<: *template_test_edges_query +template_simple_cluster: &template_simple_cluster + cluster: + replica_1: + args: [ "--bolt-port", "7688", "--log-level=TRACE" ] + log_file: "replication-e2e-replica1.log" + setup_queries: [ "SET REPLICATION ROLE TO REPLICA WITH PORT 10001;" ] + replica_2: + args: ["--bolt-port", "7689", "--log-level=TRACE"] + log_file: "replication-e2e-replica2.log" + setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"] + main: + args: ["--bolt-port", "7687", "--log-level=TRACE"] + log_file: "replication-e2e-main.log" + setup_queries: [ + "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_2 ASYNC TO '127.0.0.1:10002'", + ] template_cluster: &template_cluster cluster: replica_1: @@ -83,3 +100,8 @@ workloads: - name: "Show while creating invalid state" binary: "tests/e2e/pytest_runner.sh" args: ["replication/show_while_creating_invalid_state.py"] + + - name: "Delete edge replication" + binary: "tests/e2e/pytest_runner.sh" + args: ["replication/edge_delete.py"] + <<: *template_simple_cluster