diff --git a/tests/e2e/mg_utils.py b/tests/e2e/mg_utils.py index 9eec91da0..74cc8dc3a 100644 --- a/tests/e2e/mg_utils.py +++ b/tests/e2e/mg_utils.py @@ -1,12 +1,11 @@ import time -def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.05): +def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2): result = function_to_retrieve_data() start_time = time.time() while result != expected_value: - current_time = time.time() - duration = current_time - start_time + duration = time.time() - start_time if duration > max_duration: assert ( False diff --git a/tests/e2e/streams/common.py b/tests/e2e/streams/common.py index 43b10ba1c..66a045970 100644 --- a/tests/e2e/streams/common.py +++ b/tests/e2e/streams/common.py @@ -9,12 +9,12 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. +import time +from multiprocessing import Manager, Process, Value + import mgclient import pytest -import time - from mg_utils import mg_sleep_and_assert -from multiprocessing import Manager, Process, Value # These are the indices of the different values in the result of SHOW STREAM # query @@ -103,15 +103,53 @@ def get_stream_info(cursor, stream_name): def get_is_running(cursor, stream_name): stream_info = get_stream_info(cursor, stream_name) - - assert stream_info + assert stream_info is not None return stream_info[IS_RUNNING] -def start_stream(cursor, stream_name): - execute_and_fetch_all(cursor, f"START STREAM {stream_name}") +def create_stream( + cursor, + stream_name, + topics, + transformation, + consumer_group=None, + batch_interval=None, + batch_size=None, + bootstrap_servers=None, + configs=None, + credentials=None, +): + query_str = f"CREATE KAFKA STREAM {stream_name} TOPICS {topics} TRANSFORM {transformation}" + if consumer_group is not None: + query_str += f" CONSUMER_GROUP {consumer_group}" + if batch_interval is not None: + query_str += f" BATCH_INTERVAL {batch_interval}" + if batch_size is not None: + query_str += f" BATCH_SIZE {batch_size}" + if bootstrap_servers is not None: + query_str += f" BOOTSTRAP_SERVERS {bootstrap_servers}" + if configs is not None: + query_str += f" CONFIGS {configs}" + if credentials is not None: + query_str += f" CREDENTIALS {credentials}" + execute_and_fetch_all(cursor, query_str) + +def start_stream(cursor, stream_name, sleep=True): + # Sleep is needed because although is_running returns True, + # the stream cannot accept messages yet + execute_and_fetch_all(cursor, f"START STREAM {stream_name}") assert get_is_running(cursor, stream_name) + if sleep: + time.sleep(5) + + +def start_streams(cursor, stream_names): + # Start every stream but don't sleep after each creation + for stream_name in stream_names: + execute_and_fetch_all(cursor, f"START STREAM {stream_name}") + assert get_is_running(cursor, stream_name) + time.sleep(5) def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None): @@ -123,13 +161,11 @@ def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None): def stop_stream(cursor, stream_name): execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}") - assert not get_is_running(cursor, stream_name) def drop_stream(cursor, stream_name): execute_and_fetch_all(cursor, f"DROP STREAM {stream_name}") - assert get_stream_info(cursor, stream_name) is None @@ -227,7 +263,7 @@ def test_start_and_stop_during_check( try: check_stream_proc.start() - time.sleep(0.5) + time.sleep(3) assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE) assert timed_wait(lambda: get_is_running(cursor, "test_stream")) @@ -260,25 +296,22 @@ def test_start_and_stop_during_check( def test_start_checked_stream_after_timeout(connection, stream_creator): cursor = connection.cursor() - execute_and_fetch_all(cursor, stream_creator("test_stream")) + stream_name = "test_start_checked_stream_after_timeout" + execute_and_fetch_all(cursor, stream_creator(stream_name)) - TIMEOUT_IN_MS = 2000 - TIMEOUT_IN_SECONDS = TIMEOUT_IN_MS / 1000 + timeout_in_ms = 2000 def call_check(): - execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_IN_MS}") + execute_and_fetch_all(connect().cursor(), f"CHECK STREAM {stream_name} TIMEOUT {timeout_in_ms}") check_stream_proc = Process(target=call_check, daemon=True) - start = time.time() check_stream_proc.start() - assert timed_wait(lambda: get_is_running(cursor, "test_stream")) - start_stream(cursor, "test_stream") - end = time.time() + assert timed_wait(lambda: get_is_running(cursor, stream_name)) + start_stream(cursor, stream_name) - assert (end - start) < 1.3 * TIMEOUT_IN_SECONDS, "The START STREAM was blocked too long" - assert get_is_running(cursor, "test_stream") - stop_stream(cursor, "test_stream") + assert get_is_running(cursor, stream_name) + stop_stream(cursor, stream_name) def test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender): @@ -313,7 +346,6 @@ def test_check_stream_same_number_of_queries_than_messages(connection, stream_cr # # {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}] # # -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"}, # # {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}] - assert len(test_results.value) == BATCH_LIMIT expected_queries_and_raw_messages_1 = ( @@ -351,7 +383,7 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre STREAM_NAME = "test_stream" cursor = connection.cursor() execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE)) - time.sleep(2) + time.sleep(3) results = Manager().Namespace() @@ -413,30 +445,32 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre assert expected_queries_and_raw_messages_3 == results.value[2] -def test_start_stream_with_batch_limit(connection, stream_creator, messages_sender): - STREAM_NAME = "test" +def test_start_stream_with_batch_limit(connection, stream_name, stream_creator, messages_sender): BATCH_LIMIT = 5 + TIMEOUT = 10000 cursor = connection.cursor() - execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + execute_and_fetch_all(cursor, stream_creator()) + results = execute_and_fetch_all(connection.cursor(), "SHOW STREAMS") + assert len(results) == 1 - def start_new_stream_with_limit(stream_name, batch_limit): + def start_new_stream_with_limit(): connection = connect() cursor = connection.cursor() - start_stream_with_limit(cursor, stream_name, batch_limit) - - thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT)) - thread_stream_running.start() + start_stream_with_limit(cursor, stream_name, BATCH_LIMIT, TIMEOUT) def is_running(): - return get_is_running(cursor, STREAM_NAME) + return get_is_running(cursor, stream_name) + thread_stream_running = Process(target=start_new_stream_with_limit) + thread_stream_running.start() + + execute_and_fetch_all(connection.cursor(), "SHOW STREAMS") assert mg_sleep_and_assert(True, is_running) - messages_sender(BATCH_LIMIT - 1) # We have not sent enough batches to reach the limit. We check that the stream is still correctly running. - assert get_is_running(cursor, STREAM_NAME) + assert get_is_running(cursor, stream_name) # We send a last message to reach the batch_limit messages_sender(1) @@ -549,7 +583,6 @@ def test_check_while_stream_with_batch_limit_running(connection, stream_creator, STREAM_NAME = "test_batch_limit_and_check" BATCH_LIMIT = 1 TIMEOUT = 10000 - TIMEOUT_IN_SECONDS = TIMEOUT / 1000 cursor = connection.cursor() execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) @@ -625,7 +658,7 @@ def test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stre cursor = connection.cursor() execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) - time.sleep(2) + time.sleep(3) # 1/ checking with batch_limit=-10 batch_limit = -10 diff --git a/tests/e2e/streams/conftest.py b/tests/e2e/streams/conftest.py index a26138b53..1bf3544c2 100644 --- a/tests/e2e/streams/conftest.py +++ b/tests/e2e/streams/conftest.py @@ -9,15 +9,14 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. +import pulsar import pytest +from common import NAME, PULSAR_SERVICE_URL, connect, execute_and_fetch_all from kafka import KafkaProducer from kafka.admin import KafkaAdminClient, NewTopic -import pulsar import requests -from common import NAME, connect, execute_and_fetch_all, PULSAR_SERVICE_URL - # To run these test locally a running Kafka sever is necessery. The test tries # to connect on localhost:9092. @@ -37,29 +36,22 @@ def connection(): def get_topics(num): - return [f'topic_{i}' for i in range(num)] + return [f"topic_{i}" for i in range(num)] @pytest.fixture(scope="function") def kafka_topics(): - admin_client = KafkaAdminClient( - bootstrap_servers="localhost:9092", - client_id="test") + admin_client = KafkaAdminClient(bootstrap_servers="localhost:29092", client_id="test") # The issue arises if we remove default kafka topics, e.g. # "__consumer_offsets" - previous_topics = [ - topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"] + previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"] if previous_topics: admin_client.delete_topics(topics=previous_topics, timeout_ms=5000) topics = get_topics(3) topics_to_create = [] for topic in topics: - topics_to_create.append( - NewTopic( - name=topic, - num_partitions=1, - replication_factor=1)) + topics_to_create.append(NewTopic(name=topic, num_partitions=1, replication_factor=1)) admin_client.create_topics(new_topics=topics_to_create, timeout_ms=5000) yield topics @@ -68,7 +60,7 @@ def kafka_topics(): @pytest.fixture(scope="function") def kafka_producer(): - yield KafkaProducer(bootstrap_servers="localhost:9092") + yield KafkaProducer(bootstrap_servers=["localhost:29092"], api_version_auto_timeout_ms=10000) @pytest.fixture(scope="function") @@ -80,6 +72,5 @@ def pulsar_client(): def pulsar_topics(): topics = get_topics(3) for topic in topics: - requests.delete( - f'http://127.0.0.1:6652/admin/v2/persistent/public/default/{topic}?force=true') + requests.delete(f"http://localhost:6652/admin/v2/persistent/public/default/{topic}?force=true") yield topics diff --git a/tests/e2e/streams/kafka.yml b/tests/e2e/streams/kafka.yml deleted file mode 100644 index f9d213864..000000000 --- a/tests/e2e/streams/kafka.yml +++ /dev/null @@ -1,20 +0,0 @@ -version: '3.7' -services: - zookeeper: - image: 'bitnami/zookeeper:latest' - ports: - - '2181:2181' - environment: - - ALLOW_ANONYMOUS_LOGIN=yes - kafka: - image: 'bitnami/kafka:latest' - ports: - - '9092:9092' - environment: - - KAFKA_BROKER_ID=1 - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - ALLOW_PLAINTEXT_LISTENER=yes - depends_on: - - zookeeper diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index 3c1213019..b988a6c26 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -12,31 +12,30 @@ # licenses/APL.txt. import sys -import pytest -import mgclient import time -from mg_utils import mg_sleep_and_assert -from multiprocessing import Process, Value +from multiprocessing import Process + import common +import mgclient +import pytest +from mg_utils import mg_sleep_and_assert TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"] - TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"] +KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT = 60 @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) def test_simple(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 + stream_name = "test_simple_" + transformation.split(".")[1] + cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation}", - ) - common.start_stream(cursor, "test") - time.sleep(5) + common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation) + common.start_stream(cursor, stream_name) for topic in kafka_topics: - kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) + kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) for topic in kafka_topics: common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) @@ -47,22 +46,15 @@ def test_separate_consumers(kafka_producer, kafka_topics, connection, transforma assert len(kafka_topics) > 0 cursor = connection.cursor() - stream_names = [] - for topic in kafka_topics: - stream_name = "stream_" + topic - stream_names.append(stream_name) - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM {stream_name} TOPICS {topic} TRANSFORM {transformation}", - ) + stream_names = ["stream_" + transformation.split(".")[1] + "_" + topic for topic in kafka_topics] - for stream_name in stream_names: - common.start_stream(cursor, stream_name) + for stream_name, topic in zip(stream_names, kafka_topics): + common.create_stream(cursor, stream_name, topic, transformation) - time.sleep(5) + common.start_streams(cursor, stream_names) for topic in kafka_topics: - kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) + kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) for topic in kafka_topics: common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) @@ -77,23 +69,20 @@ def test_start_from_last_committed_offset(kafka_producer, kafka_topics, connecti # restarting Memgraph during a single workload cannot be done currently. assert len(kafka_topics) > 0 cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", - ) - common.start_stream(cursor, "test") - time.sleep(1) - kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) + stream_name = "test_start_from_last_committed_offset" + common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.simple") + common.start_stream(cursor, stream_name) + kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], common.SIMPLE_MSG) - common.stop_stream(cursor, "test") - common.drop_stream(cursor, "test") + common.stop_stream(cursor, stream_name) + common.drop_stream(cursor, stream_name) messages = [b"second message", b"third message"] for message in messages: - kafka_producer.send(kafka_topics[0], message).get(timeout=60) + kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) for message in messages: vertices_with_msg = common.execute_and_fetch_all( @@ -103,11 +92,8 @@ def test_start_from_last_committed_offset(kafka_producer, kafka_topics, connecti assert len(vertices_with_msg) == 0 - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", - ) - common.start_stream(cursor, "test") + common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.simple") + common.start_stream(cursor, stream_name) for message in messages: common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message) @@ -118,32 +104,26 @@ def test_check_stream(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 BATCH_SIZE = 1 INDEX_OF_FIRST_BATCH = 0 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}", - ) - common.start_stream(cursor, "test") - time.sleep(1) + stream_name = "test_check_stream_" + transformation.split(".")[1] - kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) - common.stop_stream(cursor, "test") + cursor = connection.cursor() + + common.create_stream(cursor, stream_name, kafka_topics[0], transformation, batch_size=BATCH_SIZE) + common.start_stream(cursor, stream_name) + kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) + common.stop_stream(cursor, stream_name) messages = [b"first message", b"second message", b"third message"] for message in messages: - kafka_producer.send(kafka_topics[0], message).get(timeout=60) + kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) def check_check_stream(batch_limit): - assert transformation == "kafka_transform.simple" or transformation == "kafka_transform.with_parameters" - test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}") + test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit}") assert len(test_results) == batch_limit for i in range(batch_limit): message_as_str = messages[i].decode("utf-8") - assert ( - BATCH_SIZE == 1 - ) # If batch size != 1, then the usage of INDEX_OF_FIRST_BATCH must change: the result will have a list of queries (pair) - + # If batch size != 1, then the usage of INDEX_OF_FIRST_BATCH must change: the result will have a list of queries (pair) if transformation == "kafka_transform.simple": assert ( f"payload: '{message_as_str}'" @@ -165,41 +145,48 @@ def test_check_stream(kafka_producer, kafka_topics, connection, transformation): check_check_stream(1) check_check_stream(2) check_check_stream(3) - common.start_stream(cursor, "test") + common.start_stream(cursor, stream_name) for message in messages: common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message) -def test_show_streams(kafka_producer, kafka_topics, connection): +def test_show_streams(kafka_topics, connection): assert len(kafka_topics) > 1 cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM default_values TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BOOTSTRAP_SERVERS 'localhost:9092'", - ) consumer_group = "my_special_consumer_group" BATCH_INTERVAL = 42 BATCH_SIZE = 3 - common.execute_and_fetch_all( + default_values_stream = "default_values" + complex_values_stream = "complex_values" + + common.create_stream( + cursor, default_values_stream, kafka_topics[0], "kafka_transform.simple", bootstrap_servers="'localhost:29092'" + ) + common.create_stream( cursor, - f"CREATE KAFKA STREAM complex_values TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.with_parameters CONSUMER_GROUP {consumer_group} BATCH_INTERVAL {BATCH_INTERVAL} BATCH_SIZE {BATCH_SIZE} ", + complex_values_stream, + ",".join(kafka_topics), + "kafka_transform.with_parameters", + consumer_group=consumer_group, + batch_interval=BATCH_INTERVAL, + batch_size=BATCH_SIZE, ) assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2 common.check_stream_info( cursor, - "default_values", - ("default_values", "kafka", 100, 1000, "kafka_transform.simple", None, False), + default_values_stream, + (default_values_stream, "kafka", 100, 1000, "kafka_transform.simple", None, False), ) common.check_stream_info( cursor, - "complex_values", + complex_values_stream, ( - "complex_values", + complex_values_stream, "kafka", BATCH_INTERVAL, BATCH_SIZE, @@ -219,7 +206,7 @@ def test_start_and_stop_during_check(kafka_producer, kafka_topics, connection, o return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {BATCH_SIZE}" def message_sender(msg): - kafka_producer.send(kafka_topics[0], msg).get(timeout=60) + kafka_producer.send(kafka_topics[0], msg).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) common.test_start_and_stop_during_check( operation, @@ -235,14 +222,12 @@ def test_check_already_started_stream(kafka_topics, connection): assert len(kafka_topics) > 0 cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM started_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", - ) - common.start_stream(cursor, "started_stream") + stream_name = "test_check_already_started_stream" + common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.simple") + common.start_stream(cursor, stream_name) with pytest.raises(mgclient.DatabaseError): - common.execute_and_fetch_all(cursor, "CHECK STREAM started_stream") + common.execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name}") def test_start_checked_stream_after_timeout(kafka_topics, connection): @@ -254,19 +239,14 @@ def test_start_checked_stream_after_timeout(kafka_topics, connection): def test_restart_after_error(kafka_producer, kafka_topics, connection): cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM test_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.query", - ) + stream_name = "test_restart_after_error" + common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.query") + common.start_stream(cursor, stream_name) - common.start_stream(cursor, "test_stream") - time.sleep(1) + kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) + assert common.timed_wait(lambda: not common.get_is_running(cursor, stream_name)) - kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) - assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream")) - - common.start_stream(cursor, "test_stream") - time.sleep(1) + common.start_stream(cursor, stream_name) kafka_producer.send(kafka_topics[0], b"CREATE (n:VERTEX { id : 42 })") assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") @@ -275,23 +255,21 @@ def test_restart_after_error(kafka_producer, kafka_topics, connection): def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 cursor = connection.cursor() - LOCAL = "localhost:9092" - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation} BOOTSTRAP_SERVERS '{LOCAL}'", - ) - common.start_stream(cursor, "test") - time.sleep(5) + local = "'localhost:29092'" + stream_name = "test_bootstrap_server_" + transformation.split(".")[1] + + common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation, bootstrap_servers=local) + common.start_stream(cursor, stream_name) for topic in kafka_topics: - kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) + kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) for topic in kafka_topics: common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) -def test_bootstrap_server_empty(kafka_producer, kafka_topics, connection, transformation): +def test_bootstrap_server_empty(kafka_topics, connection, transformation): assert len(kafka_topics) > 0 cursor = connection.cursor() with pytest.raises(mgclient.DatabaseError): @@ -312,7 +290,7 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): messages = [f"{i} message" for i in range(1, 21)] for message in messages: - kafka_producer.send(kafka_topics[0], message.encode()).get(timeout=60) + kafka_producer.send(kafka_topics[0], message.encode()).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) def consume(expected_msgs): common.start_stream(cursor, "test") @@ -352,7 +330,7 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): res = execute_set_offset_and_consume(-2, []) assert len(res) == 0 last_msg = "Final Message" - kafka_producer.send(kafka_topics[0], last_msg.encode()).get(timeout=60) + kafka_producer.send(kafka_topics[0], last_msg.encode()).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) res = consume([last_msg]) assert len(res) == 1 assert comparison_check("Final Message", res[0]) @@ -361,21 +339,27 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): def test_info_procedure(kafka_topics, connection): cursor = connection.cursor() - STREAM_NAME = "test_stream" - CONFIGS = {"sasl.username": "michael.scott"} - LOCAL = "localhost:9092" - CREDENTIALS = {"sasl.password": "S3cr3tP4ssw0rd"} - CONSUMER_GROUP = "ConsumerGr" - common.execute_and_fetch_all( + stream_name = "test_stream" + configs = {"sasl.username": "michael.scott"} + local = "localhost:29092" + credentials = {"sasl.password": "S3cr3tP4ssw0rd"} + consumer_group = "ConsumerGr" + + common.create_stream( cursor, - f"CREATE KAFKA STREAM {STREAM_NAME} TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.simple CONSUMER_GROUP {CONSUMER_GROUP} BOOTSTRAP_SERVERS '{LOCAL}' CONFIGS {CONFIGS} CREDENTIALS {CREDENTIALS}", + stream_name, + ",".join(kafka_topics), + "kafka_transform.simple", + consumer_group=consumer_group, + bootstrap_servers=f"'{local}'", + configs=configs, + credentials=credentials, ) + stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *") - stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{STREAM_NAME}') YIELD *") + reducted_credentials = {key: "" for key in credentials.keys()} - reducted_credentials = {key: "" for key in CREDENTIALS.keys()} - - expected_stream_info = [(LOCAL, CONFIGS, CONSUMER_GROUP, reducted_credentials, kafka_topics)] + expected_stream_info = [(local, configs, consumer_group, reducted_credentials, kafka_topics)] common.validate_info(stream_info, expected_stream_info) @@ -398,7 +382,7 @@ def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" def message_sender(msg): - kafka_producer.send(kafka_topics[0], msg).get(timeout=60) + kafka_producer.send(kafka_topics[0], msg).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender) @@ -409,30 +393,33 @@ def test_check_stream_different_number_of_queries_than_messages(kafka_producer, TRANSFORMATION = "common_transform.check_stream_with_filtering" def stream_creator(stream_name, batch_size): - return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" + return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" def message_sender(msg): - kafka_producer.send(kafka_topics[0], msg).get(timeout=60) + kafka_producer.send(kafka_topics[0], msg).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender) def test_start_stream_with_batch_limit(kafka_producer, kafka_topics, connection): assert len(kafka_topics) > 0 + STREAM_NAME = "test_start_stream_with_batch_limit" - def stream_creator(stream_name): + def stream_creator(): return ( - f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + f"CREATE KAFKA STREAM {STREAM_NAME} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" ) def messages_sender(nof_messages): - for x in range(nof_messages): - kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) + for _ in range(nof_messages): + kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get( + timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT + ) - common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender) + common.test_start_stream_with_batch_limit(connection, STREAM_NAME, stream_creator, messages_sender) -def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, connection): +def test_start_stream_with_batch_limit_timeout(kafka_topics, connection): assert len(kafka_topics) > 0 def stream_creator(stream_name): @@ -443,7 +430,7 @@ def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, con common.test_start_stream_with_batch_limit_timeout(connection, stream_creator) -def test_start_stream_with_batch_limit_reaching_timeout(kafka_producer, kafka_topics, connection): +def test_start_stream_with_batch_limit_reaching_timeout(kafka_topics, connection): assert len(kafka_topics) > 0 def stream_creator(stream_name, batch_size): @@ -461,7 +448,7 @@ def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka ) def message_sender(message): - kafka_producer.send(kafka_topics[0], message).get(timeout=6000) + kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout): thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout)) @@ -488,12 +475,12 @@ def test_check_while_stream_with_batch_limit_running(kafka_producer, kafka_topic ) def message_sender(message): - kafka_producer.send(kafka_topics[0], message).get(timeout=6000) + kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT) common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender) -def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection): +def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_topics, connection): assert len(kafka_topics) > 0 def stream_creator(stream_name): @@ -504,7 +491,7 @@ def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator) -def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection): +def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_topics, connection): assert len(kafka_topics) > 0 def stream_creator(stream_name): diff --git a/tests/e2e/streams/pulsar.yml b/tests/e2e/streams/pulsar.yml deleted file mode 100644 index 31241ad66..000000000 --- a/tests/e2e/streams/pulsar.yml +++ /dev/null @@ -1,8 +0,0 @@ -version: '3.7' -services: - pulsar: - image: 'apachepulsar/pulsar:latest' - ports: - - '6652:8080' - - '6650:6650' - entrypoint: ['bin/pulsar', 'standalone'] diff --git a/tests/e2e/streams/pulsar_streams_tests.py b/tests/e2e/streams/pulsar_streams_tests.py index f68bf14d6..cf52416cb 100755 --- a/tests/e2e/streams/pulsar_streams_tests.py +++ b/tests/e2e/streams/pulsar_streams_tests.py @@ -12,11 +12,12 @@ # licenses/APL.txt. import sys -import pytest -import mgclient import time from multiprocessing import Process, Value + import common +import mgclient +import pytest TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"] @@ -80,12 +81,6 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection): # inbetween should be lost. Additionally, we check that consumer continues from the correct message # after stopping and starting again. assert len(pulsar_topics) > 0 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple", - ) - common.start_stream(cursor, "test") def assert_message_not_consumed(message): vertices_with_msg = common.execute_and_fetch_all( @@ -95,6 +90,13 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection): assert len(vertices_with_msg) == 0 + cursor = connection.cursor() + common.execute_and_fetch_all( + cursor, + f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple", + ) + common.start_stream(cursor, "test") + producer = pulsar_client.create_producer( common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 ) @@ -131,7 +133,7 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection): producer.send(message) assert_message_not_consumed(message) - common.start_stream(cursor, "test") + common.start_stream(cursor, "test", sleep=False) assert_message_not_consumed(LOST_MESSAGE) @@ -338,19 +340,20 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation): def test_start_stream_with_batch_limit(pulsar_client, pulsar_topics, connection): assert len(pulsar_topics) > 1 + STREAM_NAME = "test_start_stream_with_batch_limit" - def stream_creator(stream_name): - return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + def stream_creator(): + return f"CREATE PULSAR STREAM {STREAM_NAME} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" producer = pulsar_client.create_producer( common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 ) def messages_sender(nof_messages): - for x in range(nof_messages): + for _ in range(nof_messages): producer.send(common.SIMPLE_MSG) - common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender) + common.test_start_stream_with_batch_limit(connection, STREAM_NAME, stream_creator, messages_sender) def test_start_stream_with_batch_limit_timeout(pulsar_client, pulsar_topics, connection): @@ -409,7 +412,7 @@ def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar TRANSFORMATION = "common_transform.check_stream_no_filtering" def stream_creator(stream_name, batch_size): - return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} " + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} " producer = pulsar_client.create_producer( common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000