From 13c9bf76af8ed6717e3fe1a553cefe296502053b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?=
 <antaljanosbenjamin@users.noreply.github.com>
Date: Tue, 6 Jul 2021 17:51:11 +0200
Subject: [PATCH] Add e2e tests for streams (#190)

* Add base of e2e tests

* Add python dependencies

* Explicitly close customer in destructor

* Parametrize tests and add test for CHECK STREAM

* Add tests for SHOW STREAMS

* Add test for concurrent start/stop during check

* Add test for calling check with an already started stream

* Run streams e2e tests on CI servers

Co-authored-by: antonio2368 <antonio2368@users.noreply.github.com>
Co-authored-by: Jure Bajic <jbajic@users.noreply.github.com>
---
 include/mg_procedure.h                        |   6 +-
 src/integrations/kafka/consumer.cpp           |   1 +
 src/query/procedure/mg_procedure_impl.cpp     |   2 +-
 src/query/procedure/py_module.cpp             |   2 +-
 tests/e2e/CMakeLists.txt                      |   1 +
 tests/e2e/runner.py                           |   5 +-
 tests/e2e/streams/CMakeLists.txt              |  11 +
 tests/e2e/streams/docker-compose.yml          |  20 +
 tests/e2e/streams/streams_test_runner.sh      |   6 +
 tests/e2e/streams/streams_tests.py            | 466 ++++++++++++++++++
 .../streams/transformations/CMakeLists.txt    |   1 +
 .../e2e/streams/transformations/transform.py  |  37 ++
 tests/e2e/streams/workloads.yaml              |  14 +
 tests/setup.sh                                |   2 +
 tests/unit/mgp_kafka_c_api.cpp                |   2 +-
 15 files changed, 568 insertions(+), 8 deletions(-)
 create mode 100644 tests/e2e/streams/CMakeLists.txt
 create mode 100644 tests/e2e/streams/docker-compose.yml
 create mode 100755 tests/e2e/streams/streams_test_runner.sh
 create mode 100755 tests/e2e/streams/streams_tests.py
 create mode 100644 tests/e2e/streams/transformations/CMakeLists.txt
 create mode 100644 tests/e2e/streams/transformations/transform.py
 create mode 100644 tests/e2e/streams/workloads.yaml

diff --git a/include/mg_procedure.h b/include/mg_procedure.h
index e13065d16..d6dab79b5 100644
--- a/include/mg_procedure.h
+++ b/include/mg_procedure.h
@@ -815,12 +815,12 @@ struct mgp_message;
 struct mgp_messages;
 
 /// Payload is not null terminated and not a string but rather a byte array.
-/// You need to call mgp_message_get_payload_size() first, to read the size
-/// of the payload.
+/// You need to call mgp_message_payload_size() first, to read the size of
+/// the payload.
 const char *mgp_message_get_payload(const struct mgp_message *);
 
 /// Return the payload size
-size_t mgp_message_get_payload_size(const struct mgp_message *);
+size_t mgp_message_payload_size(const struct mgp_message *);
 
 /// Return the name of topic
 const char *mgp_message_topic_name(const struct mgp_message *);
diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp
index 929c153a6..44e13590e 100644
--- a/src/integrations/kafka/consumer.cpp
+++ b/src/integrations/kafka/consumer.cpp
@@ -158,6 +158,7 @@ Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, Cons
 
 Consumer::~Consumer() {
   StopIfRunning();
+  consumer_->close();
   RdKafka::TopicPartition::destroy(last_assignment_);
 }
 
diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp
index cf345b371..282ea0383 100644
--- a/src/query/procedure/mg_procedure_impl.cpp
+++ b/src/query/procedure/mg_procedure_impl.cpp
@@ -1455,7 +1455,7 @@ bool IsValidIdentifierName(const char *name) {
 
 const char *mgp_message_get_payload(const mgp_message *message) { return message->msg->Payload().data(); }
 
-size_t mgp_message_get_payload_size(const mgp_message *message) { return message->msg->Payload().size(); }
+size_t mgp_message_payload_size(const mgp_message *message) { return message->msg->Payload().size(); }
 
 const char *mgp_message_topic_name(const mgp_message *message) { return message->msg->TopicName().data(); }
 
diff --git a/src/query/procedure/py_module.cpp b/src/query/procedure/py_module.cpp
index 09867af73..c95aedac8 100644
--- a/src/query/procedure/py_module.cpp
+++ b/src/query/procedure/py_module.cpp
@@ -424,7 +424,7 @@ PyObject *PyMessageIsValid(PyMessage *self, PyObject *Py_UNUSED(ignored)) {
 
 PyObject *PyMessageGetPayload(PyMessage *self, PyObject *Py_UNUSED(ignored)) {
   MG_ASSERT(self->message);
-  auto payload_size = mgp_message_get_payload_size(self->message);
+  auto payload_size = mgp_message_payload_size(self->message);
   const auto *payload = mgp_message_get_payload(self->message);
   auto *raw_bytes = PyByteArray_FromStringAndSize(payload, payload_size);
   if (!raw_bytes) {
diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt
index 1a14bd21a..4b2a11a9d 100644
--- a/tests/e2e/CMakeLists.txt
+++ b/tests/e2e/CMakeLists.txt
@@ -2,3 +2,4 @@ add_subdirectory(replication)
 add_subdirectory(memory)
 add_subdirectory(triggers)
 add_subdirectory(isolation_levels)
+add_subdirectory(streams)
diff --git a/tests/e2e/runner.py b/tests/e2e/runner.py
index 74b69955e..e8dfc5c9f 100755
--- a/tests/e2e/runner.py
+++ b/tests/e2e/runner.py
@@ -52,8 +52,9 @@ def run(args):
             log_file_path = os.path.join(BUILD_DIR, 'logs', config['log_file'])
             binary_args = config['args'] + ["--log-file", log_file_path]
             if 'proc' in workload:
-              procdir = "--query-modules-directory=" + os.path.join(BUILD_DIR, workload['proc'])
-              binary_args.append(procdir)
+                procdir = "--query-modules-directory=" + \
+                    os.path.join(BUILD_DIR, workload['proc'])
+                binary_args.append(procdir)
 
             mg_instance.start(args=binary_args)
             for query in config['setup_queries']:
diff --git a/tests/e2e/streams/CMakeLists.txt b/tests/e2e/streams/CMakeLists.txt
new file mode 100644
index 000000000..d142ae61a
--- /dev/null
+++ b/tests/e2e/streams/CMakeLists.txt
@@ -0,0 +1,11 @@
+function(copy_streams_e2e_python_files FILE_NAME)
+add_custom_target(memgraph__e2e__streams__${FILE_NAME} ALL
+        COMMAND ${CMAKE_COMMAND} -E copy
+                ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME}
+                ${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
+        DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
+endfunction()
+
+copy_streams_e2e_python_files(streams_tests.py)
+copy_streams_e2e_python_files(streams_test_runner.sh)
+add_subdirectory(transformations)
diff --git a/tests/e2e/streams/docker-compose.yml b/tests/e2e/streams/docker-compose.yml
new file mode 100644
index 000000000..f40bb053a
--- /dev/null
+++ b/tests/e2e/streams/docker-compose.yml
@@ -0,0 +1,20 @@
+version: "3"
+services:
+  zookeeper:
+    image: 'bitnami/zookeeper:3.6.3-debian-10-r33'
+    ports:
+      - '2181:2181'
+    environment:
+      - ALLOW_ANONYMOUS_LOGIN=yes
+  kafka:
+    image: 'bitnami/kafka:2.8.0-debian-10-r49'
+    ports:
+      - '9092:9092'
+    environment:
+      - KAFKA_BROKER_ID=1
+      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
+      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
+      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
+      - ALLOW_PLAINTEXT_LISTENER=yes
+    depends_on:
+      - zookeeper
diff --git a/tests/e2e/streams/streams_test_runner.sh b/tests/e2e/streams/streams_test_runner.sh
new file mode 100755
index 000000000..9cc749250
--- /dev/null
+++ b/tests/e2e/streams/streams_test_runner.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+# This workaround is necessary to run in the same virtualenv as the e2e runner.py
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+python3 "$DIR/streams_tests.py"
diff --git a/tests/e2e/streams/streams_tests.py b/tests/e2e/streams/streams_tests.py
new file mode 100755
index 000000000..1b528b927
--- /dev/null
+++ b/tests/e2e/streams/streams_tests.py
@@ -0,0 +1,466 @@
+#!/usr/bin/python3
+
+# To run these test locally a running Kafka sever is necessery. The test tries
+# to connect on localhost:9092.
+
+# All tests are implemented in this file, because using the same test fixtures
+# in multiple files is not possible in a straightforward way
+
+import sys
+import pytest
+import mgclient
+import time
+from multiprocessing import Process, Value
+from kafka import KafkaProducer
+from kafka.admin import KafkaAdminClient, NewTopic
+
+# These are the indices of the different values in the result of SHOW STREAM
+# query
+NAME = 0
+TOPICS = 1
+CONSUMER_GROUP = 2
+BATCH_INTERVAL = 3
+BATCH_SIZE = 4
+TRANSFORM = 5
+IS_RUNNING = 6
+
+# These are the indices of the query and parameters in the result of CHECK
+# STREAM query
+QUERY = 0
+PARAMS = 1
+
+TRANSFORMATIONS_TO_CHECK = [
+    "transform.simple", "transform.with_parameters"]
+
+SIMPLE_MSG = b'message'
+
+
+def execute_and_fetch_all(cursor, query):
+    cursor.execute(query)
+    return cursor.fetchall()
+
+
+def connect():
+    connection = mgclient.connect(host="localhost", port=7687)
+    connection.autocommit = True
+    return connection
+
+
+@pytest.fixture(autouse=True)
+def connection():
+    connection = connect()
+    yield connection
+    cursor = connection.cursor()
+    execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
+    stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS")
+    for stream_info in stream_infos:
+        execute_and_fetch_all(cursor, f"DROP STREAM {stream_info[NAME]}")
+
+
+@pytest.fixture(scope="function")
+def topics():
+    admin_client = KafkaAdminClient(
+        bootstrap_servers="localhost:9092", client_id='test')
+
+    topics = []
+    topics_to_create = []
+    for index in range(3):
+        topic = f"topic_{index}"
+        topics.append(topic)
+        topics_to_create.append(NewTopic(name=topic,
+                                num_partitions=1, replication_factor=1))
+
+    admin_client.create_topics(new_topics=topics_to_create, timeout_ms=5000)
+    yield topics
+    admin_client.delete_topics(topics=topics, timeout_ms=5000)
+
+
+@pytest.fixture(scope="function")
+def producer():
+    yield KafkaProducer(bootstrap_servers="localhost:9092")
+
+
+def timed_wait(fun):
+    start_time = time.time()
+    seconds = 10
+
+    while True:
+        current_time = time.time()
+        elapsed_time = current_time - start_time
+
+        if elapsed_time > seconds:
+            return False
+
+        if fun():
+            return True
+
+
+def check_one_result_row(cursor, query):
+    start_time = time.time()
+    seconds = 10
+
+    while True:
+        current_time = time.time()
+        elapsed_time = current_time - start_time
+
+        if elapsed_time > seconds:
+            return False
+
+        cursor.execute(query)
+        results = cursor.fetchall()
+        if len(results) < 1:
+            time.sleep(0.1)
+            continue
+
+        return len(results) == 1
+
+
+def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
+    assert check_one_result_row(cursor,
+                                "MATCH (n: MESSAGE {"
+                                f"payload: '{payload_bytes.decode('utf-8')}',"
+                                f"topic: '{topic}'"
+                                "}) RETURN n")
+
+
+def get_stream_info(cursor, stream_name):
+    stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS")
+    for stream_info in stream_infos:
+        if (stream_info[NAME] == stream_name):
+            return stream_info
+
+    return None
+
+
+def get_is_running(cursor, stream_name):
+    stream_info = get_stream_info(cursor, stream_name)
+
+    assert stream_info
+    return stream_info[IS_RUNNING]
+
+
+def start_stream(cursor, stream_name):
+    execute_and_fetch_all(cursor, f"START STREAM {stream_name}")
+
+    assert get_is_running(cursor, stream_name)
+
+
+def stop_stream(cursor, stream_name):
+    execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}")
+
+    assert not get_is_running(cursor, stream_name)
+
+
+def drop_stream(cursor, stream_name):
+    execute_and_fetch_all(cursor, f"DROP STREAM {stream_name}")
+
+    assert get_stream_info(cursor, stream_name) is None
+
+
+def check_stream_info(cursor, stream_name, expected_stream_info):
+    stream_info = get_stream_info(cursor, stream_name)
+    assert len(stream_info) == len(expected_stream_info)
+    for info, expected_info in zip(stream_info, expected_stream_info):
+        assert info == expected_info
+
+##############################################
+# Tests
+##############################################
+
+
+@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
+def test_simple(producer, topics, connection, transformation):
+    assert len(topics) > 0
+    cursor = connection.cursor()
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM test "
+                          f"TOPICS {','.join(topics)} "
+                          f"TRANSFORM {transformation}")
+    start_stream(cursor, "test")
+    time.sleep(5)
+
+    for topic in topics:
+        producer.send(topic, SIMPLE_MSG).get(timeout=60)
+
+    for topic in topics:
+        check_vertex_exists_with_topic_and_payload(
+            cursor, topic, SIMPLE_MSG)
+
+
+@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
+def test_separate_consumers(producer, topics, connection, transformation):
+    assert len(topics) > 0
+    cursor = connection.cursor()
+
+    stream_names = []
+    for topic in topics:
+        stream_name = "stream_" + topic
+        stream_names.append(stream_name)
+        execute_and_fetch_all(cursor,
+                              f"CREATE STREAM {stream_name} "
+                              f"TOPICS {topic} "
+                              f"TRANSFORM {transformation}")
+
+    for stream_name in stream_names:
+        start_stream(cursor, stream_name)
+
+    time.sleep(5)
+
+    for topic in topics:
+        producer.send(topic, SIMPLE_MSG).get(timeout=60)
+
+    for topic in topics:
+        check_vertex_exists_with_topic_and_payload(
+            cursor, topic, SIMPLE_MSG)
+
+
+def test_start_from_last_committed_offset(producer, topics, connection):
+    # This test creates a stream, consumes a message to have a committed
+    # offset, then destroys the stream. A new message is sent before the
+    # stream is recreated and then restarted. This simulates when Memgraph is
+    # stopped (stream is destroyed) and then restarted (stream is recreated).
+    # This is of course not as good as restarting memgraph would be, but
+    # restarting Memgraph during a single workload cannot be done currently.
+    assert len(topics) > 0
+    cursor = connection.cursor()
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM test "
+                          f"TOPICS {topics[0]} "
+                          "TRANSFORM transform.simple")
+    start_stream(cursor, "test")
+    time.sleep(1)
+
+    producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
+
+    check_vertex_exists_with_topic_and_payload(
+        cursor, topics[0], SIMPLE_MSG)
+
+    stop_stream(cursor, "test")
+    drop_stream(cursor, "test")
+
+    messages = [b"second message", b"third message"]
+    for message in messages:
+        producer.send(topics[0], message).get(timeout=60)
+
+    for message in messages:
+        vertices_with_msg = execute_and_fetch_all(cursor,
+                                                  "MATCH (n: MESSAGE {"
+                                                  f"payload: '{message.decode('utf-8')}'"
+                                                  "}) RETURN n")
+
+        assert len(vertices_with_msg) == 0
+
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM test "
+                          f"TOPICS {topics[0]} "
+                          "TRANSFORM transform.simple")
+    start_stream(cursor, "test")
+
+    for message in messages:
+        check_vertex_exists_with_topic_and_payload(
+            cursor, topics[0], message)
+
+
+@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
+def test_check_stream(producer, topics, connection, transformation):
+    assert len(topics) > 0
+    cursor = connection.cursor()
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM test "
+                          f"TOPICS {topics[0]} "
+                          f"TRANSFORM {transformation} "
+                          "BATCH_SIZE 1")
+    start_stream(cursor, "test")
+    time.sleep(1)
+
+    producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
+    stop_stream(cursor, "test")
+
+    messages = [b"first message", b"second message", b"third message"]
+    for message in messages:
+        producer.send(topics[0], message).get(timeout=60)
+
+    def check_check_stream(batch_limit):
+        assert transformation == "transform.simple" \
+            or transformation == "transform.with_parameters"
+        test_results = execute_and_fetch_all(
+            cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}")
+        assert len(test_results) == batch_limit
+
+        for i in range(batch_limit):
+            message_as_str = messages[i].decode('utf-8')
+            if transformation == "transform.simple":
+                assert f"payload: '{message_as_str}'" in \
+                    test_results[i][QUERY]
+                assert test_results[i][PARAMS] is None
+            else:
+                assert test_results[i][QUERY] == ("CREATE (n:MESSAGE "
+                                                  "{timestamp: $timestamp, "
+                                                  "payload: $payload, "
+                                                  "topic: $topic})")
+                parameters = test_results[i][PARAMS]
+                # this is not a very sofisticated test, but checks if
+                # timestamp has some kind of value
+                assert parameters["timestamp"] > 1000000000000
+                assert parameters["topic"] == topics[0]
+                assert parameters["payload"] == message_as_str
+
+    check_check_stream(1)
+    check_check_stream(2)
+    check_check_stream(3)
+    start_stream(cursor, "test")
+
+    for message in messages:
+        check_vertex_exists_with_topic_and_payload(
+            cursor, topics[0], message)
+
+
+def test_show_streams(producer, topics, connection):
+    assert len(topics) > 1
+    cursor = connection.cursor()
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM default_values "
+                          f"TOPICS {topics[0]} "
+                          f"TRANSFORM transform.simple")
+
+    consumer_group = "my_special_consumer_group"
+    batch_interval = 42
+    batch_size = 3
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM complex_values "
+                          f"TOPICS {','.join(topics)} "
+                          f"TRANSFORM transform.with_parameters "
+                          f"CONSUMER_GROUP {consumer_group} "
+                          f"BATCH_INTERVAL {batch_interval} "
+                          f"BATCH_SIZE {batch_size} ")
+
+    assert len(execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
+
+    check_stream_info(cursor, "default_values", ("default_values", [
+                      topics[0]], "mg_consumer", None, None,
+        "transform.simple", False))
+
+    check_stream_info(cursor, "complex_values", ("complex_values", topics,
+                      consumer_group, batch_interval, batch_size,
+                                                 "transform.with_parameters",
+                                                 False))
+
+
+@pytest.mark.parametrize("operation", ["START", "STOP"])
+def test_start_and_stop_during_check(producer, topics, connection, operation):
+    # This test is quite complex. The goal is to call START/STOP queries
+    # while a CHECK query is waiting for its result. Because the Global
+    # Interpreter Lock, running queries on multiple threads is not useful,
+    # because only one of them can call Cursor::execute at a time. Therefore
+    # multiple processes are used to execute the queries, because different
+    # processes have different GILs.
+    # The counter variables are thread- and process-safe variables to
+    # synchronize between the different processes. Each value represents a
+    # specific phase of the execution of the processes.
+    assert len(topics) > 1
+    assert operation == "START" or operation == "STOP"
+    cursor = connection.cursor()
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM test_stream "
+                          f"TOPICS {topics[0]} "
+                          f"TRANSFORM transform.simple")
+
+    check_counter = Value('i', 0)
+    check_result_len = Value('i', 0)
+    operation_counter = Value('i', 0)
+
+    CHECK_BEFORE_EXECUTE = 1
+    CHECK_AFTER_FETCHALL = 2
+    CHECK_CORRECT_RESULT = 3
+    CHECK_INCORRECT_RESULT = 4
+
+    def call_check(counter, result_len):
+        # This process will call the CHECK query and increment the counter
+        # based on its progress and expected behavior
+        connection = connect()
+        cursor = connection.cursor()
+        counter.value = CHECK_BEFORE_EXECUTE
+        result = execute_and_fetch_all(cursor, "CHECK STREAM test_stream")
+        result_len.value = len(result)
+        counter.value = CHECK_AFTER_FETCHALL
+        if len(result) > 0 and "payload: 'message'" in result[0][QUERY]:
+            counter.value = CHECK_CORRECT_RESULT
+        else:
+            counter.value = CHECK_INCORRECT_RESULT
+
+    OP_BEFORE_EXECUTE = 1
+    OP_AFTER_FETCHALL = 2
+    OP_ALREADY_STOPPED_EXCEPTION = 3
+    OP_INCORRECT_ALREADY_STOPPED_EXCEPTION = 4
+    OP_UNEXPECTED_EXCEPTION = 5
+
+    def call_operation(counter):
+        # This porcess will call the query with the specified operation and
+        # increment the counter based on its progress and expected behavior
+        connection = connect()
+        cursor = connection.cursor()
+        counter.value = OP_BEFORE_EXECUTE
+        try:
+            execute_and_fetch_all(cursor, f"{operation} STREAM test_stream")
+            counter.value = OP_AFTER_FETCHALL
+        except mgclient.DatabaseError as e:
+            if "Kafka consumer test_stream is already stopped" in str(e):
+                counter.value = OP_ALREADY_STOPPED_EXCEPTION
+            else:
+                counter.value = OP_INCORRECT_ALREADY_STOPPED_EXCEPTION
+        except Exception:
+            counter.value = OP_UNEXPECTED_EXCEPTION
+
+    check_stream_proc = Process(
+        target=call_check, daemon=True, args=(check_counter, check_result_len))
+    operation_proc = Process(target=call_operation,
+                             daemon=True, args=(operation_counter,))
+
+    try:
+        check_stream_proc.start()
+
+        time.sleep(0.5)
+
+        assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE)
+        operation_proc.start()
+        assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE)
+
+        producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
+        assert timed_wait(lambda: check_counter.value > CHECK_AFTER_FETCHALL)
+        assert check_counter.value == CHECK_CORRECT_RESULT
+        assert check_result_len.value == 1
+        check_stream_proc.join()
+
+        operation_proc.join()
+        if operation == "START":
+            assert operation_counter.value == OP_AFTER_FETCHALL
+            assert get_is_running(cursor, "test_stream")
+        else:
+            assert operation_counter.value == OP_ALREADY_STOPPED_EXCEPTION
+            assert not get_is_running(cursor, "test_stream")
+
+    finally:
+        # to make sure CHECK STREAM finishes
+        producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
+        if check_stream_proc.is_alive():
+            check_stream_proc.terminate()
+        if operation_proc.is_alive():
+            operation_proc.terminate()
+
+
+def test_check_already_started_stream(producer, topics, connection):
+    assert len(topics) > 0
+    cursor = connection.cursor()
+
+    execute_and_fetch_all(cursor,
+                          "CREATE STREAM started_stream "
+                          f"TOPICS {topics[0]} "
+                          f"TRANSFORM transform.simple")
+    start_stream(cursor, "started_stream")
+
+    with pytest.raises(mgclient.DatabaseError):
+        execute_and_fetch_all(cursor, "CHECK STREAM started_stream")
+
+
+if __name__ == "__main__":
+    sys.exit(pytest.main([__file__, "-rA"]))
diff --git a/tests/e2e/streams/transformations/CMakeLists.txt b/tests/e2e/streams/transformations/CMakeLists.txt
new file mode 100644
index 000000000..8594c8ba0
--- /dev/null
+++ b/tests/e2e/streams/transformations/CMakeLists.txt
@@ -0,0 +1 @@
+copy_streams_e2e_python_files(transform.py)
diff --git a/tests/e2e/streams/transformations/transform.py b/tests/e2e/streams/transformations/transform.py
new file mode 100644
index 000000000..b15ff13a1
--- /dev/null
+++ b/tests/e2e/streams/transformations/transform.py
@@ -0,0 +1,37 @@
+import mgp
+
+
+@mgp.transformation
+def simple(context: mgp.TransCtx,
+           messages: mgp.Messages
+           ) -> mgp.Record(query=str, parameters=mgp.Map):
+
+    result_queries = []
+
+    for i in range(0, messages.total_messages()):
+        message = messages.message_at(i)
+        payload_as_str = message.payload().decode("utf-8")
+        result_queries.append(mgp.Record(
+            query=f"CREATE (n:MESSAGE {{timestamp: '{message.timestamp()}', payload: '{payload_as_str}', topic: '{message.topic_name()}'}})",
+            parameters=None))
+
+    return result_queries
+
+
+@mgp.transformation
+def with_parameters(context: mgp.TransCtx,
+                    messages: mgp.Messages
+                    ) -> mgp.Record(query=str, parameters=mgp.Map):
+
+    result_queries = []
+
+    for i in range(0, messages.total_messages()):
+        message = messages.message_at(i)
+        payload_as_str = message.payload().decode("utf-8")
+        result_queries.append(mgp.Record(
+            query="CREATE (n:MESSAGE {timestamp: $timestamp, payload: $payload, topic: $topic})",
+            parameters={"timestamp": message.timestamp(),
+                        "payload": payload_as_str,
+                        "topic": message.topic_name()}))
+
+    return result_queries
diff --git a/tests/e2e/streams/workloads.yaml b/tests/e2e/streams/workloads.yaml
new file mode 100644
index 000000000..8d8a836be
--- /dev/null
+++ b/tests/e2e/streams/workloads.yaml
@@ -0,0 +1,14 @@
+template_cluster: &template_cluster
+  cluster:
+    main:
+      args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092","--query-execution-timeout-sec=0"]
+      log_file: "streams-e2e.log"
+      setup_queries: []
+      validation_queries: []
+
+workloads:
+  - name: "Streams start, stop and show"
+    binary: "tests/e2e/streams/streams_test_runner.sh"
+    proc: "tests/e2e/streams/transformations/"
+    args: []
+    <<: *template_cluster
diff --git a/tests/setup.sh b/tests/setup.sh
index e4d04cd3f..ff853e4ff 100755
--- a/tests/setup.sh
+++ b/tests/setup.sh
@@ -7,9 +7,11 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 PIP_DEPS=(
    "behave==1.2.6"
    "ldap3==2.6"
+   "kafka-python==2.0.2"
    "neo4j-driver==4.1.1"
    "parse==1.18.0"
    "parse-type==0.5.2"
+   "pytest==6.2.3"
    "pyyaml==5.3.1"
    "six==1.15.0"
 )
diff --git a/tests/unit/mgp_kafka_c_api.cpp b/tests/unit/mgp_kafka_c_api.cpp
index c8a4ddeb3..947888be6 100644
--- a/tests/unit/mgp_kafka_c_api.cpp
+++ b/tests/unit/mgp_kafka_c_api.cpp
@@ -142,7 +142,7 @@ TEST_F(MgpApiTest, TestAllMgpKafkaCApi) {
     EXPECT_EQ(*mgp_message_key(message), expected[i].key);
 
     // Test for payload size
-    EXPECT_EQ(mgp_message_get_payload_size(message), expected[i].payload_size);
+    EXPECT_EQ(mgp_message_payload_size(message), expected[i].payload_size);
     // Test for payload
     EXPECT_FALSE(std::strcmp(mgp_message_get_payload(message), expected[i].payload));
     // Test for topic name