Pulsar e2e tests (#296)
This commit is contained in:
parent
b66cc66503
commit
c7b045bffc
@ -45,7 +45,8 @@ pulsar_client::Result ConsumeMessage(pulsar_client::Reader &reader, pulsar_clien
|
||||
|
||||
template <PulsarConsumer TConsumer>
|
||||
utils::BasicResult<std::string, std::vector<Message>> GetBatch(TConsumer &consumer, const ConsumerInfo &info,
|
||||
std::atomic<bool> &is_running) {
|
||||
std::atomic<bool> &is_running,
|
||||
const pulsar_client::MessageId &last_message_id) {
|
||||
std::vector<Message> batch{};
|
||||
|
||||
const auto batch_size = info.batch_size.value_or(kDefaultBatchSize);
|
||||
@ -54,14 +55,16 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(TConsumer &consum
|
||||
auto remaining_timeout_in_ms = info.batch_interval.value_or(kDefaultBatchInterval).count();
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
|
||||
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size && is_running.load(); ++i) {
|
||||
while (remaining_timeout_in_ms > 0 && batch.size() < batch_size && is_running) {
|
||||
pulsar_client::Message message;
|
||||
const auto result = ConsumeMessage(consumer, message, remaining_timeout_in_ms);
|
||||
switch (result) {
|
||||
case pulsar_client::Result::ResultTimeout:
|
||||
return std::move(batch);
|
||||
case pulsar_client::Result::ResultOk:
|
||||
batch.emplace_back(Message{std::move(message)});
|
||||
if (message.getMessageId() != last_message_id) {
|
||||
batch.emplace_back(Message{std::move(message)});
|
||||
}
|
||||
break;
|
||||
default:
|
||||
spdlog::warn(fmt::format("Unexpected error while consuming message from consumer {}, error: {}",
|
||||
@ -87,15 +90,7 @@ class SpdlogLogger : public pulsar_client::Logger {
|
||||
};
|
||||
|
||||
class SpdlogLoggerFactory : public pulsar_client::LoggerFactory {
|
||||
pulsar_client::Logger *getLogger(const std::string & /*file_name*/) override {
|
||||
if (!logger_) {
|
||||
logger_ = std::make_unique<SpdlogLogger>();
|
||||
}
|
||||
return logger_.get();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<SpdlogLogger> logger_;
|
||||
pulsar_client::Logger *getLogger(const std::string & /*file_name*/) override { return new SpdlogLogger; }
|
||||
};
|
||||
|
||||
pulsar_client::Client CreateClient(const std::string &service_url) {
|
||||
@ -111,6 +106,8 @@ std::span<const char> Message::Payload() const {
|
||||
return {static_cast<const char *>(message_.getData()), message_.getLength()};
|
||||
}
|
||||
|
||||
std::string_view Message::TopicName() const { return message_.getTopicName(); }
|
||||
|
||||
Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
|
||||
: info_{std::move(info)},
|
||||
client_{CreateClient(info_.service_url)},
|
||||
@ -203,7 +200,7 @@ void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::opti
|
||||
throw ConsumerCheckFailedException(info_.consumer_name, "Timeout reached");
|
||||
}
|
||||
|
||||
auto maybe_batch = GetBatch(reader, info_, is_running_);
|
||||
auto maybe_batch = GetBatch(reader, info_, is_running_, last_message_id_);
|
||||
|
||||
if (maybe_batch.HasError()) {
|
||||
throw ConsumerCheckFailedException(info_.consumer_name, maybe_batch.GetError());
|
||||
@ -241,7 +238,7 @@ void Consumer::StartConsuming() {
|
||||
utils::ThreadSetName(full_thread_name.substr(0, kMaxThreadNameSize));
|
||||
|
||||
while (is_running_) {
|
||||
auto maybe_batch = GetBatch(consumer_, info_, is_running_);
|
||||
auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_);
|
||||
|
||||
if (maybe_batch.HasError()) {
|
||||
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
|
||||
|
@ -28,6 +28,7 @@ class Message final {
|
||||
explicit Message(pulsar_client::Message &&message);
|
||||
|
||||
std::span<const char> Payload() const;
|
||||
std::string_view TopicName() const;
|
||||
|
||||
private:
|
||||
pulsar_client::Message message_;
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/temporal.hpp"
|
||||
#include "utils/variant_helpers.hpp"
|
||||
|
||||
// This file contains implementation of top level C API functions, but this is
|
||||
// all actually part of query::procedure. So use that namespace for simplicity.
|
||||
@ -2496,16 +2497,9 @@ mgp_error mgp_message_payload(mgp_message *message, const char **result) {
|
||||
return WrapExceptions(
|
||||
[message] {
|
||||
return std::visit(
|
||||
[]<typename T>(T &&msg) -> const char * {
|
||||
using MessageType = std::decay_t<T>;
|
||||
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
|
||||
return msg->Payload().data();
|
||||
} else if constexpr (std::same_as<MessageType, mgp_message::PulsarMessage>) {
|
||||
return msg.Payload().data();
|
||||
} else {
|
||||
throw std::invalid_argument("Invalid source type");
|
||||
}
|
||||
},
|
||||
utils::Overloaded{[](const mgp_message::KafkaMessage &msg) { return msg->Payload().data(); },
|
||||
[](const mgp_message::PulsarMessage &msg) { return msg.Payload().data(); },
|
||||
[](const auto & /*other*/) { throw std::invalid_argument("Invalid source type"); }},
|
||||
message->msg);
|
||||
},
|
||||
result);
|
||||
@ -2515,16 +2509,9 @@ mgp_error mgp_message_payload_size(mgp_message *message, size_t *result) {
|
||||
return WrapExceptions(
|
||||
[message] {
|
||||
return std::visit(
|
||||
[]<typename T>(T &&msg) -> size_t {
|
||||
using MessageType = std::decay_t<T>;
|
||||
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
|
||||
return msg->Payload().size();
|
||||
} else if constexpr (std::same_as<MessageType, mgp_message::PulsarMessage>) {
|
||||
return msg.Payload().size();
|
||||
} else {
|
||||
throw std::invalid_argument("Invalid source type");
|
||||
}
|
||||
},
|
||||
utils::Overloaded{[](const mgp_message::KafkaMessage &msg) { return msg->Payload().size(); },
|
||||
[](const mgp_message::PulsarMessage &msg) { return msg.Payload().size(); },
|
||||
[](const auto & /*other*/) { throw std::invalid_argument("Invalid source type"); }},
|
||||
message->msg);
|
||||
},
|
||||
result);
|
||||
@ -2534,14 +2521,9 @@ mgp_error mgp_message_topic_name(mgp_message *message, const char **result) {
|
||||
return WrapExceptions(
|
||||
[message] {
|
||||
return std::visit(
|
||||
[]<typename T>(T &&msg) -> const char * {
|
||||
using MessageType = std::decay_t<T>;
|
||||
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
|
||||
return msg->TopicName().data();
|
||||
} else {
|
||||
throw std::invalid_argument("Invalid source type");
|
||||
}
|
||||
},
|
||||
utils::Overloaded{[](const mgp_message::KafkaMessage &msg) { return msg->TopicName().data(); },
|
||||
[](const mgp_message::PulsarMessage &msg) { return msg.TopicName().data(); },
|
||||
[](const auto & /*other*/) { throw std::invalid_argument("Invalid source type"); }},
|
||||
message->msg);
|
||||
},
|
||||
result);
|
||||
|
@ -4,7 +4,8 @@ endfunction()
|
||||
|
||||
copy_streams_e2e_python_files(common.py)
|
||||
copy_streams_e2e_python_files(conftest.py)
|
||||
copy_streams_e2e_python_files(streams_tests.py)
|
||||
copy_streams_e2e_python_files(kafka_streams_tests.py)
|
||||
copy_streams_e2e_python_files(streams_owner_tests.py)
|
||||
copy_streams_e2e_python_files(pulsar_streams_tests.py)
|
||||
|
||||
add_subdirectory(transformations)
|
||||
|
@ -12,6 +12,8 @@
|
||||
import mgclient
|
||||
import time
|
||||
|
||||
from multiprocessing import Process, Value
|
||||
|
||||
# These are the indices of the different values in the result of SHOW STREAM
|
||||
# query
|
||||
NAME = 0
|
||||
@ -21,6 +23,13 @@ TRANSFORM = 3
|
||||
OWNER = 4
|
||||
IS_RUNNING = 5
|
||||
|
||||
# These are the indices of the query and parameters in the result of CHECK
|
||||
# STREAM query
|
||||
QUERY = 0
|
||||
PARAMS = 1
|
||||
|
||||
SIMPLE_MSG = b"message"
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor, query):
|
||||
cursor.execute(query)
|
||||
@ -70,12 +79,12 @@ def check_one_result_row(cursor, query):
|
||||
return len(results) == 1
|
||||
|
||||
|
||||
def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
|
||||
def check_vertex_exists_with_properties(cursor, properties):
|
||||
properties_string = ', '.join([f'{k}: {v}' for k, v in properties.items()])
|
||||
assert check_one_result_row(
|
||||
cursor,
|
||||
"MATCH (n: MESSAGE {"
|
||||
f"payload: '{payload_bytes.decode('utf-8')}',"
|
||||
f"topic: '{topic}'"
|
||||
f"{properties_string}"
|
||||
"}) RETURN n",
|
||||
)
|
||||
|
||||
@ -119,3 +128,152 @@ def check_stream_info(cursor, stream_name, expected_stream_info):
|
||||
assert len(stream_info) == len(expected_stream_info)
|
||||
for info, expected_info in zip(stream_info, expected_stream_info):
|
||||
assert info == expected_info
|
||||
|
||||
def kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
|
||||
decoded_payload = payload_bytes.decode('utf-8')
|
||||
check_vertex_exists_with_properties(
|
||||
cursor, {'topic': f'"{topic}"', 'payload': f'"{decoded_payload}"'})
|
||||
|
||||
|
||||
def pulsar_default_namespace_topic(topic):
|
||||
return f'persistent://public/default/{topic}'
|
||||
|
||||
|
||||
def test_start_and_stop_during_check(
|
||||
operation,
|
||||
connection,
|
||||
stream_creator,
|
||||
message_sender,
|
||||
already_stopped_error):
|
||||
# This test is quite complex. The goal is to call START/STOP queries
|
||||
# while a CHECK query is waiting for its result. Because the Global
|
||||
# Interpreter Lock, running queries on multiple threads is not useful,
|
||||
# because only one of them can call Cursor::execute at a time. Therefore
|
||||
# multiple processes are used to execute the queries, because different
|
||||
# processes have different GILs.
|
||||
# The counter variables are thread- and process-safe variables to
|
||||
# synchronize between the different processes. Each value represents a
|
||||
# specific phase of the execution of the processes.
|
||||
assert operation in ["START", "STOP"]
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
stream_creator('test_stream')
|
||||
)
|
||||
|
||||
check_counter = Value("i", 0)
|
||||
check_result_len = Value("i", 0)
|
||||
operation_counter = Value("i", 0)
|
||||
|
||||
CHECK_BEFORE_EXECUTE = 1
|
||||
CHECK_AFTER_FETCHALL = 2
|
||||
CHECK_CORRECT_RESULT = 3
|
||||
CHECK_INCORRECT_RESULT = 4
|
||||
|
||||
def call_check(counter, result_len):
|
||||
# This process will call the CHECK query and increment the counter
|
||||
# based on its progress and expected behavior
|
||||
connection = connect()
|
||||
cursor = connection.cursor()
|
||||
counter.value = CHECK_BEFORE_EXECUTE
|
||||
result = execute_and_fetch_all(cursor, "CHECK STREAM test_stream")
|
||||
result_len.value = len(result)
|
||||
counter.value = CHECK_AFTER_FETCHALL
|
||||
if len(result) > 0 and "payload: 'message'" in result[0][QUERY]:
|
||||
counter.value = CHECK_CORRECT_RESULT
|
||||
else:
|
||||
counter.value = CHECK_INCORRECT_RESULT
|
||||
|
||||
OP_BEFORE_EXECUTE = 1
|
||||
OP_AFTER_FETCHALL = 2
|
||||
OP_ALREADY_STOPPED_EXCEPTION = 3
|
||||
OP_INCORRECT_ALREADY_STOPPED_EXCEPTION = 4
|
||||
OP_UNEXPECTED_EXCEPTION = 5
|
||||
|
||||
def call_operation(counter):
|
||||
# This porcess will call the query with the specified operation and
|
||||
# increment the counter based on its progress and expected behavior
|
||||
connection = connect()
|
||||
cursor = connection.cursor()
|
||||
counter.value = OP_BEFORE_EXECUTE
|
||||
try:
|
||||
execute_and_fetch_all(cursor, f"{operation} STREAM test_stream")
|
||||
counter.value = OP_AFTER_FETCHALL
|
||||
except mgclient.DatabaseError as e:
|
||||
if already_stopped_error in str(e):
|
||||
counter.value = OP_ALREADY_STOPPED_EXCEPTION
|
||||
else:
|
||||
counter.value = OP_INCORRECT_ALREADY_STOPPED_EXCEPTION
|
||||
except Exception:
|
||||
counter.value = OP_UNEXPECTED_EXCEPTION
|
||||
|
||||
check_stream_proc = Process(
|
||||
target=call_check, daemon=True, args=(check_counter, check_result_len)
|
||||
)
|
||||
operation_proc = Process(
|
||||
target=call_operation, daemon=True, args=(operation_counter,)
|
||||
)
|
||||
|
||||
try:
|
||||
check_stream_proc.start()
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE)
|
||||
assert timed_wait(lambda: get_is_running(cursor, "test_stream"))
|
||||
assert check_counter.value == CHECK_BEFORE_EXECUTE, (
|
||||
"SHOW STREAMS " "was blocked until the end of CHECK STREAM"
|
||||
)
|
||||
operation_proc.start()
|
||||
assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE)
|
||||
|
||||
message_sender(SIMPLE_MSG)
|
||||
assert timed_wait(lambda: check_counter.value > CHECK_AFTER_FETCHALL)
|
||||
assert check_counter.value == CHECK_CORRECT_RESULT
|
||||
assert check_result_len.value == 1
|
||||
check_stream_proc.join()
|
||||
|
||||
operation_proc.join()
|
||||
if operation == "START":
|
||||
assert operation_counter.value == OP_AFTER_FETCHALL
|
||||
assert get_is_running(cursor, "test_stream")
|
||||
else:
|
||||
assert operation_counter.value == OP_ALREADY_STOPPED_EXCEPTION
|
||||
assert not get_is_running(cursor, "test_stream")
|
||||
|
||||
finally:
|
||||
# to make sure CHECK STREAM finishes
|
||||
message_sender(SIMPLE_MSG)
|
||||
if check_stream_proc.is_alive():
|
||||
check_stream_proc.terminate()
|
||||
if operation_proc.is_alive():
|
||||
operation_proc.terminate()
|
||||
|
||||
def test_start_checked_stream_after_timeout(connection, stream_creator):
|
||||
cursor = connection.cursor()
|
||||
execute_and_fetch_all(
|
||||
cursor,
|
||||
stream_creator('test_stream')
|
||||
)
|
||||
|
||||
timeout_ms = 2000
|
||||
|
||||
def call_check():
|
||||
execute_and_fetch_all(
|
||||
connect().cursor(),
|
||||
f"CHECK STREAM test_stream TIMEOUT {timeout_ms}")
|
||||
|
||||
check_stream_proc = Process(target=call_check, daemon=True)
|
||||
|
||||
start = time.time()
|
||||
check_stream_proc.start()
|
||||
assert timed_wait(
|
||||
lambda: get_is_running(
|
||||
cursor, "test_stream"))
|
||||
start_stream(cursor, "test_stream")
|
||||
end = time.time()
|
||||
|
||||
assert (end - start) < 1.3 * \
|
||||
timeout_ms, "The START STREAM was blocked too long"
|
||||
assert get_is_running(cursor, "test_stream")
|
||||
stop_stream(cursor, "test_stream")
|
||||
|
@ -13,6 +13,9 @@ import pytest
|
||||
from kafka import KafkaProducer
|
||||
from kafka.admin import KafkaAdminClient, NewTopic
|
||||
|
||||
import pulsar
|
||||
import requests
|
||||
|
||||
from common import NAME, connect, execute_and_fetch_all
|
||||
|
||||
# To run these test locally a running Kafka sever is necessery. The test tries
|
||||
@ -33,20 +36,30 @@ def connection():
|
||||
execute_and_fetch_all(cursor, f"DROP USER {username}")
|
||||
|
||||
|
||||
def get_topics(num):
|
||||
return [f'topic_{i}' for i in range(num)]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def topics():
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id="test")
|
||||
# The issue arises if we remove default kafka topics, e.g. "__consumer_offsets"
|
||||
previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
|
||||
def kafka_topics():
|
||||
admin_client = KafkaAdminClient(
|
||||
bootstrap_servers="localhost:9092",
|
||||
client_id="test")
|
||||
# The issue arises if we remove default kafka topics, e.g.
|
||||
# "__consumer_offsets"
|
||||
previous_topics = [
|
||||
topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
|
||||
if previous_topics:
|
||||
admin_client.delete_topics(topics=previous_topics, timeout_ms=5000)
|
||||
|
||||
topics = []
|
||||
topics = get_topics(3)
|
||||
topics_to_create = []
|
||||
for index in range(3):
|
||||
topic = f"topic_{index}"
|
||||
topics.append(topic)
|
||||
topics_to_create.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
|
||||
for topic in topics:
|
||||
topics_to_create.append(
|
||||
NewTopic(
|
||||
name=topic,
|
||||
num_partitions=1,
|
||||
replication_factor=1))
|
||||
|
||||
admin_client.create_topics(new_topics=topics_to_create, timeout_ms=5000)
|
||||
yield topics
|
||||
@ -54,5 +67,19 @@ def topics():
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def producer():
|
||||
def kafka_producer():
|
||||
yield KafkaProducer(bootstrap_servers="localhost:9092")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pulsar_client():
|
||||
yield pulsar.Client('pulsar://127.0.0.1:6650')
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pulsar_topics():
|
||||
topics = get_topics(3)
|
||||
for topic in topics:
|
||||
requests.delete(
|
||||
f'http://127.0.0.1:6652/admin/v2/persistent/public/default/{topic}?force=true')
|
||||
yield topics
|
||||
|
@ -18,3 +18,9 @@ services:
|
||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||
depends_on:
|
||||
- zookeeper
|
||||
pulsar:
|
||||
image: 'apachepulsar/pulsar:2.8.1'
|
||||
ports:
|
||||
- '6652:8080'
|
||||
- '6650:6650'
|
||||
entrypoint: ['bin/pulsar', 'standalone']
|
||||
|
422
tests/e2e/streams/kafka_streams_tests.py
Executable file
422
tests/e2e/streams/kafka_streams_tests.py
Executable file
@ -0,0 +1,422 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Copyright 2021 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
import mgclient
|
||||
import time
|
||||
from multiprocessing import Process, Value
|
||||
import common
|
||||
|
||||
TRANSFORMATIONS_TO_CHECK = [
|
||||
"kafka_transform.simple",
|
||||
"kafka_transform.with_parameters"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_simple(kafka_producer, kafka_topics, connection, transformation):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {','.join(kafka_topics)} "
|
||||
f"TRANSFORM {transformation}",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in kafka_topics:
|
||||
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in kafka_topics:
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, common.SIMPLE_MSG)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_separate_consumers(
|
||||
kafka_producer,
|
||||
kafka_topics,
|
||||
connection,
|
||||
transformation):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
stream_names = []
|
||||
for topic in kafka_topics:
|
||||
stream_name = "stream_" + topic
|
||||
stream_names.append(stream_name)
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CREATE KAFKA STREAM {stream_name} "
|
||||
f"TOPICS {topic} "
|
||||
f"TRANSFORM {transformation}",
|
||||
)
|
||||
|
||||
for stream_name in stream_names:
|
||||
common.start_stream(cursor, stream_name)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
for topic in kafka_topics:
|
||||
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in kafka_topics:
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, common.SIMPLE_MSG)
|
||||
|
||||
|
||||
def test_start_from_last_committed_offset(
|
||||
kafka_producer, kafka_topics, connection):
|
||||
# This test creates a stream, consumes a message to have a committed
|
||||
# offset, then destroys the stream. A new message is sent before the
|
||||
# stream is recreated and then restarted. This simulates when Memgraph is
|
||||
# stopped (stream is destroyed) and then restarted (stream is recreated).
|
||||
# This is of course not as good as restarting memgraph would be, but
|
||||
# restarting Memgraph during a single workload cannot be done currently.
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor, "CREATE KAFKA STREAM test "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
"TRANSFORM kafka_transform.simple", )
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
cursor, kafka_topics[0], common.SIMPLE_MSG)
|
||||
|
||||
common.stop_stream(cursor, "test")
|
||||
common.drop_stream(cursor, "test")
|
||||
|
||||
messages = [b"second message", b"third message"]
|
||||
for message in messages:
|
||||
kafka_producer.send(kafka_topics[0], message).get(timeout=60)
|
||||
|
||||
for message in messages:
|
||||
vertices_with_msg = common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"MATCH (n: MESSAGE {" f"payload: '{message.decode('utf-8')}'" "}) RETURN n",
|
||||
)
|
||||
|
||||
assert len(vertices_with_msg) == 0
|
||||
|
||||
common.execute_and_fetch_all(
|
||||
cursor, "CREATE KAFKA STREAM test "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
"TRANSFORM kafka_transform.simple", )
|
||||
common.start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
cursor, kafka_topics[0], message)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_check_stream(
|
||||
kafka_producer,
|
||||
kafka_topics,
|
||||
connection,
|
||||
transformation):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
|
||||
common.stop_stream(cursor, "test")
|
||||
|
||||
messages = [b"first message", b"second message", b"third message"]
|
||||
for message in messages:
|
||||
kafka_producer.send(kafka_topics[0], message).get(timeout=60)
|
||||
|
||||
def check_check_stream(batch_limit):
|
||||
assert (
|
||||
transformation == "kafka_transform.simple"
|
||||
or transformation == "kafka_transform.with_parameters"
|
||||
)
|
||||
test_results = common.execute_and_fetch_all(
|
||||
cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}"
|
||||
)
|
||||
assert len(test_results) == batch_limit
|
||||
|
||||
for i in range(batch_limit):
|
||||
message_as_str = messages[i].decode("utf-8")
|
||||
if transformation == "kafka_transform.simple":
|
||||
assert f"payload: '{message_as_str}'" in test_results[i][common.QUERY]
|
||||
assert test_results[i][common.PARAMS] is None
|
||||
else:
|
||||
assert test_results[i][common.QUERY] == (
|
||||
"CREATE (n:MESSAGE "
|
||||
"{timestamp: $timestamp, "
|
||||
"payload: $payload, "
|
||||
"topic: $topic})"
|
||||
)
|
||||
parameters = test_results[i][common.PARAMS]
|
||||
# this is not a very sofisticated test, but checks if
|
||||
# timestamp has some kind of value
|
||||
assert parameters["timestamp"] > 1000000000000
|
||||
assert parameters["topic"] == kafka_topics[0]
|
||||
assert parameters["payload"] == message_as_str
|
||||
|
||||
check_check_stream(1)
|
||||
check_check_stream(2)
|
||||
check_check_stream(3)
|
||||
common.start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
cursor, kafka_topics[0], message)
|
||||
|
||||
|
||||
def test_show_streams(kafka_producer, kafka_topics, connection):
|
||||
assert len(kafka_topics) > 1
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM default_values "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.simple "
|
||||
f"BOOTSTRAP_SERVERS 'localhost:9092'",
|
||||
)
|
||||
|
||||
consumer_group = "my_special_consumer_group"
|
||||
batch_interval = 42
|
||||
batch_size = 3
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM complex_values "
|
||||
f"TOPICS {','.join(kafka_topics)} "
|
||||
f"TRANSFORM kafka_transform.with_parameters "
|
||||
f"CONSUMER_GROUP {consumer_group} "
|
||||
f"BATCH_INTERVAL {batch_interval} "
|
||||
f"BATCH_SIZE {batch_size} ",
|
||||
)
|
||||
|
||||
assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
|
||||
|
||||
common.check_stream_info(
|
||||
cursor,
|
||||
"default_values",
|
||||
("default_values", None, None, "kafka_transform.simple", None, False),
|
||||
)
|
||||
|
||||
common.check_stream_info(
|
||||
cursor,
|
||||
"complex_values",
|
||||
(
|
||||
"complex_values",
|
||||
batch_interval,
|
||||
batch_size,
|
||||
"kafka_transform.with_parameters",
|
||||
None,
|
||||
False,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("operation", ["START", "STOP"])
|
||||
def test_start_and_stop_during_check(
|
||||
kafka_producer,
|
||||
kafka_topics,
|
||||
connection,
|
||||
operation):
|
||||
assert len(kafka_topics) > 1
|
||||
|
||||
def stream_creator(stream_name):
|
||||
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple"
|
||||
|
||||
def message_sender(msg):
|
||||
kafka_producer.send(kafka_topics[0], msg).get(timeout=60)
|
||||
|
||||
common.test_start_and_stop_during_check(
|
||||
operation,
|
||||
connection,
|
||||
stream_creator,
|
||||
message_sender,
|
||||
"Kafka consumer test_stream is already stopped")
|
||||
|
||||
|
||||
def test_check_already_started_stream(kafka_topics, connection):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM started_stream "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.simple",
|
||||
)
|
||||
common.start_stream(cursor, "started_stream")
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
common.execute_and_fetch_all(cursor, "CHECK STREAM started_stream")
|
||||
|
||||
|
||||
def test_start_checked_stream_after_timeout(kafka_topics, connection):
|
||||
def stream_creator(stream_name):
|
||||
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple"
|
||||
|
||||
common.test_start_checked_stream_after_timeout(connection, stream_creator)
|
||||
|
||||
|
||||
def test_restart_after_error(kafka_producer, kafka_topics, connection):
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test_stream "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.query",
|
||||
)
|
||||
|
||||
common.start_stream(cursor, "test_stream")
|
||||
time.sleep(1)
|
||||
|
||||
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
|
||||
assert common.timed_wait(
|
||||
lambda: not common.get_is_running(
|
||||
cursor, "test_stream"))
|
||||
|
||||
common.start_stream(cursor, "test_stream")
|
||||
time.sleep(1)
|
||||
kafka_producer.send(kafka_topics[0], b"CREATE (n:VERTEX { id : 42 })")
|
||||
assert common.check_one_result_row(
|
||||
cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_bootstrap_server(
|
||||
kafka_producer,
|
||||
kafka_topics,
|
||||
connection,
|
||||
transformation):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
local = "localhost:9092"
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {','.join(kafka_topics)} "
|
||||
f"TRANSFORM {transformation} "
|
||||
f"BOOTSTRAP_SERVERS '{local}'",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in kafka_topics:
|
||||
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in kafka_topics:
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, common.SIMPLE_MSG)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_bootstrap_server_empty(
|
||||
kafka_producer,
|
||||
kafka_topics,
|
||||
connection,
|
||||
transformation):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {','.join(kafka_topics)} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BOOTSTRAP_SERVERS ''",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_set_offset(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1",
|
||||
)
|
||||
|
||||
messages = [f"{i} message" for i in range(1, 21)]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message.encode()).get(timeout=60)
|
||||
|
||||
def consume(expected_msgs):
|
||||
common.start_stream(cursor, "test")
|
||||
if len(expected_msgs) == 0:
|
||||
time.sleep(2)
|
||||
else:
|
||||
assert common.check_one_result_row(
|
||||
cursor,
|
||||
(
|
||||
f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})"
|
||||
"RETURN n"
|
||||
),
|
||||
)
|
||||
common.stop_stream(cursor, "test")
|
||||
res = common.execute_and_fetch_all(
|
||||
cursor, "MATCH (n) RETURN n.payload"
|
||||
)
|
||||
return res
|
||||
|
||||
def execute_set_offset_and_consume(id, expected_msgs):
|
||||
common.execute_and_fetch_all(
|
||||
cursor, f"CALL mg.kafka_set_stream_offset('test', {id})"
|
||||
)
|
||||
return consume(expected_msgs)
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
res = common.execute_and_fetch_all(
|
||||
cursor, "CALL mg.kafka_set_stream_offset('foo', 10)"
|
||||
)
|
||||
|
||||
def comparison_check(a, b):
|
||||
return a == str(b).strip("'(,)")
|
||||
|
||||
res = execute_set_offset_and_consume(10, messages[10:])
|
||||
assert len(res) == 10
|
||||
assert all([comparison_check(a, b) for a, b in zip(messages[10:], res)])
|
||||
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
|
||||
res = execute_set_offset_and_consume(-1, messages)
|
||||
assert len(res) == len(messages)
|
||||
assert all([comparison_check(a, b) for a, b in zip(messages, res)])
|
||||
res = common.execute_and_fetch_all(cursor, "MATCH (n) return n.offset")
|
||||
assert all([comparison_check(str(i), res[i]) for i in range(1, 20)])
|
||||
res = common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
|
||||
res = execute_set_offset_and_consume(-2, [])
|
||||
assert len(res) == 0
|
||||
last_msg = "Final Message"
|
||||
producer.send(topics[0], last_msg.encode()).get(timeout=60)
|
||||
res = consume([last_msg])
|
||||
assert len(res) == 1
|
||||
assert comparison_check("Final Message", res[0])
|
||||
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
373
tests/e2e/streams/pulsar_streams_tests.py
Executable file
373
tests/e2e/streams/pulsar_streams_tests.py
Executable file
@ -0,0 +1,373 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Copyright 2021 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
import mgclient
|
||||
import time
|
||||
from multiprocessing import Process, Value
|
||||
import common
|
||||
|
||||
TRANSFORMATIONS_TO_CHECK = [
|
||||
"pulsar_transform.simple",
|
||||
"pulsar_transform.with_parameters"]
|
||||
|
||||
|
||||
def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_byte):
|
||||
decoded_payload = payload_byte.decode('utf-8')
|
||||
common.check_vertex_exists_with_properties(
|
||||
cursor, {
|
||||
'topic': f'"{common.pulsar_default_namespace_topic(topic)}"', 'payload': f'"{decoded_payload}"'})
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_simple(pulsar_client, pulsar_topics, connection, transformation):
|
||||
assert len(pulsar_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM test "
|
||||
f"TOPICS '{','.join(pulsar_topics)}' "
|
||||
f"TRANSFORM {transformation}",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in pulsar_topics:
|
||||
producer = pulsar_client.create_producer(
|
||||
common.pulsar_default_namespace_topic(topic),
|
||||
send_timeout_millis=60000)
|
||||
producer.send(common.SIMPLE_MSG)
|
||||
|
||||
for topic in pulsar_topics:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, common.SIMPLE_MSG)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_separate_consumers(
|
||||
pulsar_client,
|
||||
pulsar_topics,
|
||||
connection,
|
||||
transformation):
|
||||
assert len(pulsar_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
stream_names = []
|
||||
for topic in pulsar_topics:
|
||||
stream_name = "stream_" + topic
|
||||
stream_names.append(stream_name)
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CREATE PULSAR STREAM {stream_name} "
|
||||
f"TOPICS {topic} "
|
||||
f"TRANSFORM {transformation}",
|
||||
)
|
||||
|
||||
for stream_name in stream_names:
|
||||
common.start_stream(cursor, stream_name)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
for topic in pulsar_topics:
|
||||
producer = pulsar_client.create_producer(
|
||||
topic, send_timeout_millis=60000)
|
||||
producer.send(common.SIMPLE_MSG)
|
||||
|
||||
for topic in pulsar_topics:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, common.SIMPLE_MSG)
|
||||
|
||||
|
||||
def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
|
||||
# This test creates a stream, consumes a message, then destroys the stream. A new message is sent before the
|
||||
# stream is recreated, and additional messages after the stream was recreated. Pulsar consumer
|
||||
# should only receive message that were sent after the consumer was created. Everything
|
||||
# inbetween should be lost. Additionally, we check that consumer continues from the correct message
|
||||
# after stopping and starting again.
|
||||
assert len(pulsar_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor, "CREATE PULSAR STREAM test "
|
||||
f"TOPICS {pulsar_topics[0]} "
|
||||
"TRANSFORM pulsar_transform.simple", )
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
def assert_message_not_consumed(message):
|
||||
vertices_with_msg = common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"MATCH (n: MESSAGE {" f"payload: '{message.decode('utf-8')}'" "}) RETURN n",
|
||||
)
|
||||
|
||||
assert len(vertices_with_msg) == 0
|
||||
|
||||
producer = pulsar_client.create_producer(
|
||||
common.pulsar_default_namespace_topic(
|
||||
pulsar_topics[0]), send_timeout_millis=60000)
|
||||
producer.send(common.SIMPLE_MSG)
|
||||
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, pulsar_topics[0], common.SIMPLE_MSG)
|
||||
|
||||
common.stop_stream(cursor, "test")
|
||||
|
||||
next_message = b"NEXT"
|
||||
producer.send(next_message)
|
||||
|
||||
assert_message_not_consumed(next_message)
|
||||
|
||||
common.start_stream(cursor, "test")
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, pulsar_topics[0], next_message)
|
||||
common.stop_stream(cursor, "test")
|
||||
|
||||
common.drop_stream(cursor, "test")
|
||||
|
||||
lost_message = b"LOST"
|
||||
valid_messages = [b"second message", b"third message"]
|
||||
|
||||
producer.send(lost_message)
|
||||
|
||||
assert_message_not_consumed(lost_message)
|
||||
|
||||
common.execute_and_fetch_all(
|
||||
cursor, "CREATE PULSAR STREAM test "
|
||||
f"TOPICS {pulsar_topics[0]} "
|
||||
"TRANSFORM pulsar_transform.simple", )
|
||||
|
||||
for message in valid_messages:
|
||||
producer.send(message)
|
||||
assert_message_not_consumed(message)
|
||||
|
||||
common.start_stream(cursor, "test")
|
||||
|
||||
assert_message_not_consumed(lost_message)
|
||||
|
||||
for message in valid_messages:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, pulsar_topics[0], message)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_check_stream(
|
||||
pulsar_client,
|
||||
pulsar_topics,
|
||||
connection,
|
||||
transformation):
|
||||
assert len(pulsar_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM test "
|
||||
f"TOPICS {pulsar_topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
producer = pulsar_client.create_producer(
|
||||
common.pulsar_default_namespace_topic(
|
||||
pulsar_topics[0]), send_timeout_millis=60000)
|
||||
producer.send(common.SIMPLE_MSG)
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, pulsar_topics[0], common.SIMPLE_MSG)
|
||||
common.stop_stream(cursor, "test")
|
||||
|
||||
messages = [b"first message", b"second message", b"third message"]
|
||||
for message in messages:
|
||||
producer.send(message)
|
||||
|
||||
def check_check_stream(batch_limit):
|
||||
assert (
|
||||
transformation == "pulsar_transform.simple"
|
||||
or transformation == "pulsar_transform.with_parameters"
|
||||
)
|
||||
test_results = common.execute_and_fetch_all(
|
||||
cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}"
|
||||
)
|
||||
assert len(test_results) == batch_limit
|
||||
|
||||
for i in range(batch_limit):
|
||||
message_as_str = messages[i].decode("utf-8")
|
||||
if transformation == "pulsar_transform.simple":
|
||||
assert f"payload: '{message_as_str}'" in test_results[i][common.QUERY]
|
||||
assert test_results[i][common.PARAMS] is None
|
||||
else:
|
||||
assert test_results[i][common.QUERY] == (
|
||||
"CREATE (n:MESSAGE "
|
||||
"{payload: $payload, "
|
||||
"topic: $topic})"
|
||||
)
|
||||
parameters = test_results[i][common.PARAMS]
|
||||
assert parameters["topic"] == common.pulsar_default_namespace_topic(
|
||||
pulsar_topics[0])
|
||||
assert parameters["payload"] == message_as_str
|
||||
|
||||
check_check_stream(1)
|
||||
check_check_stream(2)
|
||||
check_check_stream(3)
|
||||
common.start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, pulsar_topics[0], message)
|
||||
|
||||
|
||||
def test_show_streams(pulsar_client, pulsar_topics, connection):
|
||||
assert len(pulsar_topics) > 1
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM default_values "
|
||||
f"TOPICS {pulsar_topics[0]} "
|
||||
f"TRANSFORM pulsar_transform.simple ",
|
||||
)
|
||||
|
||||
batch_interval = 42
|
||||
batch_size = 3
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM complex_values "
|
||||
f"TOPICS {','.join(pulsar_topics)} "
|
||||
f"TRANSFORM pulsar_transform.with_parameters "
|
||||
f"BATCH_INTERVAL {batch_interval} "
|
||||
f"BATCH_SIZE {batch_size} ",
|
||||
)
|
||||
|
||||
assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
|
||||
|
||||
common.check_stream_info(
|
||||
cursor,
|
||||
"default_values",
|
||||
("default_values", None, None, "pulsar_transform.simple", None, False),
|
||||
)
|
||||
|
||||
common.check_stream_info(
|
||||
cursor,
|
||||
"complex_values",
|
||||
(
|
||||
"complex_values",
|
||||
batch_interval,
|
||||
batch_size,
|
||||
"pulsar_transform.with_parameters",
|
||||
None,
|
||||
False,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("operation", ["START", "STOP"])
|
||||
def test_start_and_stop_during_check(
|
||||
pulsar_client,
|
||||
pulsar_topics,
|
||||
connection,
|
||||
operation):
|
||||
assert len(pulsar_topics) > 1
|
||||
|
||||
def stream_creator(stream_name):
|
||||
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple"
|
||||
|
||||
producer = pulsar_client.create_producer(
|
||||
common.pulsar_default_namespace_topic(
|
||||
pulsar_topics[0]), send_timeout_millis=60000)
|
||||
|
||||
def message_sender(msg):
|
||||
producer.send(msg)
|
||||
|
||||
common.test_start_and_stop_during_check(
|
||||
operation,
|
||||
connection,
|
||||
stream_creator,
|
||||
message_sender,
|
||||
"Pulsar consumer test_stream is already stopped")
|
||||
|
||||
|
||||
def test_check_already_started_stream(pulsar_topics, connection):
|
||||
assert len(pulsar_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM started_stream "
|
||||
f"TOPICS {pulsar_topics[0]} "
|
||||
f"TRANSFORM pulsar_transform.simple",
|
||||
)
|
||||
common.start_stream(cursor, "started_stream")
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
common.execute_and_fetch_all(cursor, "CHECK STREAM started_stream")
|
||||
|
||||
|
||||
def test_start_checked_stream_after_timeout(pulsar_topics, connection):
|
||||
def stream_creator(stream_name):
|
||||
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple"
|
||||
common.test_start_checked_stream_after_timeout(connection, stream_creator)
|
||||
|
||||
|
||||
def test_restart_after_error(pulsar_client, pulsar_topics, connection):
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM test_stream "
|
||||
f"TOPICS {pulsar_topics[0]} "
|
||||
f"TRANSFORM pulsar_transform.query",
|
||||
)
|
||||
|
||||
common.start_stream(cursor, "test_stream")
|
||||
time.sleep(1)
|
||||
|
||||
producer = pulsar_client.create_producer(
|
||||
common.pulsar_default_namespace_topic(
|
||||
pulsar_topics[0]), send_timeout_millis=60000)
|
||||
producer.send(common.SIMPLE_MSG)
|
||||
assert common.timed_wait(
|
||||
lambda: not common.get_is_running(
|
||||
cursor, "test_stream"))
|
||||
|
||||
common.start_stream(cursor, "test_stream")
|
||||
time.sleep(1)
|
||||
producer.send(b"CREATE (n:VERTEX { id : 42 })")
|
||||
assert common.check_one_result_row(
|
||||
cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
|
||||
assert len(pulsar_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
local = "pulsar://127.0.0.1:6650"
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE PULSAR STREAM test "
|
||||
f"TOPICS {','.join(pulsar_topics)} "
|
||||
f"TRANSFORM {transformation} "
|
||||
f"SERVICE_URL '{local}'",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in pulsar_topics:
|
||||
producer = pulsar_client.create_producer(
|
||||
common.pulsar_default_namespace_topic(topic),
|
||||
send_timeout_millis=60000)
|
||||
producer.send(common.SIMPLE_MSG)
|
||||
|
||||
for topic in pulsar_topics:
|
||||
check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, common.SIMPLE_MSG)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -32,20 +32,20 @@ def create_stream_user(cursor, stream_user):
|
||||
cursor, f"GRANT STREAM TO {stream_user}")
|
||||
|
||||
|
||||
def test_ownerless_stream(producer, topics, connection):
|
||||
assert len(topics) > 0
|
||||
def test_ownerless_stream(kafka_producer, kafka_topics, connection):
|
||||
assert len(kafka_topics) > 0
|
||||
userless_cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(userless_cursor,
|
||||
"CREATE KAFKA STREAM ownerless "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.simple")
|
||||
common.start_stream(userless_cursor, "ownerless")
|
||||
time.sleep(1)
|
||||
|
||||
admin_user = "admin_user"
|
||||
create_admin_user(userless_cursor, admin_user)
|
||||
|
||||
producer.send(topics[0], b"first message").get(timeout=60)
|
||||
kafka_producer.send(kafka_topics[0], b"first message").get(timeout=60)
|
||||
assert common.timed_wait(
|
||||
lambda: not common.get_is_running(userless_cursor, "ownerless"))
|
||||
|
||||
@ -57,16 +57,16 @@ def test_ownerless_stream(producer, topics, connection):
|
||||
time.sleep(1)
|
||||
|
||||
second_message = b"second message"
|
||||
producer.send(topics[0], second_message).get(timeout=60)
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
userless_cursor, topics[0], second_message)
|
||||
kafka_producer.send(kafka_topics[0], second_message).get(timeout=60)
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
userless_cursor, kafka_topics[0], second_message)
|
||||
|
||||
assert len(common.execute_and_fetch_all(
|
||||
userless_cursor, "MATCH (n) RETURN n")) == 1
|
||||
|
||||
|
||||
def test_owner_is_shown(topics, connection):
|
||||
assert len(topics) > 0
|
||||
def test_owner_is_shown(kafka_topics, connection):
|
||||
assert len(kafka_topics) > 0
|
||||
userless_cursor = connection.cursor()
|
||||
|
||||
stream_user = "stream_user"
|
||||
@ -74,15 +74,15 @@ def test_owner_is_shown(topics, connection):
|
||||
stream_cursor = get_cursor_with_user(stream_user)
|
||||
|
||||
common.execute_and_fetch_all(stream_cursor, "CREATE KAFKA STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.simple")
|
||||
|
||||
common.check_stream_info(userless_cursor, "test", ("test", None, None,
|
||||
"transform.simple", stream_user, False))
|
||||
"kafka_transform.simple", stream_user, False))
|
||||
|
||||
|
||||
def test_insufficient_privileges(producer, topics, connection):
|
||||
assert len(topics) > 0
|
||||
def test_insufficient_privileges(kafka_producer, kafka_topics, connection):
|
||||
assert len(kafka_topics) > 0
|
||||
userless_cursor = connection.cursor()
|
||||
|
||||
admin_user = "admin_user"
|
||||
@ -95,15 +95,15 @@ def test_insufficient_privileges(producer, topics, connection):
|
||||
|
||||
common.execute_and_fetch_all(stream_cursor,
|
||||
"CREATE KAFKA STREAM insufficient_test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.simple")
|
||||
|
||||
# the stream is started by admin, but should check against the owner
|
||||
# privileges
|
||||
common.start_stream(admin_cursor, "insufficient_test")
|
||||
time.sleep(1)
|
||||
|
||||
producer.send(topics[0], b"first message").get(timeout=60)
|
||||
kafka_producer.send(kafka_topics[0], b"first message").get(timeout=60)
|
||||
assert common.timed_wait(
|
||||
lambda: not common.get_is_running(userless_cursor, "insufficient_test"))
|
||||
|
||||
@ -116,16 +116,16 @@ def test_insufficient_privileges(producer, topics, connection):
|
||||
time.sleep(1)
|
||||
|
||||
second_message = b"second message"
|
||||
producer.send(topics[0], second_message).get(timeout=60)
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
userless_cursor, topics[0], second_message)
|
||||
kafka_producer.send(kafka_topics[0], second_message).get(timeout=60)
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
userless_cursor, kafka_topics[0], second_message)
|
||||
|
||||
assert len(common.execute_and_fetch_all(
|
||||
userless_cursor, "MATCH (n) RETURN n")) == 1
|
||||
|
||||
|
||||
def test_happy_case(producer, topics, connection):
|
||||
assert len(topics) > 0
|
||||
def test_happy_case(kafka_producer, kafka_topics, connection):
|
||||
assert len(kafka_topics) > 0
|
||||
userless_cursor = connection.cursor()
|
||||
|
||||
admin_user = "admin_user"
|
||||
@ -140,17 +140,17 @@ def test_happy_case(producer, topics, connection):
|
||||
|
||||
common.execute_and_fetch_all(stream_cursor,
|
||||
"CREATE KAFKA STREAM insufficient_test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple")
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM kafka_transform.simple")
|
||||
|
||||
common.start_stream(stream_cursor, "insufficient_test")
|
||||
time.sleep(1)
|
||||
|
||||
first_message = b"first message"
|
||||
producer.send(topics[0], first_message).get(timeout=60)
|
||||
kafka_producer.send(kafka_topics[0], first_message).get(timeout=60)
|
||||
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
userless_cursor, topics[0], first_message)
|
||||
common.kafka_check_vertex_exists_with_topic_and_payload(
|
||||
userless_cursor, kafka_topics[0], first_message)
|
||||
|
||||
assert len(common.execute_and_fetch_all(
|
||||
userless_cursor, "MATCH (n) RETURN n")) == 1
|
||||
|
@ -1 +1,2 @@
|
||||
copy_streams_e2e_python_files(transform.py)
|
||||
copy_streams_e2e_python_files(kafka_transform.py)
|
||||
copy_streams_e2e_python_files(pulsar_transform.py)
|
||||
|
61
tests/e2e/streams/transformations/pulsar_transform.py
Normal file
61
tests/e2e/streams/transformations/pulsar_transform.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright 2021 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import mgp
|
||||
|
||||
|
||||
@mgp.transformation
|
||||
def simple(context: mgp.TransCtx,
|
||||
messages: mgp.Messages
|
||||
) -> mgp.Record(query=str, parameters=mgp.Map):
|
||||
|
||||
result_queries = []
|
||||
|
||||
for i in range(0, messages.total_messages()):
|
||||
message = messages.message_at(i)
|
||||
payload_as_str = message.payload().decode("utf-8")
|
||||
result_queries.append(mgp.Record(
|
||||
query=f"CREATE (n:MESSAGE {{payload: '{payload_as_str}', topic: '{message.topic_name()}'}})",
|
||||
parameters=None))
|
||||
|
||||
return result_queries
|
||||
|
||||
|
||||
@mgp.transformation
|
||||
def with_parameters(context: mgp.TransCtx,
|
||||
messages: mgp.Messages
|
||||
) -> mgp.Record(query=str, parameters=mgp.Map):
|
||||
|
||||
result_queries = []
|
||||
|
||||
for i in range(0, messages.total_messages()):
|
||||
message = messages.message_at(i)
|
||||
payload_as_str = message.payload().decode("utf-8")
|
||||
result_queries.append(mgp.Record(
|
||||
query="CREATE (n:MESSAGE {payload: $payload, topic: $topic})",
|
||||
parameters={"payload": payload_as_str, "topic": message.topic_name()}))
|
||||
|
||||
return result_queries
|
||||
|
||||
|
||||
@mgp.transformation
|
||||
def query(messages: mgp.Messages
|
||||
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
|
||||
result_queries = []
|
||||
|
||||
for i in range(0, messages.total_messages()):
|
||||
message = messages.message_at(i)
|
||||
payload_as_str = message.payload().decode("utf-8")
|
||||
result_queries.append(mgp.Record(
|
||||
query=payload_as_str, parameters=None))
|
||||
|
||||
return result_queries
|
||||
|
@ -1,19 +1,24 @@
|
||||
template_cluster: &template_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092", "--query-execution-timeout-sec=0"]
|
||||
args: ["--bolt-port", "7687", "--log-level=DEBUG", "--kafka-bootstrap-servers=localhost:9092", "--query-execution-timeout-sec=0", "--pulsar-service-url=pulsar://127.0.0.1:6650"]
|
||||
log_file: "streams-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
- name: "Streams start, stop and show"
|
||||
- name: "Kafka streams start, stop and show"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
proc: "tests/e2e/streams/transformations/"
|
||||
args: ["streams/streams_tests.py"]
|
||||
args: ["streams/kafka_streams_tests.py"]
|
||||
<<: *template_cluster
|
||||
- name: "Streams with users"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
proc: "tests/e2e/streams/transformations/"
|
||||
args: ["streams/streams_owner_tests.py"]
|
||||
<<: *template_cluster
|
||||
- name: "Pulsar streams start, stop and show"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
proc: "tests/e2e/streams/transformations/"
|
||||
args: ["streams/pulsar_streams_tests.py"]
|
||||
<<: *template_cluster
|
||||
|
@ -8,6 +8,7 @@ PIP_DEPS=(
|
||||
"behave==1.2.6"
|
||||
"ldap3==2.6"
|
||||
"kafka-python==2.0.2"
|
||||
"requests==2.25.1"
|
||||
"neo4j-driver==4.1.1"
|
||||
"parse==1.18.0"
|
||||
"parse-type==0.5.2"
|
||||
@ -28,6 +29,19 @@ set +u
|
||||
source "ve3/bin/activate"
|
||||
set -u
|
||||
|
||||
# https://docs.python.org/3/library/sys.html#sys.version_info
|
||||
PYTHON_MINOR=$(python3 -c 'import sys; print(sys.version_info[:][1])')
|
||||
|
||||
# install pulsar-client
|
||||
# NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use
|
||||
# our manually built wheel file. When they update the repository, pulsar-client can be
|
||||
# added as a regular PIP dependancy
|
||||
if [ $PYTHON_MINOR -lt 9 ]; then
|
||||
pip --timeout 1000 install "pulsar-client==2.8.1"
|
||||
else
|
||||
pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl
|
||||
fi
|
||||
|
||||
for pkg in "${PIP_DEPS[@]}"; do
|
||||
pip --timeout 1000 install "$pkg"
|
||||
done
|
||||
|
Loading…
Reference in New Issue
Block a user