memgraph/tests/e2e/triggers/common.py
Andi 156e2cd095
On delete triggers invalid edge reference (#717)
* Added check if there is invalid reference to the underlying edge

* Added fix and e2e tests

* Isolation levels tracking based on from_vertex_

* Added explicit transaction test + edge accessor changes based on the vertex_edge

* Autocommit on tests, initialize deleted by checking out_edges

Co-authored-by: Marko Budiselić <marko.budiselic@memgraph.com>
2023-01-18 15:05:10 +01:00

35 lines
1.4 KiB
Python

# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import pytest
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
@pytest.fixture
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
triggers_list = execute_and_fetch_all(connection.cursor(), "SHOW TRIGGERS;")
for trigger in triggers_list:
execute_and_fetch_all(connection.cursor(), f"DROP TRIGGER {trigger[0]}")
execute_and_fetch_all(connection.cursor(), "MATCH (n) DETACH DELETE n")
yield connection
for trigger in triggers_list:
execute_and_fetch_all(connection.cursor(), f"DROP TRIGGER {trigger[0]}")
execute_and_fetch_all(connection.cursor(), "MATCH (n) DETACH DELETE n")