memgraph/tests/e2e/triggers/procedures/write.py
2021-10-11 13:39:57 +02:00

69 lines
1.8 KiB
Python

import mgp
@mgp.write_proc
def create_vertex(ctx: mgp.ProcCtx, id: mgp.Any) -> mgp.Record(v=mgp.Any):
v = None
try:
v = ctx.graph.create_vertex()
v.properties.set("id", id)
v.properties.set("tbd", 0)
except RuntimeError as e:
return mgp.Record(v=str(e))
return mgp.Record(v=v)
@mgp.write_proc
def delete_vertex(ctx: mgp.ProcCtx, v: mgp.Any) -> mgp.Record():
ctx.graph.delete_vertex(v)
return mgp.Record()
@mgp.write_proc
def detach_delete_vertex(ctx: mgp.ProcCtx, v: mgp.Any) -> mgp.Record():
ctx.graph.detach_delete_vertex(v)
return mgp.Record()
@mgp.write_proc
def create_edge(ctx: mgp.ProcCtx, from_vertex: mgp.Vertex,
to_vertex: mgp.Vertex,
edge_type: str) -> mgp.Record(e=mgp.Any):
e = None
try:
e = ctx.graph.create_edge(
from_vertex, to_vertex, mgp.EdgeType(edge_type))
e.properties.set("id", 1);
e.properties.set("tbd", 0);
except RuntimeError as ex:
return mgp.Record(e=str(ex))
return mgp.Record(e=e)
@mgp.write_proc
def delete_edge(ctx: mgp.ProcCtx, edge: mgp.Edge) -> mgp.Record():
ctx.graph.delete_edge(edge)
return mgp.Record()
@mgp.write_proc
def set_property(ctx: mgp.ProcCtx, object: mgp.Any) -> mgp.Record():
object.properties.set("id", 2)
return mgp.Record()
@mgp.write_proc
def remove_property(ctx: mgp.ProcCtx, object: mgp.Any) -> mgp.Record():
object.properties.set("tbd", None)
return mgp.Record()
@mgp.write_proc
def add_label(ctx: mgp.ProcCtx, object: mgp.Any,
name: str) -> mgp.Record(o=mgp.Any):
object.add_label(name)
return mgp.Record(o=object)
@mgp.write_proc
def remove_label(ctx: mgp.ProcCtx, object: mgp.Any,
name: str) -> mgp.Record(o=mgp.Any):
object.remove_label(name)
return mgp.Record(o=object)