Adding "raw message" column to the result returned by CHECK STREAM query (#394)

* Adding "raw message" column to the result return by CHECK STREAM query

* Update way results of CHECK STREAM are built

* Adapting CHECK STREAM integration tests (Pulsar/Kafka) to run with new result structure

* Adding new tests covering the check stream functionality

* Uppercase constants in stream tests

* Reformat f-strings
This commit is contained in:
Jeremy B 2022-06-08 23:17:44 +02:00 committed by GitHub
parent 21ad5d4328
commit a0bc1371dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 507 additions and 378 deletions

View File

@ -729,7 +729,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return callback;
}
case StreamQuery::Action::CHECK_STREAM: {
callback.header = {"query", "parameters"};
callback.header = {"queries", "raw messages"};
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator),
batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable {

View File

@ -43,6 +43,7 @@ extern const Event MessagesConsumed;
namespace memgraph::query::stream {
namespace {
inline constexpr auto kExpectedTransformationResultSize = 2;
inline constexpr auto kCheckStreamResultSize = 2;
const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()};
const utils::pmr::string params_param_name{"parameters", utils::NewDeleteResource()};
@ -724,15 +725,27 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona
auto accessor = interpreter_context->db->Access();
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
for (auto &row : result.rows) {
auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name);
std::vector<TypedValue> result_row;
result_row.reserve(kExpectedTransformationResultSize);
result_row.push_back(std::move(query));
result_row.push_back(std::move(parameters));
auto result_row = std::vector<TypedValue>();
result_row.reserve(kCheckStreamResultSize);
test_result.push_back(std::move(result_row));
}
auto queries_and_parameters = std::vector<TypedValue>(result.rows.size());
std::transform(
result.rows.cbegin(), result.rows.cend(), queries_and_parameters.begin(), [&](const auto &row) {
auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name);
return std::map<std::string, TypedValue>{{"query", std::move(query)},
{"parameters", std::move(parameters)}};
});
result_row.emplace_back(std::move(queries_and_parameters));
auto messages_list = std::vector<TypedValue>(messages.size());
std::transform(messages.cbegin(), messages.cend(), messages_list.begin(), [](const auto &message) {
return std::string_view(message.Payload().data(), message.Payload().size());
});
result_row.emplace_back(std::move(messages_list));
test_result.emplace_back(std::move(result_row));
};
locked_stream_source->Check(timeout, batch_limit, consumer_function);

View File

@ -12,7 +12,7 @@
import mgclient
import time
from multiprocessing import Process, Value
from multiprocessing import Manager, Process, Value
# These are the indices of the different values in the result of SHOW STREAM
# query
@ -26,8 +26,10 @@ IS_RUNNING = 6
# These are the indices of the query and parameters in the result of CHECK
# STREAM query
QUERY = 0
PARAMS = 1
QUERIES = 0
RAWMESSAGES = 1
PARAMETERS_LITERAL = "parameters"
QUERY_LITERAL = "query"
SIMPLE_MSG = b"message"
@ -45,13 +47,13 @@ def connect(**kwargs):
def timed_wait(fun):
start_time = time.time()
seconds = 10
SECONDS = 10
while True:
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > seconds:
if elapsed_time > SECONDS:
return False
if fun():
@ -62,13 +64,13 @@ def timed_wait(fun):
def check_one_result_row(cursor, query):
start_time = time.time()
seconds = 10
SECONDS = 10
while True:
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > seconds:
if elapsed_time > SECONDS:
return False
cursor.execute(query)
@ -81,12 +83,10 @@ def check_one_result_row(cursor, query):
def check_vertex_exists_with_properties(cursor, properties):
properties_string = ', '.join([f'{k}: {v}' for k, v in properties.items()])
properties_string = ", ".join([f"{k}: {v}" for k, v in properties.items()])
assert check_one_result_row(
cursor,
"MATCH (n: MESSAGE {"
f"{properties_string}"
"}) RETURN n",
f"MATCH (n: MESSAGE {{{properties_string}}}) RETURN n",
)
@ -129,28 +129,27 @@ def validate_info(actual_stream_info, expected_stream_info):
for info, expected_info in zip(actual_stream_info, expected_stream_info):
assert info == expected_info
def check_stream_info(cursor, stream_name, expected_stream_info):
stream_info = get_stream_info(cursor, stream_name)
validate_info(stream_info, expected_stream_info)
def kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
decoded_payload = payload_bytes.decode('utf-8')
check_vertex_exists_with_properties(
cursor, {'topic': f'"{topic}"', 'payload': f'"{decoded_payload}"'})
decoded_payload = payload_bytes.decode("utf-8")
check_vertex_exists_with_properties(cursor, {"topic": f'"{topic}"', "payload": f'"{decoded_payload}"'})
PULSAR_SERVICE_URL = 'pulsar://127.0.0.1:6650'
PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650"
def pulsar_default_namespace_topic(topic):
return f'persistent://public/default/{topic}'
return f"persistent://public/default/{topic}"
def test_start_and_stop_during_check(
operation,
connection,
stream_creator,
message_sender,
already_stopped_error):
operation, connection, stream_creator, message_sender, already_stopped_error, batchSize
):
# This test is quite complex. The goal is to call START/STOP queries
# while a CHECK query is waiting for its result. Because the Global
# Interpreter Lock, running queries on multiple threads is not useful,
@ -161,11 +160,9 @@ def test_start_and_stop_during_check(
# synchronize between the different processes. Each value represents a
# specific phase of the execution of the processes.
assert operation in ["START", "STOP"]
assert batchSize == 1
cursor = connection.cursor()
execute_and_fetch_all(
cursor,
stream_creator('test_stream')
)
execute_and_fetch_all(cursor, stream_creator("test_stream"))
check_counter = Value("i", 0)
check_result_len = Value("i", 0)
@ -185,7 +182,9 @@ def test_start_and_stop_during_check(
result = execute_and_fetch_all(cursor, "CHECK STREAM test_stream")
result_len.value = len(result)
counter.value = CHECK_AFTER_FETCHALL
if len(result) > 0 and "payload: 'message'" in result[0][QUERY]:
if (
len(result) > 0 and "payload: 'message'" in result[0][QUERIES][0][QUERY_LITERAL]
): # The 0 is only correct because batchSize is 1
counter.value = CHECK_CORRECT_RESULT
else:
counter.value = CHECK_INCORRECT_RESULT
@ -213,12 +212,8 @@ def test_start_and_stop_during_check(
except Exception:
counter.value = OP_UNEXPECTED_EXCEPTION
check_stream_proc = Process(
target=call_check, daemon=True, args=(check_counter, check_result_len)
)
operation_proc = Process(
target=call_operation, daemon=True, args=(operation_counter,)
)
check_stream_proc = Process(target=call_check, daemon=True, args=(check_counter, check_result_len))
operation_proc = Process(target=call_operation, daemon=True, args=(operation_counter,))
try:
check_stream_proc.start()
@ -227,9 +222,7 @@ def test_start_and_stop_during_check(
assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE)
assert timed_wait(lambda: get_is_running(cursor, "test_stream"))
assert check_counter.value == CHECK_BEFORE_EXECUTE, (
"SHOW STREAMS " "was blocked until the end of CHECK STREAM"
)
assert check_counter.value == CHECK_BEFORE_EXECUTE, "SHOW STREAMS " "was blocked until the end of CHECK STREAM"
operation_proc.start()
assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE)
@ -255,31 +248,156 @@ def test_start_and_stop_during_check(
if operation_proc.is_alive():
operation_proc.terminate()
def test_start_checked_stream_after_timeout(connection, stream_creator):
cursor = connection.cursor()
execute_and_fetch_all(
cursor,
stream_creator('test_stream')
)
execute_and_fetch_all(cursor, stream_creator("test_stream"))
timeout_ms = 2000
TIMEOUT_MS = 2000
def call_check():
execute_and_fetch_all(
connect().cursor(),
f"CHECK STREAM test_stream TIMEOUT {timeout_ms}")
execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_MS}")
check_stream_proc = Process(target=call_check, daemon=True)
start = time.time()
check_stream_proc.start()
assert timed_wait(
lambda: get_is_running(
cursor, "test_stream"))
assert timed_wait(lambda: get_is_running(cursor, "test_stream"))
start_stream(cursor, "test_stream")
end = time.time()
assert (end - start) < 1.3 * \
timeout_ms, "The START STREAM was blocked too long"
assert (end - start) < 1.3 * TIMEOUT_MS, "The START STREAM was blocked too long"
assert get_is_running(cursor, "test_stream")
stop_stream(cursor, "test_stream")
def test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender):
BATCH_SIZE = 2
BATCH_LIMIT = 3
STREAM_NAME = "test_stream"
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE))
time.sleep(2)
test_results = Manager().Namespace()
def check_stream(stream_name, batch_limit):
connection = connect()
cursor = connection.cursor()
test_results.value = execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} ")
check_stream_proc = Process(target=check_stream, args=(STREAM_NAME, BATCH_LIMIT))
check_stream_proc.start()
time.sleep(2)
MESSAGES = [b"01", b"02", b"03", b"04", b"05", b"06"]
for message in MESSAGES:
message_sender(message)
check_stream_proc.join()
# # Transformation does not do any filtering and simply create queries as "Messages: {contentOfMessage}". Queries should be like:
# # -Batch 1: [{parameters: {"value": "Parameter: 01"}, query: "Message: 01"},
# # {parameters: {"value": "Parameter: 02"}, query: "Message: 02"}]
# # -Batch 2: [{parameters: {"value": "Parameter: 03"}, query: "Message: 03"},
# # {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}]
# # -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"},
# # {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}]
assert len(test_results.value) == BATCH_LIMIT
expected_queries_and_raw_messages_1 = (
[ # queries
{PARAMETERS_LITERAL: {"value": "Parameter: 01"}, QUERY_LITERAL: "Message: 01"},
{PARAMETERS_LITERAL: {"value": "Parameter: 02"}, QUERY_LITERAL: "Message: 02"},
],
["01", "02"], # raw message
)
expected_queries_and_raw_messages_2 = (
[ # queries
{PARAMETERS_LITERAL: {"value": "Parameter: 03"}, QUERY_LITERAL: "Message: 03"},
{PARAMETERS_LITERAL: {"value": "Parameter: 04"}, QUERY_LITERAL: "Message: 04"},
],
["03", "04"], # raw message
)
expected_queries_and_raw_messages_3 = (
[ # queries
{PARAMETERS_LITERAL: {"value": "Parameter: 05"}, QUERY_LITERAL: "Message: 05"},
{PARAMETERS_LITERAL: {"value": "Parameter: 06"}, QUERY_LITERAL: "Message: 06"},
],
["05", "06"], # raw message
)
assert expected_queries_and_raw_messages_1 == test_results.value[0]
assert expected_queries_and_raw_messages_2 == test_results.value[1]
assert expected_queries_and_raw_messages_3 == test_results.value[2]
def test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender):
BATCH_SIZE = 2
BATCH_LIMIT = 3
STREAM_NAME = "test_stream"
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE))
time.sleep(2)
results = Manager().Namespace()
def check_stream(stream_name, batch_limit):
connection = connect()
cursor = connection.cursor()
results.value = execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} ")
check_stream_proc = Process(target=check_stream, args=(STREAM_NAME, BATCH_LIMIT))
check_stream_proc.start()
time.sleep(2)
MESSAGES = [b"a_01", b"a_02", b"03", b"04", b"b_05", b"06"]
for message in MESSAGES:
message_sender(message)
check_stream_proc.join()
# Transformation does some filtering: if message contains "a", it is ignored.
# Transformation also has special rule to create query if message is "b": it create more queries.
#
# Queries should be like:
# -Batch 1: []
# -Batch 2: [{parameters: {"value": "Parameter: 03"}, query: "Message: 03"},
# {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}]
# -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"},
# {parameters: {"value": "Parameter: extra_05"}, query: "Message: extra_05"}
# {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}]
assert len(results.value) == BATCH_LIMIT
expected_queries_and_raw_messages_1 = (
[], # queries
["a_01", "a_02"], # raw message
)
expected_queries_and_raw_messages_2 = (
[ # queries
{PARAMETERS_LITERAL: {"value": "Parameter: 03"}, QUERY_LITERAL: "Message: 03"},
{PARAMETERS_LITERAL: {"value": "Parameter: 04"}, QUERY_LITERAL: "Message: 04"},
],
["03", "04"], # raw message
)
expected_queries_and_raw_messages_3 = (
[ # queries
{PARAMETERS_LITERAL: {"value": "Parameter: b_05"}, QUERY_LITERAL: "Message: b_05"},
{
PARAMETERS_LITERAL: {"value": "Parameter: extra_b_05"},
QUERY_LITERAL: "Message: extra_b_05",
},
{PARAMETERS_LITERAL: {"value": "Parameter: 06"}, QUERY_LITERAL: "Message: 06"},
],
["b_05", "06"], # raw message
)
assert expected_queries_and_raw_messages_1 == results.value[0]
assert expected_queries_and_raw_messages_2 == results.value[1]
assert expected_queries_and_raw_messages_3 == results.value[2]

View File

@ -18,12 +18,10 @@ import time
from multiprocessing import Process, Value
import common
TRANSFORMATIONS_TO_CHECK_C = [
"empty_transformation"]
TRANSFORMATIONS_TO_CHECK_C = ["empty_transformation"]
TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"]
TRANSFORMATIONS_TO_CHECK_PY = [
"kafka_transform.simple",
"kafka_transform.with_parameters"]
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_simple(kafka_producer, kafka_topics, connection, transformation):
@ -31,9 +29,7 @@ def test_simple(kafka_producer, kafka_topics, connection, transformation):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM test "
f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM {transformation}",
f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation}",
)
common.start_stream(cursor, "test")
time.sleep(5)
@ -42,16 +38,11 @@ def test_simple(kafka_producer, kafka_topics, connection, transformation):
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60)
for topic in kafka_topics:
common.kafka_check_vertex_exists_with_topic_and_payload(
cursor, topic, common.SIMPLE_MSG)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_separate_consumers(
kafka_producer,
kafka_topics,
connection,
transformation):
def test_separate_consumers(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0
cursor = connection.cursor()
@ -61,9 +52,7 @@ def test_separate_consumers(
stream_names.append(stream_name)
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM {stream_name} "
f"TOPICS {topic} "
f"TRANSFORM {transformation}",
f"CREATE KAFKA STREAM {stream_name} TOPICS {topic} TRANSFORM {transformation}",
)
for stream_name in stream_names:
@ -75,12 +64,10 @@ def test_separate_consumers(
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60)
for topic in kafka_topics:
common.kafka_check_vertex_exists_with_topic_and_payload(
cursor, topic, common.SIMPLE_MSG)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
def test_start_from_last_committed_offset(
kafka_producer, kafka_topics, connection):
def test_start_from_last_committed_offset(kafka_producer, kafka_topics, connection):
# This test creates a stream, consumes a message to have a committed
# offset, then destroys the stream. A new message is sent before the
# stream is recreated and then restarted. This simulates when Memgraph is
@ -90,16 +77,15 @@ def test_start_from_last_committed_offset(
assert len(kafka_topics) > 0
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor, "CREATE KAFKA STREAM test "
f"TOPICS {kafka_topics[0]} "
"TRANSFORM kafka_transform.simple", )
cursor,
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "test")
time.sleep(1)
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
common.kafka_check_vertex_exists_with_topic_and_payload(
cursor, kafka_topics[0], common.SIMPLE_MSG)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], common.SIMPLE_MSG)
common.stop_stream(cursor, "test")
common.drop_stream(cursor, "test")
@ -111,36 +97,30 @@ def test_start_from_last_committed_offset(
for message in messages:
vertices_with_msg = common.execute_and_fetch_all(
cursor,
"MATCH (n: MESSAGE {" f"payload: '{message.decode('utf-8')}'" "}) RETURN n",
f"MATCH (n: MESSAGE {{payload: '{message.decode('utf-8')}'}}) RETURN n",
)
assert len(vertices_with_msg) == 0
common.execute_and_fetch_all(
cursor, "CREATE KAFKA STREAM test "
f"TOPICS {kafka_topics[0]} "
"TRANSFORM kafka_transform.simple", )
cursor,
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "test")
for message in messages:
common.kafka_check_vertex_exists_with_topic_and_payload(
cursor, kafka_topics[0], message)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_check_stream(
kafka_producer,
kafka_topics,
connection,
transformation):
def test_check_stream(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0
BATCH_SIZE = 1
INDEX_OF_FIRST_BATCH = 0
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM test "
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM {transformation} "
"BATCH_SIZE 1",
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}",
)
common.start_stream(cursor, "test")
time.sleep(1)
@ -153,24 +133,28 @@ def test_check_stream(
kafka_producer.send(kafka_topics[0], message).get(timeout=60)
def check_check_stream(batch_limit):
assert (
transformation == "kafka_transform.simple"
or transformation == "kafka_transform.with_parameters"
)
test_results = common.execute_and_fetch_all(
cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}"
)
assert transformation == "kafka_transform.simple" or transformation == "kafka_transform.with_parameters"
test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}")
assert len(test_results) == batch_limit
for i in range(batch_limit):
message_as_str = messages[i].decode("utf-8")
assert (
BATCH_SIZE == 1
) # If batch size != 1, then the usage of INDEX_OF_FIRST_BATCH must change: the result will have a list of queries (pair<parameters,query>)
if transformation == "kafka_transform.simple":
assert f"payload: '{message_as_str}'" in test_results[i][common.QUERY]
assert test_results[i][common.PARAMS] is None
assert (
f"payload: '{message_as_str}'"
in test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.QUERY_LITERAL]
)
assert test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.PARAMETERS_LITERAL] is None
else:
assert f"payload: $payload" in test_results[i][
common.QUERY] and f"topic: $topic" in test_results[i][common.QUERY]
parameters = test_results[i][common.PARAMS]
assert (
f"payload: $payload" in test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.QUERY_LITERAL]
and f"topic: $topic" in test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.QUERY_LITERAL]
)
parameters = test_results[i][common.QUERIES][INDEX_OF_FIRST_BATCH][common.PARAMETERS_LITERAL]
# this is not a very sofisticated test, but checks if
# timestamp has some kind of value
assert parameters["timestamp"] > 1000000000000
@ -183,8 +167,7 @@ def test_check_stream(
common.start_stream(cursor, "test")
for message in messages:
common.kafka_check_vertex_exists_with_topic_and_payload(
cursor, kafka_topics[0], message)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message)
def test_show_streams(kafka_producer, kafka_topics, connection):
@ -192,23 +175,15 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM default_values "
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM kafka_transform.simple "
f"BOOTSTRAP_SERVERS 'localhost:9092'",
f"CREATE KAFKA STREAM default_values TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BOOTSTRAP_SERVERS 'localhost:9092'",
)
consumer_group = "my_special_consumer_group"
batch_interval = 42
batch_size = 3
BATCH_INTERVAL = 42
BATCH_SIZE = 3
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM complex_values "
f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM kafka_transform.with_parameters "
f"CONSUMER_GROUP {consumer_group} "
f"BATCH_INTERVAL {batch_interval} "
f"BATCH_SIZE {batch_size} ",
f"CREATE KAFKA STREAM complex_values TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.with_parameters CONSUMER_GROUP {consumer_group} BATCH_INTERVAL {BATCH_INTERVAL} BATCH_SIZE {BATCH_SIZE} ",
)
assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
@ -216,13 +191,7 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
common.check_stream_info(
cursor,
"default_values",
("default_values",
"kafka",
100,
1000,
"kafka_transform.simple",
None,
False),
("default_values", "kafka", 100, 1000, "kafka_transform.simple", None, False),
)
common.check_stream_info(
@ -231,8 +200,8 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
(
"complex_values",
"kafka",
batch_interval,
batch_size,
BATCH_INTERVAL,
BATCH_SIZE,
"kafka_transform.with_parameters",
None,
False,
@ -241,15 +210,12 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
@pytest.mark.parametrize("operation", ["START", "STOP"])
def test_start_and_stop_during_check(
kafka_producer,
kafka_topics,
connection,
operation):
def test_start_and_stop_during_check(kafka_producer, kafka_topics, connection, operation):
assert len(kafka_topics) > 1
BATCH_SIZE = 1
def stream_creator(stream_name):
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple"
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {BATCH_SIZE}"
def message_sender(msg):
kafka_producer.send(kafka_topics[0], msg).get(timeout=60)
@ -259,7 +225,9 @@ def test_start_and_stop_during_check(
connection,
stream_creator,
message_sender,
"Kafka consumer test_stream is already stopped")
"Kafka consumer test_stream is already stopped",
BATCH_SIZE,
)
def test_check_already_started_stream(kafka_topics, connection):
@ -268,9 +236,7 @@ def test_check_already_started_stream(kafka_topics, connection):
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM started_stream "
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM kafka_transform.simple",
f"CREATE KAFKA STREAM started_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "started_stream")
@ -289,41 +255,29 @@ def test_restart_after_error(kafka_producer, kafka_topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM test_stream "
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM kafka_transform.query",
f"CREATE KAFKA STREAM test_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.query",
)
common.start_stream(cursor, "test_stream")
time.sleep(1)
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
assert common.timed_wait(
lambda: not common.get_is_running(
cursor, "test_stream"))
assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream"))
common.start_stream(cursor, "test_stream")
time.sleep(1)
kafka_producer.send(kafka_topics[0], b"CREATE (n:VERTEX { id : 42 })")
assert common.check_one_result_row(
cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_bootstrap_server(
kafka_producer,
kafka_topics,
connection,
transformation):
def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0
cursor = connection.cursor()
local = "localhost:9092"
LOCAL = "localhost:9092"
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM test "
f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM {transformation} "
f"BOOTSTRAP_SERVERS '{local}'",
f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation} BOOTSTRAP_SERVERS '{LOCAL}'",
)
common.start_stream(cursor, "test")
time.sleep(5)
@ -332,25 +286,17 @@ def test_bootstrap_server(
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60)
for topic in kafka_topics:
common.kafka_check_vertex_exists_with_topic_and_payload(
cursor, topic, common.SIMPLE_MSG)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_bootstrap_server_empty(
kafka_producer,
kafka_topics,
connection,
transformation):
def test_bootstrap_server_empty(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0
cursor = connection.cursor()
with pytest.raises(mgclient.DatabaseError):
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM test "
f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM {transformation} "
"BOOTSTRAP_SERVERS ''",
f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation} BOOTSTRAP_SERVERS ''",
)
@ -360,10 +306,7 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE KAFKA STREAM test "
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM {transformation} "
"BATCH_SIZE 1",
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM {transformation} BATCH_SIZE 1",
)
messages = [f"{i} message" for i in range(1, 21)]
@ -377,27 +320,18 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
else:
assert common.check_one_result_row(
cursor,
(
f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})"
"RETURN n"
),
(f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})" "RETURN n"),
)
common.stop_stream(cursor, "test")
res = common.execute_and_fetch_all(
cursor, "MATCH (n) RETURN n.payload"
)
res = common.execute_and_fetch_all(cursor, "MATCH (n) RETURN n.payload")
return res
def execute_set_offset_and_consume(id, expected_msgs):
common.execute_and_fetch_all(
cursor, f"CALL mg.kafka_set_stream_offset('test', {id})"
)
common.execute_and_fetch_all(cursor, f"CALL mg.kafka_set_stream_offset('test', {id})")
return consume(expected_msgs)
with pytest.raises(mgclient.DatabaseError):
res = common.execute_and_fetch_all(
cursor, "CALL mg.kafka_set_stream_offset('foo', 10)"
)
res = common.execute_and_fetch_all(cursor, "CALL mg.kafka_set_stream_offset('foo', 10)")
def comparison_check(a, b):
return a == str(b).strip("'(,)")
@ -426,40 +360,60 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
def test_info_procedure(kafka_topics, connection):
cursor = connection.cursor()
stream_name = 'test_stream'
configs = {"sasl.username": "michael.scott"}
local = "localhost:9092"
credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
consumer_group = "ConsumerGr"
STREAM_NAME = "test_stream"
CONFIGS = {"sasl.username": "michael.scott"}
LOCAL = "localhost:9092"
CREDENTIALS = {"sasl.password": "S3cr3tP4ssw0rd"}
CONSUMER_GROUP = "ConsumerGr"
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM {stream_name} "
f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM pulsar_transform.simple "
f"CONSUMER_GROUP {consumer_group} "
f"BOOTSTRAP_SERVERS '{local}' "
f"CONFIGS {configs} "
f"CREDENTIALS {credentials}"
f"CREATE KAFKA STREAM {STREAM_NAME} TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.simple CONSUMER_GROUP {CONSUMER_GROUP} BOOTSTRAP_SERVERS '{LOCAL}' CONFIGS {CONFIGS} CREDENTIALS {CREDENTIALS}",
)
stream_info = common.execute_and_fetch_all(
cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *")
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{STREAM_NAME}') YIELD *")
reducted_credentials = {key: "<REDUCTED>" for
key in credentials.keys()}
reducted_credentials = {key: "<REDUCTED>" for key in CREDENTIALS.keys()}
expected_stream_info = [
(local, configs, consumer_group, reducted_credentials, kafka_topics)]
expected_stream_info = [(LOCAL, CONFIGS, CONSUMER_GROUP, reducted_credentials, kafka_topics)]
common.validate_info(stream_info, expected_stream_info)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_C)
def test_load_c_transformations(connection, transformation):
cursor = connection.cursor()
query = "CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations." + transformation + "' RETURN name"
result = common.execute_and_fetch_all(
cursor, query)
query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations.{transformation}' RETURN name"
result = common.execute_and_fetch_all(cursor, query)
assert len(result) == 1
assert result[0][0] == "c_transformations." + transformation
assert result[0][0] == f"c_transformations.{transformation}"
def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
TRANSFORMATION = "common_transform.check_stream_no_filtering"
def stream_creator(stream_name, batch_size):
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}"
def message_sender(msg):
kafka_producer.send(kafka_topics[0], msg).get(timeout=60)
common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_check_stream_different_number_of_queries_than_messages(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
TRANSFORMATION = "common_transform.check_stream_with_filtering"
def stream_creator(stream_name, batch_size):
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}"
def message_sender(msg):
kafka_producer.send(kafka_topics[0], msg).get(timeout=60)
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -18,17 +18,14 @@ import time
from multiprocessing import Process, Value
import common
TRANSFORMATIONS_TO_CHECK = [
"pulsar_transform.simple",
"pulsar_transform.with_parameters"]
TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"]
def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_byte):
decoded_payload = payload_byte.decode('utf-8')
decoded_payload = payload_byte.decode("utf-8")
common.check_vertex_exists_with_properties(
cursor, {
'topic': f'"{common.pulsar_default_namespace_topic(topic)}"',
'payload': f'"{decoded_payload}"'})
cursor, {"topic": f'"{common.pulsar_default_namespace_topic(topic)}"', "payload": f'"{decoded_payload}"'}
)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
@ -37,30 +34,23 @@ def test_simple(pulsar_client, pulsar_topics, connection, transformation):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM test "
f"TOPICS '{','.join(pulsar_topics)}' "
f"TRANSFORM {transformation}",
f"CREATE PULSAR STREAM test TOPICS '{','.join(pulsar_topics)}' TRANSFORM {transformation}",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(topic),
send_timeout_millis=60000)
common.pulsar_default_namespace_topic(topic), send_timeout_millis=60000
)
producer.send(common.SIMPLE_MSG)
for topic in pulsar_topics:
check_vertex_exists_with_topic_and_payload(
cursor, topic, common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
def test_separate_consumers(
pulsar_client,
pulsar_topics,
connection,
transformation):
def test_separate_consumers(pulsar_client, pulsar_topics, connection, transformation):
assert len(pulsar_topics) > 0
cursor = connection.cursor()
@ -70,9 +60,7 @@ def test_separate_consumers(
stream_names.append(stream_name)
common.execute_and_fetch_all(
cursor,
f"CREATE PULSAR STREAM {stream_name} "
f"TOPICS {topic} "
f"TRANSFORM {transformation}",
f"CREATE PULSAR STREAM {stream_name} TOPICS {topic} TRANSFORM {transformation}",
)
for stream_name in stream_names:
@ -81,13 +69,11 @@ def test_separate_consumers(
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(
topic, send_timeout_millis=60000)
producer = pulsar_client.create_producer(topic, send_timeout_millis=60000)
producer.send(common.SIMPLE_MSG)
for topic in pulsar_topics:
check_vertex_exists_with_topic_and_payload(
cursor, topic, common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
@ -99,118 +85,112 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor, "CREATE PULSAR STREAM test "
f"TOPICS {pulsar_topics[0]} "
"TRANSFORM pulsar_transform.simple", )
cursor,
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
common.start_stream(cursor, "test")
time.sleep(1)
def assert_message_not_consumed(message):
vertices_with_msg = common.execute_and_fetch_all(
cursor,
"MATCH (n: MESSAGE {" f"payload: '{message.decode('utf-8')}'" "}) RETURN n",
f"MATCH (n: MESSAGE {{payload: '{message.decode('utf-8')}'}}) RETURN n",
)
assert len(vertices_with_msg) == 0
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(
pulsar_topics[0]), send_timeout_millis=60000)
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
producer.send(common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(
cursor, pulsar_topics[0], common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], common.SIMPLE_MSG)
common.stop_stream(cursor, "test")
next_message = b"NEXT"
producer.send(next_message)
NEXT_MESSAGE = b"NEXT"
producer.send(NEXT_MESSAGE)
assert_message_not_consumed(next_message)
assert_message_not_consumed(NEXT_MESSAGE)
common.start_stream(cursor, "test")
check_vertex_exists_with_topic_and_payload(
cursor, pulsar_topics[0], next_message)
check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], NEXT_MESSAGE)
common.stop_stream(cursor, "test")
common.drop_stream(cursor, "test")
lost_message = b"LOST"
valid_messages = [b"second message", b"third message"]
LOST_MESSAGE = b"LOST"
VALID_MESSAGES = [b"second message", b"third message"]
producer.send(lost_message)
producer.send(LOST_MESSAGE)
assert_message_not_consumed(lost_message)
assert_message_not_consumed(LOST_MESSAGE)
common.execute_and_fetch_all(
cursor, "CREATE PULSAR STREAM test "
f"TOPICS {pulsar_topics[0]} "
"TRANSFORM pulsar_transform.simple", )
cursor,
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
for message in valid_messages:
for message in VALID_MESSAGES:
producer.send(message)
assert_message_not_consumed(message)
common.start_stream(cursor, "test")
assert_message_not_consumed(lost_message)
assert_message_not_consumed(LOST_MESSAGE)
for message in valid_messages:
check_vertex_exists_with_topic_and_payload(
cursor, pulsar_topics[0], message)
for message in VALID_MESSAGES:
check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], message)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
def test_check_stream(
pulsar_client,
pulsar_topics,
connection,
transformation):
def test_check_stream(pulsar_client, pulsar_topics, connection, transformation):
assert len(pulsar_topics) > 0
BATCH_SIZE = 1
INDEX_Of_FIRST_BATCH = 0
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM test "
f"TOPICS {pulsar_topics[0]} "
f"TRANSFORM {transformation} "
"BATCH_SIZE 1",
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}",
)
common.start_stream(cursor, "test")
time.sleep(1)
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(
pulsar_topics[0]), send_timeout_millis=60000)
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
producer.send(common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(
cursor, pulsar_topics[0], common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], common.SIMPLE_MSG)
common.stop_stream(cursor, "test")
messages = [b"first message", b"second message", b"third message"]
for message in messages:
MESSAGES = [b"first message", b"second message", b"third message"]
for message in MESSAGES:
producer.send(message)
def check_check_stream(batch_limit):
assert (
transformation == "pulsar_transform.simple"
or transformation == "pulsar_transform.with_parameters"
)
test_results = common.execute_and_fetch_all(
cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}"
)
assert transformation == "pulsar_transform.simple" or transformation == "pulsar_transform.with_parameters"
test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}")
assert len(test_results) == batch_limit
for i in range(batch_limit):
message_as_str = messages[i].decode("utf-8")
message_as_str = MESSAGES[i].decode("utf-8")
assert (
BATCH_SIZE == 1
) # If batch size != 1, then the usage of INDEX_Of_FIRST_BATCH must change: the result will have a list of queries (pair<parameters,query>)
if transformation == "pulsar_transform.simple":
assert f"payload: '{message_as_str}'" in test_results[i][common.QUERY]
assert test_results[i][common.PARAMS] is None
assert (
f"payload: '{message_as_str}'"
in test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.QUERY_LITERAL]
)
assert test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.PARAMETERS_LITERAL] is None
else:
assert f"payload: $payload" in test_results[i][
common.QUERY] and f"topic: $topic" in test_results[i][common.QUERY]
parameters = test_results[i][common.PARAMS]
assert parameters["topic"] == common.pulsar_default_namespace_topic(
pulsar_topics[0])
assert (
f"payload: $payload" in test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.QUERY_LITERAL]
and f"topic: $topic" in test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.QUERY_LITERAL]
)
parameters = test_results[i][common.QUERIES][INDEX_Of_FIRST_BATCH][common.PARAMETERS_LITERAL]
assert parameters["topic"] == common.pulsar_default_namespace_topic(pulsar_topics[0])
assert parameters["payload"] == message_as_str
check_check_stream(1)
@ -218,45 +198,37 @@ def test_check_stream(
check_check_stream(3)
common.start_stream(cursor, "test")
for message in messages:
check_vertex_exists_with_topic_and_payload(
cursor, pulsar_topics[0], message)
for message in MESSAGES:
check_vertex_exists_with_topic_and_payload(cursor, pulsar_topics[0], message)
def test_info_procedure(pulsar_client, pulsar_topics, connection):
cursor = connection.cursor()
stream_name = 'test_stream'
STREAM_NAME = "test_stream"
common.execute_and_fetch_all(
cursor,
f"CREATE PULSAR STREAM {stream_name} "
f"TOPICS {','.join(pulsar_topics)} "
f"TRANSFORM pulsar_transform.simple ",
f"CREATE PULSAR STREAM {STREAM_NAME} TOPICS {','.join(pulsar_topics)} TRANSFORM pulsar_transform.simple ",
)
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.pulsar_stream_info('{stream_name}') YIELD *")
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.pulsar_stream_info('{STREAM_NAME}') YIELD *")
expected_stream_info = [(common.PULSAR_SERVICE_URL, pulsar_topics)]
common.validate_info(stream_info, expected_stream_info)
def test_show_streams(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM default_values "
f"TOPICS {pulsar_topics[0]} "
f"TRANSFORM pulsar_transform.simple ",
f"CREATE PULSAR STREAM default_values TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple ",
)
batch_interval = 42
batch_size = 3
BATCH_INTERVAL = 42
BATCH_SIZE = 3
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM complex_values "
f"TOPICS {','.join(pulsar_topics)} "
f"TRANSFORM pulsar_transform.with_parameters "
f"BATCH_INTERVAL {batch_interval} "
f"BATCH_SIZE {batch_size} ",
f"CREATE PULSAR STREAM complex_values TOPICS {','.join(pulsar_topics)} TRANSFORM pulsar_transform.with_parameters BATCH_INTERVAL {BATCH_INTERVAL} BATCH_SIZE {BATCH_SIZE} ",
)
assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
@ -264,13 +236,7 @@ def test_show_streams(pulsar_client, pulsar_topics, connection):
common.check_stream_info(
cursor,
"default_values",
("default_values",
"pulsar",
100,
1000,
"pulsar_transform.simple",
None,
False),
("default_values", "pulsar", 100, 1000, "pulsar_transform.simple", None, False),
)
common.check_stream_info(
@ -279,8 +245,8 @@ def test_show_streams(pulsar_client, pulsar_topics, connection):
(
"complex_values",
"pulsar",
batch_interval,
batch_size,
BATCH_INTERVAL,
BATCH_SIZE,
"pulsar_transform.with_parameters",
None,
False,
@ -289,19 +255,16 @@ def test_show_streams(pulsar_client, pulsar_topics, connection):
@pytest.mark.parametrize("operation", ["START", "STOP"])
def test_start_and_stop_during_check(
pulsar_client,
pulsar_topics,
connection,
operation):
def test_start_and_stop_during_check(pulsar_client, pulsar_topics, connection, operation):
assert len(pulsar_topics) > 1
BATCH_SIZE = 1
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple"
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE {BATCH_SIZE}"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(
pulsar_topics[0]), send_timeout_millis=60000)
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(msg):
producer.send(msg)
@ -311,7 +274,9 @@ def test_start_and_stop_during_check(
connection,
stream_creator,
message_sender,
"Pulsar consumer test_stream is already stopped")
"Pulsar consumer test_stream is already stopped",
BATCH_SIZE,
)
def test_check_already_started_stream(pulsar_topics, connection):
@ -320,9 +285,7 @@ def test_check_already_started_stream(pulsar_topics, connection):
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM started_stream "
f"TOPICS {pulsar_topics[0]} "
f"TRANSFORM pulsar_transform.simple",
f"CREATE PULSAR STREAM started_stream TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
common.start_stream(cursor, "started_stream")
@ -333,6 +296,7 @@ def test_check_already_started_stream(pulsar_topics, connection):
def test_start_checked_stream_after_timeout(pulsar_topics, connection):
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple"
common.test_start_checked_stream_after_timeout(connection, stream_creator)
@ -340,53 +304,80 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM test_stream "
f"TOPICS {pulsar_topics[0]} "
f"TRANSFORM pulsar_transform.query",
f"CREATE PULSAR STREAM test_stream TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.query",
)
common.start_stream(cursor, "test_stream")
time.sleep(1)
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(
pulsar_topics[0]), send_timeout_millis=60000)
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
producer.send(common.SIMPLE_MSG)
assert common.timed_wait(
lambda: not common.get_is_running(
cursor, "test_stream"))
assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream"))
common.start_stream(cursor, "test_stream")
time.sleep(1)
producer.send(b"CREATE (n:VERTEX { id : 42 })")
assert common.check_one_result_row(
cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
assert len(pulsar_topics) > 0
cursor = connection.cursor()
local = "pulsar://127.0.0.1:6650"
LOCAL = "pulsar://127.0.0.1:6650"
common.execute_and_fetch_all(
cursor,
"CREATE PULSAR STREAM test "
f"TOPICS {','.join(pulsar_topics)} "
f"TRANSFORM {transformation} "
f"SERVICE_URL '{local}'",
f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(topic),
send_timeout_millis=60000)
common.pulsar_default_namespace_topic(topic), send_timeout_millis=60000
)
producer.send(common.SIMPLE_MSG)
for topic in pulsar_topics:
check_vertex_exists_with_topic_and_payload(
cursor, topic, common.SIMPLE_MSG)
check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
TRANSFORMATION = "common_transform.check_stream_no_filtering"
def stream_creator(stream_name, batch_size):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} "
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(msg):
producer.send(msg)
common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_check_stream_different_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
TRANSFORMATION = "common_transform.check_stream_with_filtering"
def stream_creator(stream_name, batch_size):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} "
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(msg):
producer.send(msg)
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
if __name__ == "__main__":

View File

@ -1,3 +1,4 @@
copy_streams_e2e_python_files(kafka_transform.py)
copy_streams_e2e_python_files(pulsar_transform.py)
copy_streams_e2e_python_files(common_transform.py)
add_query_module(c_transformations c_transformations.cpp)

View File

@ -0,0 +1,57 @@
# Copyright 2021 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import mgp
@mgp.transformation
def check_stream_no_filtering(
context: mgp.TransCtx, messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Map):
result_queries = []
for i in range(0, messages.total_messages()):
message = messages.message_at(i)
payload_as_str = message.payload().decode("utf-8")
result_queries.append(
mgp.Record(query=f"Message: {payload_as_str}", parameters={"value": f"Parameter: {payload_as_str}"})
)
return result_queries
@mgp.transformation
def check_stream_with_filtering(
context: mgp.TransCtx, messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Map):
result_queries = []
for i in range(0, messages.total_messages()):
message = messages.message_at(i)
payload_as_str = message.payload().decode("utf-8")
if "a" in payload_as_str:
continue
result_queries.append(
mgp.Record(query=f"Message: {payload_as_str}", parameters={"value": f"Parameter: {payload_as_str}"})
)
if "b" in payload_as_str:
result_queries.append(
mgp.Record(
query=f"Message: extra_{payload_as_str}", parameters={"value": f"Parameter: extra_{payload_as_str}"}
)
)
return result_queries

View File

@ -13,9 +13,7 @@ import mgp
@mgp.transformation
def simple(
context: mgp.TransCtx, messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Map):
def simple(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map):
result_queries = []
@ -32,15 +30,15 @@ def simple(
offset: '{message.offset()}',
topic: '{message.topic_name()}'
}})""",
parameters=None))
parameters=None,
)
)
return result_queries
@mgp.transformation
def with_parameters(
context: mgp.TransCtx, messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Map):
def with_parameters(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map):
result_queries = []
@ -61,7 +59,10 @@ def with_parameters(
"timestamp": message.timestamp(),
"payload": payload_as_str,
"offset": message.offset(),
"topic": message.topic_name()}))
"topic": message.topic_name(),
},
)
)
return result_queries
@ -76,8 +77,6 @@ def query(
message = messages.message_at(i)
assert message.source_type() == mgp.SOURCE_TYPE_KAFKA
payload_as_str = message.payload().decode("utf-8")
result_queries.append(
mgp.Record(query=payload_as_str, parameters=None)
)
result_queries.append(mgp.Record(query=payload_as_str, parameters=None))
return result_queries

View File

@ -13,9 +13,7 @@ import mgp
@mgp.transformation
def simple(context: mgp.TransCtx,
messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Map):
def simple(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map):
result_queries = []
@ -30,15 +28,15 @@ def simple(context: mgp.TransCtx,
payload: '{payload_as_str}',
topic: '{message.topic_name()}'
}})""",
parameters=None))
parameters=None,
)
)
return result_queries
@mgp.transformation
def with_parameters(context: mgp.TransCtx,
messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Map):
def with_parameters(context: mgp.TransCtx, messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Map):
result_queries = []
@ -53,23 +51,21 @@ def with_parameters(context: mgp.TransCtx,
payload: $payload,
topic: $topic
})""",
parameters={
"payload": payload_as_str,
"topic": message.topic_name()}))
parameters={"payload": payload_as_str, "topic": message.topic_name()},
)
)
return result_queries
@mgp.transformation
def query(messages: mgp.Messages
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
def query(messages: mgp.Messages) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
result_queries = []
for i in range(0, messages.total_messages()):
message = messages.message_at(i)
assert message.source_type() == mgp.SOURCE_TYPE_PULSAR
payload_as_str = message.payload().decode("utf-8")
result_queries.append(mgp.Record(
query=payload_as_str, parameters=None))
result_queries.append(mgp.Record(query=payload_as_str, parameters=None))
return result_queries