diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 7746ddb91..3671d36b5 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -101,7 +101,7 @@ jobs: echo ${file} if [[ ${file} == *.py ]]; then python3 -m black --check --diff ${file} - python3 -m isort --check-only --diff ${file} + python3 -m isort --profile black --check-only --diff ${file} fi done @@ -275,11 +275,13 @@ jobs: - name: Run stress test (plain) run: | cd tests/stress + source ve3/bin/activate ./continuous_integration - name: Run stress test (SSL) run: | cd tests/stress + source ve3/bin/activate ./continuous_integration --use-ssl - name: Run durability test diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index df5572bc9..0c94cc552 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,6 +15,7 @@ repos: hooks: - id: isort name: isort (python) + args: ["--profile", "black"] - repo: https://github.com/pre-commit/mirrors-clang-format rev: v13.0.0 hooks: diff --git a/src/query/db_accessor.hpp b/src/query/db_accessor.hpp index 229df8f4b..7d9001593 100644 --- a/src/query/db_accessor.hpp +++ b/src/query/db_accessor.hpp @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -469,8 +470,8 @@ class DbAccessor final { std::vector deleted_edges; deleted_edges.reserve(edges.size()); - std::transform(edges.begin(), edges.end(), std::back_inserter(deleted_edges), - [](const auto &deleted_edge) { return EdgeAccessor{deleted_edge}; }); + std::ranges::transform(edges, std::back_inserter(deleted_edges), + [](const auto &deleted_edge) { return EdgeAccessor{deleted_edge}; }); return std::make_optional(vertex, std::move(deleted_edges)); } @@ -489,6 +490,53 @@ class DbAccessor final { return std::make_optional(*value); } + storage::Result, std::vector>>> DetachDelete( + std::vector nodes, std::vector edges, bool detach) { + using ReturnType = std::pair, std::vector>; + + std::vector nodes_impl; + std::vector edges_impl; + + nodes_impl.reserve(nodes.size()); + edges_impl.reserve(edges.size()); + + for (auto &vertex_accessor : nodes) { + accessor_->PrefetchOutEdges(vertex_accessor.impl_); + accessor_->PrefetchInEdges(vertex_accessor.impl_); + + nodes_impl.push_back(&vertex_accessor.impl_); + } + + for (auto &edge_accessor : edges) { + edges_impl.push_back(&edge_accessor.impl_); + } + + auto res = accessor_->DetachDelete(std::move(nodes_impl), std::move(edges_impl), detach); + if (res.HasError()) { + return res.GetError(); + } + + const auto &value = res.GetValue(); + if (!value) { + return std::optional{}; + } + + const auto &[val_vertices, val_edges] = *value; + + std::vector deleted_vertices; + std::vector deleted_edges; + + deleted_vertices.reserve(val_vertices.size()); + deleted_edges.reserve(val_edges.size()); + + std::ranges::transform(val_vertices, std::back_inserter(deleted_vertices), + [](const auto &deleted_vertex) { return VertexAccessor{deleted_vertex}; }); + std::ranges::transform(val_edges, std::back_inserter(deleted_edges), + [](const auto &deleted_edge) { return EdgeAccessor{deleted_edge}; }); + + return std::make_optional(std::move(deleted_vertices), std::move(deleted_edges)); + } + storage::PropertyId NameToProperty(const std::string_view name) { return accessor_->NameToProperty(name); } storage::LabelId NameToLabel(const std::string_view name) { return accessor_->NameToLabel(name); } diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 615ee4741..4700ed82d 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -2560,15 +2560,12 @@ std::vector Delete::ModifiedSymbols(const SymbolTable &table) const { re Delete::DeleteCursor::DeleteCursor(const Delete &self, utils::MemoryResource *mem) : self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {} -bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { - SCOPED_PROFILE_OP("Delete"); - - if (!input_cursor_->Pull(frame, context)) return false; - +void Delete::DeleteCursor::UpdateDeleteBuffer(Frame &frame, ExecutionContext &context) { // Delete should get the latest information, this way it is also possible // to delete newly added nodes and edges. ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, storage::View::NEW); + auto *pull_memory = context.evaluation_context.memory; // collect expressions results so edges can get deleted before vertices // this is necessary because an edge that gets deleted could block vertex @@ -2579,107 +2576,34 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { expression_results.emplace_back(expression->Accept(evaluator)); } - auto &dba = *context.db_accessor; - // delete edges first - for (TypedValue &expression_result : expression_results) { - AbortCheck(context); - if (expression_result.type() == TypedValue::Type::Edge) { - auto &ea = expression_result.ValueEdge(); -#ifdef MG_ENTERPRISE - if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker && - !(context.auth_checker->Has(ea, query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE) && - context.auth_checker->Has(ea.To(), storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::UPDATE) && - context.auth_checker->Has(ea.From(), storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::UPDATE))) { - throw QueryRuntimeException("Edge not deleted due to not having enough permission!"); - } -#endif - auto maybe_value = dba.RemoveEdge(&ea); - if (maybe_value.HasError()) { - switch (maybe_value.GetError()) { - case storage::Error::SERIALIZATION_ERROR: - throw TransactionSerializationException(); - case storage::Error::DELETED_OBJECT: - case storage::Error::VERTEX_HAS_EDGES: - case storage::Error::PROPERTIES_DISABLED: - case storage::Error::NONEXISTENT_OBJECT: - throw QueryRuntimeException("Unexpected error when deleting an edge."); - } - } - context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += 1; - if (context.trigger_context_collector && maybe_value.GetValue()) { - context.trigger_context_collector->RegisterDeletedObject(*maybe_value.GetValue()); - } - } - } - - // delete vertices for (TypedValue &expression_result : expression_results) { AbortCheck(context); switch (expression_result.type()) { case TypedValue::Type::Vertex: { - auto &va = expression_result.ValueVertex(); + auto va = expression_result.ValueVertex(); #ifdef MG_ENTERPRISE if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker && !context.auth_checker->Has(va, storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE)) { throw QueryRuntimeException("Vertex not deleted due to not having enough permission!"); } #endif - if (self_.detach_) { - auto res = dba.DetachRemoveVertex(&va); - if (res.HasError()) { - switch (res.GetError()) { - case storage::Error::SERIALIZATION_ERROR: - throw TransactionSerializationException(); - case storage::Error::DELETED_OBJECT: - case storage::Error::VERTEX_HAS_EDGES: - case storage::Error::PROPERTIES_DISABLED: - case storage::Error::NONEXISTENT_OBJECT: - throw QueryRuntimeException("Unexpected error when deleting a node."); - } - } - - context.execution_stats[ExecutionStats::Key::DELETED_NODES] += 1; - if (*res) { - context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += static_cast((*res)->second.size()); - } - std::invoke([&] { - if (!context.trigger_context_collector || !*res) { - return; - } - - context.trigger_context_collector->RegisterDeletedObject((*res)->first); - if (!context.trigger_context_collector->ShouldRegisterDeletedObject()) { - return; - } - for (const auto &edge : (*res)->second) { - context.trigger_context_collector->RegisterDeletedObject(edge); - } - }); - } else { - auto res = dba.RemoveVertex(&va); - if (res.HasError()) { - switch (res.GetError()) { - case storage::Error::SERIALIZATION_ERROR: - throw TransactionSerializationException(); - case storage::Error::VERTEX_HAS_EDGES: - throw RemoveAttachedVertexException(); - case storage::Error::DELETED_OBJECT: - case storage::Error::PROPERTIES_DISABLED: - case storage::Error::NONEXISTENT_OBJECT: - throw QueryRuntimeException("Unexpected error when deleting a node."); - } - } - context.execution_stats[ExecutionStats::Key::DELETED_NODES] += 1; - if (context.trigger_context_collector && res.GetValue()) { - context.trigger_context_collector->RegisterDeletedObject(*res.GetValue()); - } - } + buffer_.nodes.push_back(va); + break; + } + case TypedValue::Type::Edge: { + auto ea = expression_result.ValueEdge(); +#ifdef MG_ENTERPRISE + if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker && + !(context.auth_checker->Has(ea, query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE) && + context.auth_checker->Has(ea.To(), storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::UPDATE) && + context.auth_checker->Has(ea.From(), storage::View::NEW, + query::AuthQuery::FineGrainedPrivilege::UPDATE))) { + throw QueryRuntimeException("Edge not deleted due to not having enough permission!"); + } +#endif + buffer_.edges.push_back(ea); break; } - - // skip Edges (already deleted) and Nulls (can occur in optional - // match) - case TypedValue::Type::Edge: case TypedValue::Type::Null: break; // check we're not trying to delete anything except vertices and edges @@ -2687,13 +2611,64 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { throw QueryRuntimeException("Only edges and vertices can be deleted."); } } +} - return true; +bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) { + SCOPED_PROFILE_OP("Delete"); + + if (delete_executed_) { + return false; + } + + if (input_cursor_->Pull(frame, context)) { + UpdateDeleteBuffer(frame, context); + return true; + } + + auto &dba = *context.db_accessor; + auto res = dba.DetachDelete(std::move(buffer_.nodes), std::move(buffer_.edges), self_.detach_); + if (res.HasError()) { + switch (res.GetError()) { + case storage::Error::SERIALIZATION_ERROR: + throw TransactionSerializationException(); + case storage::Error::VERTEX_HAS_EDGES: + throw RemoveAttachedVertexException(); + case storage::Error::DELETED_OBJECT: + case storage::Error::PROPERTIES_DISABLED: + case storage::Error::NONEXISTENT_OBJECT: + throw QueryRuntimeException("Unexpected error when deleting a node."); + } + } + + if (*res) { + context.execution_stats[ExecutionStats::Key::DELETED_NODES] += static_cast((*res)->first.size()); + context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += static_cast((*res)->second.size()); + } + + // Update deleted objects for triggers + if (context.trigger_context_collector && *res) { + for (const auto &node : (*res)->first) { + context.trigger_context_collector->RegisterDeletedObject(node); + } + + if (context.trigger_context_collector->ShouldRegisterDeletedObject()) { + for (const auto &edge : (*res)->second) { + context.trigger_context_collector->RegisterDeletedObject(edge); + } + } + } + + delete_executed_ = true; + + return false; } void Delete::DeleteCursor::Shutdown() { input_cursor_->Shutdown(); } -void Delete::DeleteCursor::Reset() { input_cursor_->Reset(); } +void Delete::DeleteCursor::Reset() { + input_cursor_->Reset(); + delete_executed_ = false; +} SetProperty::SetProperty(const std::shared_ptr &input, storage::PropertyId property, PropertyLookup *lhs, Expression *rhs) @@ -4953,7 +4928,9 @@ class LoadCsvCursor : public Cursor { if (input_cursor_->Pull(frame, context)) { if (did_pull_) { throw QueryRuntimeException( - "LOAD CSV can be executed only once, please check if the cardinality of the operator before LOAD CSV is 1"); + "LOAD CSV can be executed only once, please check if the cardinality of the operator before LOAD CSV " + "is " + "1"); } did_pull_ = true; } diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index d243ba129..e38d6ddeb 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -1121,6 +1121,11 @@ class Produce : public memgraph::query::plan::LogicalOperator { }; }; +struct DeleteBuffer { + std::vector nodes{}; + std::vector edges{}; +}; + /// Operator for deleting vertices and edges. /// /// Has a flag for using DETACH DELETE when deleting vertices. @@ -1168,6 +1173,10 @@ class Delete : public memgraph::query::plan::LogicalOperator { private: const Delete &self_; const UniqueCursorPtr input_cursor_; + DeleteBuffer buffer_; + bool delete_executed_{false}; + + void UpdateDeleteBuffer(Frame &, ExecutionContext &); }; }; diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index fe123799f..fdd4f3e1b 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -1085,79 +1085,44 @@ std::optional DiskStorage::DiskAccessor::FindVertex(storage::Gid return std::nullopt; } -Result> DiskStorage::DiskAccessor::DeleteVertex(VertexAccessor *vertex) { - auto *vertex_ptr = vertex->vertex_; +Result, std::vector>>> +DiskStorage::DiskAccessor::DetachDelete(std::vector nodes, std::vector edges, + bool detach) { + using ReturnType = std::pair, std::vector>; - if (vertex_ptr->deleted) { - return std::optional{}; + auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach); + + if (maybe_result.HasError()) { + return maybe_result.GetError(); } - if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Error::VERTEX_HAS_EDGES; + auto value = maybe_result.GetValue(); - CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); - vertex_ptr->deleted = true; - vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex_ptr->gid), utils::SerializeVertex(*vertex_ptr)); - transaction_.manyDeltasCache.Invalidate(vertex_ptr); + if (!value) { + return std::make_optional(); + } + + auto &[deleted_vertices, deleted_edges] = *value; auto *disk_storage = static_cast(storage_); - disk_storage->vertex_count_.fetch_sub(1, std::memory_order_acq_rel); - - return std::make_optional(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, - config_, true); -} - -Result>>> -DiskStorage::DiskAccessor::DetachDeleteVertex(VertexAccessor *vertex) { - using ReturnType = std::pair>; - auto *vertex_ptr = vertex->vertex_; - - if (vertex_ptr->deleted) return std::optional{}; - - std::vector> in_edges{vertex_ptr->in_edges}; - std::vector> out_edges{vertex_ptr->out_edges}; - - std::vector deleted_edges; - for (const auto &item : in_edges) { - auto [edge_type, from_vertex, edge] = item; - EdgeAccessor e(edge, edge_type, from_vertex, vertex_ptr, &transaction_, &storage_->indices_, - &storage_->constraints_, config_); - auto ret = DeleteEdge(&e); - if (ret.HasError()) { - MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!"); - return ret.GetError(); - } - - if (ret.GetValue()) { - deleted_edges.push_back(*ret.GetValue()); - } - } - for (const auto &item : out_edges) { - auto [edge_type, to_vertex, edge] = item; - EdgeAccessor e(edge, edge_type, vertex_ptr, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, - config_); - auto ret = DeleteEdge(&e); - if (ret.HasError()) { - MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!"); - return ret.GetError(); - } - - if (ret.GetValue()) { - deleted_edges.push_back(*ret.GetValue()); - } + for (const auto &vertex : deleted_vertices) { + vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex.vertex_->gid), + utils::SerializeVertex(*vertex.vertex_)); + transaction_.manyDeltasCache.Invalidate(vertex.vertex_); + disk_storage->vertex_count_.fetch_sub(1, std::memory_order_acq_rel); } - MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); + for (const auto &edge : deleted_edges) { + const DiskEdgeKey disk_edge_key(edge.from_vertex_->gid, edge.to_vertex_->gid, edge.edge_type_, edge.edge_, + config_.properties_on_edges); + edges_to_delete_.emplace(disk_edge_key.GetSerializedKey()); - CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); - vertex_ptr->deleted = true; - vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex_ptr->gid), utils::SerializeVertex(*vertex_ptr)); - auto *disk_storage = static_cast(storage_); - disk_storage->vertex_count_.fetch_sub(1, std::memory_order_acq_rel); - transaction_.manyDeltasCache.Invalidate(vertex_ptr); + transaction_.manyDeltasCache.Invalidate(edge.from_vertex_, edge.edge_type_, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(edge.to_vertex_, edge.edge_type_, EdgeDirection::IN); + transaction_.RemoveModifiedEdge(edge.Gid()); + } - return std::make_optional( - VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true}, - std::move(deleted_edges)); + return maybe_result; } bool DiskStorage::DiskAccessor::PrefetchEdgeFilter(const std::string_view disk_edge_key_str, @@ -1300,73 +1265,6 @@ Result DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from, &storage_->constraints_, config_); } -Result> DiskStorage::DiskAccessor::DeleteEdge(EdgeAccessor *edge) { - MG_ASSERT(edge->transaction_ == &transaction_, - "EdgeAccessor must be from the same transaction as the storage " - "accessor when deleting an edge!"); - const auto edge_ref = edge->edge_; - const auto edge_type = edge->edge_type_; - - if (config_.properties_on_edges && edge_ref.ptr->deleted) return std::optional{}; - - auto *from_vertex = edge->from_vertex_; - auto *to_vertex = edge->to_vertex_; - - MG_ASSERT(!from_vertex->deleted, "Invalid database state!"); - - if (to_vertex != from_vertex) { - MG_ASSERT(!to_vertex->deleted, "Invalid database state!"); - } - - auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) { - const std::tuple link(edge_type, vertex, edge_ref); - auto it = std::find(edges->begin(), edges->end(), link); - if (config_.properties_on_edges) { - MG_ASSERT(it != edges->end(), "Invalid database state!"); - } else if (it == edges->end()) { - return false; - } - std::swap(*it, *edges->rbegin()); - edges->pop_back(); - return true; - }; - - const auto op1 = delete_edge_from_storage(to_vertex, &from_vertex->out_edges); - const auto op2 = delete_edge_from_storage(from_vertex, &to_vertex->in_edges); - - const DiskEdgeKey disk_edge_key(from_vertex->gid, to_vertex->gid, edge_type, edge_ref, config_.properties_on_edges); - edges_to_delete_.emplace(disk_edge_key.GetSerializedKey()); - - transaction_.RemoveModifiedEdge(edge->Gid()); - - if (config_.properties_on_edges) { - MG_ASSERT((op1 && op2), "Invalid database state!"); - } else { - MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!"); - if (!op1 && !op2) { - // The edge is already deleted. - return std::optional{}; - } - } - - if (config_.properties_on_edges) { - auto *edge_ptr = edge_ref.ptr; - CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag()); - edge_ptr->deleted = true; - } - - CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); - - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); - - storage_->edge_count_.fetch_sub(1, std::memory_order_acq_rel); - - return std::make_optional(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, - &storage_->indices_, &storage_->constraints_, config_, true); -} - Result DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) { MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage"); return Error::NONEXISTENT_OBJECT; diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index c8124e7a1..a3e3d4e62 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -172,11 +172,8 @@ class DiskStorage final : public Storage { throw utils::NotYetImplemented("SetIndexStats(stats) is not implemented for DiskStorage."); } - /// TODO: It is just marked as deleted but the memory isn't reclaimed because of the in-memory storage - Result> DeleteVertex(VertexAccessor *vertex) override; - - Result>>> DetachDeleteVertex( - VertexAccessor *vertex) override; + Result, std::vector>>> DetachDelete( + std::vector nodes, std::vector edges, bool detach) override; void PrefetchInEdges(const VertexAccessor &vertex_acc) override; @@ -188,8 +185,6 @@ class DiskStorage final : public Storage { Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; - Result> DeleteEdge(EdgeAccessor *edge) override; - bool LabelIndexExists(LabelId label) const override { auto *disk_storage = static_cast(storage_); return disk_storage->indices_.label_index_->IndexExists(label); diff --git a/src/storage/v2/edge_ref.hpp b/src/storage/v2/edge_ref.hpp index 967dd9439..3764879f4 100644 --- a/src/storage/v2/edge_ref.hpp +++ b/src/storage/v2/edge_ref.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -36,7 +36,6 @@ static_assert(std::is_standard_layout_v, "The Edge * must have a standar static_assert(std::is_standard_layout_v, "The EdgeRef must have a standard layout!"); inline bool operator==(const EdgeRef &a, const EdgeRef &b) noexcept { return a.gid == b.gid; } - +inline bool operator<(const EdgeRef &first, const EdgeRef &second) { return first.gid < second.gid; } inline bool operator!=(const EdgeRef &a, const EdgeRef &b) noexcept { return a.gid != b.gid; } - } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index d26b75b9b..7a4d579af 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -245,114 +245,51 @@ std::optional InMemoryStorage::InMemoryAccessor::FindVertex(Gid return VertexAccessor::Create(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_, view); } -Result> InMemoryStorage::InMemoryAccessor::DeleteVertex(VertexAccessor *vertex) { - MG_ASSERT(vertex->transaction_ == &transaction_, - "VertexAccessor must be from the same transaction as the storage " - "accessor when deleting a vertex!"); - auto *vertex_ptr = vertex->vertex_; +Result, std::vector>>> +InMemoryStorage::InMemoryAccessor::DetachDelete(std::vector nodes, std::vector edges, + bool detach) { + using ReturnType = std::pair, std::vector>; - auto guard = std::unique_lock{vertex_ptr->lock}; + auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach); - if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; - - if (vertex_ptr->deleted) { - return std::optional{}; + if (maybe_result.HasError()) { + return maybe_result.GetError(); } - if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Error::VERTEX_HAS_EDGES; + auto value = maybe_result.GetValue(); - CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); - vertex_ptr->deleted = true; - transaction_.manyDeltasCache.Invalidate(vertex_ptr); + if (!value) { + return std::make_optional(); + } + + auto &[deleted_vertices, deleted_edges] = *value; // Need to inform the next CollectGarbage call that there are some // non-transactional deletions that need to be collected - if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { - auto *mem_storage = static_cast(storage_); - mem_storage->gc_full_scan_vertices_delete_ = true; - } - - return std::make_optional(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, - config_, true); -} - -Result>>> -InMemoryStorage::InMemoryAccessor::DetachDeleteVertex(VertexAccessor *vertex) { - using ReturnType = std::pair>; - - MG_ASSERT(vertex->transaction_ == &transaction_, - "VertexAccessor must be from the same transaction as the storage " - "accessor when deleting a vertex!"); - auto *vertex_ptr = vertex->vertex_; - - std::vector> in_edges; - std::vector> out_edges; - - { - auto guard = std::unique_lock{vertex_ptr->lock}; - - if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; - - if (vertex_ptr->deleted) return std::optional{}; - - in_edges = vertex_ptr->in_edges; - out_edges = vertex_ptr->out_edges; - } - - std::vector deleted_edges; - for (const auto &item : in_edges) { - auto [edge_type, from_vertex, edge] = item; - EdgeAccessor e(edge, edge_type, from_vertex, vertex_ptr, &transaction_, &storage_->indices_, - &storage_->constraints_, config_); - auto ret = DeleteEdge(&e); - if (ret.HasError()) { - MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!"); - return ret.GetError(); + auto const inform_gc_vertex_deletion = utils::OnScopeExit{[this, &deleted_vertices = deleted_vertices]() { + if (!deleted_vertices.empty() && transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { + auto *mem_storage = static_cast(storage_); + mem_storage->gc_full_scan_vertices_delete_ = true; } + }}; - if (ret.GetValue()) { - deleted_edges.push_back(*ret.GetValue()); - } - } - for (const auto &item : out_edges) { - auto [edge_type, to_vertex, edge] = item; - EdgeAccessor e(edge, edge_type, vertex_ptr, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, - config_); - auto ret = DeleteEdge(&e); - if (ret.HasError()) { - MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!"); - return ret.GetError(); + auto const inform_gc_edge_deletion = utils::OnScopeExit{[this, &deleted_edges = deleted_edges]() { + if (!deleted_edges.empty() && transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { + auto *mem_storage = static_cast(storage_); + mem_storage->gc_full_scan_edges_delete_ = true; } + }}; - if (ret.GetValue()) { - deleted_edges.push_back(*ret.GetValue()); - } + for (auto const &vertex : deleted_vertices) { + transaction_.manyDeltasCache.Invalidate(vertex.vertex_); } - auto guard = std::unique_lock{vertex_ptr->lock}; - - // We need to check again for serialization errors because we unlocked the - // vertex. Some other transaction could have modified the vertex in the - // meantime if we didn't have any edges to delete. - - if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; - - MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); - - CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); - vertex_ptr->deleted = true; - transaction_.manyDeltasCache.Invalidate(vertex_ptr); - - // Need to inform the next CollectGarbage call that there are some - // non-transactional deletions that need to be collected - if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { - auto *mem_storage = static_cast(storage_); - mem_storage->gc_full_scan_vertices_delete_ = true; + for (const auto &edge : deleted_edges) { + transaction_.manyDeltasCache.Invalidate(edge.from_vertex_, edge.edge_type_, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(edge.to_vertex_, edge.edge_type_, EdgeDirection::IN); } - return std::make_optional( - VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true}, - std::move(deleted_edges)); + return maybe_result; } Result InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, @@ -700,100 +637,6 @@ Result InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor * &storage_->constraints_, config_); } -Result> InMemoryStorage::InMemoryAccessor::DeleteEdge(EdgeAccessor *edge) { - MG_ASSERT(edge->transaction_ == &transaction_, - "EdgeAccessor must be from the same transaction as the storage " - "accessor when deleting an edge!"); - auto &edge_ref = edge->edge_; - auto &edge_type = edge->edge_type_; - - std::unique_lock guard; - if (config_.properties_on_edges) { - auto *edge_ptr = edge_ref.ptr; - guard = std::unique_lock{edge_ptr->lock}; - - if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR; - - if (edge_ptr->deleted) return std::optional{}; - } - - auto *from_vertex = edge->from_vertex_; - auto *to_vertex = edge->to_vertex_; - - // Obtain the locks by `gid` order to avoid lock cycles. - auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock}; - auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock}; - if (from_vertex->gid < to_vertex->gid) { - guard_from.lock(); - guard_to.lock(); - } else if (from_vertex->gid > to_vertex->gid) { - guard_to.lock(); - guard_from.lock(); - } else { - // The vertices are the same vertex, only lock one. - guard_from.lock(); - } - - if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR; - MG_ASSERT(!from_vertex->deleted, "Invalid database state!"); - - if (to_vertex != from_vertex) { - if (!PrepareForWrite(&transaction_, to_vertex)) return Error::SERIALIZATION_ERROR; - MG_ASSERT(!to_vertex->deleted, "Invalid database state!"); - } - - auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) { - std::tuple link(edge_type, vertex, edge_ref); - auto it = std::find(edges->begin(), edges->end(), link); - if (config_.properties_on_edges) { - MG_ASSERT(it != edges->end(), "Invalid database state!"); - } else if (it == edges->end()) { - return false; - } - std::swap(*it, *edges->rbegin()); - edges->pop_back(); - return true; - }; - - auto op1 = delete_edge_from_storage(to_vertex, &from_vertex->out_edges); - auto op2 = delete_edge_from_storage(from_vertex, &to_vertex->in_edges); - - if (config_.properties_on_edges) { - MG_ASSERT((op1 && op2), "Invalid database state!"); - } else { - MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!"); - if (!op1 && !op2) { - // The edge is already deleted. - return std::optional{}; - } - } - - if (config_.properties_on_edges) { - auto *edge_ptr = edge_ref.ptr; - CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag()); - edge_ptr->deleted = true; - - // Need to inform the next CollectGarbage call that there are some - // non-transactional deletions that need to be collected - if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) { - auto *mem_storage = static_cast(storage_); - mem_storage->gc_full_scan_edges_delete_ = true; - } - } - - CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); - - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); - - // Decrement edge count. - storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel); - - return std::make_optional(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, - &storage_->indices_, &storage_->constraints_, config_, true); -} - // NOLINTNEXTLINE(google-default-arguments) utils::BasicResult InMemoryStorage::InMemoryAccessor::Commit( const std::optional desired_commit_timestamp) { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 32faff226..cf9e9a062 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -205,14 +205,8 @@ class InMemoryStorage final : public Storage { labels); } - /// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise - /// @throw std::bad_alloc - Result> DeleteVertex(VertexAccessor *vertex) override; - - /// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise - /// @throw std::bad_alloc - Result>>> DetachDeleteVertex( - VertexAccessor *vertex) override; + Result, std::vector>>> DetachDelete( + std::vector nodes, std::vector edges, bool detach) override; void PrefetchInEdges(const VertexAccessor &vertex_acc) override{}; @@ -225,10 +219,6 @@ class InMemoryStorage final : public Storage { Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override; - /// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise - /// @throw std::bad_alloc - Result> DeleteEdge(EdgeAccessor *edge) override; - bool LabelIndexExists(LabelId label) const override { return static_cast(storage_)->indices_.label_index_->IndexExists(label); } diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 1ac88cfb1..27748ebba 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include "absl/container/flat_hash_set.h" #include "spdlog/spdlog.h" #include "storage/v2/disk/name_id_mapper.hpp" @@ -127,4 +128,368 @@ void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; } +Result> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) { + auto res = DetachDelete({vertex}, {}, false); + + if (res.HasError()) { + return res.GetError(); + } + + const auto &value = res.GetValue(); + if (!value) { + return std::optional{}; + } + + const auto &[vertices, edges] = *value; + + MG_ASSERT(vertices.size() <= 1, "The number of deleted vertices is not less or equal to 1!"); + MG_ASSERT(edges.empty(), "Deleting a vertex without detaching should not have resulted in deleting any edges!"); + + if (vertices.empty()) { + return std::optional{}; + } + + return std::make_optional(vertices[0]); +} + +Result>>> Storage::Accessor::DetachDeleteVertex( + VertexAccessor *vertex) { + using ReturnType = std::pair>; + + auto res = DetachDelete({vertex}, {}, true); + + if (res.HasError()) { + return res.GetError(); + } + + auto &value = res.GetValue(); + if (!value) { + return std::optional{}; + } + + auto &[vertices, edges] = *value; + + MG_ASSERT(vertices.size() <= 1, "The number of detach deleted vertices is not less or equal to 1!"); + + return std::make_optional(vertices[0], std::move(edges)); +} + +Result> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { + auto res = DetachDelete({}, {edge}, false); + + if (res.HasError()) { + return res.GetError(); + } + + const auto &value = res.GetValue(); + if (!value) { + return std::optional{}; + } + + const auto &[vertices, edges] = *value; + + MG_ASSERT(vertices.empty(), "Deleting an edge should not have deleted a vertex!"); + MG_ASSERT(edges.size() <= 1, "Deleted edges need to be less or equal to 1!"); + + if (edges.empty()) { + return std::optional{}; + } + + return std::make_optional(edges[0]); +} + +Result, std::vector>>> +Storage::Accessor::DetachDelete(std::vector nodes, std::vector edges, bool detach) { + using ReturnType = std::pair, std::vector>; + // 1. Gather nodes which are not deleted yet in the system + auto maybe_nodes_to_delete = PrepareDeletableNodes(nodes); + if (maybe_nodes_to_delete.HasError()) { + return maybe_nodes_to_delete.GetError(); + } + const std::unordered_set nodes_to_delete = *maybe_nodes_to_delete.GetValue(); + + // 2. Gather edges and corresponding node on the other end of the edge for the deletable nodes + EdgeInfoForDeletion edge_deletion_info = PrepareDeletableEdges(nodes_to_delete, edges, detach); + + // Detach nodes which need to be deleted + std::unordered_set deleted_edge_ids; + std::vector deleted_edges; + if (detach) { + auto maybe_cleared_edges = ClearEdgesOnVertices(nodes_to_delete, deleted_edge_ids); + if (maybe_cleared_edges.HasError()) { + return maybe_cleared_edges.GetError(); + } + + deleted_edges = *maybe_cleared_edges.GetValue(); + } + + // Detach nodes on the other end, which don't need deletion, by passing once through their vectors + auto maybe_remaining_edges = DetachRemainingEdges(std::move(edge_deletion_info), deleted_edge_ids); + if (maybe_remaining_edges.HasError()) { + return maybe_remaining_edges.GetError(); + } + const std::vector remaining_edges = *maybe_remaining_edges.GetValue(); + deleted_edges.insert(deleted_edges.end(), remaining_edges.begin(), remaining_edges.end()); + + auto const maybe_deleted_vertices = TryDeleteVertices(nodes_to_delete); + if (maybe_deleted_vertices.HasError()) { + return maybe_deleted_vertices.GetError(); + } + + auto deleted_vertices = maybe_deleted_vertices.GetValue(); + + return std::make_optional(std::move(deleted_vertices), std::move(deleted_edges)); +} + +Result>> Storage::Accessor::PrepareDeletableNodes( + const std::vector &vertices) { + // Some of the vertices could be already deleted in the system so we need to check + std::unordered_set nodes_to_delete{}; + for (const auto &vertex : vertices) { + MG_ASSERT(vertex->transaction_ == &transaction_, + "VertexAccessor must be from the same transaction as the storage " + "accessor when deleting a vertex!"); + auto *vertex_ptr = vertex->vertex_; + + { + auto vertex_lock = std::unique_lock{vertex_ptr->lock}; + + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; + + if (vertex_ptr->deleted) { + continue; + } + } + + nodes_to_delete.insert(vertex_ptr); + } + + return std::make_optional>(nodes_to_delete); +} + +EdgeInfoForDeletion Storage::Accessor::PrepareDeletableEdges(const std::unordered_set &vertices, + const std::vector &edges, + bool detach) noexcept { + std::unordered_set partial_src_vertices; + std::unordered_set partial_dest_vertices; + std::unordered_set src_edge_ids; + std::unordered_set dest_edge_ids; + + auto try_adding_partial_delete_vertices = [this, &vertices](auto &partial_delete_vertices, auto &edge_ids, + auto &item) { + // For the nodes on the other end of the edge, they might not get deleted in the system but only cut out + // of the edge. Therefore, information is gathered in this step to account for every vertices' in and out + // edges and what must be deleted + const auto &[edge_type, opposing_vertex, edge] = item; + if (!vertices.contains(opposing_vertex)) { + partial_delete_vertices.insert(opposing_vertex); + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge.ptr->gid : edge.gid; + edge_ids.insert(edge_gid); + } + }; + + // add nodes which need to be detached on the other end of the edge + if (detach) { + for (auto *vertex_ptr : vertices) { + std::vector> in_edges; + std::vector> out_edges; + + { + auto vertex_lock = std::shared_lock{vertex_ptr->lock}; + in_edges = vertex_ptr->in_edges; + out_edges = vertex_ptr->out_edges; + } + + for (auto const &item : in_edges) { + try_adding_partial_delete_vertices(partial_src_vertices, src_edge_ids, item); + } + for (auto const &item : out_edges) { + try_adding_partial_delete_vertices(partial_dest_vertices, dest_edge_ids, item); + } + } + } + + // also add edges which we want to delete from the query + for (const auto &edge_accessor : edges) { + partial_src_vertices.insert(edge_accessor->from_vertex_); + partial_dest_vertices.insert(edge_accessor->to_vertex_); + + auto const edge_gid = edge_accessor->Gid(); + src_edge_ids.insert(edge_gid); + dest_edge_ids.insert(edge_gid); + } + + return EdgeInfoForDeletion{.partial_src_edge_ids = std::move(src_edge_ids), + .partial_dest_edge_ids = std::move(dest_edge_ids), + .partial_src_vertices = std::move(partial_src_vertices), + .partial_dest_vertices = std::move(partial_dest_vertices)}; +} + +Result>> Storage::Accessor::ClearEdgesOnVertices( + const std::unordered_set &vertices, std::unordered_set &deleted_edge_ids) { + // We want to gather all edges that we delete in this step so that we can proceed with + // further deletion + using ReturnType = std::vector; + std::vector deleted_edges{}; + + auto clear_edges = [this, &deleted_edges, &deleted_edge_ids]( + auto *vertex_ptr, auto *attached_edges_to_vertex, auto deletion_delta, + auto reverse_vertex_order) -> Result> { + auto vertex_lock = std::unique_lock{vertex_ptr->lock}; + while (!attached_edges_to_vertex->empty()) { + // get the information about the last edge in the vertex collection + auto const &[edge_type, opposing_vertex, edge_ref] = *attached_edges_to_vertex->rbegin(); + + std::unique_lock guard; + if (storage_->config_.items.properties_on_edges) { + auto edge_ptr = edge_ref.ptr; + guard = std::unique_lock{edge_ptr->lock}; + + if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR; + } + + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); + + attached_edges_to_vertex->pop_back(); + if (storage_->config_.items.properties_on_edges) { + auto *edge_ptr = edge_ref.ptr; + MarkEdgeAsDeleted(edge_ptr); + } + + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); + bool const edge_cleared_from_both_directions = !was_inserted; + if (edge_cleared_from_both_directions) { + auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; + auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; + deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, + &storage_->constraints_, storage_->config_.items, true); + } + + CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + } + + return std::make_optional(); + }; + + // delete the in and out edges from the nodes we want to delete + // no need to lock here, we are just passing the pointer of the in and out edges collections + for (auto *vertex_ptr : vertices) { + auto maybe_error = clear_edges(vertex_ptr, &vertex_ptr->in_edges, Delta::AddInEdgeTag(), false); + if (maybe_error.HasError()) { + return maybe_error; + } + + maybe_error = clear_edges(vertex_ptr, &vertex_ptr->out_edges, Delta::AddOutEdgeTag(), true); + if (maybe_error.HasError()) { + return maybe_error; + } + } + + return std::make_optional(deleted_edges); +} + +Result>> Storage::Accessor::DetachRemainingEdges( + EdgeInfoForDeletion info, std::unordered_set &partially_detached_edge_ids) { + using ReturnType = std::vector; + std::vector deleted_edges{}; + + auto clear_edges_on_other_direction = [this, &deleted_edges, &partially_detached_edge_ids]( + auto *vertex_ptr, auto *edges_attached_to_vertex, auto &set_for_erasure, + auto deletion_delta, + auto reverse_vertex_order) -> Result> { + auto vertex_lock = std::unique_lock{vertex_ptr->lock}; + + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; + MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); + + auto mid = std::partition( + edges_attached_to_vertex->begin(), edges_attached_to_vertex->end(), [this, &set_for_erasure](auto &edge) { + auto const &[edge_type, opposing_vertex, edge_ref] = edge; + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + return !set_for_erasure.contains(edge_gid); + }); + + for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { + auto const &[edge_type, opposing_vertex, edge_ref] = *it; + std::unique_lock guard; + if (storage_->config_.items.properties_on_edges) { + auto edge_ptr = edge_ref.ptr; + guard = std::unique_lock{edge_ptr->lock}; + // this can happen only if we marked edges for deletion with no nodes, + // so the method detaching nodes will not do anything + MarkEdgeAsDeleted(edge_ptr); + } + + CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid); + bool const edge_cleared_from_both_directions = !was_inserted; + if (edge_cleared_from_both_directions) { + auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; + auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; + deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, + &storage_->constraints_, storage_->config_.items, true); + } + } + + edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end()); + + return std::make_optional(); + }; + + // remove edges from vertex collections which we aggregated for just detaching + for (auto *vertex_ptr : info.partial_src_vertices) { + auto maybe_error = clear_edges_on_other_direction(vertex_ptr, &vertex_ptr->out_edges, info.partial_src_edge_ids, + Delta::AddOutEdgeTag(), false); + if (maybe_error.HasError()) { + return maybe_error; + } + } + for (auto *vertex_ptr : info.partial_dest_vertices) { + auto maybe_error = clear_edges_on_other_direction(vertex_ptr, &vertex_ptr->in_edges, info.partial_dest_edge_ids, + Delta::AddInEdgeTag(), true); + if (maybe_error.HasError()) { + return maybe_error; + } + } + + return std::make_optional(deleted_edges); +} + +Result> Storage::Accessor::TryDeleteVertices(const std::unordered_set &vertices) { + std::vector deleted_vertices; + deleted_vertices.reserve(vertices.size()); + + for (auto *vertex_ptr : vertices) { + auto vertex_lock = std::unique_lock{vertex_ptr->lock}; + + if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; + + MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); + + if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) { + return Error::VERTEX_HAS_EDGES; + } + + CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); + vertex_ptr->deleted = true; + + deleted_vertices.emplace_back(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, + storage_->config_.items, true); + } + + return deleted_vertices; +} + +void Storage::Accessor::MarkEdgeAsDeleted(Edge *edge) { + if (!edge->deleted) { + CreateAndLinkDelta(&transaction_, edge, Delta::RecreateObjectTag()); + edge->deleted = true; + storage_->edge_count_.fetch_sub(1, std::memory_order_acq_rel); + } +} + } // namespace memgraph::storage diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 399325966..8f97ebb58 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -68,6 +68,13 @@ struct StorageInfo { uint64_t disk_usage; }; +struct EdgeInfoForDeletion { + std::unordered_set partial_src_edge_ids{}; + std::unordered_set partial_dest_edge_ids{}; + std::unordered_set partial_src_vertices{}; + std::unordered_set partial_dest_vertices{}; +}; + class Storage { friend class ReplicationServer; friend class ReplicationClient; @@ -111,6 +118,14 @@ class Storage { const std::optional> &lower_bound, const std::optional> &upper_bound, View view) = 0; + virtual Result> DeleteVertex(VertexAccessor *vertex); + + virtual Result>>> DetachDeleteVertex( + VertexAccessor *vertex); + + virtual Result, std::vector>>> DetachDelete( + std::vector nodes, std::vector edges, bool detach); + virtual uint64_t ApproximateVertexCount() const = 0; virtual uint64_t ApproximateVertexCount(LabelId label) const = 0; @@ -142,11 +157,6 @@ class Storage { virtual std::vector DeleteLabelIndexStats(std::span labels) = 0; - virtual Result> DeleteVertex(VertexAccessor *vertex) = 0; - - virtual Result>>> DetachDeleteVertex( - VertexAccessor *vertex) = 0; - virtual void PrefetchInEdges(const VertexAccessor &vertex_acc) = 0; virtual void PrefetchOutEdges(const VertexAccessor &vertex_acc) = 0; @@ -157,7 +167,7 @@ class Storage { virtual Result EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0; - virtual Result> DeleteEdge(EdgeAccessor *edge) = 0; + virtual Result> DeleteEdge(EdgeAccessor *edge); virtual bool LabelIndexExists(LabelId label) const = 0; @@ -202,6 +212,18 @@ class Storage { std::optional commit_timestamp_; bool is_transaction_active_; + // Detach delete private methods + Result>> PrepareDeletableNodes( + const std::vector &vertices); + EdgeInfoForDeletion PrepareDeletableEdges(const std::unordered_set &vertices, + const std::vector &edges, bool detach) noexcept; + Result>> ClearEdgesOnVertices(const std::unordered_set &vertices, + std::unordered_set &deleted_edge_ids); + Result>> DetachRemainingEdges( + EdgeInfoForDeletion info, std::unordered_set &partially_detached_edge_ids); + Result> TryDeleteVertices(const std::unordered_set &vertices); + void MarkEdgeAsDeleted(Edge *edge); + private: StorageMode creation_storage_mode_; }; diff --git a/tests/gql_behave/tests/memgraph_V1/features/delete.feature b/tests/gql_behave/tests/memgraph_V1/features/delete.feature new file mode 100644 index 000000000..027e40b94 --- /dev/null +++ b/tests/gql_behave/tests/memgraph_V1/features/delete.feature @@ -0,0 +1,211 @@ +Feature: Delete + + Scenario: Delete all from database and yield number of deleted items + Given an empty graph + And having executed + """ + CREATE (n), (m), (o) + """ + When executing query: + """ + MATCH (n) DETACH DELETE n RETURN COUNT(*) AS cnt + """ + Then the result should be: + | cnt | + | 3 | + + Scenario: Delete all from database and match nothing after + Given an empty graph + And having executed + """ + CREATE (n), (m) + """ + When executing query: + """ + MATCH (n) DETACH DELETE n WITH n MATCH (m) RETURN COUNT(*) AS cnt + """ + Then the result should be: + | cnt | + | 0 | + + Scenario: Delete relationship in the pattern + Given an empty graph + And having executed + """ + CREATE (n)-[:REL]->(m), (k)-[:REL]->(z) + """ + When executing query: + """ + MATCH (n)-[r:REL]->(m) DELETE r RETURN COUNT(*) AS cnt + """ + Then the result should be: + | cnt | + | 2 | + + Scenario: Delete node and return property throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n RETURN n.prop AS prop + """ + Then an error should be raised + + Scenario: Delete node, set property throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n.prop = 2 + """ + Then the result should be empty + + Scenario: Delete node, set property and return throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n.prop = 2 RETURN n + """ + Then an error should be raised + + Scenario: Delete node, remove property throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n REMOVE n.prop + """ + Then the result should be empty + + Scenario: Delete node, remove property and return throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n REMOVE n.prop RETURN n + """ + Then an error should be raised + + Scenario: Delete node, set label throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n:Label + """ + Then the result should be empty + + Scenario: Delete node, set label and return throws an error + Given an empty graph + And having executed + """ + CREATE (n {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n:Label RETURN n + """ + Then an error should be raised + + Scenario: Delete node, remove label throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n REMOVE n:Label + """ + Then the result should be empty + + Scenario: Delete node, remove label and return throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n REMOVE n:Label RETURN n + """ + Then an error should be raised + + Scenario: Delete node, set update properties and return throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n += {prop: 2} RETURN n + """ + Then an error should be raised + + Scenario: Delete node, set properties throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n += {prop: 2} + """ + Then the result should be empty + + Scenario: Delete node, set replace properties and return throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n = {prop: 2} RETURN n + """ + Then an error should be raised + + Scenario: Delete node, set replace properties throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n = {prop: 2} + """ + Then the result should be empty + + Scenario: Delete node, set property and return it with aggregation throws an error + Given an empty graph + And having executed + """ + CREATE (n:Label {prop: 1}); + """ + When executing query: + """ + MATCH (n) DETACH DELETE n SET n.prop = 1 WITH n RETURN n + """ + Then an error should be raised diff --git a/tests/stress/bipartite.py b/tests/stress/bipartite.py index 3932906c4..610b65b15 100644 --- a/tests/stress/bipartite.py +++ b/tests/stress/bipartite.py @@ -12,44 +12,51 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -''' +""" Large bipartite graph stress test. -''' +""" +import atexit import logging import multiprocessing import time -import atexit -from common import connection_argument_parser, assert_equal, \ - OutputData, execute_till_success, \ - batch, render, SessionCache +from common import ( + OutputData, + SessionCache, + assert_equal, + batch, + connection_argument_parser, + execute_till_success, + render, +) def parse_args(): - ''' + """ Parses user arguments :return: parsed arguments - ''' + """ parser = connection_argument_parser() - parser.add_argument('--worker-count', type=int, - default=multiprocessing.cpu_count(), - help='Number of concurrent workers.') - parser.add_argument("--logging", default="INFO", - choices=["INFO", "DEBUG", "WARNING", "ERROR"], - help="Logging level") - parser.add_argument('--u-count', type=int, default=100, - help='Size of U set in the bipartite graph.') - parser.add_argument('--v-count', type=int, default=100, - help='Size of V set in the bipartite graph.') - parser.add_argument('--vertex-batch-size', type=int, default=100, - help="Create vertices in batches of this size.") - parser.add_argument('--edge-batching', action='store_true', - help='Create edges in batches.') - parser.add_argument('--edge-batch-size', type=int, default=100, - help='Number of edges in a batch when edges ' - 'are created in batches.') + parser.add_argument( + "--worker-count", type=int, default=multiprocessing.cpu_count(), help="Number of concurrent workers." + ) + parser.add_argument( + "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level" + ) + parser.add_argument("--u-count", type=int, default=100, help="Size of U set in the bipartite graph.") + parser.add_argument("--v-count", type=int, default=100, help="Size of V set in the bipartite graph.") + parser.add_argument("--vertex-batch-size", type=int, default=100, help="Create vertices in batches of this size.") + parser.add_argument("--edge-batching", action="store_true", help="Create edges in batches.") + parser.add_argument( + "--edge-batch-size", + type=int, + default=100, + help="Number of edges in a batch when edges " "are created in batches.", + ) + parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.") + parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.") return parser.parse_args() @@ -62,18 +69,18 @@ atexit.register(SessionCache.cleanup) def create_u_v_edges(u): - ''' + """ Creates nodes and checks that all nodes were created. create edges from one vertex in U set to all vertex of V set :param worker_id: worker id :return: tuple (worker_id, create execution time, time unit) - ''' + """ start_time = time.time() session = SessionCache.argument_session(args) no_failures = 0 - match_u = 'MATCH (u:U {id: %d})' % u + match_u = "MATCH (u:U {id: %d})" % u if args.edge_batching: # TODO: try to randomize execution, the execution time should # be smaller, add randomize flag @@ -83,143 +90,120 @@ def create_u_v_edges(u): query = match_u + "".join(match_v) + "".join(create_u) no_failures += execute_till_success(session, query)[1] else: - no_failures += execute_till_success( - session, match_u + ' MATCH (v:V) CREATE (u)-[:R]->(v)')[1] + no_failures += execute_till_success(session, match_u + " MATCH (v:V) CREATE (u)-[:R]->(v)")[1] end_time = time.time() return u, end_time - start_time, "s", no_failures def traverse_from_u_worker(u): - ''' + """ Traverses edges starting from an element of U set. Traversed labels are: :U -> :V -> :U. - ''' + """ session = SessionCache.argument_session(args) start_time = time.time() assert_equal( args.u_count * args.v_count - args.v_count, # cypher morphism - session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) " - "RETURN count(v) AS cnt" % u).data()[0]['cnt'], - "Number of traversed edges started " - "from U(id:%s) is wrong!. " % u + - "Expected: %s Actual: %s") + session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) " "RETURN count(v) AS cnt" % u).data()[0]["cnt"], + "Number of traversed edges started " "from U(id:%s) is wrong!. " % u + "Expected: %s Actual: %s", + ) end_time = time.time() - return u, end_time - start_time, 's' + return u, end_time - start_time, "s" def traverse_from_v_worker(v): - ''' + """ Traverses edges starting from an element of V set. Traversed labels are: :V -> :U -> :V. - ''' + """ session = SessionCache.argument_session(args) start_time = time.time() assert_equal( - args.u_count * args.v_count - args.u_count, # cypher morphism - session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) " - "RETURN count(u) AS cnt" % v).data()[0]['cnt'], - "Number of traversed edges started " - "from V(id:%s) is wrong!. " % v + - "Expected: %s Actual: %s") + args.u_count * args.v_count - args.u_count, # cypher morphism + session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) " "RETURN count(u) AS cnt" % v).data()[0]["cnt"], + "Number of traversed edges started " "from V(id:%s) is wrong!. " % v + "Expected: %s Actual: %s", + ) end_time = time.time() - return v, end_time - start_time, 's' + return v, end_time - start_time, "s" def execution_handler(): - ''' + """ Initializes client processes, database and starts the execution. - ''' + """ # instance cleanup session = SessionCache.argument_session(args) start_time = time.time() # clean existing database - session.run('MATCH (n) DETACH DELETE n').consume() + session.run("MATCH (n) DETACH DELETE n").consume() cleanup_end_time = time.time() - output_data.add_measurement("cleanup_time", - cleanup_end_time - start_time) + output_data.add_measurement("cleanup_time", cleanup_end_time - start_time) log.info("Database is clean.") # create indices - session.run('CREATE INDEX ON :U').consume() - session.run('CREATE INDEX ON :V').consume() + session.run("CREATE INDEX ON :U").consume() + session.run("CREATE INDEX ON :V").consume() # create U vertices - for b in batch(render('CREATE (:U {{id: {}}})', range(args.u_count)), - args.vertex_batch_size): + for b in batch(render("CREATE (:U {{id: {}}})", range(args.u_count)), args.vertex_batch_size): session.run(" ".join(b)).consume() # create V vertices - for b in batch(render('CREATE (:V {{id: {}}})', range(args.v_count)), - args.vertex_batch_size): + for b in batch(render("CREATE (:V {{id: {}}})", range(args.v_count)), args.vertex_batch_size): session.run(" ".join(b)).consume() vertices_create_end_time = time.time() - output_data.add_measurement( - 'vertices_create_time', - vertices_create_end_time - cleanup_end_time) + output_data.add_measurement("vertices_create_time", vertices_create_end_time - cleanup_end_time) log.info("All nodes created.") # concurrent create execution & tests with multiprocessing.Pool(args.worker_count) as p: create_edges_start_time = time.time() - for worker_id, create_time, time_unit, no_failures in \ - p.map(create_u_v_edges, [i for i in range(args.u_count)]): - log.info('Worker ID: %s; Create time: %s%s Failures: %s' % - (worker_id, create_time, time_unit, no_failures)) + for worker_id, create_time, time_unit, no_failures in p.map(create_u_v_edges, [i for i in range(args.u_count)]): + log.info("Worker ID: %s; Create time: %s%s Failures: %s" % (worker_id, create_time, time_unit, no_failures)) create_edges_end_time = time.time() - output_data.add_measurement( - 'edges_create_time', - create_edges_end_time - create_edges_start_time) + output_data.add_measurement("edges_create_time", create_edges_end_time - create_edges_start_time) # check total number of edges assert_equal( args.v_count * args.u_count, - session.run( - 'MATCH ()-[r]->() ' - 'RETURN count(r) AS cnt').data()[0]['cnt'], - "Total number of edges isn't correct! Expected: %s Actual: %s") + session.run("MATCH ()-[r]->() " "RETURN count(r) AS cnt").data()[0]["cnt"], + "Total number of edges isn't correct! Expected: %s Actual: %s", + ) # check traversals starting from all elements of U traverse_from_u_start_time = time.time() - for u, traverse_u_time, time_unit in \ - p.map(traverse_from_u_worker, - [i for i in range(args.u_count)]): + for u, traverse_u_time, time_unit in p.map(traverse_from_u_worker, [i for i in range(args.u_count)]): log.info("U {id: %s} %s%s" % (u, traverse_u_time, time_unit)) traverse_from_u_end_time = time.time() - output_data.add_measurement( - 'traverse_from_u_time', - traverse_from_u_end_time - traverse_from_u_start_time) + output_data.add_measurement("traverse_from_u_time", traverse_from_u_end_time - traverse_from_u_start_time) # check traversals starting from all elements of V traverse_from_v_start_time = time.time() - for v, traverse_v_time, time_unit in \ - p.map(traverse_from_v_worker, - [i for i in range(args.v_count)]): + for v, traverse_v_time, time_unit in p.map(traverse_from_v_worker, [i for i in range(args.v_count)]): log.info("V {id: %s} %s%s" % (v, traverse_v_time, time_unit)) traverse_from_v_end_time = time.time() - output_data.add_measurement( - 'traverse_from_v_time', - traverse_from_v_end_time - traverse_from_v_start_time) + output_data.add_measurement("traverse_from_v_time", traverse_from_v_end_time - traverse_from_v_start_time) # check total number of vertices assert_equal( args.v_count + args.u_count, - session.run('MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'], - "Total number of vertices isn't correct! Expected: %s Actual: %s") + session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"], + "Total number of vertices isn't correct! Expected: %s Actual: %s", + ) # check total number of edges assert_equal( args.v_count * args.u_count, - session.run( - 'MATCH ()-[r]->() RETURN count(r) AS cnt').data()[0]['cnt'], - "Total number of edges isn't correct! Expected: %s Actual: %s") + session.run("MATCH ()-[r]->() RETURN count(r) AS cnt").data()[0]["cnt"], + "Total number of edges isn't correct! Expected: %s Actual: %s", + ) end_time = time.time() - output_data.add_measurement("total_execution_time", - end_time - start_time) + output_data.add_measurement("total_execution_time", end_time - start_time) -if __name__ == '__main__': +if __name__ == "__main__": logging.basicConfig(level=args.logging) if args.logging != "DEBUG": logging.getLogger("neo4j").setLevel(logging.WARNING) diff --git a/tests/stress/common.py b/tests/stress/common.py index 3648d0e0a..0a3f3e92b 100644 --- a/tests/stress/common.py +++ b/tests/stress/common.py @@ -11,26 +11,27 @@ # -*- coding: utf-8 -*- -''' +""" Common methods for writing graph database integration tests in python. Only Bolt communication protocol is supported. -''' +""" import contextlib import os -from threading import Thread -from time import sleep - +import time from argparse import ArgumentParser -from neo4j import GraphDatabase, TRUST_ALL_CERTIFICATES +from threading import Thread + +from gqlalchemy import Memgraph +from neo4j import TRUST_ALL_CERTIFICATES, GraphDatabase class OutputData: - ''' + """ Encapsulates results and info about the tests. - ''' + """ def __init__(self): # data in time format (name, time, unit) @@ -39,32 +40,32 @@ class OutputData: self._statuses = [] def add_measurement(self, name, time, unit="s"): - ''' + """ Stores measurement. :param name: str, name of measurement :param time: float, time value :param unit: str, time unit - ''' + """ self._measurements.append((name, time, unit)) def add_status(self, name, status): - ''' + """ Stores status data point. :param name: str, name of data point :param status: printable value - ''' + """ self._statuses.append((name, status)) def dump(self, print_f=print): - ''' + """ Dumps output using the given ouput function. Args: print_f - the function that consumes ouptput. Defaults to the 'print' function. - ''' + """ print_f("Output data:") for name, status in self._statuses: print_f(" %s: %s" % (name, status)) @@ -73,7 +74,7 @@ class OutputData: def execute_till_success(session, query, max_retries=1000): - ''' + """ Executes a query within Bolt session until the query is successfully executed against the database. @@ -86,7 +87,7 @@ def execute_till_success(session, query, max_retries=1000): :param query: query to execute :return: tuple (results_data_list, number_of_failures, result_summary) - ''' + """ no_failures = 0 while True: try: @@ -97,12 +98,24 @@ def execute_till_success(session, query, max_retries=1000): except Exception: no_failures += 1 if no_failures >= max_retries: - raise Exception("Query '%s' failed %d times, aborting" % - (query, max_retries)) + raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries)) + + +def execute_till_success_gqlalchemy(memgraph: Memgraph, query: str, max_retries=1000): + """Same method as execute_till_success, but for gqlalchemy.""" + no_failures = 0 + while True: + try: + result = memgraph.execute(query) + return result, no_failures + except Exception: + no_failures += 1 + if no_failures >= max_retries: + raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries)) def batch(input, batch_size): - """ Batches the given input (must be iterable). + """Batches the given input (must be iterable). Supports input generators. Returns a generator. All is lazy. The last batch can contain less elements then `batch_size`, but is for sure more then zero. @@ -134,7 +147,7 @@ def render(template, iterable_arguments): def assert_equal(expected, actual, message): - ''' + """ Compares expected and actual values. If values are not the same terminate the execution. @@ -142,45 +155,41 @@ def assert_equal(expected, actual, message): :param actual: actual value :param message: str, message in case that the values are not equal, must contain two placeholders (%s) to print the values. - ''' + """ assert expected == actual, message % (expected, actual) def connection_argument_parser(): - ''' + """ Parses arguments related to establishing database connection like host, port, username, etc. :return: An instance of ArgumentParser - ''' + """ parser = ArgumentParser(description=__doc__) - parser.add_argument('--endpoint', type=str, default='127.0.0.1:7687', - help='DBMS instance endpoint. ' - 'Bolt protocol is the only option.') - parser.add_argument('--username', type=str, default='neo4j', - help='DBMS instance username.') - parser.add_argument('--password', type=int, default='1234', - help='DBMS instance password.') - parser.add_argument('--use-ssl', action='store_true', - help="Is SSL enabled?") + parser.add_argument( + "--endpoint", + type=str, + default="127.0.0.1:7687", + help="DBMS instance endpoint. " "Bolt protocol is the only option.", + ) + parser.add_argument("--username", type=str, default="neo4j", help="DBMS instance username.") + parser.add_argument("--password", type=str, default="1234", help="DBMS instance password.") + parser.add_argument("--use-ssl", action="store_true", help="Is SSL enabled?") return parser @contextlib.contextmanager def bolt_session(url, auth, ssl=False): - ''' + """ with wrapper around Bolt session. :param url: str, e.g. "bolt://127.0.0.1:7687" :param auth: auth method, goes directly to the Bolt driver constructor :param ssl: bool, is ssl enabled - ''' - driver = GraphDatabase.driver( - url, - auth=auth, - encrypted=ssl, - trust=TRUST_ALL_CERTIFICATES) + """ + driver = GraphDatabase.driver(url, auth=auth, encrypted=ssl, trust=TRUST_ALL_CERTIFICATES) session = driver.session() try: yield session @@ -192,19 +201,37 @@ def bolt_session(url, auth, ssl=False): # If you are using session with multiprocessing take a look at SesssionCache # in bipartite for an idea how to reuse sessions. def argument_session(args): - ''' + """ :return: Bolt session context manager based on program arguments - ''' - return bolt_session('bolt://' + args.endpoint, - (args.username, str(args.password)), - args.use_ssl) + """ + return bolt_session("bolt://" + args.endpoint, (args.username, args.password), args.use_ssl) def argument_driver(args): return GraphDatabase.driver( - 'bolt://' + args.endpoint, - auth=(args.username, str(args.password)), - encrypted=args.use_ssl, trust=TRUST_ALL_CERTIFICATES) + "bolt://" + args.endpoint, + auth=(args.username, args.password), + encrypted=args.use_ssl, + trust=TRUST_ALL_CERTIFICATES, + ) + + +def get_memgraph(args) -> Memgraph: + host_port = args.endpoint.split(":") + + connection_params = { + "host": host_port[0], + "port": int(host_port[1]), + "username": args.username, + "password": args.password, + "encrypted": False, + } + + if args.use_ssl: + connection_params["encrypted"] = True + + return Memgraph(**connection_params) + # This class is used to create and cache sessions. Session is cached by args # used to create it and process' pid in which it was created. This makes it @@ -219,8 +246,8 @@ class SessionCache: key = tuple(vars(args).items()) + (os.getpid(),) if key in SessionCache.cache: return SessionCache.cache[key][1] - driver = argument_driver(args) # | - session = driver.session() # V + driver = argument_driver(args) # | + session = driver.session() # V SessionCache.cache[key] = (driver, session) return session @@ -241,9 +268,10 @@ def periodically_execute(callable, args, interval, daemon=True): interval - time (in seconds) between two calls deamon - if the execution thread should be a daemon """ + def periodic_call(): while True: - sleep(interval) + time.sleep(interval) callable() Thread(target=periodic_call, args=args, daemon=daemon).start() diff --git a/tests/stress/continuous_integration b/tests/stress/continuous_integration index 87d5bcaf6..8744eecae 100755 --- a/tests/stress/continuous_integration +++ b/tests/stress/continuous_integration @@ -1,81 +1,16 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- - import argparse -import atexit -import json +import copy import multiprocessing import os import subprocess -import sys import time +from argparse import Namespace as Args +from subprocess import Popen +from typing import Dict, List -# dataset calibrated for running on Apollo (total 4min) -# bipartite.py runs for approx. 30s -# create_match.py runs for approx. 30s -# long_running runs for 1min -# long_running runs for 2min -SMALL_DATASET = [ - { - "test": "bipartite.py", - "options": ["--u-count", "100", "--v-count", "100"], - "timeout": 5, - }, - { - "test": "create_match.py", - "options": ["--vertex-count", "40000", "--create-pack-size", "100"], - "timeout": 5, - }, - { - "test": "parser.cpp", - "options": ["--per-worker-query-count", "1000"], - "timeout": 5, - }, - { - "test": "long_running.cpp", - "options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"], - "timeout": 5, - }, - { - "test": "long_running.cpp", - "options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"], - "timeout": 5, - }, -] - -# dataset calibrated for running on daily stress instance (total 9h) -# bipartite.py and create_match.py run for approx. 15min -# long_running runs for 5min x 6 times = 30min -# long_running runs for 8h -LARGE_DATASET = ( - [ - { - "test": "bipartite.py", - "options": ["--u-count", "300", "--v-count", "300"], - "timeout": 30, - }, - { - "test": "create_match.py", - "options": ["--vertex-count", "500000", "--create-pack-size", "500"], - "timeout": 30, - }, - ] - + [ - { - "test": "long_running.cpp", - "options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"], - "timeout": 16, - }, - ] - * 6 - + [ - { - "test": "long_running.cpp", - "options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"], - "timeout": 500, - }, - ] -) +from test_config import LARGE_DATASET, SMALL_DATASET, DatabaseMode, DatasetConstants # paths SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -87,64 +22,12 @@ CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem") # long running stats file STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats") -SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) -LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE]) # get number of threads -if "THREADS" in os.environ: - THREADS = os.environ["THREADS"] -else: - THREADS = multiprocessing.cpu_count() +THREADS = os.environ["THREADS"] if "THREADS" in os.environ else multiprocessing.cpu_count() -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - -# run test helper function -def run_test(args, test, options, timeout): - print("Running test '{}'".format(test)) - - # find binary - if test.endswith(".py"): - logging = "DEBUG" if args.verbose else "WARNING" - binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging] - elif test.endswith(".cpp"): - exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) - binary = [exe] - else: - raise Exception("Test '{}' binary not supported!".format(test)) - - # start test - cmd = binary + ["--worker-count", str(THREADS)] + options - start = time.time() - ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60) - - if ret_test.returncode != 0: - raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode)) - - runtime = time.time() - start - print(" Done after {:.3f} seconds".format(runtime)) - - return runtime - - -# parse arguments -parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.") -parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph")) -parser.add_argument("--log-file", default="") -parser.add_argument("--data-directory", default="") -parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str) -parser.add_argument("--large-dataset", action="store_const", const=True, default=False) -parser.add_argument("--use-ssl", action="store_const", const=True, default=False) -parser.add_argument("--verbose", action="store_const", const=True, default=False) -args = parser.parse_args() - -# generate temporary SSL certs -if args.use_ssl: +def generate_temporary_ssl_certs(): # https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com" subprocess.run( @@ -168,70 +51,184 @@ if args.use_ssl: check=True, ) -# start memgraph -cwd = os.path.dirname(args.memgraph) -cmd = [ - args.memgraph, - "--bolt-num-workers=" + str(THREADS), - "--storage-properties-on-edges=true", - "--storage-snapshot-on-exit=true", - "--storage-snapshot-interval-sec=600", - "--storage-snapshot-retention-count=1", - "--storage-wal-enabled=true", - "--storage-recover-on-startup=false", - "--query-execution-timeout-sec=1200", -] -if not args.verbose: - cmd += ["--log-level", "WARNING"] -if args.log_file: - cmd += ["--log-file", args.log_file] -if args.data_directory: - cmd += ["--data-directory", args.data_directory] -if args.use_ssl: - cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE] -proc_mg = subprocess.Popen(cmd, cwd=cwd) -wait_for_server(7687) -assert proc_mg.poll() is None, "The database binary died prematurely!" -# at exit cleanup -@atexit.register -def cleanup(): - global proc_mg - if proc_mg.poll() != None: - return - proc_mg.kill() - proc_mg.wait() - - -# run tests -runtimes = {} -dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET -for test in dataset: - if args.use_ssl: - test["options"] += ["--use-ssl"] - runtime = run_test(args, **test) - runtimes[os.path.splitext(test["test"])[0]] = runtime - -# stop memgraph -proc_mg.terminate() -ret_mg = proc_mg.wait() -if ret_mg != 0: - raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) - -# cleanup certificates -if args.use_ssl: +def remove_certificates() -> None: os.remove(KEY_FILE) os.remove(CERT_FILE) -# measurements -measurements = "" -for key, value in runtimes.items(): - measurements += "{}.runtime {}\n".format(key, value) -with open(STATS_FILE) as f: - stats = f.read().split("\n") -measurements += "long_running.queries.executed {}\n".format(stats[0]) -measurements += "long_running.queries.failed {}\n".format(stats[1]) -with open(MEASUREMENTS_FILE, "w") as f: - f.write(measurements) -print("Done!") +def parse_arguments() -> Args: + # parse arguments + parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.") + parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph")) + parser.add_argument("--log-file", default="") + parser.add_argument("--data-directory", default="") + parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str) + parser.add_argument("--large-dataset", action="store_const", const=True, default=False) + parser.add_argument("--small-dataset", action="store_const", const=True, default=False) + parser.add_argument("--specific-test", default="") + parser.add_argument("--use-ssl", action="store_const", const=True, default=False) + parser.add_argument("--verbose", action="store_const", const=True, default=False) + return parser.parse_args() + + +def wait_for_server(port, delay=0.1) -> None: + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] + while subprocess.call(cmd) != 0: + time.sleep(0.01) + time.sleep(delay) + + +def start_memgraph(args: Args) -> Popen: + """Starts Memgraph and return the process""" + cwd = os.path.dirname(args.memgraph) + cmd = [ + args.memgraph, + "--bolt-num-workers=" + str(THREADS), + "--storage-properties-on-edges=true", + "--storage-snapshot-on-exit=true", + "--storage-snapshot-interval-sec=600", + "--storage-snapshot-retention-count=1", + "--storage-wal-enabled=true", + "--storage-recover-on-startup=false", + "--query-execution-timeout-sec=1200", + "--bolt-server-name-for-init=Neo4j/", + ] + if not args.verbose: + cmd += ["--log-level", "WARNING"] + if args.log_file: + cmd += ["--log-file", args.log_file] + if args.data_directory: + cmd += ["--data-directory", args.data_directory] + if args.use_ssl: + cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE] + memgraph_proc = subprocess.Popen(cmd, cwd=cwd) + wait_for_server(7687) + + assert memgraph_proc.poll() is None, "The database binary died prematurely!" + + return memgraph_proc + + +def stop_memgraph(proc_mg: Popen) -> None: + proc_mg.terminate() + ret_mg = proc_mg.wait() + if ret_mg != 0: + raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg)) + + +def run_test(args: Args, test: str, options: List[str], timeout: int, mode: DatabaseMode) -> float: + """Runs tests for a set of specific database configuration. + + Args: + args: Arguments passed to the test + test: Test name + options: List of options specific for each test + timeout: Timeout in minutes + mode: DatabaseMode (storage mode & isolation level pair) + """ + print("Running test '{}'".format(test)) + + binary = _find_test_binary(args, test) + + # start test + cmd = ( + binary + + [ + "--worker-count", + str(THREADS), + "--isolation-level", + mode.isolation_level, + "--storage-mode", + mode.storage_mode, + ] + + options + ) + start = time.time() + ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60) + + if ret_test.returncode != 0: + raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode)) + + runtime = time.time() - start + print(" Done after {:.3f} seconds".format(runtime)) + + return runtime + + +def _find_test_binary(args: Args, test: str) -> List[str]: + if test.endswith(".py"): + logging = "DEBUG" if args.verbose else "WARNING" + return [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging] + + if test.endswith(".cpp"): + exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4]) + return [exe] + + raise Exception("Test '{}' binary not supported!".format(test)) + + +def run_stress_test_suite(args: Args) -> Dict[str, float]: + def cleanup(memgraph_proc): + if memgraph_proc.poll() != None: + return + memgraph_proc.kill() + memgraph_proc.wait() + + runtimes = {} + + if args.large_dataset and args.small_dataset: + raise Exception("Choose only a large dataset or a small dataset for stress testing!") + + dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET + if args.specific_test: + dataset = [x for x in dataset if x[DatasetConstants.TEST] == args.specific_test] + if not len(dataset): + raise Exception("Specific dataset is not found for stress testing!") + + for test in dataset: + if args.use_ssl: + test[DatasetConstants.OPTIONS] += ["--use-ssl"] + + for mode in test[DatasetConstants.MODE]: + test_run = copy.deepcopy(test) + + # Run for every specified combination of storage mode, serialization mode, etc (if extended) + test_run[DatasetConstants.MODE] = mode + + memgraph_proc = start_memgraph(args) + runtime = run_test(args, **test_run) + runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime + + stop_memgraph(memgraph_proc) + cleanup(memgraph_proc) + + return runtimes + + +def write_stats(runtimes: Dict[str, float]) -> None: + measurements = "" + for key, value in runtimes.items(): + measurements += "{}.runtime {}\n".format(key, value) + with open(STATS_FILE) as f: + stats = f.read().split("\n") + measurements += "long_running.queries.executed {}\n".format(stats[0]) + measurements += "long_running.queries.failed {}\n".format(stats[1]) + with open(MEASUREMENTS_FILE, "w") as f: + f.write(measurements) + + +if __name__ == "__main__": + args = parse_arguments() + + if args.use_ssl: + generate_temporary_ssl_certs() + + runtimes = run_stress_test_suite(args) + + if args.use_ssl: + remove_certificates() + + write_stats(runtimes) + + print("Successfully ran stress tests!") diff --git a/tests/stress/create_match.py b/tests/stress/create_match.py index 70fc6cb29..399e68db7 100644 --- a/tests/stress/create_match.py +++ b/tests/stress/create_match.py @@ -12,44 +12,48 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -''' +""" Large scale stress test. Tests only node creation. The idea is to run this test on machines with huge amount of memory e.g. 2TB. -''' +""" import logging import multiprocessing import random import time - from collections import defaultdict -from common import connection_argument_parser, argument_session + +from common import argument_session, connection_argument_parser def parse_args(): - ''' + """ Parses user arguments :return: parsed arguments - ''' + """ parser = connection_argument_parser() # specific - parser.add_argument('--worker-count', type=int, - default=multiprocessing.cpu_count(), - help='Number of concurrent workers.') - parser.add_argument("--logging", default="INFO", - choices=["INFO", "DEBUG", "WARNING", "ERROR"], - help="Logging level") - parser.add_argument('--vertex-count', type=int, default=100, - help='Number of created vertices.') - parser.add_argument('--max-property-value', type=int, default=1000, - help='Maximum value of property - 1. A created node ' - 'will have a property with random value from 0 to ' - 'max_property_value - 1.') - parser.add_argument('--create-pack-size', type=int, default=1, - help='Number of CREATE clauses in a query') + parser.add_argument( + "--worker-count", type=int, default=multiprocessing.cpu_count(), help="Number of concurrent workers." + ) + parser.add_argument( + "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level" + ) + parser.add_argument("--vertex-count", type=int, default=100, help="Number of created vertices.") + parser.add_argument( + "--max-property-value", + type=int, + default=1000, + help="Maximum value of property - 1. A created node " + "will have a property with random value from 0 to " + "max_property_value - 1.", + ) + parser.add_argument("--create-pack-size", type=int, default=1, help="Number of CREATE clauses in a query") + parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.") + parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.") return parser.parse_args() @@ -58,51 +62,46 @@ args = parse_args() def create_worker(worker_id): - ''' + """ Creates nodes and checks that all nodes were created. :param worker_id: worker id :return: tuple (worker_id, create execution time, time unit) - ''' - assert args.vertex_count > 0, 'Number of vertices has to be positive int' + """ + assert args.vertex_count > 0, "Number of vertices has to be positive int" generated_xs = defaultdict(int) - create_query = '' + create_query = "" with argument_session(args) as session: # create vertices start_time = time.time() for i in range(0, args.vertex_count): random_number = random.randint(0, args.max_property_value - 1) generated_xs[random_number] += 1 - create_query += 'CREATE (:Label_T%s {x: %s}) ' % \ - (worker_id, random_number) + create_query += "CREATE (:Label_T%s {x: %s}) " % (worker_id, random_number) # if full back or last item -> execute query - if (i + 1) % args.create_pack_size == 0 or \ - i == args.vertex_count - 1: + if (i + 1) % args.create_pack_size == 0 or i == args.vertex_count - 1: session.run(create_query).consume() - create_query = '' + create_query = "" create_time = time.time() # check total count - result_set = session.run('MATCH (n:Label_T%s) RETURN count(n) AS cnt' % - worker_id).data()[0] - assert result_set['cnt'] == args.vertex_count, \ - 'Create vertices Expected: %s Created: %s' % \ - (args.vertex_count, result_set['cnt']) + result_set = session.run("MATCH (n:Label_T%s) RETURN count(n) AS cnt" % worker_id).data()[0] + assert result_set["cnt"] == args.vertex_count, "Create vertices Expected: %s Created: %s" % ( + args.vertex_count, + result_set["cnt"], + ) # check count per property value for i, size in generated_xs.items(): - result_set = session.run('MATCH (n:Label_T%s {x: %s}) ' - 'RETURN count(n) AS cnt' - % (worker_id, i)).data()[0] - assert result_set['cnt'] == size, "Per x count isn't good " \ - "(Label: Label_T%s, prop x: %s" % (worker_id, i) + result_set = session.run("MATCH (n:Label_T%s {x: %s}) " "RETURN count(n) AS cnt" % (worker_id, i)).data()[0] + assert result_set["cnt"] == size, "Per x count isn't good " "(Label: Label_T%s, prop x: %s" % (worker_id, i) return (worker_id, create_time - start_time, "s") def create_handler(): - ''' + """ Initializes processes and starts the execution. - ''' + """ # instance cleanup with argument_session(args) as session: session.run("MATCH (n) DETACH DELETE n").consume() @@ -113,21 +112,19 @@ def create_handler(): # concurrent create execution & tests with multiprocessing.Pool(args.worker_count) as p: - for worker_id, create_time, time_unit in \ - p.map(create_worker, [i for i in range(args.worker_count)]): - log.info('Worker ID: %s; Create time: %s%s' % - (worker_id, create_time, time_unit)) + for worker_id, create_time, time_unit in p.map(create_worker, [i for i in range(args.worker_count)]): + log.info("Worker ID: %s; Create time: %s%s" % (worker_id, create_time, time_unit)) # check total count expected_total_count = args.worker_count * args.vertex_count - total_count = session.run( - 'MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'] - assert total_count == expected_total_count, \ - 'Total vertex number: %s Expected: %s' % \ - (total_count, expected_total_count) + total_count = session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"] + assert total_count == expected_total_count, "Total vertex number: %s Expected: %s" % ( + total_count, + expected_total_count, + ) -if __name__ == '__main__': +if __name__ == "__main__": logging.basicConfig(level=args.logging) if args.logging != "DEBUG": logging.getLogger("neo4j").setLevel(logging.WARNING) diff --git a/tests/stress/detach_delete.py b/tests/stress/detach_delete.py new file mode 100644 index 000000000..7b7b8b7d4 --- /dev/null +++ b/tests/stress/detach_delete.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +""" +Large bipartite graph stress test. +""" + +import logging +import multiprocessing +import random +import time +from argparse import Namespace as Args +from dataclasses import dataclass +from functools import wraps +from typing import Any, Callable, Tuple + +from common import OutputData, connection_argument_parser, get_memgraph + +log = logging.getLogger(__name__) +output_data = OutputData() + +NUMBER_NODES_IN_CHAIN = 4 +CREATE_FUNCTION = "CREATE" +DELETE_FUNCTION = "DELETE" + + +def parse_args() -> Args: + """ + Parses user arguments + + :return: parsed arguments + """ + parser = connection_argument_parser() + parser.add_argument("--worker-count", type=int, default=4, help="Number of concurrent workers.") + parser.add_argument( + "--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level" + ) + parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action") + parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.") + parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.") + + return parser.parse_args() + + +args = parse_args() + + +@dataclass +class Worker: + """ + Class that performs a function defined in the `type` argument + + Args: + type - either `CREATE` or `DELETE`, signifying the function that's going to be performed + by the worker + id - worker id + total_worker_cnt - total number of workers for reference + repetition_count - number of times to perform the worker action + sleep_sec - float for subsecond sleeping between two subsequent actions + """ + + type: str + id: int + total_worker_cnt: int + repetition_count: int + sleep_sec: float + + +def timed_function(name) -> Callable: + """ + Times performed function + """ + + def actual_decorator(func) -> Callable: + @wraps(func) + def timed_wrapper(*args, **kwargs) -> Any: + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + output_data.add_measurement(name, end_time - start_time) + return result + + return timed_wrapper + + return actual_decorator + + +@timed_function("cleanup_time") +def clean_database() -> None: + memgraph = get_memgraph(args) + memgraph.execute("MATCH (n) DETACH DELETE n") + + +def create_indices() -> None: + memgraph = get_memgraph(args) + memgraph.execute("CREATE INDEX ON :Node") + memgraph.execute("CREATE INDEX ON :Node(id)") + + +def setup_database_mode() -> None: + memgraph = get_memgraph(args) + memgraph.execute(f"STORAGE MODE {args.storage_mode}") + memgraph.execute(f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}") + + +def execute_function(worker: Worker) -> Worker: + """ + Executes the function based on the worker type + """ + if worker.type == CREATE_FUNCTION: + run_writer(worker.total_worker_cnt, worker.repetition_count, worker.sleep_sec, worker.id) + return worker + + if worker.type == DELETE_FUNCTION: + run_deleter(worker.total_worker_cnt, worker.repetition_count, worker.sleep_sec) + return worker + + raise Exception("Worker function not recognized, raising exception!") + + +def run_writer(total_workers_cnt: int, repetition_count: int, sleep_sec: float, worker_id: int) -> int: + """ + This writer creates a chain and wants to verify after each action if the action he performed is + a valid graph. A graph is valid if the number of nodes is preserved, and the chain is either + not present or present completely. + """ + memgraph = get_memgraph(args) + + def create(): + try: + memgraph.execute( + f"MERGE (:Node{worker_id} {{id: 1}})-[:REL]-(:Node{worker_id} {{id: 2}})-[:REL]-(:Node{worker_id} {{id: 3}})-[:REL]-(:Node{worker_id} {{id: 4}})" + ) + except Exception as ex: + pass + + def verify() -> Tuple[bool, int]: + # We always create X nodes and therefore the number of nodes needs to be always a fraction of X + count = list(memgraph.execute_and_fetch(f"MATCH (n) RETURN COUNT(n) AS cnt"))[0]["cnt"] + log.info(f"Worker {worker_id} verified graph count {count} in repetition {curr_repetition}") + + assert count <= total_workers_cnt * NUMBER_NODES_IN_CHAIN and count % NUMBER_NODES_IN_CHAIN == 0 + + ids = list( + memgraph.execute_and_fetch( + f"MATCH (n:Node{worker_id} {{id: 1}})-->(m)-->(o)-->(p) RETURN n.id AS id1, m.id AS id2, o.id AS id3, p.id AS id4" + ) + ) + + if len(ids): + result = ids[0] + assert "id1" in result and "id2" in result and "id3" in result and "id4" in result + assert result["id1"] == 1 and result["id2"] == 2 and result["id3"] == 3 and result["id4"] == 4 + log.info(f"Worker {worker_id} verified graph chain is valid in repetition {curr_repetition}") + else: + log.info(f"Worker {worker_id} does not have a chain in repetition {repetition_count}") + + curr_repetition = 0 + + while curr_repetition < repetition_count: + log.info(f"Worker {worker_id} started iteration {curr_repetition}") + create() + time.sleep(sleep_sec) + log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") + + verify() + + curr_repetition += 1 + + +def run_deleter(total_workers_cnt: int, repetition_count: int, sleep_sec: float) -> None: + """ + Periodic deletion of an arbitrary chain in the graph + """ + memgraph = get_memgraph(args) + + def delete_part_of_graph(id: int): + try: + memgraph.execute(f"MATCH (n:Node{id}) DETACH DELETE n") + log.info(f"Worker deleted chain with nodes of id {id}") + except Exception as ex: + log.info(f"Worker failed to delete the chain with id {id}") + pass + + curr_repetition = 0 + while curr_repetition < repetition_count: + random_part_of_graph = random.randint(0, total_workers_cnt - 1) + delete_part_of_graph(random_part_of_graph) + time.sleep(sleep_sec) + curr_repetition += 1 + + +@timed_function("total_execution_time") +def execution_handler() -> None: + clean_database() + log.info("Database is clean.") + + setup_database_mode() + + create_indices() + + worker_count = args.worker_count + rep_count = args.repetition_count + + workers = [Worker(CREATE_FUNCTION, x, worker_count - 1, rep_count, 0.2) for x in range(worker_count - 1)] + workers.append(Worker(DELETE_FUNCTION, -1, worker_count - 1, rep_count, 0.15)) + + with multiprocessing.Pool(processes=args.worker_count) as p: + for worker in p.map(execute_function, workers): + print(f"Worker {worker.type} finished!") + + +if __name__ == "__main__": + logging.basicConfig(level=args.logging) + execution_handler() + if args.logging in ["DEBUG", "INFO"]: + output_data.dump() diff --git a/tests/stress/durability b/tests/stress/durability index d107e57ef..59878aa34 100755 --- a/tests/stress/durability +++ b/tests/stress/durability @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -''' +""" Durability Stress Test -''' +""" # TODO (mg_durability_stress_test): extend once we add full durability mode @@ -13,11 +13,11 @@ import os import random import shutil import subprocess -import time import threading +import time +from multiprocessing import Manager, Pool -from common import connection_argument_parser, SessionCache -from multiprocessing import Pool, Manager +from common import SessionCache, connection_argument_parser # Constants and args SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -35,11 +35,9 @@ SNAPSHOT_CYCLE_SEC = 5 # Parse command line arguments parser = connection_argument_parser() -parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, - "memgraph")) +parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph")) parser.add_argument("--log-file", default="") -parser.add_argument("--verbose", action="store_const", - const=True, default=False) +parser.add_argument("--verbose", action="store_const", const=True, default=False) parser.add_argument("--data-directory", default=DATA_DIR) parser.add_argument("--num-clients", default=multiprocessing.cpu_count()) parser.add_argument("--num-steps", type=int, default=5) @@ -47,14 +45,18 @@ args = parser.parse_args() # Memgraph run command construction cwd = os.path.dirname(args.memgraph) -cmd = [args.memgraph, "--bolt-num-workers=" + str(DB_WORKERS), - "--storage-properties-on-edges=true", - "--storage-snapshot-on-exit=false", - "--storage-snapshot-interval-sec=5", - "--storage-snapshot-retention-count=2", - "--storage-wal-enabled=true", - "--storage-recover-on-startup=true", - "--query-execution-timeout-sec=600"] +cmd = [ + args.memgraph, + "--bolt-num-workers=" + str(DB_WORKERS), + "--storage-properties-on-edges=true", + "--storage-snapshot-on-exit=false", + "--storage-snapshot-interval-sec=5", + "--storage-snapshot-retention-count=2", + "--storage-wal-enabled=true", + "--storage-recover-on-startup=true", + "--query-execution-timeout-sec=600", + "--bolt-server-name-for-init=Neo4j/v5.11.0 compatible graph database server - Memgraph", +] if not args.verbose: cmd += ["--log-level", "WARNING"] if args.log_file: @@ -104,16 +106,13 @@ def run_client(id, data): counter = data[id] # Check recovery for this client - num_nodes_db = session.run( - "MATCH (n:%s) RETURN count(n) as cnt" % - ("Client%d" % id)).data()[0]["cnt"] + num_nodes_db = session.run("MATCH (n:%s) RETURN count(n) as cnt" % ("Client%d" % id)).data()[0]["cnt"] print("Client%d DB: %d; ACTUAL: %d" % (id, num_nodes_db, data[id])) # Execute a new set of write queries while True: try: - session.run("CREATE (n:%s {id:%s}) RETURN n;" % - ("Client%d" % id, counter)).consume() + session.run("CREATE (n:%s {id:%s}) RETURN n;" % ("Client%d" % id, counter)).consume() counter += 1 if counter % 100000 == 0: print("Client %d executed %d" % (id, counter)) @@ -126,8 +125,7 @@ def run_client(id, data): def run_step(): with Pool(args.num_clients) as p: - p.starmap(run_client, [(id, data) - for id in range(1, args.num_clients + 1)]) + p.starmap(run_client, [(id, data) for id in range(1, args.num_clients + 1)]) def main(): @@ -143,22 +141,19 @@ def main(): # Also it makes sense to run this test for a longer period of time # but with small --snapshot-cycle-sec to force that Memgraph is being # killed in the middle of generating the snapshot. - time.sleep( - random.randint(2 * SNAPSHOT_CYCLE_SEC, 3 * SNAPSHOT_CYCLE_SEC)) + time.sleep(random.randint(2 * SNAPSHOT_CYCLE_SEC, 3 * SNAPSHOT_CYCLE_SEC)) clean_memgraph() # Final check run_memgraph() session = SessionCache.argument_session(args) - num_nodes_db = \ - session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"] + num_nodes_db = session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"] num_nodes = sum(data.values()) # Check that more than 99.9% of data is recoverable. # NOTE: default WAL flush interval is 1ms. # Once the full sync durability mode is introduced, # this test has to be extended. - assert num_nodes_db > 0.999 * num_nodes, \ - "Memgraph lost more than 0.001 of data!" + assert num_nodes_db > 0.999 * num_nodes, "Memgraph lost more than 0.001 of data!" if __name__ == "__main__": diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp index f266cf614..edf4b8452 100644 --- a/tests/stress/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -47,6 +47,9 @@ DEFINE_bool(global_queries, true, "If queries that modifiy globally should be ex DEFINE_string(stats_file, "", "File into which to write statistics."); +DEFINE_string(isolation_level, "", "Database isolation level."); +DEFINE_string(storage_mode, "", "Database storage_mode."); + /** * Encapsulates a Graph and a Bolt session and provides CRUD op functions. * Also defines a run-loop for a generic exectutor, and a graph state diff --git a/tests/stress/parser.cpp b/tests/stress/parser.cpp index b20cdd257..226a1957c 100644 --- a/tests/stress/parser.cpp +++ b/tests/stress/parser.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -28,6 +28,8 @@ DEFINE_string(password, "", "Password for the database"); DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); DEFINE_int32(worker_count, 1, "The number of concurrent workers executing queries against the server."); DEFINE_int32(per_worker_query_count, 100, "The number of queries each worker will try to execute."); +DEFINE_string(isolation_level, "", "Database isolation level."); +DEFINE_string(storage_mode, "", "Database storage_mode."); auto make_client() { mg::Client::Params params; diff --git a/tests/stress/requirements.txt b/tests/stress/requirements.txt index 7702f337d..a1c23a56d 100644 --- a/tests/stress/requirements.txt +++ b/tests/stress/requirements.txt @@ -1 +1,2 @@ neo4j-driver==4.1.1 +gqlalchemy==1.3.3 diff --git a/tests/stress/test_config.py b/tests/stress/test_config.py new file mode 100644 index 000000000..f740a150e --- /dev/null +++ b/tests/stress/test_config.py @@ -0,0 +1,179 @@ +import itertools +import os +from dataclasses import dataclass +from typing import List + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats") + + +class DatasetConstants: + TEST = "test" + OPTIONS = "options" + TIMEOUT = "timeout" + MODE = "mode" + + +@dataclass +class DatabaseMode: + storage_mode: str + isolation_level: str + + +class StorageModeConstants: + IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL" + IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL" + + @classmethod + def to_list(cls) -> List[str]: + return [cls.IN_MEMORY_TRANSACTIONAL, cls.IN_MEMORY_ANALYTICAL] + + +class IsolationLevelConstants: + SNAPSHOT_ISOLATION = "SNAPSHOT ISOLATION" + READ_COMMITED = "READ COMMITED" + READ_UNCOMMITED = "READ UNCOMMITED" + + @classmethod + def to_list(cls) -> List[str]: + return [cls.SNAPSHOT_SERIALIZATION, cls.READ_COMMITED, cls.READ_UNCOMMITED] + + +def get_default_database_mode() -> DatabaseMode: + return DatabaseMode(StorageModeConstants.IN_MEMORY_TRANSACTIONAL, IsolationLevelConstants.SNAPSHOT_ISOLATION) + + +def get_all_database_modes() -> List[DatabaseMode]: + return [ + DatabaseMode(x[0], x[1]) + for x in itertools.product(StorageModeConstants.to_list(), IsolationLevelConstants.to_list()) + ] + + +# dataset calibrated for running on Apollo (total 4min) +# bipartite.py runs for approx. 30s +# create_match.py runs for approx. 30s +# long_running runs for 1min +# long_running runs for 2min +SMALL_DATASET = [ + { + DatasetConstants.TEST: "bipartite.py", + DatasetConstants.OPTIONS: ["--u-count", "100", "--v-count", "100"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "detach_delete.py", + DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "100"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "create_match.py", + DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "parser.cpp", + DatasetConstants.OPTIONS: ["--per-worker-query-count", "1000"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "long_running.cpp", + DatasetConstants.OPTIONS: [ + "--vertex-count", + "1000", + "--edge-count", + "5000", + "--max-time", + "1", + "--verify", + "20", + ], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "long_running.cpp", + DatasetConstants.OPTIONS: [ + "--vertex-count", + "10000", + "--edge-count", + "50000", + "--max-time", + "2", + "--verify", + "30", + "--stats-file", + STATS_FILE, + ], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, +] + +# dataset calibrated for running on daily stress instance (total 9h) +# bipartite.py and create_match.py run for approx. 15min +# long_running runs for 5min x 6 times = 30min +# long_running runs for 8h +LARGE_DATASET = ( + [ + { + DatasetConstants.TEST: "bipartite.py", + DatasetConstants.OPTIONS: ["--u-count", "300", "--v-count", "300"], + DatasetConstants.TIMEOUT: 30, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "detach_delete.py", + DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "300"], + DatasetConstants.TIMEOUT: 5, + DatasetConstants.MODE: [get_default_database_mode()], + }, + { + DatasetConstants.TEST: "create_match.py", + DatasetConstants.OPTIONS: ["--vertex-count", "500000", "--create-pack-size", "500"], + DatasetConstants.TIMEOUT: 30, + DatasetConstants.MODE: [get_default_database_mode()], + }, + ] + + [ + { + DatasetConstants.TEST: "long_running.cpp", + DatasetConstants.OPTIONS: [ + "--vertex-count", + "10000", + "--edge-count", + "40000", + "--max-time", + "5", + "--verify", + "60", + ], + DatasetConstants.TIMEOUT: 16, + DatasetConstants.MODE: [get_default_database_mode()], + }, + ] + * 6 + + [ + { + DatasetConstants.TEST: "long_running.cpp", + DatasetConstants.OPTIONS: [ + "--vertex-count", + "200000", + "--edge-count", + "1000000", + "--max-time", + "480", + "--verify", + "300", + "--stats-file", + STATS_FILE, + ], + DatasetConstants.TIMEOUT: 500, + DatasetConstants.MODE: [get_default_database_mode()], + }, + ] +) diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index ad01586dd..4e94b987c 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -1182,6 +1182,31 @@ TYPED_TEST(InterpreterTest, ExecutionStatsValues) { ASSERT_EQ(stats["relationships-deleted"].ValueInt(), 3); AssertAllValuesAreZero(stats, {"nodes-deleted", "relationships-deleted"}); } + { + auto [stream, qid] = this->Prepare("CREATE (n)-[:TO]->(m);"); + this->Pull(&stream); + + auto stats = stream.GetSummary().at("stats").ValueMap(); + ASSERT_EQ(stats["nodes-created"].ValueInt(), 2); + ASSERT_EQ(stats["relationships-created"].ValueInt(), 1); + AssertAllValuesAreZero(stats, {"nodes-created", "relationships-created"}); + } + { + auto [stream, qid] = this->Prepare("MATCH (n)-[r]->(m) DELETE r;"); + this->Pull(&stream); + + auto stats = stream.GetSummary().at("stats").ValueMap(); + ASSERT_EQ(stats["relationships-deleted"].ValueInt(), 1); + AssertAllValuesAreZero(stats, {"nodes-deleted", "relationships-deleted"}); + } + { + auto [stream, qid] = this->Prepare("MATCH (n) DELETE n;"); + this->Pull(&stream); + + auto stats = stream.GetSummary().at("stats").ValueMap(); + ASSERT_EQ(stats["nodes-deleted"].ValueInt(), 2); + AssertAllValuesAreZero(stats, {"nodes-deleted", ""}); + } { auto [stream, qid] = this->Prepare("CREATE (:L1:L2:L3), (:L1), (:L1), (:L2);"); this->Pull(&stream); diff --git a/tests/unit/query_plan_create_set_remove_delete.cpp b/tests/unit/query_plan_create_set_remove_delete.cpp index 5f3c1c851..e3873b20a 100644 --- a/tests/unit/query_plan_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_create_set_remove_delete.cpp @@ -775,6 +775,7 @@ TYPED_TEST(QueryPlanTest, Delete) { } // detach delete a single vertex + // delete will not happen as we are deleting in bulk { auto n = MakeScanAll(this->storage, symbol_table, "n"); auto n_get = this->storage.template Create("n")->MapTo(n.sym_); @@ -783,8 +784,8 @@ TYPED_TEST(QueryPlanTest, Delete) { auto context = MakeContext(this->storage, symbol_table, &dba); delete_op->MakeCursor(memgraph::utils::NewDeleteResource())->Pull(frame, context); dba.AdvanceCommand(); - EXPECT_EQ(3, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); - EXPECT_EQ(3, CountEdges(&dba, memgraph::storage::View::OLD)); + EXPECT_EQ(4, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); + EXPECT_EQ(6, CountEdges(&dba, memgraph::storage::View::OLD)); } // delete all remaining edges @@ -797,7 +798,7 @@ TYPED_TEST(QueryPlanTest, Delete) { auto context = MakeContext(this->storage, symbol_table, &dba); PullAll(*delete_op, &context); dba.AdvanceCommand(); - EXPECT_EQ(3, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); + EXPECT_EQ(4, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); EXPECT_EQ(0, CountEdges(&dba, memgraph::storage::View::OLD)); } @@ -1012,10 +1013,11 @@ TYPED_TEST(QueryPlanTest, DeleteTwiceDeleteBlockingEdge) { } TYPED_TEST(QueryPlanTest, DeleteReturn) { + // MATCH (n) DETACH DELETE n RETURN n auto storage_dba = this->db->Access(); memgraph::query::DbAccessor dba(storage_dba.get()); - // make a fully-connected (one-direction, no cycles) with 4 nodes + // graph with 4 vertices auto prop = PROPERTY_PAIR(dba, "property"); for (int i = 0; i < 4; ++i) { auto va = dba.InsertVertex(); @@ -1033,10 +1035,12 @@ TYPED_TEST(QueryPlanTest, DeleteReturn) { auto n_get = this->storage.template Create("n")->MapTo(n.sym_); auto delete_op = std::make_shared(n.op_, std::vector{n_get}, true); + auto accumulate_op = std::make_shared(delete_op, delete_op->ModifiedSymbols(symbol_table), true); + auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); auto n_p = this->storage.template Create("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true)); - auto produce = MakeProduce(delete_op, n_p); + auto produce = MakeProduce(accumulate_op, n_p); auto context = MakeContext(this->storage, symbol_table, &dba); ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException); @@ -1757,6 +1761,7 @@ TYPED_TEST(QueryPlanTest, RemoveLabelsOnNull) { } TYPED_TEST(QueryPlanTest, DeleteSetProperty) { + // MATCH (n) DELETE n SET n.property = 42 RETURN n auto storage_dba = this->db->Access(); memgraph::query::DbAccessor dba(storage_dba.get()); // Add a single vertex. @@ -1764,18 +1769,25 @@ TYPED_TEST(QueryPlanTest, DeleteSetProperty) { dba.AdvanceCommand(); EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); SymbolTable symbol_table; - // MATCH (n) DELETE n SET n.property = 42 auto n = MakeScanAll(this->storage, symbol_table, "n"); auto n_get = this->storage.template Create("n")->MapTo(n.sym_); auto delete_op = std::make_shared(n.op_, std::vector{n_get}, false); auto prop = PROPERTY_PAIR(dba, "property"); auto n_prop = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); auto set_op = std::make_shared(delete_op, prop.second, n_prop, LITERAL(42)); + auto accumulate_op = std::make_shared(set_op, set_op->ModifiedSymbols(symbol_table), true); + + auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); + auto n_p = + this->storage.template Create("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true)); + auto produce = MakeProduce(accumulate_op, n_p); auto context = MakeContext(this->storage, symbol_table, &dba); - EXPECT_THROW(PullAll(*set_op, &context), QueryRuntimeException); + ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException); } TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFromMap) { + // MATCH (n) DELETE n SET n = {property: 42} return n + // MATCH (n) DELETE n SET n += {property: 42} return n auto storage_dba = this->db->Access(); memgraph::query::DbAccessor dba(storage_dba.get()); // Add a single vertex. @@ -1783,7 +1795,6 @@ TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFromMap) { dba.AdvanceCommand(); EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); SymbolTable symbol_table; - // MATCH (n) DELETE n SET n = {property: 42} auto n = MakeScanAll(this->storage, symbol_table, "n"); auto n_get = this->storage.template Create("n")->MapTo(n.sym_); auto delete_op = std::make_shared(n.op_, std::vector{n_get}, false); @@ -1793,12 +1804,18 @@ TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFromMap) { auto *rhs = this->storage.template Create(prop_map); for (auto op_type : {plan::SetProperties::Op::REPLACE, plan::SetProperties::Op::UPDATE}) { auto set_op = std::make_shared(delete_op, n.sym_, rhs, op_type); + auto accumulate_op = std::make_shared(set_op, set_op->ModifiedSymbols(symbol_table), false); + auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); + auto n_p = + this->storage.template Create("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true)); + auto produce = MakeProduce(accumulate_op, n_p); auto context = MakeContext(this->storage, symbol_table, &dba); - EXPECT_THROW(PullAll(*set_op, &context), QueryRuntimeException); + ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException); } } TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFrom) { + // MATCH (n) DELETE n SET n = n RETURN n auto storage_dba = this->db->Access(); memgraph::query::DbAccessor dba(storage_dba.get()); // Add a single vertex. @@ -1809,19 +1826,25 @@ TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFrom) { dba.AdvanceCommand(); EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); SymbolTable symbol_table; - // MATCH (n) DELETE n SET n = n auto n = MakeScanAll(this->storage, symbol_table, "n"); auto n_get = this->storage.template Create("n")->MapTo(n.sym_); auto delete_op = std::make_shared(n.op_, std::vector{n_get}, false); auto *rhs = IDENT("n")->MapTo(n.sym_); + auto prop = PROPERTY_PAIR(dba, "property"); for (auto op_type : {plan::SetProperties::Op::REPLACE, plan::SetProperties::Op::UPDATE}) { auto set_op = std::make_shared(delete_op, n.sym_, rhs, op_type); + auto accumulate_op = std::make_shared(set_op, set_op->ModifiedSymbols(symbol_table), false); + auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); + auto n_p = + this->storage.template Create("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true)); + auto produce = MakeProduce(accumulate_op, n_p); auto context = MakeContext(this->storage, symbol_table, &dba); - EXPECT_THROW(PullAll(*set_op, &context), QueryRuntimeException); + ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException); } } TYPED_TEST(QueryPlanTest, DeleteRemoveLabels) { + // MATCH (n) DELETE n REMOVE n :label return n auto storage_dba = this->db->Access(); memgraph::query::DbAccessor dba(storage_dba.get()); // Add a single vertex. @@ -1829,17 +1852,24 @@ TYPED_TEST(QueryPlanTest, DeleteRemoveLabels) { dba.AdvanceCommand(); EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); SymbolTable symbol_table; - // MATCH (n) DELETE n REMOVE n :label auto n = MakeScanAll(this->storage, symbol_table, "n"); auto n_get = this->storage.template Create("n")->MapTo(n.sym_); auto delete_op = std::make_shared(n.op_, std::vector{n_get}, false); std::vector labels{dba.NameToLabel("label")}; auto rem_op = std::make_shared(delete_op, n.sym_, labels); + auto accumulate_op = std::make_shared(rem_op, rem_op->ModifiedSymbols(symbol_table), true); + + auto prop = PROPERTY_PAIR(dba, "property"); + auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); + auto n_p = + this->storage.template Create("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true)); + auto produce = MakeProduce(accumulate_op, n_p); auto context = MakeContext(this->storage, symbol_table, &dba); - EXPECT_THROW(PullAll(*rem_op, &context), QueryRuntimeException); + ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException); } TYPED_TEST(QueryPlanTest, DeleteRemoveProperty) { + // MATCH (n) DELETE n REMOVE n.property RETURN n auto storage_dba = this->db->Access(); memgraph::query::DbAccessor dba(storage_dba.get()); // Add a single vertex. @@ -1847,15 +1877,20 @@ TYPED_TEST(QueryPlanTest, DeleteRemoveProperty) { dba.AdvanceCommand(); EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD))); SymbolTable symbol_table; - // MATCH (n) DELETE n REMOVE n.property auto n = MakeScanAll(this->storage, symbol_table, "n"); auto n_get = this->storage.template Create("n")->MapTo(n.sym_); auto delete_op = std::make_shared(n.op_, std::vector{n_get}, false); auto prop = PROPERTY_PAIR(dba, "property"); auto n_prop = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); auto rem_op = std::make_shared(delete_op, prop.second, n_prop); + auto accumulate_op = std::make_shared(rem_op, rem_op->ModifiedSymbols(symbol_table), true); + + auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop); + auto n_p = + this->storage.template Create("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true)); + auto produce = MakeProduce(accumulate_op, n_p); auto context = MakeContext(this->storage, symbol_table, &dba); - EXPECT_THROW(PullAll(*rem_op, &context), QueryRuntimeException); + ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException); } //////////////////////////////////////////////