diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index fc20f8b25..8a7ba6ed8 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -729,7 +729,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete return callback; } case StreamQuery::Action::CHECK_STREAM: { - callback.header = {"query", "parameters"}; + callback.header = {"queries", "raw messages"}; callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator), batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable { diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index 7e2dd5c6b..f532f4b34 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -43,6 +43,7 @@ extern const Event MessagesConsumed; namespace memgraph::query::stream { namespace { inline constexpr auto kExpectedTransformationResultSize = 2; +inline constexpr auto kCheckStreamResultSize = 2; const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()}; const utils::pmr::string params_param_name{"parameters", utils::NewDeleteResource()}; @@ -724,15 +725,27 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona auto accessor = interpreter_context->db->Access(); CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); - for (auto &row : result.rows) { - auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name); - std::vector<TypedValue> result_row; - result_row.reserve(kExpectedTransformationResultSize); - result_row.push_back(std::move(query)); - result_row.push_back(std::move(parameters)); + auto result_row = std::vector<TypedValue>(); + result_row.reserve(kCheckStreamResultSize); - test_result.push_back(std::move(result_row)); - } + auto queries_and_parameters = std::vector<TypedValue>(result.rows.size()); + std::transform( + result.rows.cbegin(), result.rows.cend(), queries_and_parameters.begin(), [&](const auto &row) { + auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name); + + return std::map<std::string, TypedValue>{{"query", std::move(query)}, + {"parameters", std::move(parameters)}}; + }); + result_row.emplace_back(std::move(queries_and_parameters)); + + auto messages_list = std::vector<TypedValue>(messages.size()); + std::transform(messages.cbegin(), messages.cend(), messages_list.begin(), [](const auto &message) { + return std::string_view(message.Payload().data(), message.Payload().size()); + }); + + result_row.emplace_back(std::move(messages_list)); + + test_result.emplace_back(std::move(result_row)); }; locked_stream_source->Check(timeout, batch_limit, consumer_function); diff --git a/tests/e2e/streams/common.py b/tests/e2e/streams/common.py index 7f94429da..8de3637fc 100644 --- a/tests/e2e/streams/common.py +++ b/tests/e2e/streams/common.py @@ -12,7 +12,7 @@ import mgclient import time -from multiprocessing import Process, Value +from multiprocessing import Manager, Process, Value # These are the indices of the different values in the result of SHOW STREAM # query @@ -26,8 +26,10 @@ IS_RUNNING = 6 # These are the indices of the query and parameters in the result of CHECK # STREAM query -QUERY = 0 -PARAMS = 1 +QUERIES = 0 +RAWMESSAGES = 1 +PARAMETERS_LITERAL = "parameters" +QUERY_LITERAL = "query" SIMPLE_MSG = b"message" @@ -45,13 +47,13 @@ def connect(**kwargs): def timed_wait(fun): start_time = time.time() - seconds = 10 + SECONDS = 10 while True: current_time = time.time() elapsed_time = current_time - start_time - if elapsed_time > seconds: + if elapsed_time > SECONDS: return False if fun(): @@ -62,13 +64,13 @@ def timed_wait(fun): def check_one_result_row(cursor, query): start_time = time.time() - seconds = 10 + SECONDS = 10 while True: current_time = time.time() elapsed_time = current_time - start_time - if elapsed_time > seconds: + if elapsed_time > SECONDS: return False cursor.execute(query) @@ -81,12 +83,10 @@ def check_one_result_row(cursor, query): def check_vertex_exists_with_properties(cursor, properties): - properties_string = ', '.join([f'{k}: {v}' for k, v in properties.items()]) + properties_string = ", ".join([f"{k}: {v}" for k, v in properties.items()]) assert check_one_result_row( cursor, - "MATCH (n: MESSAGE {" - f"{properties_string}" - "}) RETURN n", + f"MATCH (n: MESSAGE {{{properties_string}}}) RETURN n", ) @@ -129,28 +129,27 @@ def validate_info(actual_stream_info, expected_stream_info): for info, expected_info in zip(actual_stream_info, expected_stream_info): assert info == expected_info + def check_stream_info(cursor, stream_name, expected_stream_info): stream_info = get_stream_info(cursor, stream_name) validate_info(stream_info, expected_stream_info) + def kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes): - decoded_payload = payload_bytes.decode('utf-8') - check_vertex_exists_with_properties( - cursor, {'topic': f'"{topic}"', 'payload': f'"{decoded_payload}"'}) + decoded_payload = payload_bytes.decode("utf-8") + check_vertex_exists_with_properties(cursor, {"topic": f'"{topic}"', "payload": f'"{decoded_payload}"'}) -PULSAR_SERVICE_URL = 'pulsar://127.0.0.1:6650' +PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650" + def pulsar_default_namespace_topic(topic): - return f'persistent://public/default/{topic}' + return f"persistent://public/default/{topic}" def test_start_and_stop_during_check( - operation, - connection, - stream_creator, - message_sender, - already_stopped_error): + operation, connection, stream_creator, message_sender, already_stopped_error, batchSize +): # This test is quite complex. The goal is to call START/STOP queries # while a CHECK query is waiting for its result. Because the Global # Interpreter Lock, running queries on multiple threads is not useful, @@ -161,11 +160,9 @@ def test_start_and_stop_during_check( # synchronize between the different processes. Each value represents a # specific phase of the execution of the processes. assert operation in ["START", "STOP"] + assert batchSize == 1 cursor = connection.cursor() - execute_and_fetch_all( - cursor, - stream_creator('test_stream') - ) + execute_and_fetch_all(cursor, stream_creator("test_stream")) check_counter = Value("i", 0) check_result_len = Value("i", 0) @@ -185,7 +182,9 @@ def test_start_and_stop_during_check( result = execute_and_fetch_all(cursor, "CHECK STREAM test_stream") result_len.value = len(result) counter.value = CHECK_AFTER_FETCHALL - if len(result) > 0 and "payload: 'message'" in result[0][QUERY]: + if ( + len(result) > 0 and "payload: 'message'" in result[0][QUERIES][0][QUERY_LITERAL] + ): # The 0 is only correct because batchSize is 1 counter.value = CHECK_CORRECT_RESULT else: counter.value = CHECK_INCORRECT_RESULT @@ -213,12 +212,8 @@ def test_start_and_stop_during_check( except Exception: counter.value = OP_UNEXPECTED_EXCEPTION - check_stream_proc = Process( - target=call_check, daemon=True, args=(check_counter, check_result_len) - ) - operation_proc = Process( - target=call_operation, daemon=True, args=(operation_counter,) - ) + check_stream_proc = Process(target=call_check, daemon=True, args=(check_counter, check_result_len)) + operation_proc = Process(target=call_operation, daemon=True, args=(operation_counter,)) try: check_stream_proc.start() @@ -227,9 +222,7 @@ def test_start_and_stop_during_check( assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE) assert timed_wait(lambda: get_is_running(cursor, "test_stream")) - assert check_counter.value == CHECK_BEFORE_EXECUTE, ( - "SHOW STREAMS " "was blocked until the end of CHECK STREAM" - ) + assert check_counter.value == CHECK_BEFORE_EXECUTE, "SHOW STREAMS " "was blocked until the end of CHECK STREAM" operation_proc.start() assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE) @@ -255,31 +248,156 @@ def test_start_and_stop_during_check( if operation_proc.is_alive(): operation_proc.terminate() + def test_start_checked_stream_after_timeout(connection, stream_creator): cursor = connection.cursor() - execute_and_fetch_all( - cursor, - stream_creator('test_stream') - ) + execute_and_fetch_all(cursor, stream_creator("test_stream")) - timeout_ms = 2000 + TIMEOUT_MS = 2000 def call_check(): - execute_and_fetch_all( - connect().cursor(), - f"CHECK STREAM test_stream TIMEOUT {timeout_ms}") + execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_MS}") check_stream_proc = Process(target=call_check, daemon=True) start = time.time() check_stream_proc.start() - assert timed_wait( - lambda: get_is_running( - cursor, "test_stream")) + assert timed_wait(lambda: get_is_running(cursor, "test_stream")) start_stream(cursor, "test_stream") end = time.time() - assert (end - start) < 1.3 * \ - timeout_ms, "The START STREAM was blocked too long" + assert (end - start) < 1.3 * TIMEOUT_MS, "The START STREAM was blocked too long" assert get_is_running(cursor, "test_stream") stop_stream(cursor, "test_stream") + + +def test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender): + BATCH_SIZE = 2 + BATCH_LIMIT = 3 + STREAM_NAME = "test_stream" + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE)) + time.sleep(2) + + test_results = Manager().Namespace() + + def check_stream(stream_name, batch_limit): + connection = connect() + cursor = connection.cursor() + test_results.value = execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} ") + + check_stream_proc = Process(target=check_stream, args=(STREAM_NAME, BATCH_LIMIT)) + check_stream_proc.start() + time.sleep(2) + + MESSAGES = [b"01", b"02", b"03", b"04", b"05", b"06"] + for message in MESSAGES: + message_sender(message) + + check_stream_proc.join() + + # # Transformation does not do any filtering and simply create queries as "Messages: {contentOfMessage}". Queries should be like: + # # -Batch 1: [{parameters: {"value": "Parameter: 01"}, query: "Message: 01"}, + # # {parameters: {"value": "Parameter: 02"}, query: "Message: 02"}] + # # -Batch 2: [{parameters: {"value": "Parameter: 03"}, query: "Message: 03"}, + # # {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}] + # # -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"}, + # # {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}] + + assert len(test_results.value) == BATCH_LIMIT + + expected_queries_and_raw_messages_1 = ( + [ # queries + {PARAMETERS_LITERAL: {"value": "Parameter: 01"}, QUERY_LITERAL: "Message: 01"}, + {PARAMETERS_LITERAL: {"value": "Parameter: 02"}, QUERY_LITERAL: "Message: 02"}, + ], + ["01", "02"], # raw message + ) + + expected_queries_and_raw_messages_2 = ( + [ # queries + {PARAMETERS_LITERAL: {"value": "Parameter: 03"}, QUERY_LITERAL: "Message: 03"}, + {PARAMETERS_LITERAL: {"value": "Parameter: 04"}, QUERY_LITERAL: "Message: 04"}, + ], + ["03", "04"], # raw message + ) + + expected_queries_and_raw_messages_3 = ( + [ # queries + {PARAMETERS_LITERAL: {"value": "Parameter: 05"}, QUERY_LITERAL: "Message: 05"}, + {PARAMETERS_LITERAL: {"value": "Parameter: 06"}, QUERY_LITERAL: "Message: 06"}, + ], + ["05", "06"], # raw message + ) + + assert expected_queries_and_raw_messages_1 == test_results.value[0] + assert expected_queries_and_raw_messages_2 == test_results.value[1] + assert expected_queries_and_raw_messages_3 == test_results.value[2] + + +def test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender): + BATCH_SIZE = 2 + BATCH_LIMIT = 3 + STREAM_NAME = "test_stream" + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE)) + time.sleep(2) + + results = Manager().Namespace() + + def check_stream(stream_name, batch_limit): + connection = connect() + cursor = connection.cursor() + results.value = execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} ") + + check_stream_proc = Process(target=check_stream, args=(STREAM_NAME, BATCH_LIMIT)) + check_stream_proc.start() + time.sleep(2) + + MESSAGES = [b"a_01", b"a_02", b"03", b"04", b"b_05", b"06"] + for message in MESSAGES: + message_sender(message) + + check_stream_proc.join() + + # Transformation does some filtering: if message contains "a", it is ignored. + # Transformation also has special rule to create query if message is "b": it create more queries. + # + # Queries should be like: + # -Batch 1: [] + # -Batch 2: [{parameters: {"value": "Parameter: 03"}, query: "Message: 03"}, + # {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}] + # -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"}, + # {parameters: {"value": "Parameter: extra_05"}, query: "Message: extra_05"} + # {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}] + + assert len(results.value) == BATCH_LIMIT + + expected_queries_and_raw_messages_1 = ( + [], # queries + ["a_01", "a_02"], # raw message + ) + + expected_queries_and_raw_messages_2 = ( + [ # queries + {PARAMETERS_LITERAL: {"value": "Parameter: 03"}, QUERY_LITERAL: "Message: 03"}, + {PARAMETERS_LITERAL: {"value": "Parameter: 04"}, QUERY_LITERAL: "Message: 04"}, + ], + ["03", "04"], # raw message + ) + + expected_queries_and_raw_messages_3 = ( + [ # queries + {PARAMETERS_LITERAL: {"value": "Parameter: b_05"}, QUERY_LITERAL: "Message: b_05"}, + { + PARAMETERS_LITERAL: {"value": "Parameter: extra_b_05"}, + QUERY_LITERAL: "Message: extra_b_05", + }, + {PARAMETERS_LITERAL: {"value": "Parameter: 06"}, QUERY_LITERAL: "Message: 06"}, + ], + ["b_05", "06"], # raw message + ) + + assert expected_queries_and_raw_messages_1 == results.value[0] + assert expected_queries_and_raw_messages_2 == results.value[1] + assert expected_queries_and_raw_messages_3 == results.value[2] diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index 0d2e14c78..63cc291c9 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -18,12 +18,10 @@ import time from multiprocessing import Process, Value import common -TRANSFORMATIONS_TO_CHECK_C = [ - "empty_transformation"] +TRANSFORMATIONS_TO_CHECK_C = ["empty_transformation"] + +TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"] -TRANSFORMATIONS_TO_CHECK_PY = [ - "kafka_transform.simple", - "kafka_transform.with_parameters"] @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) def test_simple(kafka_producer, kafka_topics, connection, transformation): @@ -31,9 +29,7 @@ def test_simple(kafka_producer, kafka_topics, connection, transformation): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {','.join(kafka_topics)} " - f"TRANSFORM {transformation}", + f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation}", ) common.start_stream(cursor, "test") time.sleep(5) @@ -42,16 +38,11 @@ def test_simple(kafka_producer, kafka_topics, connection, transformation): kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) for topic in kafka_topics: - common.kafka_check_vertex_exists_with_topic_and_payload( - cursor, topic, common.SIMPLE_MSG) + common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) -def test_separate_consumers( - kafka_producer, - kafka_topics, - connection, - transformation): +def test_separate_consumers(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 cursor = connection.cursor() @@ -61,9 +52,7 @@ def test_separate_consumers( stream_names.append(stream_name) common.execute_and_fetch_all( cursor, - f"CREATE KAFKA STREAM {stream_name} " - f"TOPICS {topic} " - f"TRANSFORM {transformation}", + f"CREATE KAFKA STREAM {stream_name} TOPICS {topic} TRANSFORM {transformation}", ) for stream_name in stream_names: @@ -75,12 +64,10 @@ def test_separate_consumers( kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) for topic in kafka_topics: - common.kafka_check_vertex_exists_with_topic_and_payload( - cursor, topic, common.SIMPLE_MSG) + common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) -def test_start_from_last_committed_offset( - kafka_producer, kafka_topics, connection): +def test_start_from_last_committed_offset(kafka_producer, kafka_topics, connection): # This test creates a stream, consumes a message to have a committed # offset, then destroys the stream. A new message is sent before the # stream is recreated and then restarted. This simulates when Memgraph is @@ -90,16 +77,15 @@ def test_start_from_last_committed_offset( assert len(kafka_topics) > 0 cursor = connection.cursor() common.execute_and_fetch_all( - cursor, "CREATE KAFKA STREAM test " - f"TOPICS {kafka_topics[0]} " - "TRANSFORM kafka_transform.simple", ) + cursor, + f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", + ) common.start_stream(cursor, "test") time.sleep(1) kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) - common.kafka_check_vertex_exists_with_topic_and_payload( - cursor, kafka_topics[0], common.SIMPLE_MSG) + common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], common.SIMPLE_MSG) common.stop_stream(cursor, "test") common.drop_stream(cursor, "test") @@ -111,36 +97,30 @@ def test_start_from_last_committed_offset( for message in messages: vertices_with_msg = common.execute_and_fetch_all( cursor, - "MATCH (n: MESSAGE {" f"payload: '{message.decode('utf-8')}'" "}) RETURN n", + f"MATCH (n: MESSAGE {{payload: '{message.decode('utf-8')}'}}) RETURN n", ) assert len(vertices_with_msg) == 0 common.execute_and_fetch_all( - cursor, "CREATE KAFKA STREAM test " - f"TOPICS {kafka_topics[0]} " - "TRANSFORM kafka_transform.simple", ) + cursor, + f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", + ) common.start_stream(cursor, "test") for message in messages: - common.kafka_check_vertex_exists_with_topic_and_payload( - cursor, kafka_topics[0], message) + common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) -def test_check_stream( - kafka_producer, - kafka_topics, - connection, - transformation): +def test_check_stream(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 + BATCH_SIZE = 1 + INDEX_OF_FIRST_BATCH = 0 cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {kafka_topics[0]} " - f"TRANSFORM {transformation} " - "BATCH_SIZE 1", + f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}", ) common.start_stream(cursor, "test") time.sleep(1) @@ -153,24 +133,28 @@ def test_check_stream( kafka_producer.send(kafka_topics[0], message).get(timeout=60) def check_check_stream(batch_limit): - assert ( - transformation == "kafka_transform.simple" - or transformation == "kafka_transform.with_parameters" - ) - test_results = common.execute_and_fetch_all( - cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}" - ) + assert transformation == "kafka_transform.simple" or transformation == "kafka_transform.with_parameters" + test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}") assert len(test_results) == batch_limit for i in range(batch_limit): message_as_str = messages[i].decode("utf-8") + assert ( + BATCH_SIZE == 1 + ) # If batch size != 1, then the usage of INDEX_OF_FIRST_BATCH must change: the result will have a list of queries (pair<parameters,query>) + if transformation == "kafka_transform.simple": - assert f"payload: '{message_as_str}'" in test_results[i][common.QUERY] - assert test_results[i][common.PARAMS] is None + assert ( + f"payload: '{message_as_str}'" + in test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.QUERY_LITERAL] + ) + assert test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.PARAMETERS_LITERAL] is None else: - assert f"payload: $payload" in test_results[i][ - common.QUERY] and f"topic: $topic" in test_results[i][common.QUERY] - parameters = test_results[i][common.PARAMS] + assert ( + f"payload: $payload" in test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.QUERY_LITERAL] + and f"topic: $topic" in test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.QUERY_LITERAL] + ) + parameters = test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.PARAMETERS_LITERAL] # this is not a very sofisticated test, but checks if # timestamp has some kind of value assert parameters["timestamp"] > 1000000000000 @@ -183,8 +167,7 @@ def test_check_stream( common.start_stream(cursor, "test") for message in messages: - common.kafka_check_vertex_exists_with_topic_and_payload( - cursor, kafka_topics[0], message) + common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message) def test_show_streams(kafka_producer, kafka_topics, connection): @@ -192,23 +175,15 @@ def test_show_streams(kafka_producer, kafka_topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM default_values " - f"TOPICS {kafka_topics[0]} " - f"TRANSFORM kafka_transform.simple " - f"BOOTSTRAP_SERVERS 'localhost:9092'", + f"CREATE KAFKA STREAM default_values TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BOOTSTRAP_SERVERS 'localhost:9092'", ) consumer_group = "my_special_consumer_group" - batch_interval = 42 - batch_size = 3 + BATCH_INTERVAL = 42 + BATCH_SIZE = 3 common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM complex_values " - f"TOPICS {','.join(kafka_topics)} " - f"TRANSFORM kafka_transform.with_parameters " - f"CONSUMER_GROUP {consumer_group} " - f"BATCH_INTERVAL {batch_interval} " - f"BATCH_SIZE {batch_size} ", + f"CREATE KAFKA STREAM complex_values TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.with_parameters CONSUMER_GROUP {consumer_group} BATCH_INTERVAL {BATCH_INTERVAL} BATCH_SIZE {BATCH_SIZE} ", ) assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2 @@ -216,13 +191,7 @@ def test_show_streams(kafka_producer, kafka_topics, connection): common.check_stream_info( cursor, "default_values", - ("default_values", - "kafka", - 100, - 1000, - "kafka_transform.simple", - None, - False), + ("default_values", "kafka", 100, 1000, "kafka_transform.simple", None, False), ) common.check_stream_info( @@ -231,8 +200,8 @@ def test_show_streams(kafka_producer, kafka_topics, connection): ( "complex_values", "kafka", - batch_interval, - batch_size, + BATCH_INTERVAL, + BATCH_SIZE, "kafka_transform.with_parameters", None, False, @@ -241,15 +210,12 @@ def test_show_streams(kafka_producer, kafka_topics, connection): @pytest.mark.parametrize("operation", ["START", "STOP"]) -def test_start_and_stop_during_check( - kafka_producer, - kafka_topics, - connection, - operation): +def test_start_and_stop_during_check(kafka_producer, kafka_topics, connection, operation): assert len(kafka_topics) > 1 + BATCH_SIZE = 1 def stream_creator(stream_name): - return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple" + return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {BATCH_SIZE}" def message_sender(msg): kafka_producer.send(kafka_topics[0], msg).get(timeout=60) @@ -259,7 +225,9 @@ def test_start_and_stop_during_check( connection, stream_creator, message_sender, - "Kafka consumer test_stream is already stopped") + "Kafka consumer test_stream is already stopped", + BATCH_SIZE, + ) def test_check_already_started_stream(kafka_topics, connection): @@ -268,9 +236,7 @@ def test_check_already_started_stream(kafka_topics, connection): common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM started_stream " - f"TOPICS {kafka_topics[0]} " - f"TRANSFORM kafka_transform.simple", + f"CREATE KAFKA STREAM started_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", ) common.start_stream(cursor, "started_stream") @@ -289,41 +255,29 @@ def test_restart_after_error(kafka_producer, kafka_topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM test_stream " - f"TOPICS {kafka_topics[0]} " - f"TRANSFORM kafka_transform.query", + f"CREATE KAFKA STREAM test_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.query", ) common.start_stream(cursor, "test_stream") time.sleep(1) kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) - assert common.timed_wait( - lambda: not common.get_is_running( - cursor, "test_stream")) + assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream")) common.start_stream(cursor, "test_stream") time.sleep(1) kafka_producer.send(kafka_topics[0], b"CREATE (n:VERTEX { id : 42 })") - assert common.check_one_result_row( - cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") + assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) -def test_bootstrap_server( - kafka_producer, - kafka_topics, - connection, - transformation): +def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 cursor = connection.cursor() - local = "localhost:9092" + LOCAL = "localhost:9092" common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {','.join(kafka_topics)} " - f"TRANSFORM {transformation} " - f"BOOTSTRAP_SERVERS '{local}'", + f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation} BOOTSTRAP_SERVERS '{LOCAL}'", ) common.start_stream(cursor, "test") time.sleep(5) @@ -332,25 +286,17 @@ def test_bootstrap_server( kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) for topic in kafka_topics: - common.kafka_check_vertex_exists_with_topic_and_payload( - cursor, topic, common.SIMPLE_MSG) + common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) -def test_bootstrap_server_empty( - kafka_producer, - kafka_topics, - connection, - transformation): +def test_bootstrap_server_empty(kafka_producer, kafka_topics, connection, transformation): assert len(kafka_topics) > 0 cursor = connection.cursor() with pytest.raises(mgclient.DatabaseError): common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {','.join(kafka_topics)} " - f"TRANSFORM {transformation} " - "BOOTSTRAP_SERVERS ''", + f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation} BOOTSTRAP_SERVERS ''", ) @@ -360,10 +306,7 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {kafka_topics[0]} " - f"TRANSFORM {transformation} " - "BATCH_SIZE 1", + f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM {transformation} BATCH_SIZE 1", ) messages = [f"{i} message" for i in range(1, 21)] @@ -377,27 +320,18 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): else: assert common.check_one_result_row( cursor, - ( - f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})" - "RETURN n" - ), + (f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})" "RETURN n"), ) common.stop_stream(cursor, "test") - res = common.execute_and_fetch_all( - cursor, "MATCH (n) RETURN n.payload" - ) + res = common.execute_and_fetch_all(cursor, "MATCH (n) RETURN n.payload") return res def execute_set_offset_and_consume(id, expected_msgs): - common.execute_and_fetch_all( - cursor, f"CALL mg.kafka_set_stream_offset('test', {id})" - ) + common.execute_and_fetch_all(cursor, f"CALL mg.kafka_set_stream_offset('test', {id})") return consume(expected_msgs) with pytest.raises(mgclient.DatabaseError): - res = common.execute_and_fetch_all( - cursor, "CALL mg.kafka_set_stream_offset('foo', 10)" - ) + res = common.execute_and_fetch_all(cursor, "CALL mg.kafka_set_stream_offset('foo', 10)") def comparison_check(a, b): return a == str(b).strip("'(,)") @@ -426,40 +360,60 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): def test_info_procedure(kafka_topics, connection): cursor = connection.cursor() - stream_name = 'test_stream' - configs = {"sasl.username": "michael.scott"} - local = "localhost:9092" - credentials = {"sasl.password": "S3cr3tP4ssw0rd"} - consumer_group = "ConsumerGr" + STREAM_NAME = "test_stream" + CONFIGS = {"sasl.username": "michael.scott"} + LOCAL = "localhost:9092" + CREDENTIALS = {"sasl.password": "S3cr3tP4ssw0rd"} + CONSUMER_GROUP = "ConsumerGr" common.execute_and_fetch_all( cursor, - f"CREATE KAFKA STREAM {stream_name} " - f"TOPICS {','.join(kafka_topics)} " - f"TRANSFORM pulsar_transform.simple " - f"CONSUMER_GROUP {consumer_group} " - f"BOOTSTRAP_SERVERS '{local}' " - f"CONFIGS {configs} " - f"CREDENTIALS {credentials}" + f"CREATE KAFKA STREAM {STREAM_NAME} TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.simple CONSUMER_GROUP {CONSUMER_GROUP} BOOTSTRAP_SERVERS '{LOCAL}' CONFIGS {CONFIGS} CREDENTIALS {CREDENTIALS}", ) - stream_info = common.execute_and_fetch_all( - cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *") + stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{STREAM_NAME}') YIELD *") - reducted_credentials = {key: "<REDUCTED>" for - key in credentials.keys()} + reducted_credentials = {key: "<REDUCTED>" for key in CREDENTIALS.keys()} - expected_stream_info = [ - (local, configs, consumer_group, reducted_credentials, kafka_topics)] + expected_stream_info = [(LOCAL, CONFIGS, CONSUMER_GROUP, reducted_credentials, kafka_topics)] common.validate_info(stream_info, expected_stream_info) -@pytest.mark.parametrize("transformation",TRANSFORMATIONS_TO_CHECK_C) + +@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_C) def test_load_c_transformations(connection, transformation): cursor = connection.cursor() - query = "CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations." + transformation + "' RETURN name" - result = common.execute_and_fetch_all( - cursor, query) + query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations.{transformation}' RETURN name" + result = common.execute_and_fetch_all(cursor, query) assert len(result) == 1 - assert result[0][0] == "c_transformations." + transformation - + assert result[0][0] == f"c_transformations.{transformation}" + + +def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + TRANSFORMATION = "common_transform.check_stream_no_filtering" + + def stream_creator(stream_name, batch_size): + return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" + + def message_sender(msg): + kafka_producer.send(kafka_topics[0], msg).get(timeout=60) + + common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender) + + +def test_check_stream_different_number_of_queries_than_messages(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + TRANSFORMATION = "common_transform.check_stream_with_filtering" + + def stream_creator(stream_name, batch_size): + return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" + + def message_sender(msg): + kafka_producer.send(kafka_topics[0], msg).get(timeout=60) + + common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/streams/pulsar_streams_tests.py b/tests/e2e/streams/pulsar_streams_tests.py index 4df7e186e..c77d72372 100755 --- a/tests/e2e/streams/pulsar_streams_tests.py +++ b/tests/e2e/streams/pulsar_streams_tests.py @@ -18,17 +18,14 @@ import time from multiprocessing import Process, Value import common -TRANSFORMATIONS_TO_CHECK = [ - "pulsar_transform.simple", - "pulsar_transform.with_parameters"] +TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"] def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_byte): - decoded_payload = payload_byte.decode('utf-8') + decoded_payload = payload_byte.decode("utf-8") common.check_vertex_exists_with_properties( - cursor, { - 'topic': f'"{common.pulsar_default_namespace_topic(topic)}"', - 'payload': f'"{decoded_payload}"'}) + cursor, {"topic": f'"{common.pulsar_default_namespace_topic(topic)}"', "payload": f'"{decoded_payload}"'} + ) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) @@ -37,30 +34,23 @@ def test_simple(pulsar_client, pulsar_topics, connection, transformation): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM test " - f"TOPICS '{','.join(pulsar_topics)}' " - f"TRANSFORM {transformation}", + f"CREATE PULSAR STREAM test TOPICS '{','.join(pulsar_topics)}' TRANSFORM {transformation}", ) common.start_stream(cursor, "test") time.sleep(5) for topic in pulsar_topics: producer = pulsar_client.create_producer( - common.pulsar_default_namespace_topic(topic), - send_timeout_millis=60000) + common.pulsar_default_namespace_topic(topic), send_timeout_millis=60000 + ) producer.send(common.SIMPLE_MSG) for topic in pulsar_topics: - check_vertex_exists_with_topic_and_payload( - cursor, topic, common.SIMPLE_MSG) + check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_separate_consumers( - pulsar_client, - pulsar_topics, - connection, - transformation): +def test_separate_consumers(pulsar_client, pulsar_topics, connection, transformation): assert len(pulsar_topics) > 0 cursor = connection.cursor() @@ -70,9 +60,7 @@ def test_separate_consumers( stream_names.append(stream_name) common.execute_and_fetch_all( cursor, - f"CREATE PULSAR STREAM {stream_name} " - f"TOPICS {topic} " - f"TRANSFORM {transformation}", + f"CREATE PULSAR STREAM {stream_name} TOPICS {topic} TRANSFORM {transformation}", ) for stream_name in stream_names: @@ -81,13 +69,11 @@ def test_separate_consumers( time.sleep(5) for topic in pulsar_topics: - producer = pulsar_client.create_producer( - topic, send_timeout_millis=60000) + producer = pulsar_client.create_producer(topic, send_timeout_millis=60000) producer.send(common.SIMPLE_MSG) for topic in pulsar_topics: - check_vertex_exists_with_topic_and_payload( - cursor, topic, common.SIMPLE_MSG) + check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection): @@ -99,118 +85,112 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection): assert len(pulsar_topics) > 0 cursor = connection.cursor() common.execute_and_fetch_all( - cursor, "CREATE PULSAR STREAM test " - f"TOPICS {pulsar_topics[0]} " - "TRANSFORM pulsar_transform.simple", ) + cursor, + f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple", + ) common.start_stream(cursor, "test") time.sleep(1) def assert_message_not_consumed(message): vertices_with_msg = common.execute_and_fetch_all( cursor, - "MATCH (n: MESSAGE {" f"payload: '{message.decode('utf-8')}'" "}) RETURN n", + f"MATCH (n: MESSAGE {{payload: '{message.decode('utf-8')}'}}) RETURN n", ) assert len(vertices_with_msg) == 0 producer = pulsar_client.create_producer( - common.pulsar_default_namespace_topic( - pulsar_topics[0]), send_timeout_millis=60000) + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) producer.send(common.SIMPLE_MSG) - check_vertex_exists_with_topic_and_payload( - cursor, pulsar_topics[0], common.SIMPLE_MSG) + check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], common.SIMPLE_MSG) common.stop_stream(cursor, "test") - next_message = b"NEXT" - producer.send(next_message) + NEXT_MESSAGE = b"NEXT" + producer.send(NEXT_MESSAGE) - assert_message_not_consumed(next_message) + assert_message_not_consumed(NEXT_MESSAGE) common.start_stream(cursor, "test") - check_vertex_exists_with_topic_and_payload( - cursor, pulsar_topics[0], next_message) + check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], NEXT_MESSAGE) common.stop_stream(cursor, "test") common.drop_stream(cursor, "test") - lost_message = b"LOST" - valid_messages = [b"second message", b"third message"] + LOST_MESSAGE = b"LOST" + VALID_MESSAGES = [b"second message", b"third message"] - producer.send(lost_message) + producer.send(LOST_MESSAGE) - assert_message_not_consumed(lost_message) + assert_message_not_consumed(LOST_MESSAGE) common.execute_and_fetch_all( - cursor, "CREATE PULSAR STREAM test " - f"TOPICS {pulsar_topics[0]} " - "TRANSFORM pulsar_transform.simple", ) + cursor, + f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple", + ) - for message in valid_messages: + for message in VALID_MESSAGES: producer.send(message) assert_message_not_consumed(message) common.start_stream(cursor, "test") - assert_message_not_consumed(lost_message) + assert_message_not_consumed(LOST_MESSAGE) - for message in valid_messages: - check_vertex_exists_with_topic_and_payload( - cursor, pulsar_topics[0], message) + for message in VALID_MESSAGES: + check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], message) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_check_stream( - pulsar_client, - pulsar_topics, - connection, - transformation): +def test_check_stream(pulsar_client, pulsar_topics, connection, transformation): assert len(pulsar_topics) > 0 + BATCH_SIZE = 1 + INDEX_Of_FIRST_BATCH = 0 cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM test " - f"TOPICS {pulsar_topics[0]} " - f"TRANSFORM {transformation} " - "BATCH_SIZE 1", + f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}", ) common.start_stream(cursor, "test") time.sleep(1) producer = pulsar_client.create_producer( - common.pulsar_default_namespace_topic( - pulsar_topics[0]), send_timeout_millis=60000) + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) producer.send(common.SIMPLE_MSG) - check_vertex_exists_with_topic_and_payload( - cursor, pulsar_topics[0], common.SIMPLE_MSG) + check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], common.SIMPLE_MSG) common.stop_stream(cursor, "test") - messages = [b"first message", b"second message", b"third message"] - for message in messages: + MESSAGES = [b"first message", b"second message", b"third message"] + for message in MESSAGES: producer.send(message) def check_check_stream(batch_limit): - assert ( - transformation == "pulsar_transform.simple" - or transformation == "pulsar_transform.with_parameters" - ) - test_results = common.execute_and_fetch_all( - cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}" - ) + assert transformation == "pulsar_transform.simple" or transformation == "pulsar_transform.with_parameters" + test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}") assert len(test_results) == batch_limit for i in range(batch_limit): - message_as_str = messages[i].decode("utf-8") + message_as_str = MESSAGES[i].decode("utf-8") + assert ( + BATCH_SIZE == 1 + ) # If batch size != 1, then the usage of INDEX_Of_FIRST_BATCH must change: the result will have a list of queries (pair<parameters,query>) + if transformation == "pulsar_transform.simple": - assert f"payload: '{message_as_str}'" in test_results[i][common.QUERY] - assert test_results[i][common.PARAMS] is None + assert ( + f"payload: '{message_as_str}'" + in test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.QUERY_LITERAL] + ) + assert test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.PARAMETERS_LITERAL] is None else: - assert f"payload: $payload" in test_results[i][ - common.QUERY] and f"topic: $topic" in test_results[i][common.QUERY] - parameters = test_results[i][common.PARAMS] - assert parameters["topic"] == common.pulsar_default_namespace_topic( - pulsar_topics[0]) + assert ( + f"payload: $payload" in test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.QUERY_LITERAL] + and f"topic: $topic" in test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.QUERY_LITERAL] + ) + parameters = test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.PARAMETERS_LITERAL] + assert parameters["topic"] == common.pulsar_default_namespace_topic(pulsar_topics[0]) assert parameters["payload"] == message_as_str check_check_stream(1) @@ -218,45 +198,37 @@ def test_check_stream( check_check_stream(3) common.start_stream(cursor, "test") - for message in messages: - check_vertex_exists_with_topic_and_payload( - cursor, pulsar_topics[0], message) + for message in MESSAGES: + check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], message) def test_info_procedure(pulsar_client, pulsar_topics, connection): cursor = connection.cursor() - stream_name = 'test_stream' + STREAM_NAME = "test_stream" common.execute_and_fetch_all( cursor, - f"CREATE PULSAR STREAM {stream_name} " - f"TOPICS {','.join(pulsar_topics)} " - f"TRANSFORM pulsar_transform.simple ", + f"CREATE PULSAR STREAM {STREAM_NAME} TOPICS {','.join(pulsar_topics)} TRANSFORM pulsar_transform.simple ", ) - stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.pulsar_stream_info('{stream_name}') YIELD *") + stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.pulsar_stream_info('{STREAM_NAME}') YIELD *") expected_stream_info = [(common.PULSAR_SERVICE_URL, pulsar_topics)] common.validate_info(stream_info, expected_stream_info) + def test_show_streams(pulsar_client, pulsar_topics, connection): assert len(pulsar_topics) > 1 cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM default_values " - f"TOPICS {pulsar_topics[0]} " - f"TRANSFORM pulsar_transform.simple ", + f"CREATE PULSAR STREAM default_values TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple ", ) - batch_interval = 42 - batch_size = 3 + BATCH_INTERVAL = 42 + BATCH_SIZE = 3 common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM complex_values " - f"TOPICS {','.join(pulsar_topics)} " - f"TRANSFORM pulsar_transform.with_parameters " - f"BATCH_INTERVAL {batch_interval} " - f"BATCH_SIZE {batch_size} ", + f"CREATE PULSAR STREAM complex_values TOPICS {','.join(pulsar_topics)} TRANSFORM pulsar_transform.with_parameters BATCH_INTERVAL {BATCH_INTERVAL} BATCH_SIZE {BATCH_SIZE} ", ) assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2 @@ -264,13 +236,7 @@ def test_show_streams(pulsar_client, pulsar_topics, connection): common.check_stream_info( cursor, "default_values", - ("default_values", - "pulsar", - 100, - 1000, - "pulsar_transform.simple", - None, - False), + ("default_values", "pulsar", 100, 1000, "pulsar_transform.simple", None, False), ) common.check_stream_info( @@ -279,8 +245,8 @@ def test_show_streams(pulsar_client, pulsar_topics, connection): ( "complex_values", "pulsar", - batch_interval, - batch_size, + BATCH_INTERVAL, + BATCH_SIZE, "pulsar_transform.with_parameters", None, False, @@ -289,19 +255,16 @@ def test_show_streams(pulsar_client, pulsar_topics, connection): @pytest.mark.parametrize("operation", ["START", "STOP"]) -def test_start_and_stop_during_check( - pulsar_client, - pulsar_topics, - connection, - operation): +def test_start_and_stop_during_check(pulsar_client, pulsar_topics, connection, operation): assert len(pulsar_topics) > 1 + BATCH_SIZE = 1 def stream_creator(stream_name): - return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple" + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE {BATCH_SIZE}" producer = pulsar_client.create_producer( - common.pulsar_default_namespace_topic( - pulsar_topics[0]), send_timeout_millis=60000) + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) def message_sender(msg): producer.send(msg) @@ -311,7 +274,9 @@ def test_start_and_stop_during_check( connection, stream_creator, message_sender, - "Pulsar consumer test_stream is already stopped") + "Pulsar consumer test_stream is already stopped", + BATCH_SIZE, + ) def test_check_already_started_stream(pulsar_topics, connection): @@ -320,9 +285,7 @@ def test_check_already_started_stream(pulsar_topics, connection): common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM started_stream " - f"TOPICS {pulsar_topics[0]} " - f"TRANSFORM pulsar_transform.simple", + f"CREATE PULSAR STREAM started_stream TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple", ) common.start_stream(cursor, "started_stream") @@ -333,6 +296,7 @@ def test_check_already_started_stream(pulsar_topics, connection): def test_start_checked_stream_after_timeout(pulsar_topics, connection): def stream_creator(stream_name): return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple" + common.test_start_checked_stream_after_timeout(connection, stream_creator) @@ -340,53 +304,80 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM test_stream " - f"TOPICS {pulsar_topics[0]} " - f"TRANSFORM pulsar_transform.query", + f"CREATE PULSAR STREAM test_stream TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.query", ) common.start_stream(cursor, "test_stream") time.sleep(1) producer = pulsar_client.create_producer( - common.pulsar_default_namespace_topic( - pulsar_topics[0]), send_timeout_millis=60000) + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) producer.send(common.SIMPLE_MSG) - assert common.timed_wait( - lambda: not common.get_is_running( - cursor, "test_stream")) + assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream")) common.start_stream(cursor, "test_stream") time.sleep(1) producer.send(b"CREATE (n:VERTEX { id : 42 })") - assert common.check_one_result_row( - cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") + assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) def test_service_url(pulsar_client, pulsar_topics, connection, transformation): assert len(pulsar_topics) > 0 cursor = connection.cursor() - local = "pulsar://127.0.0.1:6650" + LOCAL = "pulsar://127.0.0.1:6650" common.execute_and_fetch_all( cursor, - "CREATE PULSAR STREAM test " - f"TOPICS {','.join(pulsar_topics)} " - f"TRANSFORM {transformation} " - f"SERVICE_URL '{local}'", + f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'", ) common.start_stream(cursor, "test") time.sleep(5) for topic in pulsar_topics: producer = pulsar_client.create_producer( - common.pulsar_default_namespace_topic(topic), - send_timeout_millis=60000) + common.pulsar_default_namespace_topic(topic), send_timeout_millis=60000 + ) producer.send(common.SIMPLE_MSG) for topic in pulsar_topics: - check_vertex_exists_with_topic_and_payload( - cursor, topic, common.SIMPLE_MSG) + check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) + + +def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 0 + + TRANSFORMATION = "common_transform.check_stream_no_filtering" + + def stream_creator(stream_name, batch_size): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} " + + producer = pulsar_client.create_producer( + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) + + def message_sender(msg): + producer.send(msg) + + common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender) + + +def test_check_stream_different_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 0 + + TRANSFORMATION = "common_transform.check_stream_with_filtering" + + def stream_creator(stream_name, batch_size): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} " + + producer = pulsar_client.create_producer( + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) + + def message_sender(msg): + producer.send(msg) + + common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender) if __name__ == "__main__": diff --git a/tests/e2e/streams/transformations/CMakeLists.txt b/tests/e2e/streams/transformations/CMakeLists.txt index 5ecafd43a..4d930a5de 100644 --- a/tests/e2e/streams/transformations/CMakeLists.txt +++ b/tests/e2e/streams/transformations/CMakeLists.txt @@ -1,3 +1,4 @@ copy_streams_e2e_python_files(kafka_transform.py) copy_streams_e2e_python_files(pulsar_transform.py) +copy_streams_e2e_python_files(common_transform.py) add_query_module(c_transformations c_transformations.cpp) diff --git a/tests/e2e/streams/transformations/common_transform.py b/tests/e2e/streams/transformations/common_transform.py new file mode 100644 index 000000000..25836c207 --- /dev/null +++ b/tests/e2e/streams/transformations/common_transform.py @@ -0,0 +1,57 @@ +# Copyright 2021 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import mgp + + +@mgp.transformation +def check_stream_no_filtering( + context: mgp.TransCtx, messages: mgp.Messages +) -> mgp.Record(query=str, parameters=mgp.Map): + + result_queries = [] + + for i in range(0, messages.total_messages()): + message = messages.message_at(i) + payload_as_str = message.payload().decode("utf-8") + result_queries.append( + mgp.Record(query=f"Message: {payload_as_str}", parameters={"value": f"Parameter: {payload_as_str}"}) + ) + + return result_queries + + +@mgp.transformation +def check_stream_with_filtering( + context: mgp.TransCtx, messages: mgp.Messages +) -> mgp.Record(query=str, parameters=mgp.Map): + + result_queries = [] + + for i in range(0, messages.total_messages()): + message = messages.message_at(i) + payload_as_str = message.payload().decode("utf-8") + + if "a" in payload_as_str: + continue + + result_queries.append( + mgp.Record(query=f"Message: {payload_as_str}", parameters={"value": f"Parameter: {payload_as_str}"}) + ) + + if "b" in payload_as_str: + result_queries.append( + mgp.Record( + query=f"Message: extra_{payload_as_str}", parameters={"value": f"Parameter: extra_{payload_as_str}"} + ) + ) + + return result_queries diff --git a/tests/e2e/streams/transformations/kafka_transform.py b/tests/e2e/streams/transformations/kafka_transform.py index 4967e0f40..87a6b8449 100644 --- a/tests/e2e/streams/transformations/kafka_transform.py +++ b/tests/e2e/streams/transformations/kafka_transform.py @@ -13,9 +13,7 @@ import mgp @mgp.transformation -def simple( - context: mgp.TransCtx, messages: mgp.Messages -) -> mgp.Record(query=str, parameters=mgp.Map): +def simple(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map): result_queries = [] @@ -32,15 +30,15 @@ def simple( offset: '{message.offset()}', topic: '{message.topic_name()}' }})""", - parameters=None)) + parameters=None, + ) + ) return result_queries @mgp.transformation -def with_parameters( - context: mgp.TransCtx, messages: mgp.Messages -) -> mgp.Record(query=str, parameters=mgp.Map): +def with_parameters(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map): result_queries = [] @@ -61,7 +59,10 @@ def with_parameters( "timestamp": message.timestamp(), "payload": payload_as_str, "offset": message.offset(), - "topic": message.topic_name()})) + "topic": message.topic_name(), + }, + ) + ) return result_queries @@ -76,8 +77,6 @@ def query( message = messages.message_at(i) assert message.source_type() == mgp.SOURCE_TYPE_KAFKA payload_as_str = message.payload().decode("utf-8") - result_queries.append( - mgp.Record(query=payload_as_str, parameters=None) - ) + result_queries.append(mgp.Record(query=payload_as_str, parameters=None)) return result_queries diff --git a/tests/e2e/streams/transformations/pulsar_transform.py b/tests/e2e/streams/transformations/pulsar_transform.py index 5379db52c..d8560c2a7 100644 --- a/tests/e2e/streams/transformations/pulsar_transform.py +++ b/tests/e2e/streams/transformations/pulsar_transform.py @@ -13,9 +13,7 @@ import mgp @mgp.transformation -def simple(context: mgp.TransCtx, - messages: mgp.Messages - ) -> mgp.Record(query=str, parameters=mgp.Map): +def simple(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map): result_queries = [] @@ -30,15 +28,15 @@ def simple(context: mgp.TransCtx, payload: '{payload_as_str}', topic: '{message.topic_name()}' }})""", - parameters=None)) + parameters=None, + ) + ) return result_queries @mgp.transformation -def with_parameters(context: mgp.TransCtx, - messages: mgp.Messages - ) -> mgp.Record(query=str, parameters=mgp.Map): +def with_parameters(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map): result_queries = [] @@ -53,23 +51,21 @@ def with_parameters(context: mgp.TransCtx, payload: $payload, topic: $topic })""", - parameters={ - "payload": payload_as_str, - "topic": message.topic_name()})) + parameters={"payload": payload_as_str, "topic": message.topic_name()}, + ) + ) return result_queries @mgp.transformation -def query(messages: mgp.Messages - ) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]): +def query(messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]): result_queries = [] for i in range(0, messages.total_messages()): message = messages.message_at(i) assert message.source_type() == mgp.SOURCE_TYPE_PULSAR payload_as_str = message.payload().decode("utf-8") - result_queries.append(mgp.Record( - query=payload_as_str, parameters=None)) + result_queries.append(mgp.Record(query=payload_as_str, parameters=None)) return result_queries