Fix Kafka's NoBrokersAvailableInfo issue (#1578)

This commit is contained in:
Andi 2023-12-20 20:03:06 +01:00 committed by GitHub
parent 4ef86efb6f
commit f11b3c6d9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 204 additions and 219 deletions

View File

@ -1,12 +1,11 @@
import time import time
def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.05): def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.2):
result = function_to_retrieve_data() result = function_to_retrieve_data()
start_time = time.time() start_time = time.time()
while result != expected_value: while result != expected_value:
current_time = time.time() duration = time.time() - start_time
duration = current_time - start_time
if duration > max_duration: if duration > max_duration:
assert ( assert (
False False

View File

@ -9,12 +9,12 @@
# by the Apache License, Version 2.0, included in the file # by the Apache License, Version 2.0, included in the file
# licenses/APL.txt. # licenses/APL.txt.
import time
from multiprocessing import Manager, Process, Value
import mgclient import mgclient
import pytest import pytest
import time
from mg_utils import mg_sleep_and_assert from mg_utils import mg_sleep_and_assert
from multiprocessing import Manager, Process, Value
# These are the indices of the different values in the result of SHOW STREAM # These are the indices of the different values in the result of SHOW STREAM
# query # query
@ -103,15 +103,53 @@ def get_stream_info(cursor, stream_name):
def get_is_running(cursor, stream_name): def get_is_running(cursor, stream_name):
stream_info = get_stream_info(cursor, stream_name) stream_info = get_stream_info(cursor, stream_name)
assert stream_info is not None
assert stream_info
return stream_info[IS_RUNNING] return stream_info[IS_RUNNING]
def start_stream(cursor, stream_name): def create_stream(
execute_and_fetch_all(cursor, f"START STREAM {stream_name}") cursor,
stream_name,
topics,
transformation,
consumer_group=None,
batch_interval=None,
batch_size=None,
bootstrap_servers=None,
configs=None,
credentials=None,
):
query_str = f"CREATE KAFKA STREAM {stream_name} TOPICS {topics} TRANSFORM {transformation}"
if consumer_group is not None:
query_str += f" CONSUMER_GROUP {consumer_group}"
if batch_interval is not None:
query_str += f" BATCH_INTERVAL {batch_interval}"
if batch_size is not None:
query_str += f" BATCH_SIZE {batch_size}"
if bootstrap_servers is not None:
query_str += f" BOOTSTRAP_SERVERS {bootstrap_servers}"
if configs is not None:
query_str += f" CONFIGS {configs}"
if credentials is not None:
query_str += f" CREDENTIALS {credentials}"
execute_and_fetch_all(cursor, query_str)
def start_stream(cursor, stream_name, sleep=True):
# Sleep is needed because although is_running returns True,
# the stream cannot accept messages yet
execute_and_fetch_all(cursor, f"START STREAM {stream_name}")
assert get_is_running(cursor, stream_name) assert get_is_running(cursor, stream_name)
if sleep:
time.sleep(5)
def start_streams(cursor, stream_names):
# Start every stream but don't sleep after each creation
for stream_name in stream_names:
execute_and_fetch_all(cursor, f"START STREAM {stream_name}")
assert get_is_running(cursor, stream_name)
time.sleep(5)
def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None): def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None):
@ -123,13 +161,11 @@ def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None):
def stop_stream(cursor, stream_name): def stop_stream(cursor, stream_name):
execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}") execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}")
assert not get_is_running(cursor, stream_name) assert not get_is_running(cursor, stream_name)
def drop_stream(cursor, stream_name): def drop_stream(cursor, stream_name):
execute_and_fetch_all(cursor, f"DROP STREAM {stream_name}") execute_and_fetch_all(cursor, f"DROP STREAM {stream_name}")
assert get_stream_info(cursor, stream_name) is None assert get_stream_info(cursor, stream_name) is None
@ -227,7 +263,7 @@ def test_start_and_stop_during_check(
try: try:
check_stream_proc.start() check_stream_proc.start()
time.sleep(0.5) time.sleep(3)
assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE) assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE)
assert timed_wait(lambda: get_is_running(cursor, "test_stream")) assert timed_wait(lambda: get_is_running(cursor, "test_stream"))
@ -260,25 +296,22 @@ def test_start_and_stop_during_check(
def test_start_checked_stream_after_timeout(connection, stream_creator): def test_start_checked_stream_after_timeout(connection, stream_creator):
cursor = connection.cursor() cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator("test_stream")) stream_name = "test_start_checked_stream_after_timeout"
execute_and_fetch_all(cursor, stream_creator(stream_name))
TIMEOUT_IN_MS = 2000 timeout_in_ms = 2000
TIMEOUT_IN_SECONDS = TIMEOUT_IN_MS / 1000
def call_check(): def call_check():
execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_IN_MS}") execute_and_fetch_all(connect().cursor(), f"CHECK STREAM {stream_name} TIMEOUT {timeout_in_ms}")
check_stream_proc = Process(target=call_check, daemon=True) check_stream_proc = Process(target=call_check, daemon=True)
start = time.time()
check_stream_proc.start() check_stream_proc.start()
assert timed_wait(lambda: get_is_running(cursor, "test_stream")) assert timed_wait(lambda: get_is_running(cursor, stream_name))
start_stream(cursor, "test_stream") start_stream(cursor, stream_name)
end = time.time()
assert (end - start) < 1.3 * TIMEOUT_IN_SECONDS, "The START STREAM was blocked too long" assert get_is_running(cursor, stream_name)
assert get_is_running(cursor, "test_stream") stop_stream(cursor, stream_name)
stop_stream(cursor, "test_stream")
def test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender): def test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender):
@ -313,7 +346,6 @@ def test_check_stream_same_number_of_queries_than_messages(connection, stream_cr
# # {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}] # # {parameters: {"value": "Parameter: 04"}, query: "Message: 04"}]
# # -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"}, # # -Batch 3: [{parameters: {"value": "Parameter: 05"}, query: "Message: 05"},
# # {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}] # # {parameters: {"value": "Parameter: 06"}, query: "Message: 06"}]
assert len(test_results.value) == BATCH_LIMIT assert len(test_results.value) == BATCH_LIMIT
expected_queries_and_raw_messages_1 = ( expected_queries_and_raw_messages_1 = (
@ -351,7 +383,7 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre
STREAM_NAME = "test_stream" STREAM_NAME = "test_stream"
cursor = connection.cursor() cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE)) execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE))
time.sleep(2) time.sleep(3)
results = Manager().Namespace() results = Manager().Namespace()
@ -413,30 +445,32 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre
assert expected_queries_and_raw_messages_3 == results.value[2] assert expected_queries_and_raw_messages_3 == results.value[2]
def test_start_stream_with_batch_limit(connection, stream_creator, messages_sender): def test_start_stream_with_batch_limit(connection, stream_name, stream_creator, messages_sender):
STREAM_NAME = "test"
BATCH_LIMIT = 5 BATCH_LIMIT = 5
TIMEOUT = 10000
cursor = connection.cursor() cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) execute_and_fetch_all(cursor, stream_creator())
results = execute_and_fetch_all(connection.cursor(), "SHOW STREAMS")
assert len(results) == 1
def start_new_stream_with_limit(stream_name, batch_limit): def start_new_stream_with_limit():
connection = connect() connection = connect()
cursor = connection.cursor() cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit) start_stream_with_limit(cursor, stream_name, BATCH_LIMIT, TIMEOUT)
thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT))
thread_stream_running.start()
def is_running(): def is_running():
return get_is_running(cursor, STREAM_NAME) return get_is_running(cursor, stream_name)
thread_stream_running = Process(target=start_new_stream_with_limit)
thread_stream_running.start()
execute_and_fetch_all(connection.cursor(), "SHOW STREAMS")
assert mg_sleep_and_assert(True, is_running) assert mg_sleep_and_assert(True, is_running)
messages_sender(BATCH_LIMIT - 1) messages_sender(BATCH_LIMIT - 1)
# We have not sent enough batches to reach the limit. We check that the stream is still correctly running. # We have not sent enough batches to reach the limit. We check that the stream is still correctly running.
assert get_is_running(cursor, STREAM_NAME) assert get_is_running(cursor, stream_name)
# We send a last message to reach the batch_limit # We send a last message to reach the batch_limit
messages_sender(1) messages_sender(1)
@ -549,7 +583,6 @@ def test_check_while_stream_with_batch_limit_running(connection, stream_creator,
STREAM_NAME = "test_batch_limit_and_check" STREAM_NAME = "test_batch_limit_and_check"
BATCH_LIMIT = 1 BATCH_LIMIT = 1
TIMEOUT = 10000 TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor() cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
@ -625,7 +658,7 @@ def test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stre
cursor = connection.cursor() cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
time.sleep(2) time.sleep(3)
# 1/ checking with batch_limit=-10 # 1/ checking with batch_limit=-10
batch_limit = -10 batch_limit = -10

View File

@ -9,15 +9,14 @@
# by the Apache License, Version 2.0, included in the file # by the Apache License, Version 2.0, included in the file
# licenses/APL.txt. # licenses/APL.txt.
import pulsar
import pytest import pytest
from common import NAME, PULSAR_SERVICE_URL, connect, execute_and_fetch_all
from kafka import KafkaProducer from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic from kafka.admin import KafkaAdminClient, NewTopic
import pulsar
import requests import requests
from common import NAME, connect, execute_and_fetch_all, PULSAR_SERVICE_URL
# To run these test locally a running Kafka sever is necessery. The test tries # To run these test locally a running Kafka sever is necessery. The test tries
# to connect on localhost:9092. # to connect on localhost:9092.
@ -37,29 +36,22 @@ def connection():
def get_topics(num): def get_topics(num):
return [f'topic_{i}' for i in range(num)] return [f"topic_{i}" for i in range(num)]
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def kafka_topics(): def kafka_topics():
admin_client = KafkaAdminClient( admin_client = KafkaAdminClient(bootstrap_servers="localhost:29092", client_id="test")
bootstrap_servers="localhost:9092",
client_id="test")
# The issue arises if we remove default kafka topics, e.g. # The issue arises if we remove default kafka topics, e.g.
# "__consumer_offsets" # "__consumer_offsets"
previous_topics = [ previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
if previous_topics: if previous_topics:
admin_client.delete_topics(topics=previous_topics, timeout_ms=5000) admin_client.delete_topics(topics=previous_topics, timeout_ms=5000)
topics = get_topics(3) topics = get_topics(3)
topics_to_create = [] topics_to_create = []
for topic in topics: for topic in topics:
topics_to_create.append( topics_to_create.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
NewTopic(
name=topic,
num_partitions=1,
replication_factor=1))
admin_client.create_topics(new_topics=topics_to_create, timeout_ms=5000) admin_client.create_topics(new_topics=topics_to_create, timeout_ms=5000)
yield topics yield topics
@ -68,7 +60,7 @@ def kafka_topics():
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def kafka_producer(): def kafka_producer():
yield KafkaProducer(bootstrap_servers="localhost:9092") yield KafkaProducer(bootstrap_servers=["localhost:29092"], api_version_auto_timeout_ms=10000)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -80,6 +72,5 @@ def pulsar_client():
def pulsar_topics(): def pulsar_topics():
topics = get_topics(3) topics = get_topics(3)
for topic in topics: for topic in topics:
requests.delete( requests.delete(f"http://localhost:6652/admin/v2/persistent/public/default/{topic}?force=true")
f'http://127.0.0.1:6652/admin/v2/persistent/public/default/{topic}?force=true')
yield topics yield topics

View File

@ -1,20 +0,0 @@
version: '3.7'
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

View File

@ -12,31 +12,30 @@
# licenses/APL.txt. # licenses/APL.txt.
import sys import sys
import pytest
import mgclient
import time import time
from mg_utils import mg_sleep_and_assert from multiprocessing import Process
from multiprocessing import Process, Value
import common import common
import mgclient
import pytest
from mg_utils import mg_sleep_and_assert
TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"] TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"]
TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"] TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"]
KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT = 60
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_simple(kafka_producer, kafka_topics, connection, transformation): def test_simple(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
stream_name = "test_simple_" + transformation.split(".")[1]
cursor = connection.cursor() cursor = connection.cursor()
common.execute_and_fetch_all( common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation)
cursor, common.start_stream(cursor, stream_name)
f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation}",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in kafka_topics: for topic in kafka_topics:
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
for topic in kafka_topics: for topic in kafka_topics:
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
@ -47,22 +46,15 @@ def test_separate_consumers(kafka_producer, kafka_topics, connection, transforma
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
stream_names = [] stream_names = ["stream_" + transformation.split(".")[1] + "_" + topic for topic in kafka_topics]
for topic in kafka_topics:
stream_name = "stream_" + topic
stream_names.append(stream_name)
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM {stream_name} TOPICS {topic} TRANSFORM {transformation}",
)
for stream_name in stream_names: for stream_name, topic in zip(stream_names, kafka_topics):
common.start_stream(cursor, stream_name) common.create_stream(cursor, stream_name, topic, transformation)
time.sleep(5) common.start_streams(cursor, stream_names)
for topic in kafka_topics: for topic in kafka_topics:
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
for topic in kafka_topics: for topic in kafka_topics:
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
@ -77,23 +69,20 @@ def test_start_from_last_committed_offset(kafka_producer, kafka_topics, connecti
# restarting Memgraph during a single workload cannot be done currently. # restarting Memgraph during a single workload cannot be done currently.
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "test")
time.sleep(1)
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) stream_name = "test_start_from_last_committed_offset"
common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.simple")
common.start_stream(cursor, stream_name)
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], common.SIMPLE_MSG) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], common.SIMPLE_MSG)
common.stop_stream(cursor, "test") common.stop_stream(cursor, stream_name)
common.drop_stream(cursor, "test") common.drop_stream(cursor, stream_name)
messages = [b"second message", b"third message"] messages = [b"second message", b"third message"]
for message in messages: for message in messages:
kafka_producer.send(kafka_topics[0], message).get(timeout=60) kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
for message in messages: for message in messages:
vertices_with_msg = common.execute_and_fetch_all( vertices_with_msg = common.execute_and_fetch_all(
@ -103,11 +92,8 @@ def test_start_from_last_committed_offset(kafka_producer, kafka_topics, connecti
assert len(vertices_with_msg) == 0 assert len(vertices_with_msg) == 0
common.execute_and_fetch_all( common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.simple")
cursor, common.start_stream(cursor, stream_name)
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "test")
for message in messages: for message in messages:
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message)
@ -118,32 +104,26 @@ def test_check_stream(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
BATCH_SIZE = 1 BATCH_SIZE = 1
INDEX_OF_FIRST_BATCH = 0 INDEX_OF_FIRST_BATCH = 0
cursor = connection.cursor() stream_name = "test_check_stream_" + transformation.split(".")[1]
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM test TOPICS {kafka_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}",
)
common.start_stream(cursor, "test")
time.sleep(1)
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) cursor = connection.cursor()
common.stop_stream(cursor, "test")
common.create_stream(cursor, stream_name, kafka_topics[0], transformation, batch_size=BATCH_SIZE)
common.start_stream(cursor, stream_name)
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
common.stop_stream(cursor, stream_name)
messages = [b"first message", b"second message", b"third message"] messages = [b"first message", b"second message", b"third message"]
for message in messages: for message in messages:
kafka_producer.send(kafka_topics[0], message).get(timeout=60) kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
def check_check_stream(batch_limit): def check_check_stream(batch_limit):
assert transformation == "kafka_transform.simple" or transformation == "kafka_transform.with_parameters" test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit}")
test_results = common.execute_and_fetch_all(cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}")
assert len(test_results) == batch_limit assert len(test_results) == batch_limit
for i in range(batch_limit): for i in range(batch_limit):
message_as_str = messages[i].decode("utf-8") message_as_str = messages[i].decode("utf-8")
assert ( # If batch size != 1, then the usage of INDEX_OF_FIRST_BATCH must change: the result will have a list of queries (pair<parameters,query>)
BATCH_SIZE == 1
) # If batch size != 1, then the usage of INDEX_OF_FIRST_BATCH must change: the result will have a list of queries (pair<parameters,query>)
if transformation == "kafka_transform.simple": if transformation == "kafka_transform.simple":
assert ( assert (
f"payload: '{message_as_str}'" f"payload: '{message_as_str}'"
@ -165,41 +145,48 @@ def test_check_stream(kafka_producer, kafka_topics, connection, transformation):
check_check_stream(1) check_check_stream(1)
check_check_stream(2) check_check_stream(2)
check_check_stream(3) check_check_stream(3)
common.start_stream(cursor, "test") common.start_stream(cursor, stream_name)
for message in messages: for message in messages:
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, kafka_topics[0], message)
def test_show_streams(kafka_producer, kafka_topics, connection): def test_show_streams(kafka_topics, connection):
assert len(kafka_topics) > 1 assert len(kafka_topics) > 1
cursor = connection.cursor() cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM default_values TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BOOTSTRAP_SERVERS 'localhost:9092'",
)
consumer_group = "my_special_consumer_group" consumer_group = "my_special_consumer_group"
BATCH_INTERVAL = 42 BATCH_INTERVAL = 42
BATCH_SIZE = 3 BATCH_SIZE = 3
common.execute_and_fetch_all( default_values_stream = "default_values"
complex_values_stream = "complex_values"
common.create_stream(
cursor, default_values_stream, kafka_topics[0], "kafka_transform.simple", bootstrap_servers="'localhost:29092'"
)
common.create_stream(
cursor, cursor,
f"CREATE KAFKA STREAM complex_values TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.with_parameters CONSUMER_GROUP {consumer_group} BATCH_INTERVAL {BATCH_INTERVAL} BATCH_SIZE {BATCH_SIZE} ", complex_values_stream,
",".join(kafka_topics),
"kafka_transform.with_parameters",
consumer_group=consumer_group,
batch_interval=BATCH_INTERVAL,
batch_size=BATCH_SIZE,
) )
assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2 assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
common.check_stream_info( common.check_stream_info(
cursor, cursor,
"default_values", default_values_stream,
("default_values", "kafka", 100, 1000, "kafka_transform.simple", None, False), (default_values_stream, "kafka", 100, 1000, "kafka_transform.simple", None, False),
) )
common.check_stream_info( common.check_stream_info(
cursor, cursor,
"complex_values", complex_values_stream,
( (
"complex_values", complex_values_stream,
"kafka", "kafka",
BATCH_INTERVAL, BATCH_INTERVAL,
BATCH_SIZE, BATCH_SIZE,
@ -219,7 +206,7 @@ def test_start_and_stop_during_check(kafka_producer, kafka_topics, connection, o
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {BATCH_SIZE}" return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {BATCH_SIZE}"
def message_sender(msg): def message_sender(msg):
kafka_producer.send(kafka_topics[0], msg).get(timeout=60) kafka_producer.send(kafka_topics[0], msg).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
common.test_start_and_stop_during_check( common.test_start_and_stop_during_check(
operation, operation,
@ -235,14 +222,12 @@ def test_check_already_started_stream(kafka_topics, connection):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
common.execute_and_fetch_all( stream_name = "test_check_already_started_stream"
cursor, common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.simple")
f"CREATE KAFKA STREAM started_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple", common.start_stream(cursor, stream_name)
)
common.start_stream(cursor, "started_stream")
with pytest.raises(mgclient.DatabaseError): with pytest.raises(mgclient.DatabaseError):
common.execute_and_fetch_all(cursor, "CHECK STREAM started_stream") common.execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name}")
def test_start_checked_stream_after_timeout(kafka_topics, connection): def test_start_checked_stream_after_timeout(kafka_topics, connection):
@ -254,19 +239,14 @@ def test_start_checked_stream_after_timeout(kafka_topics, connection):
def test_restart_after_error(kafka_producer, kafka_topics, connection): def test_restart_after_error(kafka_producer, kafka_topics, connection):
cursor = connection.cursor() cursor = connection.cursor()
common.execute_and_fetch_all( stream_name = "test_restart_after_error"
cursor, common.create_stream(cursor, stream_name, kafka_topics[0], "kafka_transform.query")
f"CREATE KAFKA STREAM test_stream TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.query", common.start_stream(cursor, stream_name)
)
common.start_stream(cursor, "test_stream") kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
time.sleep(1) assert common.timed_wait(lambda: not common.get_is_running(cursor, stream_name))
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) common.start_stream(cursor, stream_name)
assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream"))
common.start_stream(cursor, "test_stream")
time.sleep(1)
kafka_producer.send(kafka_topics[0], b"CREATE (n:VERTEX { id : 42 })") kafka_producer.send(kafka_topics[0], b"CREATE (n:VERTEX { id : 42 })")
assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
@ -275,23 +255,21 @@ def test_restart_after_error(kafka_producer, kafka_topics, connection):
def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation): def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
LOCAL = "localhost:9092" local = "'localhost:29092'"
common.execute_and_fetch_all( stream_name = "test_bootstrap_server_" + transformation.split(".")[1]
cursor,
f"CREATE KAFKA STREAM test TOPICS {','.join(kafka_topics)} TRANSFORM {transformation} BOOTSTRAP_SERVERS '{LOCAL}'", common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation, bootstrap_servers=local)
) common.start_stream(cursor, stream_name)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in kafka_topics: for topic in kafka_topics:
kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=60) kafka_producer.send(topic, common.SIMPLE_MSG).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
for topic in kafka_topics: for topic in kafka_topics:
common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) common.kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_bootstrap_server_empty(kafka_producer, kafka_topics, connection, transformation): def test_bootstrap_server_empty(kafka_topics, connection, transformation):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
with pytest.raises(mgclient.DatabaseError): with pytest.raises(mgclient.DatabaseError):
@ -312,7 +290,7 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
messages = [f"{i} message" for i in range(1, 21)] messages = [f"{i} message" for i in range(1, 21)]
for message in messages: for message in messages:
kafka_producer.send(kafka_topics[0], message.encode()).get(timeout=60) kafka_producer.send(kafka_topics[0], message.encode()).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
def consume(expected_msgs): def consume(expected_msgs):
common.start_stream(cursor, "test") common.start_stream(cursor, "test")
@ -352,7 +330,7 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
res = execute_set_offset_and_consume(-2, []) res = execute_set_offset_and_consume(-2, [])
assert len(res) == 0 assert len(res) == 0
last_msg = "Final Message" last_msg = "Final Message"
kafka_producer.send(kafka_topics[0], last_msg.encode()).get(timeout=60) kafka_producer.send(kafka_topics[0], last_msg.encode()).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
res = consume([last_msg]) res = consume([last_msg])
assert len(res) == 1 assert len(res) == 1
assert comparison_check("Final Message", res[0]) assert comparison_check("Final Message", res[0])
@ -361,21 +339,27 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
def test_info_procedure(kafka_topics, connection): def test_info_procedure(kafka_topics, connection):
cursor = connection.cursor() cursor = connection.cursor()
STREAM_NAME = "test_stream" stream_name = "test_stream"
CONFIGS = {"sasl.username": "michael.scott"} configs = {"sasl.username": "michael.scott"}
LOCAL = "localhost:9092" local = "localhost:29092"
CREDENTIALS = {"sasl.password": "S3cr3tP4ssw0rd"} credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
CONSUMER_GROUP = "ConsumerGr" consumer_group = "ConsumerGr"
common.execute_and_fetch_all(
common.create_stream(
cursor, cursor,
f"CREATE KAFKA STREAM {STREAM_NAME} TOPICS {','.join(kafka_topics)} TRANSFORM kafka_transform.simple CONSUMER_GROUP {CONSUMER_GROUP} BOOTSTRAP_SERVERS '{LOCAL}' CONFIGS {CONFIGS} CREDENTIALS {CREDENTIALS}", stream_name,
",".join(kafka_topics),
"kafka_transform.simple",
consumer_group=consumer_group,
bootstrap_servers=f"'{local}'",
configs=configs,
credentials=credentials,
) )
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *")
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{STREAM_NAME}') YIELD *") reducted_credentials = {key: "<REDUCTED>" for key in credentials.keys()}
reducted_credentials = {key: "<REDUCTED>" for key in CREDENTIALS.keys()} expected_stream_info = [(local, configs, consumer_group, reducted_credentials, kafka_topics)]
expected_stream_info = [(LOCAL, CONFIGS, CONSUMER_GROUP, reducted_credentials, kafka_topics)]
common.validate_info(stream_info, expected_stream_info) common.validate_info(stream_info, expected_stream_info)
@ -398,7 +382,7 @@ def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}"
def message_sender(msg): def message_sender(msg):
kafka_producer.send(kafka_topics[0], msg).get(timeout=60) kafka_producer.send(kafka_topics[0], msg).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender) common.test_check_stream_same_number_of_queries_than_messages(connection, stream_creator, message_sender)
@ -409,30 +393,33 @@ def test_check_stream_different_number_of_queries_than_messages(kafka_producer,
TRANSFORMATION = "common_transform.check_stream_with_filtering" TRANSFORMATION = "common_transform.check_stream_with_filtering"
def stream_creator(stream_name, batch_size): def stream_creator(stream_name, batch_size):
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}" return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size}"
def message_sender(msg): def message_sender(msg):
kafka_producer.send(kafka_topics[0], msg).get(timeout=60) kafka_producer.send(kafka_topics[0], msg).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender) common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit(kafka_producer, kafka_topics, connection): def test_start_stream_with_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
STREAM_NAME = "test_start_stream_with_batch_limit"
def stream_creator(stream_name): def stream_creator():
return ( return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" f"CREATE KAFKA STREAM {STREAM_NAME} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
) )
def messages_sender(nof_messages): def messages_sender(nof_messages):
for x in range(nof_messages): for _ in range(nof_messages):
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(
timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT
)
common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender) common.test_start_stream_with_batch_limit(connection, STREAM_NAME, stream_creator, messages_sender)
def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, connection): def test_start_stream_with_batch_limit_timeout(kafka_topics, connection):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
def stream_creator(stream_name): def stream_creator(stream_name):
@ -443,7 +430,7 @@ def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, con
common.test_start_stream_with_batch_limit_timeout(connection, stream_creator) common.test_start_stream_with_batch_limit_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_reaching_timeout(kafka_producer, kafka_topics, connection): def test_start_stream_with_batch_limit_reaching_timeout(kafka_topics, connection):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
def stream_creator(stream_name, batch_size): def stream_creator(stream_name, batch_size):
@ -461,7 +448,7 @@ def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka
) )
def message_sender(message): def message_sender(message):
kafka_producer.send(kafka_topics[0], message).get(timeout=6000) kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout): def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout):
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout)) thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout))
@ -488,12 +475,12 @@ def test_check_while_stream_with_batch_limit_running(kafka_producer, kafka_topic
) )
def message_sender(message): def message_sender(message):
kafka_producer.send(kafka_topics[0], message).get(timeout=6000) kafka_producer.send(kafka_topics[0], message).get(timeout=KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT)
common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender) common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection): def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_topics, connection):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
def stream_creator(stream_name): def stream_creator(stream_name):
@ -504,7 +491,7 @@ def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer,
common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator) common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection): def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_topics, connection):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
def stream_creator(stream_name): def stream_creator(stream_name):

View File

@ -1,8 +0,0 @@
version: '3.7'
services:
pulsar:
image: 'apachepulsar/pulsar:latest'
ports:
- '6652:8080'
- '6650:6650'
entrypoint: ['bin/pulsar', 'standalone']

View File

@ -12,11 +12,12 @@
# licenses/APL.txt. # licenses/APL.txt.
import sys import sys
import pytest
import mgclient
import time import time
from multiprocessing import Process, Value from multiprocessing import Process, Value
import common import common
import mgclient
import pytest
TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"] TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"]
@ -80,12 +81,6 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
# inbetween should be lost. Additionally, we check that consumer continues from the correct message # inbetween should be lost. Additionally, we check that consumer continues from the correct message
# after stopping and starting again. # after stopping and starting again.
assert len(pulsar_topics) > 0 assert len(pulsar_topics) > 0
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
common.start_stream(cursor, "test")
def assert_message_not_consumed(message): def assert_message_not_consumed(message):
vertices_with_msg = common.execute_and_fetch_all( vertices_with_msg = common.execute_and_fetch_all(
@ -95,6 +90,13 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
assert len(vertices_with_msg) == 0 assert len(vertices_with_msg) == 0
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
common.start_stream(cursor, "test")
producer = pulsar_client.create_producer( producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
) )
@ -131,7 +133,7 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
producer.send(message) producer.send(message)
assert_message_not_consumed(message) assert_message_not_consumed(message)
common.start_stream(cursor, "test") common.start_stream(cursor, "test", sleep=False)
assert_message_not_consumed(LOST_MESSAGE) assert_message_not_consumed(LOST_MESSAGE)
@ -338,19 +340,20 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
def test_start_stream_with_batch_limit(pulsar_client, pulsar_topics, connection): def test_start_stream_with_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1 assert len(pulsar_topics) > 1
STREAM_NAME = "test_start_stream_with_batch_limit"
def stream_creator(stream_name): def stream_creator():
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" return f"CREATE PULSAR STREAM {STREAM_NAME} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer( producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
) )
def messages_sender(nof_messages): def messages_sender(nof_messages):
for x in range(nof_messages): for _ in range(nof_messages):
producer.send(common.SIMPLE_MSG) producer.send(common.SIMPLE_MSG)
common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender) common.test_start_stream_with_batch_limit(connection, STREAM_NAME, stream_creator, messages_sender)
def test_start_stream_with_batch_limit_timeout(pulsar_client, pulsar_topics, connection): def test_start_stream_with_batch_limit_timeout(pulsar_client, pulsar_topics, connection):
@ -409,7 +412,7 @@ def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar
TRANSFORMATION = "common_transform.check_stream_no_filtering" TRANSFORMATION = "common_transform.check_stream_no_filtering"
def stream_creator(stream_name, batch_size): def stream_creator(stream_name, batch_size):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} " return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM {TRANSFORMATION} BATCH_INTERVAL 3000 BATCH_SIZE {batch_size} "
producer = pulsar_client.create_producer( producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000