On delete triggers invalid edge reference (#717)

* Added check if there is invalid reference to the underlying edge

* Added fix and e2e tests

* Isolation levels tracking based on from_vertex_

* Added explicit transaction test + edge accessor changes based on the vertex_edge

* Autocommit on tests, initialize deleted by checking out_edges

Co-authored-by: Marko Budiselić <marko.budiselic@memgraph.com>
This commit is contained in:
Andi 2023-01-18 15:05:10 +01:00 committed by GitHub
parent 8b834c702c
commit 156e2cd095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 266 additions and 6 deletions

View File

@ -12,6 +12,7 @@
#include "storage/v2/edge_accessor.hpp"
#include <memory>
#include <tuple>
#include "storage/v2/mvcc.hpp"
#include "storage/v2/property_value.hpp"
@ -21,8 +22,47 @@
namespace memgraph::storage {
bool EdgeAccessor::IsVisible(const View view) const {
bool deleted = true;
bool exists = true;
bool deleted = true;
// When edges don't have properties, their isolation level is still dictated by MVCC ->
// iterate over the deltas of the from_vertex_ and see which deltas can be applied on edges.
if (!config_.properties_on_edges) {
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(from_vertex_->lock);
// Initialize deleted by checking if out edges contain edge_
deleted = std::find_if(from_vertex_->out_edges.begin(), from_vertex_->out_edges.end(), [&](const auto &out_edge) {
return std::get<2>(out_edge) == edge_;
}) == from_vertex_->out_edges.end();
delta = from_vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::DELETE_OBJECT:
break;
case Delta::Action::ADD_OUT_EDGE: { // relevant for the from_vertex_ -> we just deleted the edge
if (delta.vertex_edge.edge == edge_) {
deleted = false;
}
break;
}
case Delta::Action::REMOVE_OUT_EDGE: { // also relevant for the from_vertex_ -> we just added the edge
if (delta.vertex_edge.edge == edge_) {
exists = false;
}
break;
}
}
});
return exists && (for_deleted_ || !deleted);
}
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
@ -49,7 +89,6 @@ bool EdgeAccessor::IsVisible(const View view) const {
}
}
});
return exists && (for_deleted_ || !deleted);
}

View File

@ -20,3 +20,10 @@ add_subdirectory(procedures)
add_dependencies(memgraph__e2e__triggers__on_create memgraph__e2e__triggers__write.py)
add_dependencies(memgraph__e2e__triggers__on_update memgraph__e2e__triggers__write.py)
add_dependencies(memgraph__e2e__triggers__on_delete memgraph__e2e__triggers__write.py)
function(copy_triggers_e2e_python_files FILE_NAME)
copy_e2e_python_files(triggers ${FILE_NAME})
endfunction()
copy_triggers_e2e_python_files(common.py)
copy_triggers_e2e_python_files(triggers_properties_false.py)

View File

@ -0,0 +1,34 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import pytest
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
@pytest.fixture
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
triggers_list = execute_and_fetch_all(connection.cursor(), "SHOW TRIGGERS;")
for trigger in triggers_list:
execute_and_fetch_all(connection.cursor(), f"DROP TRIGGER {trigger[0]}")
execute_and_fetch_all(connection.cursor(), "MATCH (n) DETACH DELETE n")
yield connection
for trigger in triggers_list:
execute_and_fetch_all(connection.cursor(), f"DROP TRIGGER {trigger[0]}")
execute_and_fetch_all(connection.cursor(), "MATCH (n) DETACH DELETE n")

View File

@ -0,0 +1,170 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import mgclient
import pytest
from common import connect, execute_and_fetch_all
@pytest.mark.parametrize("ba_commit", ["BEFORE COMMIT", "AFTER COMMIT"])
def test_create_on_create(ba_commit, connect):
"""
Args:
ba_commit (str): BEFORE OR AFTER commit
"""
cursor = connect.cursor()
QUERY_TRIGGER_CREATE = f"""
CREATE TRIGGER CreateTriggerEdgesCount
ON --> CREATE
{ba_commit}
EXECUTE
CREATE (n:CreatedEdge {{count: size(createdEdges)}})
"""
execute_and_fetch_all(cursor, QUERY_TRIGGER_CREATE)
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 1})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 2})")
res = execute_and_fetch_all(cursor, "MATCH (n:Node) RETURN n")
assert len(res) == 2
res2 = execute_and_fetch_all(cursor, "MATCH (n:CreatedEdge) RETURN n")
assert len(res2) == 0
QUERY_CREATE_EDGE = """
MATCH (n:Node {id: 1}), (m:Node {id: 2})
CREATE (n)-[r:TYPE]->(m);
"""
execute_and_fetch_all(cursor, QUERY_CREATE_EDGE)
# See if trigger was triggered
nodes = execute_and_fetch_all(cursor, "MATCH (n:Node) RETURN n")
assert len(nodes) == 2
created_edges = execute_and_fetch_all(cursor, "MATCH (n:CreatedEdge) RETURN n")
assert len(created_edges) == 1
# execute_and_fetch_all(cursor, "DROP TRIGGER CreateTriggerEdgesCount")
# execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")
@pytest.mark.parametrize("ba_commit", ["AFTER COMMIT", "BEFORE COMMIT"])
def test_create_on_delete(ba_commit, connect):
"""
Args:
ba_commit (str): BEFORE OR AFTER commit
"""
cursor = connect.cursor()
QUERY_TRIGGER_CREATE = f"""
CREATE TRIGGER DeleteTriggerEdgesCount
ON --> DELETE
{ba_commit}
EXECUTE
CREATE (n:DeletedEdge {{count: size(deletedEdges)}})
"""
# Setup queries
execute_and_fetch_all(cursor, QUERY_TRIGGER_CREATE)
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 1})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 2})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 3})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 4})")
res = execute_and_fetch_all(cursor, "MATCH (n:Node) RETURN n")
assert len(res) == 4
res2 = execute_and_fetch_all(cursor, "MATCH (n:DeletedEdge) RETURN n")
assert len(res2) == 0
# create an edge that will be deleted
QUERY_CREATE_EDGE = """
MATCH (n:Node {id: 1}), (m:Node {id: 2})
CREATE (n)-[r:TYPE]->(m);
"""
execute_and_fetch_all(cursor, QUERY_CREATE_EDGE)
# create an edge that won't be deleted
QUERY_CREATE_EDGE_NO_DELETE = """
MATCH (n:Node {id: 3}), (m:Node {id: 4})
CREATE (n)-[r:NO_DELETE_EDGE]->(m);
"""
execute_and_fetch_all(cursor, QUERY_CREATE_EDGE_NO_DELETE)
# Delete only one type of the edger
QUERY_DELETE_EDGE = """
MATCH ()-[r:TYPE]->()
DELETE r;
"""
execute_and_fetch_all(cursor, QUERY_DELETE_EDGE)
# See if trigger was triggered
nodes = execute_and_fetch_all(cursor, "MATCH (n:Node) RETURN n")
assert len(nodes) == 4
# Check how many edges got deleted
deleted_edges = execute_and_fetch_all(cursor, "MATCH (n:DeletedEdge) RETURN n")
assert len(deleted_edges) == 1
# execute_and_fetch_all(cursor, "DROP TRIGGER DeleteTriggerEdgesCount")
# execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")``
@pytest.mark.parametrize("ba_commit", ["BEFORE COMMIT", "AFTER COMMIT"])
def test_create_on_delete_explicit_transaction(ba_commit):
"""
Args:
ba_commit (str): BEFORE OR AFTER commit
"""
connection_with_autocommit = mgclient.connect(host="localhost", port=7687)
connection_with_autocommit.autocommit = True
cursor_autocommit = connection_with_autocommit.cursor()
QUERY_TRIGGER_CREATE = f"""
CREATE TRIGGER DeleteTriggerEdgesCountExplicit
ON --> DELETE
{ba_commit}
EXECUTE
CREATE (n:DeletedEdge {{count: size(deletedEdges)}})
"""
# Setup queries
execute_and_fetch_all(cursor_autocommit, QUERY_TRIGGER_CREATE)
# Start explicit transaction on the execution of the first command
connection_without_autocommit = mgclient.connect(host="localhost", port=7687)
connection_without_autocommit.autocommit = False
cursor = connection_without_autocommit.cursor()
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 1})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 2})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 3})")
execute_and_fetch_all(cursor, "CREATE (n:Node {id: 4})")
res = execute_and_fetch_all(cursor, "MATCH (n:Node) RETURN n")
assert len(res) == 4
res2 = execute_and_fetch_all(cursor, "MATCH (n:DeletedEdge) RETURN n;")
assert len(res2) == 0
QUERY_CREATE_EDGE = """
MATCH (n:Node {id: 1}), (m:Node {id: 2})
CREATE (n)-[r:TYPE]->(m);
"""
execute_and_fetch_all(cursor, QUERY_CREATE_EDGE)
# create an edge that won't be deleted
QUERY_CREATE_EDGE_NO_DELETE = """
MATCH (n:Node {id: 3}), (m:Node {id: 4})
CREATE (n)-[r:NO_DELETE_EDGE]->(m);
"""
execute_and_fetch_all(cursor, QUERY_CREATE_EDGE_NO_DELETE)
# Delete only one type of the edger
QUERY_DELETE_EDGE = """
MATCH ()-[r:TYPE]->()
DELETE r;
"""
execute_and_fetch_all(cursor, QUERY_DELETE_EDGE)
connection_without_autocommit.commit() # finish explicit transaction
nodes = execute_and_fetch_all(cursor, "MATCH (n:Node) RETURN n")
assert len(nodes) == 4
# Check how many edges got deleted
deleted_nodes_edges = execute_and_fetch_all(cursor, "MATCH (n:DeletedEdge) RETURN n")
assert len(deleted_nodes_edges) == 0
# Delete with the original cursor because triggers aren't allowed in multi-transaction environment
execute_and_fetch_all(cursor_autocommit, "DROP TRIGGER DeleteTriggerEdgesCountExplicit")
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
connection_without_autocommit.commit() # finish explicit transaction
nodes = execute_and_fetch_all(cursor_autocommit, "MATCH (n) RETURN n")
assert len(nodes) == 0
connection_with_autocommit.close()
connection_without_autocommit.close()
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -2,7 +2,14 @@ bolt_port: &bolt_port "7687"
template_cluster: &template_cluster
cluster:
main:
args: ["--bolt-port", *bolt_port, "--log-level=TRACE"]
args: ["--bolt-port", *bolt_port, "--log-level=TRACE", "--storage-properties-on-edges=True"]
log_file: "triggers-e2e.log"
setup_queries: []
validation_queries: []
storage_properties_edges_false: &storage_properties_edges_false
cluster:
main:
args: ["--bolt-port", *bolt_port, "--log-level=TRACE", "--also-log-to-stderr", "--storage-properties-on-edges=False"]
log_file: "triggers-e2e.log"
setup_queries: []
validation_queries: []
@ -18,7 +25,7 @@ workloads:
args: ["--bolt-port", *bolt_port]
proc: "tests/e2e/triggers/procedures/"
<<: *template_cluster
- name: "ON DELETE Triggers"
- name: "ON DELETE Triggers Storage Properties On Edges True"
binary: "tests/e2e/triggers/memgraph__e2e__triggers__on_delete"
args: ["--bolt-port", *bolt_port]
proc: "tests/e2e/triggers/procedures/"
@ -27,5 +34,8 @@ workloads:
binary: "tests/e2e/triggers/memgraph__e2e__triggers__privileges"
args: ["--bolt-port", *bolt_port]
<<: *template_cluster
- name: "ON DELETE Triggers Storage Properties On Edges False" # should be the same as the python file
binary: "tests/e2e/pytest_runner.sh"
proc: "tests/e2e/triggers/procedures/"
args: ["triggers/triggers_properties_false.py"]
<<: *storage_properties_edges_false