Hardcode kafka and pulsar as hostnames
This commit is contained in:
parent
0d7f51f1ae
commit
197f3f4804
@ -45,7 +45,7 @@ def get_topics(num):
|
|||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def kafka_topics():
|
def kafka_topics():
|
||||||
admin_client = KafkaAdminClient(bootstrap_servers=f"{KAFKA_HOSTNAME}:29092", client_id="test")
|
admin_client = KafkaAdminClient(bootstrap_servers=f"kafka:29092", client_id="test")
|
||||||
# The issue arises if we remove default kafka topics, e.g.
|
# The issue arises if we remove default kafka topics, e.g.
|
||||||
# "__consumer_offsets"
|
# "__consumer_offsets"
|
||||||
previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
|
previous_topics = [topic for topic in admin_client.list_topics() if topic != "__consumer_offsets"]
|
||||||
@ -64,7 +64,7 @@ def kafka_topics():
|
|||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def kafka_producer():
|
def kafka_producer():
|
||||||
yield KafkaProducer(bootstrap_servers=[f"{KAFKA_HOSTNAME}:29092"], api_version_auto_timeout_ms=10000)
|
yield KafkaProducer(bootstrap_servers=[f"kafka:29092"], api_version_auto_timeout_ms=10000)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
@ -76,5 +76,5 @@ def pulsar_client():
|
|||||||
def pulsar_topics():
|
def pulsar_topics():
|
||||||
topics = get_topics(3)
|
topics = get_topics(3)
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
requests.delete(f"http://{PULSAR_HOSTNAME}:{PULSAR_PORT}/admin/v2/persistent/public/default/{topic}?force=true")
|
requests.delete(f"http://pulsar:6652/admin/v2/persistent/public/default/{topic}?force=true")
|
||||||
yield topics
|
yield topics
|
||||||
|
@ -163,7 +163,7 @@ def test_show_streams(kafka_topics, connection):
|
|||||||
complex_values_stream = "complex_values"
|
complex_values_stream = "complex_values"
|
||||||
|
|
||||||
common.create_stream(
|
common.create_stream(
|
||||||
cursor, default_values_stream, kafka_topics[0], "kafka_transform.simple", bootstrap_servers=f"'{KAFKA_HOSTNAME}:29092'"
|
cursor, default_values_stream, kafka_topics[0], "kafka_transform.simple", bootstrap_servers=f"'kafka:29092'"
|
||||||
)
|
)
|
||||||
common.create_stream(
|
common.create_stream(
|
||||||
cursor,
|
cursor,
|
||||||
@ -256,7 +256,7 @@ def test_restart_after_error(kafka_producer, kafka_topics, connection):
|
|||||||
def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation):
|
def test_bootstrap_server(kafka_producer, kafka_topics, connection, transformation):
|
||||||
assert len(kafka_topics) > 0
|
assert len(kafka_topics) > 0
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
local = f"'{KAFKA_HOSTNAME}:29092'"
|
local = f"'kafka:29092'"
|
||||||
stream_name = "test_bootstrap_server_" + transformation.split(".")[1]
|
stream_name = "test_bootstrap_server_" + transformation.split(".")[1]
|
||||||
|
|
||||||
common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation, bootstrap_servers=local)
|
common.create_stream(cursor, stream_name, ",".join(kafka_topics), transformation, bootstrap_servers=local)
|
||||||
@ -342,7 +342,7 @@ def test_info_procedure(kafka_topics, connection):
|
|||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
stream_name = "test_stream"
|
stream_name = "test_stream"
|
||||||
configs = {"sasl.username": "michael.scott"}
|
configs = {"sasl.username": "michael.scott"}
|
||||||
local = f"{KAFKA_HOSTNAME}:29092"
|
local = f"kafka:29092"
|
||||||
credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
|
credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
|
||||||
consumer_group = "ConsumerGr"
|
consumer_group = "ConsumerGr"
|
||||||
|
|
||||||
|
@ -323,7 +323,7 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
|
|||||||
def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
|
def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
|
||||||
assert len(pulsar_topics) > 0
|
assert len(pulsar_topics) > 0
|
||||||
cursor = connection.cursor()
|
cursor = connection.cursor()
|
||||||
LOCAL = f"pulsar://{PULSAR_HOSTNAME}:6650"
|
LOCAL = f"pulsar://pulsar:6650"
|
||||||
common.execute_and_fetch_all(
|
common.execute_and_fetch_all(
|
||||||
cursor,
|
cursor,
|
||||||
f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'",
|
f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'",
|
||||||
|
Loading…
Reference in New Issue
Block a user