Add e2e tests for streams (#190)
* Add base of e2e tests * Add python dependencies * Explicitly close customer in destructor * Parametrize tests and add test for CHECK STREAM * Add tests for SHOW STREAMS * Add test for concurrent start/stop during check * Add test for calling check with an already started stream * Run streams e2e tests on CI servers Co-authored-by: antonio2368 <antonio2368@users.noreply.github.com> Co-authored-by: Jure Bajic <jbajic@users.noreply.github.com>
This commit is contained in:
parent
2e1a717dcb
commit
13c9bf76af
@ -815,12 +815,12 @@ struct mgp_message;
|
||||
struct mgp_messages;
|
||||
|
||||
/// Payload is not null terminated and not a string but rather a byte array.
|
||||
/// You need to call mgp_message_get_payload_size() first, to read the size
|
||||
/// of the payload.
|
||||
/// You need to call mgp_message_payload_size() first, to read the size of
|
||||
/// the payload.
|
||||
const char *mgp_message_get_payload(const struct mgp_message *);
|
||||
|
||||
/// Return the payload size
|
||||
size_t mgp_message_get_payload_size(const struct mgp_message *);
|
||||
size_t mgp_message_payload_size(const struct mgp_message *);
|
||||
|
||||
/// Return the name of topic
|
||||
const char *mgp_message_topic_name(const struct mgp_message *);
|
||||
|
@ -158,6 +158,7 @@ Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, Cons
|
||||
|
||||
Consumer::~Consumer() {
|
||||
StopIfRunning();
|
||||
consumer_->close();
|
||||
RdKafka::TopicPartition::destroy(last_assignment_);
|
||||
}
|
||||
|
||||
|
@ -1455,7 +1455,7 @@ bool IsValidIdentifierName(const char *name) {
|
||||
|
||||
const char *mgp_message_get_payload(const mgp_message *message) { return message->msg->Payload().data(); }
|
||||
|
||||
size_t mgp_message_get_payload_size(const mgp_message *message) { return message->msg->Payload().size(); }
|
||||
size_t mgp_message_payload_size(const mgp_message *message) { return message->msg->Payload().size(); }
|
||||
|
||||
const char *mgp_message_topic_name(const mgp_message *message) { return message->msg->TopicName().data(); }
|
||||
|
||||
|
@ -424,7 +424,7 @@ PyObject *PyMessageIsValid(PyMessage *self, PyObject *Py_UNUSED(ignored)) {
|
||||
|
||||
PyObject *PyMessageGetPayload(PyMessage *self, PyObject *Py_UNUSED(ignored)) {
|
||||
MG_ASSERT(self->message);
|
||||
auto payload_size = mgp_message_get_payload_size(self->message);
|
||||
auto payload_size = mgp_message_payload_size(self->message);
|
||||
const auto *payload = mgp_message_get_payload(self->message);
|
||||
auto *raw_bytes = PyByteArray_FromStringAndSize(payload, payload_size);
|
||||
if (!raw_bytes) {
|
||||
|
@ -2,3 +2,4 @@ add_subdirectory(replication)
|
||||
add_subdirectory(memory)
|
||||
add_subdirectory(triggers)
|
||||
add_subdirectory(isolation_levels)
|
||||
add_subdirectory(streams)
|
||||
|
@ -52,8 +52,9 @@ def run(args):
|
||||
log_file_path = os.path.join(BUILD_DIR, 'logs', config['log_file'])
|
||||
binary_args = config['args'] + ["--log-file", log_file_path]
|
||||
if 'proc' in workload:
|
||||
procdir = "--query-modules-directory=" + os.path.join(BUILD_DIR, workload['proc'])
|
||||
binary_args.append(procdir)
|
||||
procdir = "--query-modules-directory=" + \
|
||||
os.path.join(BUILD_DIR, workload['proc'])
|
||||
binary_args.append(procdir)
|
||||
|
||||
mg_instance.start(args=binary_args)
|
||||
for query in config['setup_queries']:
|
||||
|
11
tests/e2e/streams/CMakeLists.txt
Normal file
11
tests/e2e/streams/CMakeLists.txt
Normal file
@ -0,0 +1,11 @@
|
||||
function(copy_streams_e2e_python_files FILE_NAME)
|
||||
add_custom_target(memgraph__e2e__streams__${FILE_NAME} ALL
|
||||
COMMAND ${CMAKE_COMMAND} -E copy
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME}
|
||||
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
|
||||
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
|
||||
endfunction()
|
||||
|
||||
copy_streams_e2e_python_files(streams_tests.py)
|
||||
copy_streams_e2e_python_files(streams_test_runner.sh)
|
||||
add_subdirectory(transformations)
|
20
tests/e2e/streams/docker-compose.yml
Normal file
20
tests/e2e/streams/docker-compose.yml
Normal file
@ -0,0 +1,20 @@
|
||||
version: "3"
|
||||
services:
|
||||
zookeeper:
|
||||
image: 'bitnami/zookeeper:3.6.3-debian-10-r33'
|
||||
ports:
|
||||
- '2181:2181'
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
kafka:
|
||||
image: 'bitnami/kafka:2.8.0-debian-10-r49'
|
||||
ports:
|
||||
- '9092:9092'
|
||||
environment:
|
||||
- KAFKA_BROKER_ID=1
|
||||
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
|
||||
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
|
||||
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||
depends_on:
|
||||
- zookeeper
|
6
tests/e2e/streams/streams_test_runner.sh
Executable file
6
tests/e2e/streams/streams_test_runner.sh
Executable file
@ -0,0 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
# This workaround is necessary to run in the same virtualenv as the e2e runner.py
|
||||
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
python3 "$DIR/streams_tests.py"
|
466
tests/e2e/streams/streams_tests.py
Executable file
466
tests/e2e/streams/streams_tests.py
Executable file
@ -0,0 +1,466 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# To run these test locally a running Kafka sever is necessery. The test tries
|
||||
# to connect on localhost:9092.
|
||||
|
||||
# All tests are implemented in this file, because using the same test fixtures
|
||||
# in multiple files is not possible in a straightforward way
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
import mgclient
|
||||
import time
|
||||
from multiprocessing import Process, Value
|
||||
from kafka import KafkaProducer
|
||||
from kafka.admin import KafkaAdminClient, NewTopic
|
||||
|
||||
# These are the indices of the different values in the result of SHOW STREAM
|
||||
# query
|
||||
NAME = 0
|
||||
TOPICS = 1
|
||||
CONSUMER_GROUP = 2
|
||||
BATCH_INTERVAL = 3
|
||||
BATCH_SIZE = 4
|
||||
TRANSFORM = 5
|
||||
IS_RUNNING = 6
|
||||
|
||||
# These are the indices of the query and parameters in the result of CHECK
|
||||
# STREAM query
|
||||
QUERY = 0
|
||||
PARAMS = 1
|
||||
|
||||
TRANSFORMATIONS_TO_CHECK = [
|
||||
"transform.simple", "transform.with_parameters"]
|
||||
|
||||
SIMPLE_MSG = b'message'
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor, query):
|
||||
cursor.execute(query)
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
def connect():
|
||||
connection = mgclient.connect(host="localhost", port=7687)
|
||||
connection.autocommit = True
|
||||
return connection
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def connection():
|
||||
connection = connect()
|
||||
yield connection
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS")
|
||||
for stream_info in stream_infos:
|
||||
execute_and_fetch_all(cursor, f"DROP STREAM {stream_info[NAME]}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def topics():
|
||||
admin_client = KafkaAdminClient(
|
||||
bootstrap_servers="localhost:9092", client_id='test')
|
||||
|
||||
topics = []
|
||||
topics_to_create = []
|
||||
for index in range(3):
|
||||
topic = f"topic_{index}"
|
||||
topics.append(topic)
|
||||
topics_to_create.append(NewTopic(name=topic,
|
||||
num_partitions=1, replication_factor=1))
|
||||
|
||||
admin_client.create_topics(new_topics=topics_to_create, timeout_ms=5000)
|
||||
yield topics
|
||||
admin_client.delete_topics(topics=topics, timeout_ms=5000)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def producer():
|
||||
yield KafkaProducer(bootstrap_servers="localhost:9092")
|
||||
|
||||
|
||||
def timed_wait(fun):
|
||||
start_time = time.time()
|
||||
seconds = 10
|
||||
|
||||
while True:
|
||||
current_time = time.time()
|
||||
elapsed_time = current_time - start_time
|
||||
|
||||
if elapsed_time > seconds:
|
||||
return False
|
||||
|
||||
if fun():
|
||||
return True
|
||||
|
||||
|
||||
def check_one_result_row(cursor, query):
|
||||
start_time = time.time()
|
||||
seconds = 10
|
||||
|
||||
while True:
|
||||
current_time = time.time()
|
||||
elapsed_time = current_time - start_time
|
||||
|
||||
if elapsed_time > seconds:
|
||||
return False
|
||||
|
||||
cursor.execute(query)
|
||||
results = cursor.fetchall()
|
||||
if len(results) < 1:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
return len(results) == 1
|
||||
|
||||
|
||||
def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
|
||||
assert check_one_result_row(cursor,
|
||||
"MATCH (n: MESSAGE {"
|
||||
f"payload: '{payload_bytes.decode('utf-8')}',"
|
||||
f"topic: '{topic}'"
|
||||
"}) RETURN n")
|
||||
|
||||
|
||||
def get_stream_info(cursor, stream_name):
|
||||
stream_infos = execute_and_fetch_all(cursor, "SHOW STREAMS")
|
||||
for stream_info in stream_infos:
|
||||
if (stream_info[NAME] == stream_name):
|
||||
return stream_info
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_is_running(cursor, stream_name):
|
||||
stream_info = get_stream_info(cursor, stream_name)
|
||||
|
||||
assert stream_info
|
||||
return stream_info[IS_RUNNING]
|
||||
|
||||
|
||||
def start_stream(cursor, stream_name):
|
||||
execute_and_fetch_all(cursor, f"START STREAM {stream_name}")
|
||||
|
||||
assert get_is_running(cursor, stream_name)
|
||||
|
||||
|
||||
def stop_stream(cursor, stream_name):
|
||||
execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}")
|
||||
|
||||
assert not get_is_running(cursor, stream_name)
|
||||
|
||||
|
||||
def drop_stream(cursor, stream_name):
|
||||
execute_and_fetch_all(cursor, f"DROP STREAM {stream_name}")
|
||||
|
||||
assert get_stream_info(cursor, stream_name) is None
|
||||
|
||||
|
||||
def check_stream_info(cursor, stream_name, expected_stream_info):
|
||||
stream_info = get_stream_info(cursor, stream_name)
|
||||
assert len(stream_info) == len(expected_stream_info)
|
||||
for info, expected_info in zip(stream_info, expected_stream_info):
|
||||
assert info == expected_info
|
||||
|
||||
##############################################
|
||||
# Tests
|
||||
##############################################
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_simple(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM test "
|
||||
f"TOPICS {','.join(topics)} "
|
||||
f"TRANSFORM {transformation}")
|
||||
start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in topics:
|
||||
producer.send(topic, SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in topics:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, SIMPLE_MSG)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_separate_consumers(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
stream_names = []
|
||||
for topic in topics:
|
||||
stream_name = "stream_" + topic
|
||||
stream_names.append(stream_name)
|
||||
execute_and_fetch_all(cursor,
|
||||
f"CREATE STREAM {stream_name} "
|
||||
f"TOPICS {topic} "
|
||||
f"TRANSFORM {transformation}")
|
||||
|
||||
for stream_name in stream_names:
|
||||
start_stream(cursor, stream_name)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
for topic in topics:
|
||||
producer.send(topic, SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in topics:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, SIMPLE_MSG)
|
||||
|
||||
|
||||
def test_start_from_last_committed_offset(producer, topics, connection):
|
||||
# This test creates a stream, consumes a message to have a committed
|
||||
# offset, then destroys the stream. A new message is sent before the
|
||||
# stream is recreated and then restarted. This simulates when Memgraph is
|
||||
# stopped (stream is destroyed) and then restarted (stream is recreated).
|
||||
# This is of course not as good as restarting memgraph would be, but
|
||||
# restarting Memgraph during a single workload cannot be done currently.
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
"TRANSFORM transform.simple")
|
||||
start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topics[0], SIMPLE_MSG)
|
||||
|
||||
stop_stream(cursor, "test")
|
||||
drop_stream(cursor, "test")
|
||||
|
||||
messages = [b"second message", b"third message"]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message).get(timeout=60)
|
||||
|
||||
for message in messages:
|
||||
vertices_with_msg = execute_and_fetch_all(cursor,
|
||||
"MATCH (n: MESSAGE {"
|
||||
f"payload: '{message.decode('utf-8')}'"
|
||||
"}) RETURN n")
|
||||
|
||||
assert len(vertices_with_msg) == 0
|
||||
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
"TRANSFORM transform.simple")
|
||||
start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topics[0], message)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_check_stream(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1")
|
||||
start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
stop_stream(cursor, "test")
|
||||
|
||||
messages = [b"first message", b"second message", b"third message"]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message).get(timeout=60)
|
||||
|
||||
def check_check_stream(batch_limit):
|
||||
assert transformation == "transform.simple" \
|
||||
or transformation == "transform.with_parameters"
|
||||
test_results = execute_and_fetch_all(
|
||||
cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}")
|
||||
assert len(test_results) == batch_limit
|
||||
|
||||
for i in range(batch_limit):
|
||||
message_as_str = messages[i].decode('utf-8')
|
||||
if transformation == "transform.simple":
|
||||
assert f"payload: '{message_as_str}'" in \
|
||||
test_results[i][QUERY]
|
||||
assert test_results[i][PARAMS] is None
|
||||
else:
|
||||
assert test_results[i][QUERY] == ("CREATE (n:MESSAGE "
|
||||
"{timestamp: $timestamp, "
|
||||
"payload: $payload, "
|
||||
"topic: $topic})")
|
||||
parameters = test_results[i][PARAMS]
|
||||
# this is not a very sofisticated test, but checks if
|
||||
# timestamp has some kind of value
|
||||
assert parameters["timestamp"] > 1000000000000
|
||||
assert parameters["topic"] == topics[0]
|
||||
assert parameters["payload"] == message_as_str
|
||||
|
||||
check_check_stream(1)
|
||||
check_check_stream(2)
|
||||
check_check_stream(3)
|
||||
start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topics[0], message)
|
||||
|
||||
|
||||
def test_show_streams(producer, topics, connection):
|
||||
assert len(topics) > 1
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM default_values "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
|
||||
consumer_group = "my_special_consumer_group"
|
||||
batch_interval = 42
|
||||
batch_size = 3
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM complex_values "
|
||||
f"TOPICS {','.join(topics)} "
|
||||
f"TRANSFORM transform.with_parameters "
|
||||
f"CONSUMER_GROUP {consumer_group} "
|
||||
f"BATCH_INTERVAL {batch_interval} "
|
||||
f"BATCH_SIZE {batch_size} ")
|
||||
|
||||
assert len(execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
|
||||
|
||||
check_stream_info(cursor, "default_values", ("default_values", [
|
||||
topics[0]], "mg_consumer", None, None,
|
||||
"transform.simple", False))
|
||||
|
||||
check_stream_info(cursor, "complex_values", ("complex_values", topics,
|
||||
consumer_group, batch_interval, batch_size,
|
||||
"transform.with_parameters",
|
||||
False))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("operation", ["START", "STOP"])
|
||||
def test_start_and_stop_during_check(producer, topics, connection, operation):
|
||||
# This test is quite complex. The goal is to call START/STOP queries
|
||||
# while a CHECK query is waiting for its result. Because the Global
|
||||
# Interpreter Lock, running queries on multiple threads is not useful,
|
||||
# because only one of them can call Cursor::execute at a time. Therefore
|
||||
# multiple processes are used to execute the queries, because different
|
||||
# processes have different GILs.
|
||||
# The counter variables are thread- and process-safe variables to
|
||||
# synchronize between the different processes. Each value represents a
|
||||
# specific phase of the execution of the processes.
|
||||
assert len(topics) > 1
|
||||
assert operation == "START" or operation == "STOP"
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM test_stream "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
|
||||
check_counter = Value('i', 0)
|
||||
check_result_len = Value('i', 0)
|
||||
operation_counter = Value('i', 0)
|
||||
|
||||
CHECK_BEFORE_EXECUTE = 1
|
||||
CHECK_AFTER_FETCHALL = 2
|
||||
CHECK_CORRECT_RESULT = 3
|
||||
CHECK_INCORRECT_RESULT = 4
|
||||
|
||||
def call_check(counter, result_len):
|
||||
# This process will call the CHECK query and increment the counter
|
||||
# based on its progress and expected behavior
|
||||
connection = connect()
|
||||
cursor = connection.cursor()
|
||||
counter.value = CHECK_BEFORE_EXECUTE
|
||||
result = execute_and_fetch_all(cursor, "CHECK STREAM test_stream")
|
||||
result_len.value = len(result)
|
||||
counter.value = CHECK_AFTER_FETCHALL
|
||||
if len(result) > 0 and "payload: 'message'" in result[0][QUERY]:
|
||||
counter.value = CHECK_CORRECT_RESULT
|
||||
else:
|
||||
counter.value = CHECK_INCORRECT_RESULT
|
||||
|
||||
OP_BEFORE_EXECUTE = 1
|
||||
OP_AFTER_FETCHALL = 2
|
||||
OP_ALREADY_STOPPED_EXCEPTION = 3
|
||||
OP_INCORRECT_ALREADY_STOPPED_EXCEPTION = 4
|
||||
OP_UNEXPECTED_EXCEPTION = 5
|
||||
|
||||
def call_operation(counter):
|
||||
# This porcess will call the query with the specified operation and
|
||||
# increment the counter based on its progress and expected behavior
|
||||
connection = connect()
|
||||
cursor = connection.cursor()
|
||||
counter.value = OP_BEFORE_EXECUTE
|
||||
try:
|
||||
execute_and_fetch_all(cursor, f"{operation} STREAM test_stream")
|
||||
counter.value = OP_AFTER_FETCHALL
|
||||
except mgclient.DatabaseError as e:
|
||||
if "Kafka consumer test_stream is already stopped" in str(e):
|
||||
counter.value = OP_ALREADY_STOPPED_EXCEPTION
|
||||
else:
|
||||
counter.value = OP_INCORRECT_ALREADY_STOPPED_EXCEPTION
|
||||
except Exception:
|
||||
counter.value = OP_UNEXPECTED_EXCEPTION
|
||||
|
||||
check_stream_proc = Process(
|
||||
target=call_check, daemon=True, args=(check_counter, check_result_len))
|
||||
operation_proc = Process(target=call_operation,
|
||||
daemon=True, args=(operation_counter,))
|
||||
|
||||
try:
|
||||
check_stream_proc.start()
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE)
|
||||
operation_proc.start()
|
||||
assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
assert timed_wait(lambda: check_counter.value > CHECK_AFTER_FETCHALL)
|
||||
assert check_counter.value == CHECK_CORRECT_RESULT
|
||||
assert check_result_len.value == 1
|
||||
check_stream_proc.join()
|
||||
|
||||
operation_proc.join()
|
||||
if operation == "START":
|
||||
assert operation_counter.value == OP_AFTER_FETCHALL
|
||||
assert get_is_running(cursor, "test_stream")
|
||||
else:
|
||||
assert operation_counter.value == OP_ALREADY_STOPPED_EXCEPTION
|
||||
assert not get_is_running(cursor, "test_stream")
|
||||
|
||||
finally:
|
||||
# to make sure CHECK STREAM finishes
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
if check_stream_proc.is_alive():
|
||||
check_stream_proc.terminate()
|
||||
if operation_proc.is_alive():
|
||||
operation_proc.terminate()
|
||||
|
||||
|
||||
def test_check_already_started_stream(producer, topics, connection):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
execute_and_fetch_all(cursor,
|
||||
"CREATE STREAM started_stream "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
start_stream(cursor, "started_stream")
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
execute_and_fetch_all(cursor, "CHECK STREAM started_stream")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
1
tests/e2e/streams/transformations/CMakeLists.txt
Normal file
1
tests/e2e/streams/transformations/CMakeLists.txt
Normal file
@ -0,0 +1 @@
|
||||
copy_streams_e2e_python_files(transform.py)
|
37
tests/e2e/streams/transformations/transform.py
Normal file
37
tests/e2e/streams/transformations/transform.py
Normal file
@ -0,0 +1,37 @@
|
||||
import mgp
|
||||
|
||||
|
||||
@mgp.transformation
|
||||
def simple(context: mgp.TransCtx,
|
||||
messages: mgp.Messages
|
||||
) -> mgp.Record(query=str, parameters=mgp.Map):
|
||||
|
||||
result_queries = []
|
||||
|
||||
for i in range(0, messages.total_messages()):
|
||||
message = messages.message_at(i)
|
||||
payload_as_str = message.payload().decode("utf-8")
|
||||
result_queries.append(mgp.Record(
|
||||
query=f"CREATE (n:MESSAGE {{timestamp: '{message.timestamp()}', payload: '{payload_as_str}', topic: '{message.topic_name()}'}})",
|
||||
parameters=None))
|
||||
|
||||
return result_queries
|
||||
|
||||
|
||||
@mgp.transformation
|
||||
def with_parameters(context: mgp.TransCtx,
|
||||
messages: mgp.Messages
|
||||
) -> mgp.Record(query=str, parameters=mgp.Map):
|
||||
|
||||
result_queries = []
|
||||
|
||||
for i in range(0, messages.total_messages()):
|
||||
message = messages.message_at(i)
|
||||
payload_as_str = message.payload().decode("utf-8")
|
||||
result_queries.append(mgp.Record(
|
||||
query="CREATE (n:MESSAGE {timestamp: $timestamp, payload: $payload, topic: $topic})",
|
||||
parameters={"timestamp": message.timestamp(),
|
||||
"payload": payload_as_str,
|
||||
"topic": message.topic_name()}))
|
||||
|
||||
return result_queries
|
14
tests/e2e/streams/workloads.yaml
Normal file
14
tests/e2e/streams/workloads.yaml
Normal file
@ -0,0 +1,14 @@
|
||||
template_cluster: &template_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092","--query-execution-timeout-sec=0"]
|
||||
log_file: "streams-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
- name: "Streams start, stop and show"
|
||||
binary: "tests/e2e/streams/streams_test_runner.sh"
|
||||
proc: "tests/e2e/streams/transformations/"
|
||||
args: []
|
||||
<<: *template_cluster
|
@ -7,9 +7,11 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
PIP_DEPS=(
|
||||
"behave==1.2.6"
|
||||
"ldap3==2.6"
|
||||
"kafka-python==2.0.2"
|
||||
"neo4j-driver==4.1.1"
|
||||
"parse==1.18.0"
|
||||
"parse-type==0.5.2"
|
||||
"pytest==6.2.3"
|
||||
"pyyaml==5.3.1"
|
||||
"six==1.15.0"
|
||||
)
|
||||
|
@ -142,7 +142,7 @@ TEST_F(MgpApiTest, TestAllMgpKafkaCApi) {
|
||||
EXPECT_EQ(*mgp_message_key(message), expected[i].key);
|
||||
|
||||
// Test for payload size
|
||||
EXPECT_EQ(mgp_message_get_payload_size(message), expected[i].payload_size);
|
||||
EXPECT_EQ(mgp_message_payload_size(message), expected[i].payload_size);
|
||||
// Test for payload
|
||||
EXPECT_FALSE(std::strcmp(mgp_message_get_payload(message), expected[i].payload));
|
||||
// Test for topic name
|
||||
|
Loading…
Reference in New Issue
Block a user