memgraph/tests/e2e/write_procedures/common.py
János Benjamin Antal be9ed7e879 Python wrapper for write procedures (#234)
* Rename mgp_graph_remove to mgp_graph_delete

* Add mgp_graph_detach_delete

* Add PyGraph functions

* Add _mgp exceptions

* Use unified error handling in python wrapper

* Ignore clang-tidy warnings

* Add mgp.Graph, mgp.Vertex and mgp.Edge mutable functions

* Add python write procedure registration

* Add `is_write` result field to mg.procedures

* Use storage::View::NEW for write procedures

* Add simple tests for write procedures

* Remove false information about IDs
2021-10-02 13:17:41 +02:00

24 lines
660 B
Python

import mgclient
import typing
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str,
params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def connect(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
return connection
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
results = execute_and_fetch_all(cursor, query)
return len(results) == n
def has_one_result_row(cursor: mgclient.Cursor, query: str):
return has_n_result_row(cursor, query, 1)