diff --git a/tests/integration/kafka/transform.py b/tests/integration/kafka/transform.py index 1fd87ca9b..d0740f360 100644 --- a/tests/integration/kafka/transform.py +++ b/tests/integration/kafka/transform.py @@ -9,7 +9,7 @@ def stream(batch): for item in batch: message = item.decode("utf-8").strip().split() if len(message) == 1: - ret.append(("CREATE (:node {num: $num})", {"num": message[0]})) + ret.append(("MERGE (:node {num: $num})", {"num": message[0]})) elif len(message) == 2: - ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]})) + ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) MERGE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]})) return ret