Add detach delete in bulk (#1078)

This commit is contained in:
Josipmrden 2023-09-10 18:53:03 +02:00 committed by GitHub
parent ab56abf4ca
commit 58546a9fe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1704 additions and 870 deletions

View File

@ -101,7 +101,7 @@ jobs:
echo ${file}
if [[ ${file} == *.py ]]; then
python3 -m black --check --diff ${file}
python3 -m isort --check-only --diff ${file}
python3 -m isort --profile black --check-only --diff ${file}
fi
done
@ -275,11 +275,13 @@ jobs:
- name: Run stress test (plain)
run: |
cd tests/stress
source ve3/bin/activate
./continuous_integration
- name: Run stress test (SSL)
run: |
cd tests/stress
source ve3/bin/activate
./continuous_integration --use-ssl
- name: Run durability test

View File

@ -15,6 +15,7 @@ repos:
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black"]
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v13.0.0
hooks:

View File

@ -12,6 +12,7 @@
#pragma once
#include <optional>
#include <ranges>
#include <cppitertools/filter.hpp>
#include <cppitertools/imap.hpp>
@ -469,7 +470,7 @@ class DbAccessor final {
std::vector<EdgeAccessor> deleted_edges;
deleted_edges.reserve(edges.size());
std::transform(edges.begin(), edges.end(), std::back_inserter(deleted_edges),
std::ranges::transform(edges, std::back_inserter(deleted_edges),
[](const auto &deleted_edge) { return EdgeAccessor{deleted_edge}; });
return std::make_optional<ReturnType>(vertex, std::move(deleted_edges));
@ -489,6 +490,53 @@ class DbAccessor final {
return std::make_optional<VertexAccessor>(*value);
}
storage::Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
std::vector<VertexAccessor> nodes, std::vector<EdgeAccessor> edges, bool detach) {
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
std::vector<storage::VertexAccessor *> nodes_impl;
std::vector<storage::EdgeAccessor *> edges_impl;
nodes_impl.reserve(nodes.size());
edges_impl.reserve(edges.size());
for (auto &vertex_accessor : nodes) {
accessor_->PrefetchOutEdges(vertex_accessor.impl_);
accessor_->PrefetchInEdges(vertex_accessor.impl_);
nodes_impl.push_back(&vertex_accessor.impl_);
}
for (auto &edge_accessor : edges) {
edges_impl.push_back(&edge_accessor.impl_);
}
auto res = accessor_->DetachDelete(std::move(nodes_impl), std::move(edges_impl), detach);
if (res.HasError()) {
return res.GetError();
}
const auto &value = res.GetValue();
if (!value) {
return std::optional<ReturnType>{};
}
const auto &[val_vertices, val_edges] = *value;
std::vector<VertexAccessor> deleted_vertices;
std::vector<EdgeAccessor> deleted_edges;
deleted_vertices.reserve(val_vertices.size());
deleted_edges.reserve(val_edges.size());
std::ranges::transform(val_vertices, std::back_inserter(deleted_vertices),
[](const auto &deleted_vertex) { return VertexAccessor{deleted_vertex}; });
std::ranges::transform(val_edges, std::back_inserter(deleted_edges),
[](const auto &deleted_edge) { return EdgeAccessor{deleted_edge}; });
return std::make_optional<ReturnType>(std::move(deleted_vertices), std::move(deleted_edges));
}
storage::PropertyId NameToProperty(const std::string_view name) { return accessor_->NameToProperty(name); }
storage::LabelId NameToLabel(const std::string_view name) { return accessor_->NameToLabel(name); }

View File

@ -2560,15 +2560,12 @@ std::vector<Symbol> Delete::ModifiedSymbols(const SymbolTable &table) const { re
Delete::DeleteCursor::DeleteCursor(const Delete &self, utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(mem)) {}
bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("Delete");
if (!input_cursor_->Pull(frame, context)) return false;
void Delete::DeleteCursor::UpdateDeleteBuffer(Frame &frame, ExecutionContext &context) {
// Delete should get the latest information, this way it is also possible
// to delete newly added nodes and edges.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor,
storage::View::NEW);
auto *pull_memory = context.evaluation_context.memory;
// collect expressions results so edges can get deleted before vertices
// this is necessary because an edge that gets deleted could block vertex
@ -2579,84 +2576,57 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
expression_results.emplace_back(expression->Accept(evaluator));
}
auto &dba = *context.db_accessor;
// delete edges first
for (TypedValue &expression_result : expression_results) {
AbortCheck(context);
if (expression_result.type() == TypedValue::Type::Edge) {
auto &ea = expression_result.ValueEdge();
#ifdef MG_ENTERPRISE
if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker &&
!(context.auth_checker->Has(ea, query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE) &&
context.auth_checker->Has(ea.To(), storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::UPDATE) &&
context.auth_checker->Has(ea.From(), storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::UPDATE))) {
throw QueryRuntimeException("Edge not deleted due to not having enough permission!");
}
#endif
auto maybe_value = dba.RemoveEdge(&ea);
if (maybe_value.HasError()) {
switch (maybe_value.GetError()) {
case storage::Error::SERIALIZATION_ERROR:
throw TransactionSerializationException();
case storage::Error::DELETED_OBJECT:
case storage::Error::VERTEX_HAS_EDGES:
case storage::Error::PROPERTIES_DISABLED:
case storage::Error::NONEXISTENT_OBJECT:
throw QueryRuntimeException("Unexpected error when deleting an edge.");
}
}
context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += 1;
if (context.trigger_context_collector && maybe_value.GetValue()) {
context.trigger_context_collector->RegisterDeletedObject(*maybe_value.GetValue());
}
}
}
// delete vertices
for (TypedValue &expression_result : expression_results) {
AbortCheck(context);
switch (expression_result.type()) {
case TypedValue::Type::Vertex: {
auto &va = expression_result.ValueVertex();
auto va = expression_result.ValueVertex();
#ifdef MG_ENTERPRISE
if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker &&
!context.auth_checker->Has(va, storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE)) {
throw QueryRuntimeException("Vertex not deleted due to not having enough permission!");
}
#endif
if (self_.detach_) {
auto res = dba.DetachRemoveVertex(&va);
if (res.HasError()) {
switch (res.GetError()) {
case storage::Error::SERIALIZATION_ERROR:
throw TransactionSerializationException();
case storage::Error::DELETED_OBJECT:
case storage::Error::VERTEX_HAS_EDGES:
case storage::Error::PROPERTIES_DISABLED:
case storage::Error::NONEXISTENT_OBJECT:
throw QueryRuntimeException("Unexpected error when deleting a node.");
buffer_.nodes.push_back(va);
break;
}
case TypedValue::Type::Edge: {
auto ea = expression_result.ValueEdge();
#ifdef MG_ENTERPRISE
if (license::global_license_checker.IsEnterpriseValidFast() && context.auth_checker &&
!(context.auth_checker->Has(ea, query::AuthQuery::FineGrainedPrivilege::CREATE_DELETE) &&
context.auth_checker->Has(ea.To(), storage::View::NEW, query::AuthQuery::FineGrainedPrivilege::UPDATE) &&
context.auth_checker->Has(ea.From(), storage::View::NEW,
query::AuthQuery::FineGrainedPrivilege::UPDATE))) {
throw QueryRuntimeException("Edge not deleted due to not having enough permission!");
}
#endif
buffer_.edges.push_back(ea);
break;
}
case TypedValue::Type::Null:
break;
// check we're not trying to delete anything except vertices and edges
default:
throw QueryRuntimeException("Only edges and vertices can be deleted.");
}
}
}
bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("Delete");
if (delete_executed_) {
return false;
}
context.execution_stats[ExecutionStats::Key::DELETED_NODES] += 1;
if (*res) {
context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += static_cast<int64_t>((*res)->second.size());
}
std::invoke([&] {
if (!context.trigger_context_collector || !*res) {
return;
if (input_cursor_->Pull(frame, context)) {
UpdateDeleteBuffer(frame, context);
return true;
}
context.trigger_context_collector->RegisterDeletedObject((*res)->first);
if (!context.trigger_context_collector->ShouldRegisterDeletedObject<query::EdgeAccessor>()) {
return;
}
for (const auto &edge : (*res)->second) {
context.trigger_context_collector->RegisterDeletedObject(edge);
}
});
} else {
auto res = dba.RemoveVertex(&va);
auto &dba = *context.db_accessor;
auto res = dba.DetachDelete(std::move(buffer_.nodes), std::move(buffer_.edges), self_.detach_);
if (res.HasError()) {
switch (res.GetError()) {
case storage::Error::SERIALIZATION_ERROR:
@ -2669,31 +2639,36 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
throw QueryRuntimeException("Unexpected error when deleting a node.");
}
}
context.execution_stats[ExecutionStats::Key::DELETED_NODES] += 1;
if (context.trigger_context_collector && res.GetValue()) {
context.trigger_context_collector->RegisterDeletedObject(*res.GetValue());
}
}
break;
if (*res) {
context.execution_stats[ExecutionStats::Key::DELETED_NODES] += static_cast<int64_t>((*res)->first.size());
context.execution_stats[ExecutionStats::Key::DELETED_EDGES] += static_cast<int64_t>((*res)->second.size());
}
// skip Edges (already deleted) and Nulls (can occur in optional
// match)
case TypedValue::Type::Edge:
case TypedValue::Type::Null:
break;
// check we're not trying to delete anything except vertices and edges
default:
throw QueryRuntimeException("Only edges and vertices can be deleted.");
// Update deleted objects for triggers
if (context.trigger_context_collector && *res) {
for (const auto &node : (*res)->first) {
context.trigger_context_collector->RegisterDeletedObject(node);
}
if (context.trigger_context_collector->ShouldRegisterDeletedObject<query::EdgeAccessor>()) {
for (const auto &edge : (*res)->second) {
context.trigger_context_collector->RegisterDeletedObject(edge);
}
}
}
return true;
delete_executed_ = true;
return false;
}
void Delete::DeleteCursor::Shutdown() { input_cursor_->Shutdown(); }
void Delete::DeleteCursor::Reset() { input_cursor_->Reset(); }
void Delete::DeleteCursor::Reset() {
input_cursor_->Reset();
delete_executed_ = false;
}
SetProperty::SetProperty(const std::shared_ptr<LogicalOperator> &input, storage::PropertyId property,
PropertyLookup *lhs, Expression *rhs)
@ -4953,7 +4928,9 @@ class LoadCsvCursor : public Cursor {
if (input_cursor_->Pull(frame, context)) {
if (did_pull_) {
throw QueryRuntimeException(
"LOAD CSV can be executed only once, please check if the cardinality of the operator before LOAD CSV is 1");
"LOAD CSV can be executed only once, please check if the cardinality of the operator before LOAD CSV "
"is "
"1");
}
did_pull_ = true;
}

View File

@ -1121,6 +1121,11 @@ class Produce : public memgraph::query::plan::LogicalOperator {
};
};
struct DeleteBuffer {
std::vector<VertexAccessor> nodes{};
std::vector<EdgeAccessor> edges{};
};
/// Operator for deleting vertices and edges.
///
/// Has a flag for using DETACH DELETE when deleting vertices.
@ -1168,6 +1173,10 @@ class Delete : public memgraph::query::plan::LogicalOperator {
private:
const Delete &self_;
const UniqueCursorPtr input_cursor_;
DeleteBuffer buffer_;
bool delete_executed_{false};
void UpdateDeleteBuffer(Frame &, ExecutionContext &);
};
};

View File

@ -1085,79 +1085,44 @@ std::optional<VertexAccessor> DiskStorage::DiskAccessor::FindVertex(storage::Gid
return std::nullopt;
}
Result<std::optional<VertexAccessor>> DiskStorage::DiskAccessor::DeleteVertex(VertexAccessor *vertex) {
auto *vertex_ptr = vertex->vertex_;
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
DiskStorage::DiskAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
bool detach) {
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
if (vertex_ptr->deleted) {
return std::optional<VertexAccessor>{};
auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach);
if (maybe_result.HasError()) {
return maybe_result.GetError();
}
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Error::VERTEX_HAS_EDGES;
auto value = maybe_result.GetValue();
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex_ptr->gid), utils::SerializeVertex(*vertex_ptr));
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
if (!value) {
return std::make_optional<ReturnType>();
}
auto &[deleted_vertices, deleted_edges] = *value;
auto *disk_storage = static_cast<DiskStorage *>(storage_);
for (const auto &vertex : deleted_vertices) {
vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex.vertex_->gid),
utils::SerializeVertex(*vertex.vertex_));
transaction_.manyDeltasCache.Invalidate(vertex.vertex_);
disk_storage->vertex_count_.fetch_sub(1, std::memory_order_acq_rel);
return std::make_optional<VertexAccessor>(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_,
config_, true);
}
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>>
DiskStorage::DiskAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
using ReturnType = std::pair<VertexAccessor, std::vector<EdgeAccessor>>;
auto *vertex_ptr = vertex->vertex_;
if (vertex_ptr->deleted) return std::optional<ReturnType>{};
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges{vertex_ptr->in_edges};
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges{vertex_ptr->out_edges};
std::vector<EdgeAccessor> deleted_edges;
for (const auto &item : in_edges) {
auto [edge_type, from_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, from_vertex, vertex_ptr, &transaction_, &storage_->indices_,
&storage_->constraints_, config_);
auto ret = DeleteEdge(&e);
if (ret.HasError()) {
MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!");
return ret.GetError();
}
if (ret.GetValue()) {
deleted_edges.push_back(*ret.GetValue());
}
}
for (const auto &item : out_edges) {
auto [edge_type, to_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, vertex_ptr, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_,
config_);
auto ret = DeleteEdge(&e);
if (ret.HasError()) {
MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!");
return ret.GetError();
for (const auto &edge : deleted_edges) {
const DiskEdgeKey disk_edge_key(edge.from_vertex_->gid, edge.to_vertex_->gid, edge.edge_type_, edge.edge_,
config_.properties_on_edges);
edges_to_delete_.emplace(disk_edge_key.GetSerializedKey());
transaction_.manyDeltasCache.Invalidate(edge.from_vertex_, edge.edge_type_, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(edge.to_vertex_, edge.edge_type_, EdgeDirection::IN);
transaction_.RemoveModifiedEdge(edge.Gid());
}
if (ret.GetValue()) {
deleted_edges.push_back(*ret.GetValue());
}
}
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex_ptr->gid), utils::SerializeVertex(*vertex_ptr));
auto *disk_storage = static_cast<DiskStorage *>(storage_);
disk_storage->vertex_count_.fetch_sub(1, std::memory_order_acq_rel);
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
return std::make_optional<ReturnType>(
VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true},
std::move(deleted_edges));
return maybe_result;
}
bool DiskStorage::DiskAccessor::PrefetchEdgeFilter(const std::string_view disk_edge_key_str,
@ -1300,73 +1265,6 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
&storage_->constraints_, config_);
}
Result<std::optional<EdgeAccessor>> DiskStorage::DiskAccessor::DeleteEdge(EdgeAccessor *edge) {
MG_ASSERT(edge->transaction_ == &transaction_,
"EdgeAccessor must be from the same transaction as the storage "
"accessor when deleting an edge!");
const auto edge_ref = edge->edge_;
const auto edge_type = edge->edge_type_;
if (config_.properties_on_edges && edge_ref.ptr->deleted) return std::optional<EdgeAccessor>{};
auto *from_vertex = edge->from_vertex_;
auto *to_vertex = edge->to_vertex_;
MG_ASSERT(!from_vertex->deleted, "Invalid database state!");
if (to_vertex != from_vertex) {
MG_ASSERT(!to_vertex->deleted, "Invalid database state!");
}
auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) {
const std::tuple<EdgeTypeId, Vertex *, EdgeRef> link(edge_type, vertex, edge_ref);
auto it = std::find(edges->begin(), edges->end(), link);
if (config_.properties_on_edges) {
MG_ASSERT(it != edges->end(), "Invalid database state!");
} else if (it == edges->end()) {
return false;
}
std::swap(*it, *edges->rbegin());
edges->pop_back();
return true;
};
const auto op1 = delete_edge_from_storage(to_vertex, &from_vertex->out_edges);
const auto op2 = delete_edge_from_storage(from_vertex, &to_vertex->in_edges);
const DiskEdgeKey disk_edge_key(from_vertex->gid, to_vertex->gid, edge_type, edge_ref, config_.properties_on_edges);
edges_to_delete_.emplace(disk_edge_key.GetSerializedKey());
transaction_.RemoveModifiedEdge(edge->Gid());
if (config_.properties_on_edges) {
MG_ASSERT((op1 && op2), "Invalid database state!");
} else {
MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!");
if (!op1 && !op2) {
// The edge is already deleted.
return std::optional<EdgeAccessor>{};
}
}
if (config_.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag());
edge_ptr->deleted = true;
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
storage_->edge_count_.fetch_sub(1, std::memory_order_acq_rel);
return std::make_optional<EdgeAccessor>(edge_ref, edge_type, from_vertex, to_vertex, &transaction_,
&storage_->indices_, &storage_->constraints_, config_, true);
}
Result<EdgeAccessor> DiskStorage::DiskAccessor::EdgeSetFrom(EdgeAccessor * /*edge*/, VertexAccessor * /*new_from*/) {
MG_ASSERT(false, "EdgeSetFrom is currently only implemented for InMemory storage");
return Error::NONEXISTENT_OBJECT;

View File

@ -172,11 +172,8 @@ class DiskStorage final : public Storage {
throw utils::NotYetImplemented("SetIndexStats(stats) is not implemented for DiskStorage.");
}
/// TODO: It is just marked as deleted but the memory isn't reclaimed because of the in-memory storage
Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex) override;
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex(
VertexAccessor *vertex) override;
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach) override;
void PrefetchInEdges(const VertexAccessor &vertex_acc) override;
@ -188,8 +185,6 @@ class DiskStorage final : public Storage {
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) override;
bool LabelIndexExists(LabelId label) const override {
auto *disk_storage = static_cast<DiskStorage *>(storage_);
return disk_storage->indices_.label_index_->IndexExists(label);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -36,7 +36,6 @@ static_assert(std::is_standard_layout_v<Edge *>, "The Edge * must have a standar
static_assert(std::is_standard_layout_v<EdgeRef>, "The EdgeRef must have a standard layout!");
inline bool operator==(const EdgeRef &a, const EdgeRef &b) noexcept { return a.gid == b.gid; }
inline bool operator<(const EdgeRef &first, const EdgeRef &second) { return first.gid < second.gid; }
inline bool operator!=(const EdgeRef &a, const EdgeRef &b) noexcept { return a.gid != b.gid; }
} // namespace memgraph::storage

View File

@ -245,114 +245,51 @@ std::optional<VertexAccessor> InMemoryStorage::InMemoryAccessor::FindVertex(Gid
return VertexAccessor::Create(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_, view);
}
Result<std::optional<VertexAccessor>> InMemoryStorage::InMemoryAccessor::DeleteVertex(VertexAccessor *vertex) {
MG_ASSERT(vertex->transaction_ == &transaction_,
"VertexAccessor must be from the same transaction as the storage "
"accessor when deleting a vertex!");
auto *vertex_ptr = vertex->vertex_;
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
InMemoryStorage::InMemoryAccessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges,
bool detach) {
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
auto guard = std::unique_lock{vertex_ptr->lock};
auto maybe_result = Storage::Accessor::DetachDelete(nodes, edges, detach);
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
if (vertex_ptr->deleted) {
return std::optional<VertexAccessor>{};
if (maybe_result.HasError()) {
return maybe_result.GetError();
}
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Error::VERTEX_HAS_EDGES;
auto value = maybe_result.GetValue();
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
if (!value) {
return std::make_optional<ReturnType>();
}
auto &[deleted_vertices, deleted_edges] = *value;
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
auto const inform_gc_vertex_deletion = utils::OnScopeExit{[this, &deleted_vertices = deleted_vertices]() {
if (!deleted_vertices.empty() && transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
mem_storage->gc_full_scan_vertices_delete_ = true;
}
}};
return std::make_optional<VertexAccessor>(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_,
config_, true);
}
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>>
InMemoryStorage::InMemoryAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
using ReturnType = std::pair<VertexAccessor, std::vector<EdgeAccessor>>;
MG_ASSERT(vertex->transaction_ == &transaction_,
"VertexAccessor must be from the same transaction as the storage "
"accessor when deleting a vertex!");
auto *vertex_ptr = vertex->vertex_;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
{
auto guard = std::unique_lock{vertex_ptr->lock};
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
if (vertex_ptr->deleted) return std::optional<ReturnType>{};
in_edges = vertex_ptr->in_edges;
out_edges = vertex_ptr->out_edges;
}
std::vector<EdgeAccessor> deleted_edges;
for (const auto &item : in_edges) {
auto [edge_type, from_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, from_vertex, vertex_ptr, &transaction_, &storage_->indices_,
&storage_->constraints_, config_);
auto ret = DeleteEdge(&e);
if (ret.HasError()) {
MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!");
return ret.GetError();
}
if (ret.GetValue()) {
deleted_edges.push_back(*ret.GetValue());
}
}
for (const auto &item : out_edges) {
auto [edge_type, to_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, vertex_ptr, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_,
config_);
auto ret = DeleteEdge(&e);
if (ret.HasError()) {
MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!");
return ret.GetError();
}
if (ret.GetValue()) {
deleted_edges.push_back(*ret.GetValue());
}
}
auto guard = std::unique_lock{vertex_ptr->lock};
// We need to check again for serialization errors because we unlocked the
// vertex. Some other transaction could have modified the vertex in the
// meantime if we didn't have any edges to delete.
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
auto const inform_gc_edge_deletion = utils::OnScopeExit{[this, &deleted_edges = deleted_edges]() {
if (!deleted_edges.empty() && transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
mem_storage->gc_full_scan_vertices_delete_ = true;
mem_storage->gc_full_scan_edges_delete_ = true;
}
}};
for (auto const &vertex : deleted_vertices) {
transaction_.manyDeltasCache.Invalidate(vertex.vertex_);
}
return std::make_optional<ReturnType>(
VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true},
std::move(deleted_edges));
for (const auto &edge : deleted_edges) {
transaction_.manyDeltasCache.Invalidate(edge.from_vertex_, edge.edge_type_, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(edge.to_vertex_, edge.edge_type_, EdgeDirection::IN);
}
return maybe_result;
}
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to,
@ -700,100 +637,6 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor *
&storage_->constraints_, config_);
}
Result<std::optional<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::DeleteEdge(EdgeAccessor *edge) {
MG_ASSERT(edge->transaction_ == &transaction_,
"EdgeAccessor must be from the same transaction as the storage "
"accessor when deleting an edge!");
auto &edge_ref = edge->edge_;
auto &edge_type = edge->edge_type_;
std::unique_lock<utils::RWSpinLock> guard;
if (config_.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
if (edge_ptr->deleted) return std::optional<EdgeAccessor>{};
}
auto *from_vertex = edge->from_vertex_;
auto *to_vertex = edge->to_vertex_;
// Obtain the locks by `gid` order to avoid lock cycles.
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
if (from_vertex->gid < to_vertex->gid) {
guard_from.lock();
guard_to.lock();
} else if (from_vertex->gid > to_vertex->gid) {
guard_to.lock();
guard_from.lock();
} else {
// The vertices are the same vertex, only lock one.
guard_from.lock();
}
if (!PrepareForWrite(&transaction_, from_vertex)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!from_vertex->deleted, "Invalid database state!");
if (to_vertex != from_vertex) {
if (!PrepareForWrite(&transaction_, to_vertex)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!to_vertex->deleted, "Invalid database state!");
}
auto delete_edge_from_storage = [&edge_type, &edge_ref, this](auto *vertex, auto *edges) {
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link(edge_type, vertex, edge_ref);
auto it = std::find(edges->begin(), edges->end(), link);
if (config_.properties_on_edges) {
MG_ASSERT(it != edges->end(), "Invalid database state!");
} else if (it == edges->end()) {
return false;
}
std::swap(*it, *edges->rbegin());
edges->pop_back();
return true;
};
auto op1 = delete_edge_from_storage(to_vertex, &from_vertex->out_edges);
auto op2 = delete_edge_from_storage(from_vertex, &to_vertex->in_edges);
if (config_.properties_on_edges) {
MG_ASSERT((op1 && op2), "Invalid database state!");
} else {
MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!");
if (!op1 && !op2) {
// The edge is already deleted.
return std::optional<EdgeAccessor>{};
}
}
if (config_.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
CreateAndLinkDelta(&transaction_, edge_ptr, Delta::RecreateObjectTag());
edge_ptr->deleted = true;
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
if (transaction_.storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
mem_storage->gc_full_scan_edges_delete_ = true;
}
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Decrement edge count.
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
return std::make_optional<EdgeAccessor>(edge_ref, edge_type, from_vertex, to_vertex, &transaction_,
&storage_->indices_, &storage_->constraints_, config_, true);
}
// NOLINTNEXTLINE(google-default-arguments)
utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemoryAccessor::Commit(
const std::optional<uint64_t> desired_commit_timestamp) {

View File

@ -205,14 +205,8 @@ class InMemoryStorage final : public Storage {
labels);
}
/// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex) override;
/// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex(
VertexAccessor *vertex) override;
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach) override;
void PrefetchInEdges(const VertexAccessor &vertex_acc) override{};
@ -225,10 +219,6 @@ class InMemoryStorage final : public Storage {
Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) override;
/// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) override;
bool LabelIndexExists(LabelId label) const override {
return static_cast<InMemoryStorage *>(storage_)->indices_.label_index_->IndexExists(label);
}

View File

@ -9,6 +9,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "absl/container/flat_hash_set.h"
#include "spdlog/spdlog.h"
#include "storage/v2/disk/name_id_mapper.hpp"
@ -127,4 +128,368 @@ void Storage::Accessor::AdvanceCommand() {
++transaction_.command_id;
}
Result<std::optional<VertexAccessor>> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) {
auto res = DetachDelete({vertex}, {}, false);
if (res.HasError()) {
return res.GetError();
}
const auto &value = res.GetValue();
if (!value) {
return std::optional<VertexAccessor>{};
}
const auto &[vertices, edges] = *value;
MG_ASSERT(vertices.size() <= 1, "The number of deleted vertices is not less or equal to 1!");
MG_ASSERT(edges.empty(), "Deleting a vertex without detaching should not have resulted in deleting any edges!");
if (vertices.empty()) {
return std::optional<VertexAccessor>{};
}
return std::make_optional<VertexAccessor>(vertices[0]);
}
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Storage::Accessor::DetachDeleteVertex(
VertexAccessor *vertex) {
using ReturnType = std::pair<VertexAccessor, std::vector<EdgeAccessor>>;
auto res = DetachDelete({vertex}, {}, true);
if (res.HasError()) {
return res.GetError();
}
auto &value = res.GetValue();
if (!value) {
return std::optional<ReturnType>{};
}
auto &[vertices, edges] = *value;
MG_ASSERT(vertices.size() <= 1, "The number of detach deleted vertices is not less or equal to 1!");
return std::make_optional<ReturnType>(vertices[0], std::move(edges));
}
Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
auto res = DetachDelete({}, {edge}, false);
if (res.HasError()) {
return res.GetError();
}
const auto &value = res.GetValue();
if (!value) {
return std::optional<EdgeAccessor>{};
}
const auto &[vertices, edges] = *value;
MG_ASSERT(vertices.empty(), "Deleting an edge should not have deleted a vertex!");
MG_ASSERT(edges.size() <= 1, "Deleted edges need to be less or equal to 1!");
if (edges.empty()) {
return std::optional<EdgeAccessor>{};
}
return std::make_optional<EdgeAccessor>(edges[0]);
}
Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>>
Storage::Accessor::DetachDelete(std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach) {
using ReturnType = std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>;
// 1. Gather nodes which are not deleted yet in the system
auto maybe_nodes_to_delete = PrepareDeletableNodes(nodes);
if (maybe_nodes_to_delete.HasError()) {
return maybe_nodes_to_delete.GetError();
}
const std::unordered_set<Vertex *> nodes_to_delete = *maybe_nodes_to_delete.GetValue();
// 2. Gather edges and corresponding node on the other end of the edge for the deletable nodes
EdgeInfoForDeletion edge_deletion_info = PrepareDeletableEdges(nodes_to_delete, edges, detach);
// Detach nodes which need to be deleted
std::unordered_set<Gid> deleted_edge_ids;
std::vector<EdgeAccessor> deleted_edges;
if (detach) {
auto maybe_cleared_edges = ClearEdgesOnVertices(nodes_to_delete, deleted_edge_ids);
if (maybe_cleared_edges.HasError()) {
return maybe_cleared_edges.GetError();
}
deleted_edges = *maybe_cleared_edges.GetValue();
}
// Detach nodes on the other end, which don't need deletion, by passing once through their vectors
auto maybe_remaining_edges = DetachRemainingEdges(std::move(edge_deletion_info), deleted_edge_ids);
if (maybe_remaining_edges.HasError()) {
return maybe_remaining_edges.GetError();
}
const std::vector<EdgeAccessor> remaining_edges = *maybe_remaining_edges.GetValue();
deleted_edges.insert(deleted_edges.end(), remaining_edges.begin(), remaining_edges.end());
auto const maybe_deleted_vertices = TryDeleteVertices(nodes_to_delete);
if (maybe_deleted_vertices.HasError()) {
return maybe_deleted_vertices.GetError();
}
auto deleted_vertices = maybe_deleted_vertices.GetValue();
return std::make_optional<ReturnType>(std::move(deleted_vertices), std::move(deleted_edges));
}
Result<std::optional<std::unordered_set<Vertex *>>> Storage::Accessor::PrepareDeletableNodes(
const std::vector<VertexAccessor *> &vertices) {
// Some of the vertices could be already deleted in the system so we need to check
std::unordered_set<Vertex *> nodes_to_delete{};
for (const auto &vertex : vertices) {
MG_ASSERT(vertex->transaction_ == &transaction_,
"VertexAccessor must be from the same transaction as the storage "
"accessor when deleting a vertex!");
auto *vertex_ptr = vertex->vertex_;
{
auto vertex_lock = std::unique_lock{vertex_ptr->lock};
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
if (vertex_ptr->deleted) {
continue;
}
}
nodes_to_delete.insert(vertex_ptr);
}
return std::make_optional<std::unordered_set<Vertex *>>(nodes_to_delete);
}
EdgeInfoForDeletion Storage::Accessor::PrepareDeletableEdges(const std::unordered_set<Vertex *> &vertices,
const std::vector<EdgeAccessor *> &edges,
bool detach) noexcept {
std::unordered_set<Vertex *> partial_src_vertices;
std::unordered_set<Vertex *> partial_dest_vertices;
std::unordered_set<Gid> src_edge_ids;
std::unordered_set<Gid> dest_edge_ids;
auto try_adding_partial_delete_vertices = [this, &vertices](auto &partial_delete_vertices, auto &edge_ids,
auto &item) {
// For the nodes on the other end of the edge, they might not get deleted in the system but only cut out
// of the edge. Therefore, information is gathered in this step to account for every vertices' in and out
// edges and what must be deleted
const auto &[edge_type, opposing_vertex, edge] = item;
if (!vertices.contains(opposing_vertex)) {
partial_delete_vertices.insert(opposing_vertex);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge.ptr->gid : edge.gid;
edge_ids.insert(edge_gid);
}
};
// add nodes which need to be detached on the other end of the edge
if (detach) {
for (auto *vertex_ptr : vertices) {
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
{
auto vertex_lock = std::shared_lock{vertex_ptr->lock};
in_edges = vertex_ptr->in_edges;
out_edges = vertex_ptr->out_edges;
}
for (auto const &item : in_edges) {
try_adding_partial_delete_vertices(partial_src_vertices, src_edge_ids, item);
}
for (auto const &item : out_edges) {
try_adding_partial_delete_vertices(partial_dest_vertices, dest_edge_ids, item);
}
}
}
// also add edges which we want to delete from the query
for (const auto &edge_accessor : edges) {
partial_src_vertices.insert(edge_accessor->from_vertex_);
partial_dest_vertices.insert(edge_accessor->to_vertex_);
auto const edge_gid = edge_accessor->Gid();
src_edge_ids.insert(edge_gid);
dest_edge_ids.insert(edge_gid);
}
return EdgeInfoForDeletion{.partial_src_edge_ids = std::move(src_edge_ids),
.partial_dest_edge_ids = std::move(dest_edge_ids),
.partial_src_vertices = std::move(partial_src_vertices),
.partial_dest_vertices = std::move(partial_dest_vertices)};
}
Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::ClearEdgesOnVertices(
const std::unordered_set<Vertex *> &vertices, std::unordered_set<Gid> &deleted_edge_ids) {
// We want to gather all edges that we delete in this step so that we can proceed with
// further deletion
using ReturnType = std::vector<EdgeAccessor>;
std::vector<EdgeAccessor> deleted_edges{};
auto clear_edges = [this, &deleted_edges, &deleted_edge_ids](
auto *vertex_ptr, auto *attached_edges_to_vertex, auto deletion_delta,
auto reverse_vertex_order) -> Result<std::optional<ReturnType>> {
auto vertex_lock = std::unique_lock{vertex_ptr->lock};
while (!attached_edges_to_vertex->empty()) {
// get the information about the last edge in the vertex collection
auto const &[edge_type, opposing_vertex, edge_ref] = *attached_edges_to_vertex->rbegin();
std::unique_lock<utils::RWSpinLock> guard;
if (storage_->config_.items.properties_on_edges) {
auto edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
}
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
attached_edges_to_vertex->pop_back();
if (storage_->config_.items.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
MarkEdgeAsDeleted(edge_ptr);
}
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_,
&storage_->constraints_, storage_->config_.items, true);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
}
return std::make_optional<ReturnType>();
};
// delete the in and out edges from the nodes we want to delete
// no need to lock here, we are just passing the pointer of the in and out edges collections
for (auto *vertex_ptr : vertices) {
auto maybe_error = clear_edges(vertex_ptr, &vertex_ptr->in_edges, Delta::AddInEdgeTag(), false);
if (maybe_error.HasError()) {
return maybe_error;
}
maybe_error = clear_edges(vertex_ptr, &vertex_ptr->out_edges, Delta::AddOutEdgeTag(), true);
if (maybe_error.HasError()) {
return maybe_error;
}
}
return std::make_optional<ReturnType>(deleted_edges);
}
Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::DetachRemainingEdges(
EdgeInfoForDeletion info, std::unordered_set<Gid> &partially_detached_edge_ids) {
using ReturnType = std::vector<EdgeAccessor>;
std::vector<EdgeAccessor> deleted_edges{};
auto clear_edges_on_other_direction = [this, &deleted_edges, &partially_detached_edge_ids](
auto *vertex_ptr, auto *edges_attached_to_vertex, auto &set_for_erasure,
auto deletion_delta,
auto reverse_vertex_order) -> Result<std::optional<ReturnType>> {
auto vertex_lock = std::unique_lock{vertex_ptr->lock};
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
auto mid = std::partition(
edges_attached_to_vertex->begin(), edges_attached_to_vertex->end(), [this, &set_for_erasure](auto &edge) {
auto const &[edge_type, opposing_vertex, edge_ref] = edge;
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
return !set_for_erasure.contains(edge_gid);
});
for (auto it = mid; it != edges_attached_to_vertex->end(); it++) {
auto const &[edge_type, opposing_vertex, edge_ref] = *it;
std::unique_lock<utils::RWSpinLock> guard;
if (storage_->config_.items.properties_on_edges) {
auto edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
// this can happen only if we marked edges for deletion with no nodes,
// so the method detaching nodes will not do anything
MarkEdgeAsDeleted(edge_ptr);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_,
&storage_->constraints_, storage_->config_.items, true);
}
}
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
return std::make_optional<ReturnType>();
};
// remove edges from vertex collections which we aggregated for just detaching
for (auto *vertex_ptr : info.partial_src_vertices) {
auto maybe_error = clear_edges_on_other_direction(vertex_ptr, &vertex_ptr->out_edges, info.partial_src_edge_ids,
Delta::AddOutEdgeTag(), false);
if (maybe_error.HasError()) {
return maybe_error;
}
}
for (auto *vertex_ptr : info.partial_dest_vertices) {
auto maybe_error = clear_edges_on_other_direction(vertex_ptr, &vertex_ptr->in_edges, info.partial_dest_edge_ids,
Delta::AddInEdgeTag(), true);
if (maybe_error.HasError()) {
return maybe_error;
}
}
return std::make_optional<ReturnType>(deleted_edges);
}
Result<std::vector<VertexAccessor>> Storage::Accessor::TryDeleteVertices(const std::unordered_set<Vertex *> &vertices) {
std::vector<VertexAccessor> deleted_vertices;
deleted_vertices.reserve(vertices.size());
for (auto *vertex_ptr : vertices) {
auto vertex_lock = std::unique_lock{vertex_ptr->lock};
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) {
return Error::VERTEX_HAS_EDGES;
}
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
deleted_vertices.emplace_back(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_,
storage_->config_.items, true);
}
return deleted_vertices;
}
void Storage::Accessor::MarkEdgeAsDeleted(Edge *edge) {
if (!edge->deleted) {
CreateAndLinkDelta(&transaction_, edge, Delta::RecreateObjectTag());
edge->deleted = true;
storage_->edge_count_.fetch_sub(1, std::memory_order_acq_rel);
}
}
} // namespace memgraph::storage

View File

@ -68,6 +68,13 @@ struct StorageInfo {
uint64_t disk_usage;
};
struct EdgeInfoForDeletion {
std::unordered_set<Gid> partial_src_edge_ids{};
std::unordered_set<Gid> partial_dest_edge_ids{};
std::unordered_set<Vertex *> partial_src_vertices{};
std::unordered_set<Vertex *> partial_dest_vertices{};
};
class Storage {
friend class ReplicationServer;
friend class ReplicationClient;
@ -111,6 +118,14 @@ class Storage {
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) = 0;
virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex);
virtual Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex(
VertexAccessor *vertex);
virtual Result<std::optional<std::pair<std::vector<VertexAccessor>, std::vector<EdgeAccessor>>>> DetachDelete(
std::vector<VertexAccessor *> nodes, std::vector<EdgeAccessor *> edges, bool detach);
virtual uint64_t ApproximateVertexCount() const = 0;
virtual uint64_t ApproximateVertexCount(LabelId label) const = 0;
@ -142,11 +157,6 @@ class Storage {
virtual std::vector<LabelId> DeleteLabelIndexStats(std::span<std::string> labels) = 0;
virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex) = 0;
virtual Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex(
VertexAccessor *vertex) = 0;
virtual void PrefetchInEdges(const VertexAccessor &vertex_acc) = 0;
virtual void PrefetchOutEdges(const VertexAccessor &vertex_acc) = 0;
@ -157,7 +167,7 @@ class Storage {
virtual Result<EdgeAccessor> EdgeSetTo(EdgeAccessor *edge, VertexAccessor *new_to) = 0;
virtual Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) = 0;
virtual Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge);
virtual bool LabelIndexExists(LabelId label) const = 0;
@ -202,6 +212,18 @@ class Storage {
std::optional<uint64_t> commit_timestamp_;
bool is_transaction_active_;
// Detach delete private methods
Result<std::optional<std::unordered_set<Vertex *>>> PrepareDeletableNodes(
const std::vector<VertexAccessor *> &vertices);
EdgeInfoForDeletion PrepareDeletableEdges(const std::unordered_set<Vertex *> &vertices,
const std::vector<EdgeAccessor *> &edges, bool detach) noexcept;
Result<std::optional<std::vector<EdgeAccessor>>> ClearEdgesOnVertices(const std::unordered_set<Vertex *> &vertices,
std::unordered_set<Gid> &deleted_edge_ids);
Result<std::optional<std::vector<EdgeAccessor>>> DetachRemainingEdges(
EdgeInfoForDeletion info, std::unordered_set<Gid> &partially_detached_edge_ids);
Result<std::vector<VertexAccessor>> TryDeleteVertices(const std::unordered_set<Vertex *> &vertices);
void MarkEdgeAsDeleted(Edge *edge);
private:
StorageMode creation_storage_mode_;
};

View File

@ -0,0 +1,211 @@
Feature: Delete
Scenario: Delete all from database and yield number of deleted items
Given an empty graph
And having executed
"""
CREATE (n), (m), (o)
"""
When executing query:
"""
MATCH (n) DETACH DELETE n RETURN COUNT(*) AS cnt
"""
Then the result should be:
| cnt |
| 3 |
Scenario: Delete all from database and match nothing after
Given an empty graph
And having executed
"""
CREATE (n), (m)
"""
When executing query:
"""
MATCH (n) DETACH DELETE n WITH n MATCH (m) RETURN COUNT(*) AS cnt
"""
Then the result should be:
| cnt |
| 0 |
Scenario: Delete relationship in the pattern
Given an empty graph
And having executed
"""
CREATE (n)-[:REL]->(m), (k)-[:REL]->(z)
"""
When executing query:
"""
MATCH (n)-[r:REL]->(m) DELETE r RETURN COUNT(*) AS cnt
"""
Then the result should be:
| cnt |
| 2 |
Scenario: Delete node and return property throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n RETURN n.prop AS prop
"""
Then an error should be raised
Scenario: Delete node, set property throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n.prop = 2
"""
Then the result should be empty
Scenario: Delete node, set property and return throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n.prop = 2 RETURN n
"""
Then an error should be raised
Scenario: Delete node, remove property throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n REMOVE n.prop
"""
Then the result should be empty
Scenario: Delete node, remove property and return throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n REMOVE n.prop RETURN n
"""
Then an error should be raised
Scenario: Delete node, set label throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n:Label
"""
Then the result should be empty
Scenario: Delete node, set label and return throws an error
Given an empty graph
And having executed
"""
CREATE (n {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n:Label RETURN n
"""
Then an error should be raised
Scenario: Delete node, remove label throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n REMOVE n:Label
"""
Then the result should be empty
Scenario: Delete node, remove label and return throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n REMOVE n:Label RETURN n
"""
Then an error should be raised
Scenario: Delete node, set update properties and return throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n += {prop: 2} RETURN n
"""
Then an error should be raised
Scenario: Delete node, set properties throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n += {prop: 2}
"""
Then the result should be empty
Scenario: Delete node, set replace properties and return throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n = {prop: 2} RETURN n
"""
Then an error should be raised
Scenario: Delete node, set replace properties throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n = {prop: 2}
"""
Then the result should be empty
Scenario: Delete node, set property and return it with aggregation throws an error
Given an empty graph
And having executed
"""
CREATE (n:Label {prop: 1});
"""
When executing query:
"""
MATCH (n) DETACH DELETE n SET n.prop = 1 WITH n RETURN n
"""
Then an error should be raised

View File

@ -12,44 +12,51 @@
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
'''
"""
Large bipartite graph stress test.
'''
"""
import atexit
import logging
import multiprocessing
import time
import atexit
from common import connection_argument_parser, assert_equal, \
OutputData, execute_till_success, \
batch, render, SessionCache
from common import (
OutputData,
SessionCache,
assert_equal,
batch,
connection_argument_parser,
execute_till_success,
render,
)
def parse_args():
'''
"""
Parses user arguments
:return: parsed arguments
'''
"""
parser = connection_argument_parser()
parser.add_argument('--worker-count', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
parser.add_argument("--logging", default="INFO",
choices=["INFO", "DEBUG", "WARNING", "ERROR"],
help="Logging level")
parser.add_argument('--u-count', type=int, default=100,
help='Size of U set in the bipartite graph.')
parser.add_argument('--v-count', type=int, default=100,
help='Size of V set in the bipartite graph.')
parser.add_argument('--vertex-batch-size', type=int, default=100,
help="Create vertices in batches of this size.")
parser.add_argument('--edge-batching', action='store_true',
help='Create edges in batches.')
parser.add_argument('--edge-batch-size', type=int, default=100,
help='Number of edges in a batch when edges '
'are created in batches.')
parser.add_argument(
"--worker-count", type=int, default=multiprocessing.cpu_count(), help="Number of concurrent workers."
)
parser.add_argument(
"--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
)
parser.add_argument("--u-count", type=int, default=100, help="Size of U set in the bipartite graph.")
parser.add_argument("--v-count", type=int, default=100, help="Size of V set in the bipartite graph.")
parser.add_argument("--vertex-batch-size", type=int, default=100, help="Create vertices in batches of this size.")
parser.add_argument("--edge-batching", action="store_true", help="Create edges in batches.")
parser.add_argument(
"--edge-batch-size",
type=int,
default=100,
help="Number of edges in a batch when edges " "are created in batches.",
)
parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
return parser.parse_args()
@ -62,18 +69,18 @@ atexit.register(SessionCache.cleanup)
def create_u_v_edges(u):
'''
"""
Creates nodes and checks that all nodes were created.
create edges from one vertex in U set to all vertex of V set
:param worker_id: worker id
:return: tuple (worker_id, create execution time, time unit)
'''
"""
start_time = time.time()
session = SessionCache.argument_session(args)
no_failures = 0
match_u = 'MATCH (u:U {id: %d})' % u
match_u = "MATCH (u:U {id: %d})" % u
if args.edge_batching:
# TODO: try to randomize execution, the execution time should
# be smaller, add randomize flag
@ -83,143 +90,120 @@ def create_u_v_edges(u):
query = match_u + "".join(match_v) + "".join(create_u)
no_failures += execute_till_success(session, query)[1]
else:
no_failures += execute_till_success(
session, match_u + ' MATCH (v:V) CREATE (u)-[:R]->(v)')[1]
no_failures += execute_till_success(session, match_u + " MATCH (v:V) CREATE (u)-[:R]->(v)")[1]
end_time = time.time()
return u, end_time - start_time, "s", no_failures
def traverse_from_u_worker(u):
'''
"""
Traverses edges starting from an element of U set.
Traversed labels are: :U -> :V -> :U.
'''
"""
session = SessionCache.argument_session(args)
start_time = time.time()
assert_equal(
args.u_count * args.v_count - args.v_count, # cypher morphism
session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) "
"RETURN count(v) AS cnt" % u).data()[0]['cnt'],
"Number of traversed edges started "
"from U(id:%s) is wrong!. " % u +
"Expected: %s Actual: %s")
session.run("MATCH (u1:U {id: %s})-[e1]->(v:V)<-[e2]-(u2:U) " "RETURN count(v) AS cnt" % u).data()[0]["cnt"],
"Number of traversed edges started " "from U(id:%s) is wrong!. " % u + "Expected: %s Actual: %s",
)
end_time = time.time()
return u, end_time - start_time, 's'
return u, end_time - start_time, "s"
def traverse_from_v_worker(v):
'''
"""
Traverses edges starting from an element of V set.
Traversed labels are: :V -> :U -> :V.
'''
"""
session = SessionCache.argument_session(args)
start_time = time.time()
assert_equal(
args.u_count * args.v_count - args.u_count, # cypher morphism
session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) "
"RETURN count(u) AS cnt" % v).data()[0]['cnt'],
"Number of traversed edges started "
"from V(id:%s) is wrong!. " % v +
"Expected: %s Actual: %s")
session.run("MATCH (v1:V {id: %s})<-[e1]-(u:U)-[e2]->(v2:V) " "RETURN count(u) AS cnt" % v).data()[0]["cnt"],
"Number of traversed edges started " "from V(id:%s) is wrong!. " % v + "Expected: %s Actual: %s",
)
end_time = time.time()
return v, end_time - start_time, 's'
return v, end_time - start_time, "s"
def execution_handler():
'''
"""
Initializes client processes, database and starts the execution.
'''
"""
# instance cleanup
session = SessionCache.argument_session(args)
start_time = time.time()
# clean existing database
session.run('MATCH (n) DETACH DELETE n').consume()
session.run("MATCH (n) DETACH DELETE n").consume()
cleanup_end_time = time.time()
output_data.add_measurement("cleanup_time",
cleanup_end_time - start_time)
output_data.add_measurement("cleanup_time", cleanup_end_time - start_time)
log.info("Database is clean.")
# create indices
session.run('CREATE INDEX ON :U').consume()
session.run('CREATE INDEX ON :V').consume()
session.run("CREATE INDEX ON :U").consume()
session.run("CREATE INDEX ON :V").consume()
# create U vertices
for b in batch(render('CREATE (:U {{id: {}}})', range(args.u_count)),
args.vertex_batch_size):
for b in batch(render("CREATE (:U {{id: {}}})", range(args.u_count)), args.vertex_batch_size):
session.run(" ".join(b)).consume()
# create V vertices
for b in batch(render('CREATE (:V {{id: {}}})', range(args.v_count)),
args.vertex_batch_size):
for b in batch(render("CREATE (:V {{id: {}}})", range(args.v_count)), args.vertex_batch_size):
session.run(" ".join(b)).consume()
vertices_create_end_time = time.time()
output_data.add_measurement(
'vertices_create_time',
vertices_create_end_time - cleanup_end_time)
output_data.add_measurement("vertices_create_time", vertices_create_end_time - cleanup_end_time)
log.info("All nodes created.")
# concurrent create execution & tests
with multiprocessing.Pool(args.worker_count) as p:
create_edges_start_time = time.time()
for worker_id, create_time, time_unit, no_failures in \
p.map(create_u_v_edges, [i for i in range(args.u_count)]):
log.info('Worker ID: %s; Create time: %s%s Failures: %s' %
(worker_id, create_time, time_unit, no_failures))
for worker_id, create_time, time_unit, no_failures in p.map(create_u_v_edges, [i for i in range(args.u_count)]):
log.info("Worker ID: %s; Create time: %s%s Failures: %s" % (worker_id, create_time, time_unit, no_failures))
create_edges_end_time = time.time()
output_data.add_measurement(
'edges_create_time',
create_edges_end_time - create_edges_start_time)
output_data.add_measurement("edges_create_time", create_edges_end_time - create_edges_start_time)
# check total number of edges
assert_equal(
args.v_count * args.u_count,
session.run(
'MATCH ()-[r]->() '
'RETURN count(r) AS cnt').data()[0]['cnt'],
"Total number of edges isn't correct! Expected: %s Actual: %s")
session.run("MATCH ()-[r]->() " "RETURN count(r) AS cnt").data()[0]["cnt"],
"Total number of edges isn't correct! Expected: %s Actual: %s",
)
# check traversals starting from all elements of U
traverse_from_u_start_time = time.time()
for u, traverse_u_time, time_unit in \
p.map(traverse_from_u_worker,
[i for i in range(args.u_count)]):
for u, traverse_u_time, time_unit in p.map(traverse_from_u_worker, [i for i in range(args.u_count)]):
log.info("U {id: %s} %s%s" % (u, traverse_u_time, time_unit))
traverse_from_u_end_time = time.time()
output_data.add_measurement(
'traverse_from_u_time',
traverse_from_u_end_time - traverse_from_u_start_time)
output_data.add_measurement("traverse_from_u_time", traverse_from_u_end_time - traverse_from_u_start_time)
# check traversals starting from all elements of V
traverse_from_v_start_time = time.time()
for v, traverse_v_time, time_unit in \
p.map(traverse_from_v_worker,
[i for i in range(args.v_count)]):
for v, traverse_v_time, time_unit in p.map(traverse_from_v_worker, [i for i in range(args.v_count)]):
log.info("V {id: %s} %s%s" % (v, traverse_v_time, time_unit))
traverse_from_v_end_time = time.time()
output_data.add_measurement(
'traverse_from_v_time',
traverse_from_v_end_time - traverse_from_v_start_time)
output_data.add_measurement("traverse_from_v_time", traverse_from_v_end_time - traverse_from_v_start_time)
# check total number of vertices
assert_equal(
args.v_count + args.u_count,
session.run('MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt'],
"Total number of vertices isn't correct! Expected: %s Actual: %s")
session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"],
"Total number of vertices isn't correct! Expected: %s Actual: %s",
)
# check total number of edges
assert_equal(
args.v_count * args.u_count,
session.run(
'MATCH ()-[r]->() RETURN count(r) AS cnt').data()[0]['cnt'],
"Total number of edges isn't correct! Expected: %s Actual: %s")
session.run("MATCH ()-[r]->() RETURN count(r) AS cnt").data()[0]["cnt"],
"Total number of edges isn't correct! Expected: %s Actual: %s",
)
end_time = time.time()
output_data.add_measurement("total_execution_time",
end_time - start_time)
output_data.add_measurement("total_execution_time", end_time - start_time)
if __name__ == '__main__':
if __name__ == "__main__":
logging.basicConfig(level=args.logging)
if args.logging != "DEBUG":
logging.getLogger("neo4j").setLevel(logging.WARNING)

View File

@ -11,26 +11,27 @@
# -*- coding: utf-8 -*-
'''
"""
Common methods for writing graph database
integration tests in python.
Only Bolt communication protocol is supported.
'''
"""
import contextlib
import os
from threading import Thread
from time import sleep
import time
from argparse import ArgumentParser
from neo4j import GraphDatabase, TRUST_ALL_CERTIFICATES
from threading import Thread
from gqlalchemy import Memgraph
from neo4j import TRUST_ALL_CERTIFICATES, GraphDatabase
class OutputData:
'''
"""
Encapsulates results and info about the tests.
'''
"""
def __init__(self):
# data in time format (name, time, unit)
@ -39,32 +40,32 @@ class OutputData:
self._statuses = []
def add_measurement(self, name, time, unit="s"):
'''
"""
Stores measurement.
:param name: str, name of measurement
:param time: float, time value
:param unit: str, time unit
'''
"""
self._measurements.append((name, time, unit))
def add_status(self, name, status):
'''
"""
Stores status data point.
:param name: str, name of data point
:param status: printable value
'''
"""
self._statuses.append((name, status))
def dump(self, print_f=print):
'''
"""
Dumps output using the given ouput function.
Args:
print_f - the function that consumes ouptput. Defaults to
the 'print' function.
'''
"""
print_f("Output data:")
for name, status in self._statuses:
print_f(" %s: %s" % (name, status))
@ -73,7 +74,7 @@ class OutputData:
def execute_till_success(session, query, max_retries=1000):
'''
"""
Executes a query within Bolt session until the query is
successfully executed against the database.
@ -86,7 +87,7 @@ def execute_till_success(session, query, max_retries=1000):
:param query: query to execute
:return: tuple (results_data_list, number_of_failures, result_summary)
'''
"""
no_failures = 0
while True:
try:
@ -97,12 +98,24 @@ def execute_till_success(session, query, max_retries=1000):
except Exception:
no_failures += 1
if no_failures >= max_retries:
raise Exception("Query '%s' failed %d times, aborting" %
(query, max_retries))
raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries))
def execute_till_success_gqlalchemy(memgraph: Memgraph, query: str, max_retries=1000):
"""Same method as execute_till_success, but for gqlalchemy."""
no_failures = 0
while True:
try:
result = memgraph.execute(query)
return result, no_failures
except Exception:
no_failures += 1
if no_failures >= max_retries:
raise Exception("Query '%s' failed %d times, aborting" % (query, max_retries))
def batch(input, batch_size):
""" Batches the given input (must be iterable).
"""Batches the given input (must be iterable).
Supports input generators. Returns a generator.
All is lazy. The last batch can contain less elements
then `batch_size`, but is for sure more then zero.
@ -134,7 +147,7 @@ def render(template, iterable_arguments):
def assert_equal(expected, actual, message):
'''
"""
Compares expected and actual values. If values are not the same terminate
the execution.
@ -142,45 +155,41 @@ def assert_equal(expected, actual, message):
:param actual: actual value
:param message: str, message in case that the values are not equal, must
contain two placeholders (%s) to print the values.
'''
"""
assert expected == actual, message % (expected, actual)
def connection_argument_parser():
'''
"""
Parses arguments related to establishing database connection like
host, port, username, etc.
:return: An instance of ArgumentParser
'''
"""
parser = ArgumentParser(description=__doc__)
parser.add_argument('--endpoint', type=str, default='127.0.0.1:7687',
help='DBMS instance endpoint. '
'Bolt protocol is the only option.')
parser.add_argument('--username', type=str, default='neo4j',
help='DBMS instance username.')
parser.add_argument('--password', type=int, default='1234',
help='DBMS instance password.')
parser.add_argument('--use-ssl', action='store_true',
help="Is SSL enabled?")
parser.add_argument(
"--endpoint",
type=str,
default="127.0.0.1:7687",
help="DBMS instance endpoint. " "Bolt protocol is the only option.",
)
parser.add_argument("--username", type=str, default="neo4j", help="DBMS instance username.")
parser.add_argument("--password", type=str, default="1234", help="DBMS instance password.")
parser.add_argument("--use-ssl", action="store_true", help="Is SSL enabled?")
return parser
@contextlib.contextmanager
def bolt_session(url, auth, ssl=False):
'''
"""
with wrapper around Bolt session.
:param url: str, e.g. "bolt://127.0.0.1:7687"
:param auth: auth method, goes directly to the Bolt driver constructor
:param ssl: bool, is ssl enabled
'''
driver = GraphDatabase.driver(
url,
auth=auth,
encrypted=ssl,
trust=TRUST_ALL_CERTIFICATES)
"""
driver = GraphDatabase.driver(url, auth=auth, encrypted=ssl, trust=TRUST_ALL_CERTIFICATES)
session = driver.session()
try:
yield session
@ -192,19 +201,37 @@ def bolt_session(url, auth, ssl=False):
# If you are using session with multiprocessing take a look at SesssionCache
# in bipartite for an idea how to reuse sessions.
def argument_session(args):
'''
"""
:return: Bolt session context manager based on program arguments
'''
return bolt_session('bolt://' + args.endpoint,
(args.username, str(args.password)),
args.use_ssl)
"""
return bolt_session("bolt://" + args.endpoint, (args.username, args.password), args.use_ssl)
def argument_driver(args):
return GraphDatabase.driver(
'bolt://' + args.endpoint,
auth=(args.username, str(args.password)),
encrypted=args.use_ssl, trust=TRUST_ALL_CERTIFICATES)
"bolt://" + args.endpoint,
auth=(args.username, args.password),
encrypted=args.use_ssl,
trust=TRUST_ALL_CERTIFICATES,
)
def get_memgraph(args) -> Memgraph:
host_port = args.endpoint.split(":")
connection_params = {
"host": host_port[0],
"port": int(host_port[1]),
"username": args.username,
"password": args.password,
"encrypted": False,
}
if args.use_ssl:
connection_params["encrypted"] = True
return Memgraph(**connection_params)
# This class is used to create and cache sessions. Session is cached by args
# used to create it and process' pid in which it was created. This makes it
@ -241,9 +268,10 @@ def periodically_execute(callable, args, interval, daemon=True):
interval - time (in seconds) between two calls
deamon - if the execution thread should be a daemon
"""
def periodic_call():
while True:
sleep(interval)
time.sleep(interval)
callable()
Thread(target=periodic_call, args=args, daemon=daemon).start()

View File

@ -1,81 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import atexit
import json
import copy
import multiprocessing
import os
import subprocess
import sys
import time
from argparse import Namespace as Args
from subprocess import Popen
from typing import Dict, List
# dataset calibrated for running on Apollo (total 4min)
# bipartite.py runs for approx. 30s
# create_match.py runs for approx. 30s
# long_running runs for 1min
# long_running runs for 2min
SMALL_DATASET = [
{
"test": "bipartite.py",
"options": ["--u-count", "100", "--v-count", "100"],
"timeout": 5,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "40000", "--create-pack-size", "100"],
"timeout": 5,
},
{
"test": "parser.cpp",
"options": ["--per-worker-query-count", "1000"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "1000", "--edge-count", "5000", "--max-time", "1", "--verify", "20"],
"timeout": 5,
},
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "50000", "--max-time", "2", "--verify", "30"],
"timeout": 5,
},
]
# dataset calibrated for running on daily stress instance (total 9h)
# bipartite.py and create_match.py run for approx. 15min
# long_running runs for 5min x 6 times = 30min
# long_running runs for 8h
LARGE_DATASET = (
[
{
"test": "bipartite.py",
"options": ["--u-count", "300", "--v-count", "300"],
"timeout": 30,
},
{
"test": "create_match.py",
"options": ["--vertex-count", "500000", "--create-pack-size", "500"],
"timeout": 30,
},
]
+ [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "10000", "--edge-count", "40000", "--max-time", "5", "--verify", "60"],
"timeout": 16,
},
]
* 6
+ [
{
"test": "long_running.cpp",
"options": ["--vertex-count", "200000", "--edge-count", "1000000", "--max-time", "480", "--verify", "300"],
"timeout": 500,
},
]
)
from test_config import LARGE_DATASET, SMALL_DATASET, DatabaseMode, DatasetConstants
# paths
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -87,64 +22,12 @@ CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
# long running stats file
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
SMALL_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
LARGE_DATASET[-1]["options"].extend(["--stats-file", STATS_FILE])
# get number of threads
if "THREADS" in os.environ:
THREADS = os.environ["THREADS"]
else:
THREADS = multiprocessing.cpu_count()
THREADS = os.environ["THREADS"] if "THREADS" in os.environ else multiprocessing.cpu_count()
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
# run test helper function
def run_test(args, test, options, timeout):
print("Running test '{}'".format(test))
# find binary
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
binary = [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging]
elif test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
binary = [exe]
else:
raise Exception("Test '{}' binary not supported!".format(test))
# start test
cmd = binary + ["--worker-count", str(THREADS)] + options
start = time.time()
ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
return runtime
# parse arguments
parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph"))
parser.add_argument("--log-file", default="")
parser.add_argument("--data-directory", default="")
parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str)
parser.add_argument("--large-dataset", action="store_const", const=True, default=False)
parser.add_argument("--use-ssl", action="store_const", const=True, default=False)
parser.add_argument("--verbose", action="store_const", const=True, default=False)
args = parser.parse_args()
# generate temporary SSL certs
if args.use_ssl:
def generate_temporary_ssl_certs():
# https://unix.stackexchange.com/questions/104171/create-ssl-certificate-non-interactively
subj = "/C=HR/ST=Zagreb/L=Zagreb/O=Memgraph/CN=db.memgraph.com"
subprocess.run(
@ -168,9 +51,38 @@ if args.use_ssl:
check=True,
)
# start memgraph
cwd = os.path.dirname(args.memgraph)
cmd = [
def remove_certificates() -> None:
os.remove(KEY_FILE)
os.remove(CERT_FILE)
def parse_arguments() -> Args:
# parse arguments
parser = argparse.ArgumentParser(description="Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph"))
parser.add_argument("--log-file", default="")
parser.add_argument("--data-directory", default="")
parser.add_argument("--python", default=os.path.join(SCRIPT_DIR, "ve3", "bin", "python3"), type=str)
parser.add_argument("--large-dataset", action="store_const", const=True, default=False)
parser.add_argument("--small-dataset", action="store_const", const=True, default=False)
parser.add_argument("--specific-test", default="")
parser.add_argument("--use-ssl", action="store_const", const=True, default=False)
parser.add_argument("--verbose", action="store_const", const=True, default=False)
return parser.parse_args()
def wait_for_server(port, delay=0.1) -> None:
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def start_memgraph(args: Args) -> Popen:
"""Starts Memgraph and return the process"""
cwd = os.path.dirname(args.memgraph)
cmd = [
args.memgraph,
"--bolt-num-workers=" + str(THREADS),
"--storage-properties-on-edges=true",
@ -180,58 +92,143 @@ cmd = [
"--storage-wal-enabled=true",
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=1200",
]
if not args.verbose:
"--bolt-server-name-for-init=Neo4j/",
]
if not args.verbose:
cmd += ["--log-level", "WARNING"]
if args.log_file:
if args.log_file:
cmd += ["--log-file", args.log_file]
if args.data_directory:
if args.data_directory:
cmd += ["--data-directory", args.data_directory]
if args.use_ssl:
cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE]
proc_mg = subprocess.Popen(cmd, cwd=cwd)
wait_for_server(7687)
assert proc_mg.poll() is None, "The database binary died prematurely!"
# at exit cleanup
@atexit.register
def cleanup():
global proc_mg
if proc_mg.poll() != None:
return
proc_mg.kill()
proc_mg.wait()
# run tests
runtimes = {}
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
for test in dataset:
if args.use_ssl:
test["options"] += ["--use-ssl"]
runtime = run_test(args, **test)
runtimes[os.path.splitext(test["test"])[0]] = runtime
cmd += ["--bolt-cert-file", CERT_FILE, "--bolt-key-file", KEY_FILE]
memgraph_proc = subprocess.Popen(cmd, cwd=cwd)
wait_for_server(7687)
# stop memgraph
proc_mg.terminate()
ret_mg = proc_mg.wait()
if ret_mg != 0:
assert memgraph_proc.poll() is None, "The database binary died prematurely!"
return memgraph_proc
def stop_memgraph(proc_mg: Popen) -> None:
proc_mg.terminate()
ret_mg = proc_mg.wait()
if ret_mg != 0:
raise Exception("Memgraph binary returned non-zero ({})!".format(ret_mg))
# cleanup certificates
if args.use_ssl:
os.remove(KEY_FILE)
os.remove(CERT_FILE)
# measurements
measurements = ""
for key, value in runtimes.items():
def run_test(args: Args, test: str, options: List[str], timeout: int, mode: DatabaseMode) -> float:
"""Runs tests for a set of specific database configuration.
Args:
args: Arguments passed to the test
test: Test name
options: List of options specific for each test
timeout: Timeout in minutes
mode: DatabaseMode (storage mode & isolation level pair)
"""
print("Running test '{}'".format(test))
binary = _find_test_binary(args, test)
# start test
cmd = (
binary
+ [
"--worker-count",
str(THREADS),
"--isolation-level",
mode.isolation_level,
"--storage-mode",
mode.storage_mode,
]
+ options
)
start = time.time()
ret_test = subprocess.run(cmd, cwd=SCRIPT_DIR, timeout=timeout * 60)
if ret_test.returncode != 0:
raise Exception("Test '{}' binary returned non-zero ({})!".format(test, ret_test.returncode))
runtime = time.time() - start
print(" Done after {:.3f} seconds".format(runtime))
return runtime
def _find_test_binary(args: Args, test: str) -> List[str]:
if test.endswith(".py"):
logging = "DEBUG" if args.verbose else "WARNING"
return [args.python, "-u", os.path.join(SCRIPT_DIR, test), "--logging", logging]
if test.endswith(".cpp"):
exe = os.path.join(BUILD_DIR, "tests", "stress", test[:-4])
return [exe]
raise Exception("Test '{}' binary not supported!".format(test))
def run_stress_test_suite(args: Args) -> Dict[str, float]:
def cleanup(memgraph_proc):
if memgraph_proc.poll() != None:
return
memgraph_proc.kill()
memgraph_proc.wait()
runtimes = {}
if args.large_dataset and args.small_dataset:
raise Exception("Choose only a large dataset or a small dataset for stress testing!")
dataset = LARGE_DATASET if args.large_dataset else SMALL_DATASET
if args.specific_test:
dataset = [x for x in dataset if x[DatasetConstants.TEST] == args.specific_test]
if not len(dataset):
raise Exception("Specific dataset is not found for stress testing!")
for test in dataset:
if args.use_ssl:
test[DatasetConstants.OPTIONS] += ["--use-ssl"]
for mode in test[DatasetConstants.MODE]:
test_run = copy.deepcopy(test)
# Run for every specified combination of storage mode, serialization mode, etc (if extended)
test_run[DatasetConstants.MODE] = mode
memgraph_proc = start_memgraph(args)
runtime = run_test(args, **test_run)
runtimes[os.path.splitext(test[DatasetConstants.TEST])[0]] = runtime
stop_memgraph(memgraph_proc)
cleanup(memgraph_proc)
return runtimes
def write_stats(runtimes: Dict[str, float]) -> None:
measurements = ""
for key, value in runtimes.items():
measurements += "{}.runtime {}\n".format(key, value)
with open(STATS_FILE) as f:
with open(STATS_FILE) as f:
stats = f.read().split("\n")
measurements += "long_running.queries.executed {}\n".format(stats[0])
measurements += "long_running.queries.failed {}\n".format(stats[1])
with open(MEASUREMENTS_FILE, "w") as f:
measurements += "long_running.queries.executed {}\n".format(stats[0])
measurements += "long_running.queries.failed {}\n".format(stats[1])
with open(MEASUREMENTS_FILE, "w") as f:
f.write(measurements)
print("Done!")
if __name__ == "__main__":
args = parse_arguments()
if args.use_ssl:
generate_temporary_ssl_certs()
runtimes = run_stress_test_suite(args)
if args.use_ssl:
remove_certificates()
write_stats(runtimes)
print("Successfully ran stress tests!")

View File

@ -12,44 +12,48 @@
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
'''
"""
Large scale stress test. Tests only node creation.
The idea is to run this test on machines with huge amount of memory e.g. 2TB.
'''
"""
import logging
import multiprocessing
import random
import time
from collections import defaultdict
from common import connection_argument_parser, argument_session
from common import argument_session, connection_argument_parser
def parse_args():
'''
"""
Parses user arguments
:return: parsed arguments
'''
"""
parser = connection_argument_parser()
# specific
parser.add_argument('--worker-count', type=int,
default=multiprocessing.cpu_count(),
help='Number of concurrent workers.')
parser.add_argument("--logging", default="INFO",
choices=["INFO", "DEBUG", "WARNING", "ERROR"],
help="Logging level")
parser.add_argument('--vertex-count', type=int, default=100,
help='Number of created vertices.')
parser.add_argument('--max-property-value', type=int, default=1000,
help='Maximum value of property - 1. A created node '
'will have a property with random value from 0 to '
'max_property_value - 1.')
parser.add_argument('--create-pack-size', type=int, default=1,
help='Number of CREATE clauses in a query')
parser.add_argument(
"--worker-count", type=int, default=multiprocessing.cpu_count(), help="Number of concurrent workers."
)
parser.add_argument(
"--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
)
parser.add_argument("--vertex-count", type=int, default=100, help="Number of created vertices.")
parser.add_argument(
"--max-property-value",
type=int,
default=1000,
help="Maximum value of property - 1. A created node "
"will have a property with random value from 0 to "
"max_property_value - 1.",
)
parser.add_argument("--create-pack-size", type=int, default=1, help="Number of CREATE clauses in a query")
parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
return parser.parse_args()
@ -58,51 +62,46 @@ args = parse_args()
def create_worker(worker_id):
'''
"""
Creates nodes and checks that all nodes were created.
:param worker_id: worker id
:return: tuple (worker_id, create execution time, time unit)
'''
assert args.vertex_count > 0, 'Number of vertices has to be positive int'
"""
assert args.vertex_count > 0, "Number of vertices has to be positive int"
generated_xs = defaultdict(int)
create_query = ''
create_query = ""
with argument_session(args) as session:
# create vertices
start_time = time.time()
for i in range(0, args.vertex_count):
random_number = random.randint(0, args.max_property_value - 1)
generated_xs[random_number] += 1
create_query += 'CREATE (:Label_T%s {x: %s}) ' % \
(worker_id, random_number)
create_query += "CREATE (:Label_T%s {x: %s}) " % (worker_id, random_number)
# if full back or last item -> execute query
if (i + 1) % args.create_pack_size == 0 or \
i == args.vertex_count - 1:
if (i + 1) % args.create_pack_size == 0 or i == args.vertex_count - 1:
session.run(create_query).consume()
create_query = ''
create_query = ""
create_time = time.time()
# check total count
result_set = session.run('MATCH (n:Label_T%s) RETURN count(n) AS cnt' %
worker_id).data()[0]
assert result_set['cnt'] == args.vertex_count, \
'Create vertices Expected: %s Created: %s' % \
(args.vertex_count, result_set['cnt'])
result_set = session.run("MATCH (n:Label_T%s) RETURN count(n) AS cnt" % worker_id).data()[0]
assert result_set["cnt"] == args.vertex_count, "Create vertices Expected: %s Created: %s" % (
args.vertex_count,
result_set["cnt"],
)
# check count per property value
for i, size in generated_xs.items():
result_set = session.run('MATCH (n:Label_T%s {x: %s}) '
'RETURN count(n) AS cnt'
% (worker_id, i)).data()[0]
assert result_set['cnt'] == size, "Per x count isn't good " \
"(Label: Label_T%s, prop x: %s" % (worker_id, i)
result_set = session.run("MATCH (n:Label_T%s {x: %s}) " "RETURN count(n) AS cnt" % (worker_id, i)).data()[0]
assert result_set["cnt"] == size, "Per x count isn't good " "(Label: Label_T%s, prop x: %s" % (worker_id, i)
return (worker_id, create_time - start_time, "s")
def create_handler():
'''
"""
Initializes processes and starts the execution.
'''
"""
# instance cleanup
with argument_session(args) as session:
session.run("MATCH (n) DETACH DELETE n").consume()
@ -113,21 +112,19 @@ def create_handler():
# concurrent create execution & tests
with multiprocessing.Pool(args.worker_count) as p:
for worker_id, create_time, time_unit in \
p.map(create_worker, [i for i in range(args.worker_count)]):
log.info('Worker ID: %s; Create time: %s%s' %
(worker_id, create_time, time_unit))
for worker_id, create_time, time_unit in p.map(create_worker, [i for i in range(args.worker_count)]):
log.info("Worker ID: %s; Create time: %s%s" % (worker_id, create_time, time_unit))
# check total count
expected_total_count = args.worker_count * args.vertex_count
total_count = session.run(
'MATCH (n) RETURN count(n) AS cnt').data()[0]['cnt']
assert total_count == expected_total_count, \
'Total vertex number: %s Expected: %s' % \
(total_count, expected_total_count)
total_count = session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"]
assert total_count == expected_total_count, "Total vertex number: %s Expected: %s" % (
total_count,
expected_total_count,
)
if __name__ == '__main__':
if __name__ == "__main__":
logging.basicConfig(level=args.logging)
if args.logging != "DEBUG":
logging.getLogger("neo4j").setLevel(logging.WARNING)

View File

@ -0,0 +1,228 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
"""
Large bipartite graph stress test.
"""
import logging
import multiprocessing
import random
import time
from argparse import Namespace as Args
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Tuple
from common import OutputData, connection_argument_parser, get_memgraph
log = logging.getLogger(__name__)
output_data = OutputData()
NUMBER_NODES_IN_CHAIN = 4
CREATE_FUNCTION = "CREATE"
DELETE_FUNCTION = "DELETE"
def parse_args() -> Args:
"""
Parses user arguments
:return: parsed arguments
"""
parser = connection_argument_parser()
parser.add_argument("--worker-count", type=int, default=4, help="Number of concurrent workers.")
parser.add_argument(
"--logging", default="INFO", choices=["INFO", "DEBUG", "WARNING", "ERROR"], help="Logging level"
)
parser.add_argument("--repetition-count", type=int, default=1000, help="Number of times to perform the action")
parser.add_argument("--isolation-level", type=str, required=True, help="Database isolation level.")
parser.add_argument("--storage-mode", type=str, required=True, help="Database storage mode.")
return parser.parse_args()
args = parse_args()
@dataclass
class Worker:
"""
Class that performs a function defined in the `type` argument
Args:
type - either `CREATE` or `DELETE`, signifying the function that's going to be performed
by the worker
id - worker id
total_worker_cnt - total number of workers for reference
repetition_count - number of times to perform the worker action
sleep_sec - float for subsecond sleeping between two subsequent actions
"""
type: str
id: int
total_worker_cnt: int
repetition_count: int
sleep_sec: float
def timed_function(name) -> Callable:
"""
Times performed function
"""
def actual_decorator(func) -> Callable:
@wraps(func)
def timed_wrapper(*args, **kwargs) -> Any:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
output_data.add_measurement(name, end_time - start_time)
return result
return timed_wrapper
return actual_decorator
@timed_function("cleanup_time")
def clean_database() -> None:
memgraph = get_memgraph(args)
memgraph.execute("MATCH (n) DETACH DELETE n")
def create_indices() -> None:
memgraph = get_memgraph(args)
memgraph.execute("CREATE INDEX ON :Node")
memgraph.execute("CREATE INDEX ON :Node(id)")
def setup_database_mode() -> None:
memgraph = get_memgraph(args)
memgraph.execute(f"STORAGE MODE {args.storage_mode}")
memgraph.execute(f"SET GLOBAL TRANSACTION ISOLATION LEVEL {args.isolation_level}")
def execute_function(worker: Worker) -> Worker:
"""
Executes the function based on the worker type
"""
if worker.type == CREATE_FUNCTION:
run_writer(worker.total_worker_cnt, worker.repetition_count, worker.sleep_sec, worker.id)
return worker
if worker.type == DELETE_FUNCTION:
run_deleter(worker.total_worker_cnt, worker.repetition_count, worker.sleep_sec)
return worker
raise Exception("Worker function not recognized, raising exception!")
def run_writer(total_workers_cnt: int, repetition_count: int, sleep_sec: float, worker_id: int) -> int:
"""
This writer creates a chain and wants to verify after each action if the action he performed is
a valid graph. A graph is valid if the number of nodes is preserved, and the chain is either
not present or present completely.
"""
memgraph = get_memgraph(args)
def create():
try:
memgraph.execute(
f"MERGE (:Node{worker_id} {{id: 1}})-[:REL]-(:Node{worker_id} {{id: 2}})-[:REL]-(:Node{worker_id} {{id: 3}})-[:REL]-(:Node{worker_id} {{id: 4}})"
)
except Exception as ex:
pass
def verify() -> Tuple[bool, int]:
# We always create X nodes and therefore the number of nodes needs to be always a fraction of X
count = list(memgraph.execute_and_fetch(f"MATCH (n) RETURN COUNT(n) AS cnt"))[0]["cnt"]
log.info(f"Worker {worker_id} verified graph count {count} in repetition {curr_repetition}")
assert count <= total_workers_cnt * NUMBER_NODES_IN_CHAIN and count % NUMBER_NODES_IN_CHAIN == 0
ids = list(
memgraph.execute_and_fetch(
f"MATCH (n:Node{worker_id} {{id: 1}})-->(m)-->(o)-->(p) RETURN n.id AS id1, m.id AS id2, o.id AS id3, p.id AS id4"
)
)
if len(ids):
result = ids[0]
assert "id1" in result and "id2" in result and "id3" in result and "id4" in result
assert result["id1"] == 1 and result["id2"] == 2 and result["id3"] == 3 and result["id4"] == 4
log.info(f"Worker {worker_id} verified graph chain is valid in repetition {curr_repetition}")
else:
log.info(f"Worker {worker_id} does not have a chain in repetition {repetition_count}")
curr_repetition = 0
while curr_repetition < repetition_count:
log.info(f"Worker {worker_id} started iteration {curr_repetition}")
create()
time.sleep(sleep_sec)
log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
verify()
curr_repetition += 1
def run_deleter(total_workers_cnt: int, repetition_count: int, sleep_sec: float) -> None:
"""
Periodic deletion of an arbitrary chain in the graph
"""
memgraph = get_memgraph(args)
def delete_part_of_graph(id: int):
try:
memgraph.execute(f"MATCH (n:Node{id}) DETACH DELETE n")
log.info(f"Worker deleted chain with nodes of id {id}")
except Exception as ex:
log.info(f"Worker failed to delete the chain with id {id}")
pass
curr_repetition = 0
while curr_repetition < repetition_count:
random_part_of_graph = random.randint(0, total_workers_cnt - 1)
delete_part_of_graph(random_part_of_graph)
time.sleep(sleep_sec)
curr_repetition += 1
@timed_function("total_execution_time")
def execution_handler() -> None:
clean_database()
log.info("Database is clean.")
setup_database_mode()
create_indices()
worker_count = args.worker_count
rep_count = args.repetition_count
workers = [Worker(CREATE_FUNCTION, x, worker_count - 1, rep_count, 0.2) for x in range(worker_count - 1)]
workers.append(Worker(DELETE_FUNCTION, -1, worker_count - 1, rep_count, 0.15))
with multiprocessing.Pool(processes=args.worker_count) as p:
for worker in p.map(execute_function, workers):
print(f"Worker {worker.type} finished!")
if __name__ == "__main__":
logging.basicConfig(level=args.logging)
execution_handler()
if args.logging in ["DEBUG", "INFO"]:
output_data.dump()

View File

@ -1,9 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
"""
Durability Stress Test
'''
"""
# TODO (mg_durability_stress_test): extend once we add full durability mode
@ -13,11 +13,11 @@ import os
import random
import shutil
import subprocess
import time
import threading
import time
from multiprocessing import Manager, Pool
from common import connection_argument_parser, SessionCache
from multiprocessing import Pool, Manager
from common import SessionCache, connection_argument_parser
# Constants and args
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -35,11 +35,9 @@ SNAPSHOT_CYCLE_SEC = 5
# Parse command line arguments
parser = connection_argument_parser()
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR, "memgraph"))
parser.add_argument("--log-file", default="")
parser.add_argument("--verbose", action="store_const",
const=True, default=False)
parser.add_argument("--verbose", action="store_const", const=True, default=False)
parser.add_argument("--data-directory", default=DATA_DIR)
parser.add_argument("--num-clients", default=multiprocessing.cpu_count())
parser.add_argument("--num-steps", type=int, default=5)
@ -47,14 +45,18 @@ args = parser.parse_args()
# Memgraph run command construction
cwd = os.path.dirname(args.memgraph)
cmd = [args.memgraph, "--bolt-num-workers=" + str(DB_WORKERS),
cmd = [
args.memgraph,
"--bolt-num-workers=" + str(DB_WORKERS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=false",
"--storage-snapshot-interval-sec=5",
"--storage-snapshot-retention-count=2",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=true",
"--query-execution-timeout-sec=600"]
"--query-execution-timeout-sec=600",
"--bolt-server-name-for-init=Neo4j/v5.11.0 compatible graph database server - Memgraph",
]
if not args.verbose:
cmd += ["--log-level", "WARNING"]
if args.log_file:
@ -104,16 +106,13 @@ def run_client(id, data):
counter = data[id]
# Check recovery for this client
num_nodes_db = session.run(
"MATCH (n:%s) RETURN count(n) as cnt" %
("Client%d" % id)).data()[0]["cnt"]
num_nodes_db = session.run("MATCH (n:%s) RETURN count(n) as cnt" % ("Client%d" % id)).data()[0]["cnt"]
print("Client%d DB: %d; ACTUAL: %d" % (id, num_nodes_db, data[id]))
# Execute a new set of write queries
while True:
try:
session.run("CREATE (n:%s {id:%s}) RETURN n;" %
("Client%d" % id, counter)).consume()
session.run("CREATE (n:%s {id:%s}) RETURN n;" % ("Client%d" % id, counter)).consume()
counter += 1
if counter % 100000 == 0:
print("Client %d executed %d" % (id, counter))
@ -126,8 +125,7 @@ def run_client(id, data):
def run_step():
with Pool(args.num_clients) as p:
p.starmap(run_client, [(id, data)
for id in range(1, args.num_clients + 1)])
p.starmap(run_client, [(id, data) for id in range(1, args.num_clients + 1)])
def main():
@ -143,22 +141,19 @@ def main():
# Also it makes sense to run this test for a longer period of time
# but with small --snapshot-cycle-sec to force that Memgraph is being
# killed in the middle of generating the snapshot.
time.sleep(
random.randint(2 * SNAPSHOT_CYCLE_SEC, 3 * SNAPSHOT_CYCLE_SEC))
time.sleep(random.randint(2 * SNAPSHOT_CYCLE_SEC, 3 * SNAPSHOT_CYCLE_SEC))
clean_memgraph()
# Final check
run_memgraph()
session = SessionCache.argument_session(args)
num_nodes_db = \
session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"]
num_nodes_db = session.run("MATCH (n) RETURN count(n) AS cnt").data()[0]["cnt"]
num_nodes = sum(data.values())
# Check that more than 99.9% of data is recoverable.
# NOTE: default WAL flush interval is 1ms.
# Once the full sync durability mode is introduced,
# this test has to be extended.
assert num_nodes_db > 0.999 * num_nodes, \
"Memgraph lost more than 0.001 of data!"
assert num_nodes_db > 0.999 * num_nodes, "Memgraph lost more than 0.001 of data!"
if __name__ == "__main__":

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -47,6 +47,9 @@ DEFINE_bool(global_queries, true, "If queries that modifiy globally should be ex
DEFINE_string(stats_file, "", "File into which to write statistics.");
DEFINE_string(isolation_level, "", "Database isolation level.");
DEFINE_string(storage_mode, "", "Database storage_mode.");
/**
* Encapsulates a Graph and a Bolt session and provides CRUD op functions.
* Also defines a run-loop for a generic exectutor, and a graph state

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -28,6 +28,8 @@ DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_int32(worker_count, 1, "The number of concurrent workers executing queries against the server.");
DEFINE_int32(per_worker_query_count, 100, "The number of queries each worker will try to execute.");
DEFINE_string(isolation_level, "", "Database isolation level.");
DEFINE_string(storage_mode, "", "Database storage_mode.");
auto make_client() {
mg::Client::Params params;

View File

@ -1 +1,2 @@
neo4j-driver==4.1.1
gqlalchemy==1.3.3

179
tests/stress/test_config.py Normal file
View File

@ -0,0 +1,179 @@
import itertools
import os
from dataclasses import dataclass
from typing import List
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats")
class DatasetConstants:
TEST = "test"
OPTIONS = "options"
TIMEOUT = "timeout"
MODE = "mode"
@dataclass
class DatabaseMode:
storage_mode: str
isolation_level: str
class StorageModeConstants:
IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL"
IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL"
@classmethod
def to_list(cls) -> List[str]:
return [cls.IN_MEMORY_TRANSACTIONAL, cls.IN_MEMORY_ANALYTICAL]
class IsolationLevelConstants:
SNAPSHOT_ISOLATION = "SNAPSHOT ISOLATION"
READ_COMMITED = "READ COMMITED"
READ_UNCOMMITED = "READ UNCOMMITED"
@classmethod
def to_list(cls) -> List[str]:
return [cls.SNAPSHOT_SERIALIZATION, cls.READ_COMMITED, cls.READ_UNCOMMITED]
def get_default_database_mode() -> DatabaseMode:
return DatabaseMode(StorageModeConstants.IN_MEMORY_TRANSACTIONAL, IsolationLevelConstants.SNAPSHOT_ISOLATION)
def get_all_database_modes() -> List[DatabaseMode]:
return [
DatabaseMode(x[0], x[1])
for x in itertools.product(StorageModeConstants.to_list(), IsolationLevelConstants.to_list())
]
# dataset calibrated for running on Apollo (total 4min)
# bipartite.py runs for approx. 30s
# create_match.py runs for approx. 30s
# long_running runs for 1min
# long_running runs for 2min
SMALL_DATASET = [
{
DatasetConstants.TEST: "bipartite.py",
DatasetConstants.OPTIONS: ["--u-count", "100", "--v-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "detach_delete.py",
DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "create_match.py",
DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "parser.cpp",
DatasetConstants.OPTIONS: ["--per-worker-query-count", "1000"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"1000",
"--edge-count",
"5000",
"--max-time",
"1",
"--verify",
"20",
],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"10000",
"--edge-count",
"50000",
"--max-time",
"2",
"--verify",
"30",
"--stats-file",
STATS_FILE,
],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
# dataset calibrated for running on daily stress instance (total 9h)
# bipartite.py and create_match.py run for approx. 15min
# long_running runs for 5min x 6 times = 30min
# long_running runs for 8h
LARGE_DATASET = (
[
{
DatasetConstants.TEST: "bipartite.py",
DatasetConstants.OPTIONS: ["--u-count", "300", "--v-count", "300"],
DatasetConstants.TIMEOUT: 30,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "detach_delete.py",
DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "300"],
DatasetConstants.TIMEOUT: 5,
DatasetConstants.MODE: [get_default_database_mode()],
},
{
DatasetConstants.TEST: "create_match.py",
DatasetConstants.OPTIONS: ["--vertex-count", "500000", "--create-pack-size", "500"],
DatasetConstants.TIMEOUT: 30,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
+ [
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"10000",
"--edge-count",
"40000",
"--max-time",
"5",
"--verify",
"60",
],
DatasetConstants.TIMEOUT: 16,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
* 6
+ [
{
DatasetConstants.TEST: "long_running.cpp",
DatasetConstants.OPTIONS: [
"--vertex-count",
"200000",
"--edge-count",
"1000000",
"--max-time",
"480",
"--verify",
"300",
"--stats-file",
STATS_FILE,
],
DatasetConstants.TIMEOUT: 500,
DatasetConstants.MODE: [get_default_database_mode()],
},
]
)

View File

@ -1182,6 +1182,31 @@ TYPED_TEST(InterpreterTest, ExecutionStatsValues) {
ASSERT_EQ(stats["relationships-deleted"].ValueInt(), 3);
AssertAllValuesAreZero(stats, {"nodes-deleted", "relationships-deleted"});
}
{
auto [stream, qid] = this->Prepare("CREATE (n)-[:TO]->(m);");
this->Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-created"].ValueInt(), 2);
ASSERT_EQ(stats["relationships-created"].ValueInt(), 1);
AssertAllValuesAreZero(stats, {"nodes-created", "relationships-created"});
}
{
auto [stream, qid] = this->Prepare("MATCH (n)-[r]->(m) DELETE r;");
this->Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["relationships-deleted"].ValueInt(), 1);
AssertAllValuesAreZero(stats, {"nodes-deleted", "relationships-deleted"});
}
{
auto [stream, qid] = this->Prepare("MATCH (n) DELETE n;");
this->Pull(&stream);
auto stats = stream.GetSummary().at("stats").ValueMap();
ASSERT_EQ(stats["nodes-deleted"].ValueInt(), 2);
AssertAllValuesAreZero(stats, {"nodes-deleted", ""});
}
{
auto [stream, qid] = this->Prepare("CREATE (:L1:L2:L3), (:L1), (:L1), (:L2);");
this->Pull(&stream);

View File

@ -775,6 +775,7 @@ TYPED_TEST(QueryPlanTest, Delete) {
}
// detach delete a single vertex
// delete will not happen as we are deleting in bulk
{
auto n = MakeScanAll(this->storage, symbol_table, "n");
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
@ -783,8 +784,8 @@ TYPED_TEST(QueryPlanTest, Delete) {
auto context = MakeContext(this->storage, symbol_table, &dba);
delete_op->MakeCursor(memgraph::utils::NewDeleteResource())->Pull(frame, context);
dba.AdvanceCommand();
EXPECT_EQ(3, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
EXPECT_EQ(3, CountEdges(&dba, memgraph::storage::View::OLD));
EXPECT_EQ(4, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
EXPECT_EQ(6, CountEdges(&dba, memgraph::storage::View::OLD));
}
// delete all remaining edges
@ -797,7 +798,7 @@ TYPED_TEST(QueryPlanTest, Delete) {
auto context = MakeContext(this->storage, symbol_table, &dba);
PullAll(*delete_op, &context);
dba.AdvanceCommand();
EXPECT_EQ(3, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
EXPECT_EQ(4, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
EXPECT_EQ(0, CountEdges(&dba, memgraph::storage::View::OLD));
}
@ -1012,10 +1013,11 @@ TYPED_TEST(QueryPlanTest, DeleteTwiceDeleteBlockingEdge) {
}
TYPED_TEST(QueryPlanTest, DeleteReturn) {
// MATCH (n) DETACH DELETE n RETURN n
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
// make a fully-connected (one-direction, no cycles) with 4 nodes
// graph with 4 vertices
auto prop = PROPERTY_PAIR(dba, "property");
for (int i = 0; i < 4; ++i) {
auto va = dba.InsertVertex();
@ -1033,10 +1035,12 @@ TYPED_TEST(QueryPlanTest, DeleteReturn) {
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
auto delete_op = std::make_shared<plan::Delete>(n.op_, std::vector<Expression *>{n_get}, true);
auto accumulate_op = std::make_shared<plan::Accumulate>(delete_op, delete_op->ModifiedSymbols(symbol_table), true);
auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto n_p =
this->storage.template Create<NamedExpression>("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true));
auto produce = MakeProduce(delete_op, n_p);
auto produce = MakeProduce(accumulate_op, n_p);
auto context = MakeContext(this->storage, symbol_table, &dba);
ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException);
@ -1757,6 +1761,7 @@ TYPED_TEST(QueryPlanTest, RemoveLabelsOnNull) {
}
TYPED_TEST(QueryPlanTest, DeleteSetProperty) {
// MATCH (n) DELETE n SET n.property = 42 RETURN n
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
// Add a single vertex.
@ -1764,18 +1769,25 @@ TYPED_TEST(QueryPlanTest, DeleteSetProperty) {
dba.AdvanceCommand();
EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
SymbolTable symbol_table;
// MATCH (n) DELETE n SET n.property = 42
auto n = MakeScanAll(this->storage, symbol_table, "n");
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
auto delete_op = std::make_shared<plan::Delete>(n.op_, std::vector<Expression *>{n_get}, false);
auto prop = PROPERTY_PAIR(dba, "property");
auto n_prop = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto set_op = std::make_shared<plan::SetProperty>(delete_op, prop.second, n_prop, LITERAL(42));
auto accumulate_op = std::make_shared<plan::Accumulate>(set_op, set_op->ModifiedSymbols(symbol_table), true);
auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto n_p =
this->storage.template Create<NamedExpression>("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true));
auto produce = MakeProduce(accumulate_op, n_p);
auto context = MakeContext(this->storage, symbol_table, &dba);
EXPECT_THROW(PullAll(*set_op, &context), QueryRuntimeException);
ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException);
}
TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFromMap) {
// MATCH (n) DELETE n SET n = {property: 42} return n
// MATCH (n) DELETE n SET n += {property: 42} return n
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
// Add a single vertex.
@ -1783,7 +1795,6 @@ TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFromMap) {
dba.AdvanceCommand();
EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
SymbolTable symbol_table;
// MATCH (n) DELETE n SET n = {property: 42}
auto n = MakeScanAll(this->storage, symbol_table, "n");
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
auto delete_op = std::make_shared<plan::Delete>(n.op_, std::vector<Expression *>{n_get}, false);
@ -1793,12 +1804,18 @@ TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFromMap) {
auto *rhs = this->storage.template Create<MapLiteral>(prop_map);
for (auto op_type : {plan::SetProperties::Op::REPLACE, plan::SetProperties::Op::UPDATE}) {
auto set_op = std::make_shared<plan::SetProperties>(delete_op, n.sym_, rhs, op_type);
auto accumulate_op = std::make_shared<plan::Accumulate>(set_op, set_op->ModifiedSymbols(symbol_table), false);
auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto n_p =
this->storage.template Create<NamedExpression>("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true));
auto produce = MakeProduce(accumulate_op, n_p);
auto context = MakeContext(this->storage, symbol_table, &dba);
EXPECT_THROW(PullAll(*set_op, &context), QueryRuntimeException);
ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException);
}
}
TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFrom) {
// MATCH (n) DELETE n SET n = n RETURN n
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
// Add a single vertex.
@ -1809,19 +1826,25 @@ TYPED_TEST(QueryPlanTest, DeleteSetPropertiesFrom) {
dba.AdvanceCommand();
EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
SymbolTable symbol_table;
// MATCH (n) DELETE n SET n = n
auto n = MakeScanAll(this->storage, symbol_table, "n");
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
auto delete_op = std::make_shared<plan::Delete>(n.op_, std::vector<Expression *>{n_get}, false);
auto *rhs = IDENT("n")->MapTo(n.sym_);
auto prop = PROPERTY_PAIR(dba, "property");
for (auto op_type : {plan::SetProperties::Op::REPLACE, plan::SetProperties::Op::UPDATE}) {
auto set_op = std::make_shared<plan::SetProperties>(delete_op, n.sym_, rhs, op_type);
auto accumulate_op = std::make_shared<plan::Accumulate>(set_op, set_op->ModifiedSymbols(symbol_table), false);
auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto n_p =
this->storage.template Create<NamedExpression>("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true));
auto produce = MakeProduce(accumulate_op, n_p);
auto context = MakeContext(this->storage, symbol_table, &dba);
EXPECT_THROW(PullAll(*set_op, &context), QueryRuntimeException);
ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException);
}
}
TYPED_TEST(QueryPlanTest, DeleteRemoveLabels) {
// MATCH (n) DELETE n REMOVE n :label return n
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
// Add a single vertex.
@ -1829,17 +1852,24 @@ TYPED_TEST(QueryPlanTest, DeleteRemoveLabels) {
dba.AdvanceCommand();
EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
SymbolTable symbol_table;
// MATCH (n) DELETE n REMOVE n :label
auto n = MakeScanAll(this->storage, symbol_table, "n");
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
auto delete_op = std::make_shared<plan::Delete>(n.op_, std::vector<Expression *>{n_get}, false);
std::vector<memgraph::storage::LabelId> labels{dba.NameToLabel("label")};
auto rem_op = std::make_shared<plan::RemoveLabels>(delete_op, n.sym_, labels);
auto accumulate_op = std::make_shared<plan::Accumulate>(rem_op, rem_op->ModifiedSymbols(symbol_table), true);
auto prop = PROPERTY_PAIR(dba, "property");
auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto n_p =
this->storage.template Create<NamedExpression>("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true));
auto produce = MakeProduce(accumulate_op, n_p);
auto context = MakeContext(this->storage, symbol_table, &dba);
EXPECT_THROW(PullAll(*rem_op, &context), QueryRuntimeException);
ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException);
}
TYPED_TEST(QueryPlanTest, DeleteRemoveProperty) {
// MATCH (n) DELETE n REMOVE n.property RETURN n
auto storage_dba = this->db->Access();
memgraph::query::DbAccessor dba(storage_dba.get());
// Add a single vertex.
@ -1847,15 +1877,20 @@ TYPED_TEST(QueryPlanTest, DeleteRemoveProperty) {
dba.AdvanceCommand();
EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
SymbolTable symbol_table;
// MATCH (n) DELETE n REMOVE n.property
auto n = MakeScanAll(this->storage, symbol_table, "n");
auto n_get = this->storage.template Create<Identifier>("n")->MapTo(n.sym_);
auto delete_op = std::make_shared<plan::Delete>(n.op_, std::vector<Expression *>{n_get}, false);
auto prop = PROPERTY_PAIR(dba, "property");
auto n_prop = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto rem_op = std::make_shared<plan::RemoveProperty>(delete_op, prop.second, n_prop);
auto accumulate_op = std::make_shared<plan::Accumulate>(rem_op, rem_op->ModifiedSymbols(symbol_table), true);
auto prop_lookup = PROPERTY_LOOKUP(dba, IDENT("n")->MapTo(n.sym_), prop);
auto n_p =
this->storage.template Create<NamedExpression>("n", prop_lookup)->MapTo(symbol_table.CreateSymbol("bla", true));
auto produce = MakeProduce(accumulate_op, n_p);
auto context = MakeContext(this->storage, symbol_table, &dba);
EXPECT_THROW(PullAll(*rem_op, &context), QueryRuntimeException);
ASSERT_THROW(CollectProduce(*produce, &context), QueryRuntimeException);
}
//////////////////////////////////////////////