Revert hostname changes for kafka and pulsar

This commit is contained in:
Deda 2024-03-01 11:31:33 +01:00
parent 03be032b25
commit 4a63e665c2
4 changed files with 17 additions and 17 deletions

View File

@ -9,7 +9,7 @@
# by the Apache License, Version 2.0, included in the file # by the Apache License, Version 2.0, included in the file
# licenses/APL.txt. # licenses/APL.txt.
import os # import os
import pulsar import pulsar
import pytest import pytest
from common import NAME, PULSAR_SERVICE_URL, connect, execute_and_fetch_all from common import NAME, PULSAR_SERVICE_URL, connect, execute_and_fetch_all
@ -21,9 +21,9 @@ import requests
# To run these test locally a running Kafka sever is necessery. The test tries # To run these test locally a running Kafka sever is necessery. The test tries
# to connect on localhost:9092. # to connect on localhost:9092.
KAFKA_HOSTNAME=os.getenv("KAFKA_HOSTNAME", "localhost") # KAFKA_HOSTNAME=os.getenv("KAFKA_HOSTNAME", "localhost")
PULSAR_HOSTNAME=os.getenv("PULSAR_HOSTNAME", "localhost") # PULSAR_HOSTNAME=os.getenv("PULSAR_HOSTNAME", "localhost")
PULSAR_PORT="6652" if PULSAR_HOSTNAME == "localhost" else "8080" # PULSAR_PORT="6652" if PULSAR_HOSTNAME == "localhost" else "8080"
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def connection(): def connection():
@ -45,7 +45,7 @@ def get_topics(num):
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def kafka_topics(): def kafka_topics():
admin_client = KafkaAdminClient(bootstrap_servers=f"kafka:29092", client_id="test") admin_client = KafkaAdminClient(bootstrap_servers="localhost:29092", client_id="test")
# The issue arises if we remove default kafka topics, e.g. # The issue arises if we remove default kafka topics, e.g.
# "__consumer_offsets" # "__consumer_offsets"
previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"] previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
@ -64,7 +64,7 @@ def kafka_topics():
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def kafka_producer(): def kafka_producer():
yield KafkaProducer(bootstrap_servers=[f"kafka:29092"], api_version_auto_timeout_ms=10000) yield KafkaProducer(bootstrap_servers=["localhost:29092"], api_version_auto_timeout_ms=10000)
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
@ -76,5 +76,5 @@ def pulsar_client():
def pulsar_topics(): def pulsar_topics():
topics = get_topics(3) topics = get_topics(3)
for topic in topics: for topic in topics:
requests.delete(f"http://pulsar:6652/admin/v2/persistent/public/default/{topic}?force=true") requests.delete(f"http://localhost:6652/admin/v2/persistent/public/default/{topic}?force=true")
yield topics yield topics

View File

@ -11,7 +11,7 @@
# by the Apache License, Version 2.0, included in the file # by the Apache License, Version 2.0, included in the file
# licenses/APL.txt. # licenses/APL.txt.
import os # import os
import sys import sys
import time import time
from multiprocessing import Process from multiprocessing import Process
@ -24,7 +24,7 @@ from mg_utils import mg_sleep_and_assert
TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"] TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"]
TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"] TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"]
KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT = 60 KAFKA_PRODUCER_SENDING_MSG_DEFAULT_TIMEOUT = 60
KAFKA_HOSTNAME=os.getenv("KAFKA_HOSTNAME", "localhost") # KAFKA_HOSTNAME=os.getenv("KAFKA_HOSTNAME", "localhost")
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY) @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_PY)
def test_simple(kafka_producer, kafka_topics, connection, transformation): def test_simple(kafka_producer, kafka_topics, connection, transformation):
@ -163,7 +163,7 @@ def test_show_streams(kafka_topics, connection):
complex_values_stream = "complex_values" complex_values_stream = "complex_values"
common.create_stream( common.create_stream(
cursor, default_values_stream, kafka_topics[0], "kafka_transform.simple", bootstrap_servers=f"'kafka:29092'" cursor, default_values_stream, kafka_topics[0], "kafka_transform.simple", bootstrap_servers="'localhost:29092'"
) )
common.create_stream( common.create_stream(
cursor, cursor,
@ -256,7 +256,7 @@ def test_restart_after_error(kafka_producer, kafka_topics, connection):
def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation): def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation):
assert len(kafka_topics) > 0 assert len(kafka_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
local = f"'kafka:29092'" local = "'localhost:29092'"
stream_name = "test_bootstrap_server_" + transformation.split(".")[1] stream_name = "test_bootstrap_server_" + transformation.split(".")[1]
common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation, bootstrap_servers=local) common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation, bootstrap_servers=local)
@ -342,7 +342,7 @@ def test_info_procedure(kafka_topics, connection):
cursor = connection.cursor() cursor = connection.cursor()
stream_name = "test_stream" stream_name = "test_stream"
configs = {"sasl.username": "michael.scott"} configs = {"sasl.username": "michael.scott"}
local = f"kafka:29092" local = "localhost:29092"
credentials = {"sasl.password": "S3cr3tP4ssw0rd"} credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
consumer_group = "ConsumerGr" consumer_group = "ConsumerGr"

View File

@ -11,7 +11,7 @@
# by the Apache License, Version 2.0, included in the file # by the Apache License, Version 2.0, included in the file
# licenses/APL.txt. # licenses/APL.txt.
import os # import os
import sys import sys
import time import time
from multiprocessing import Process, Value from multiprocessing import Process, Value
@ -21,7 +21,7 @@ import mgclient
import pytest import pytest
TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"] TRANSFORMATIONS_TO_CHECK = ["pulsar_transform.simple", "pulsar_transform.with_parameters"]
PULSAR_HOSTNAME=os.getenv("PULSAR_HOSTNAME", "127.0.0.1") # PULSAR_HOSTNAME=os.getenv("PULSAR_HOSTNAME", "127.0.0.1")
def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_byte): def check_vertex_exists_with_topic_and_payload(cursor, topic, payload_byte):
@ -323,7 +323,7 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
def test_service_url(pulsar_client, pulsar_topics, connection, transformation): def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
assert len(pulsar_topics) > 0 assert len(pulsar_topics) > 0
cursor = connection.cursor() cursor = connection.cursor()
LOCAL = f"pulsar://pulsar:6650" LOCAL = "pulsar://127.0.0.1:6650"
common.execute_and_fetch_all( common.execute_and_fetch_all(
cursor, cursor,
f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'", f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'",

View File

@ -2,9 +2,9 @@ stream_args: &stream_args
- "--bolt-port" - "--bolt-port"
- "7687" - "7687"
- "--log-level=DEBUG" - "--log-level=DEBUG"
- "--kafka-bootstrap-servers=kafka:9092" - "--kafka-bootstrap-servers=localhost:9092"
- "--query-execution-timeout-sec=0" - "--query-execution-timeout-sec=0"
- "--pulsar-service-url=pulsar:6650" - "--pulsar-service-url=pulsar://127.0.0.1:6650"
in_memory_cluster: &in_memory_cluster in_memory_cluster: &in_memory_cluster
cluster: cluster: