From 8606e69fd69680ca672fc2af1ba36a88f13a2da6 Mon Sep 17 00:00:00 2001 From: Antonio Andelic <antonio2368@users.noreply.github.com> Date: Mon, 15 Nov 2021 18:25:23 +0100 Subject: [PATCH] Add support for type in show streams (#300) --- src/query/interpreter.cpp | 5 +++-- src/query/stream/common.hpp | 9 +++++++++ tests/e2e/streams/common.py | 11 ++++++----- tests/e2e/streams/kafka_streams_tests.py | 3 ++- tests/e2e/streams/pulsar_streams_tests.py | 3 ++- tests/e2e/streams/streams_owner_tests.py | 2 +- 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 9ddd27c4f..bf0ac9e4f 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -675,7 +675,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete return callback; } case StreamQuery::Action::SHOW_STREAMS: { - callback.header = {"name", "batch_interval", "batch_size", "transformation_name", "owner", "is running"}; + callback.header = {"name", "type", "batch_interval", "batch_size", "transformation_name", "owner", "is running"}; callback.fn = [interpreter_context]() { auto streams_status = interpreter_context->streams.GetStreamInfo(); std::vector<std::vector<TypedValue>> results; @@ -696,8 +696,9 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete for (const auto &status : streams_status) { std::vector<TypedValue> typed_status; - typed_status.reserve(8); + typed_status.reserve(7); typed_status.emplace_back(status.name); + typed_status.emplace_back(StreamSourceTypeToString(status.type)); stream_info_as_typed_stream_info_emplace_in(typed_status, status.info); if (status.owner.has_value()) { typed_status.emplace_back(*status.owner); diff --git a/src/query/stream/common.hpp b/src/query/stream/common.hpp index 7adc9cdd0..2633ed4f9 100644 --- a/src/query/stream/common.hpp +++ b/src/query/stream/common.hpp @@ -63,6 +63,15 @@ concept Stream = requires(TStream stream) { enum class StreamSourceType : uint8_t { KAFKA, PULSAR }; +constexpr std::string_view StreamSourceTypeToString(StreamSourceType type) { + switch (type) { + case StreamSourceType::KAFKA: + return "KAFKA"; + case StreamSourceType::PULSAR: + return "PULSAR"; + } +} + template <Stream T> StreamSourceType StreamType(const T & /*stream*/); diff --git a/tests/e2e/streams/common.py b/tests/e2e/streams/common.py index a1949826d..d92dbd0d1 100644 --- a/tests/e2e/streams/common.py +++ b/tests/e2e/streams/common.py @@ -17,11 +17,12 @@ from multiprocessing import Process, Value # These are the indices of the different values in the result of SHOW STREAM # query NAME = 0 -BATCH_INTERVAL = 1 -BATCH_SIZE = 2 -TRANSFORM = 3 -OWNER = 4 -IS_RUNNING = 5 +TYPE = 1 +BATCH_INTERVAL = 2 +BATCH_SIZE = 3 +TRANSFORM = 4 +OWNER = 5 +IS_RUNNING = 6 # These are the indices of the query and parameters in the result of CHECK # STREAM query diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index 168b6f56c..4dc1b2084 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -218,7 +218,7 @@ def test_show_streams(kafka_producer, kafka_topics, connection): common.check_stream_info( cursor, "default_values", - ("default_values", None, None, "kafka_transform.simple", None, False), + ("default_values", "KAFKA", None, None, "kafka_transform.simple", None, False), ) common.check_stream_info( @@ -226,6 +226,7 @@ def test_show_streams(kafka_producer, kafka_topics, connection): "complex_values", ( "complex_values", + "KAFKA", batch_interval, batch_size, "kafka_transform.with_parameters", diff --git a/tests/e2e/streams/pulsar_streams_tests.py b/tests/e2e/streams/pulsar_streams_tests.py index 04ef9b37e..a64dfbcec 100755 --- a/tests/e2e/streams/pulsar_streams_tests.py +++ b/tests/e2e/streams/pulsar_streams_tests.py @@ -251,7 +251,7 @@ def test_show_streams(pulsar_client, pulsar_topics, connection): common.check_stream_info( cursor, "default_values", - ("default_values", None, None, "pulsar_transform.simple", None, False), + ("default_values", "PULSAR", None, None, "pulsar_transform.simple", None, False), ) common.check_stream_info( @@ -259,6 +259,7 @@ def test_show_streams(pulsar_client, pulsar_topics, connection): "complex_values", ( "complex_values", + "PULSAR", batch_interval, batch_size, "pulsar_transform.with_parameters", diff --git a/tests/e2e/streams/streams_owner_tests.py b/tests/e2e/streams/streams_owner_tests.py index e65150c94..850fd34b9 100644 --- a/tests/e2e/streams/streams_owner_tests.py +++ b/tests/e2e/streams/streams_owner_tests.py @@ -77,7 +77,7 @@ def test_owner_is_shown(kafka_topics, connection): f"TOPICS {kafka_topics[0]} " f"TRANSFORM kafka_transform.simple") - common.check_stream_info(userless_cursor, "test", ("test", None, None, + common.check_stream_info(userless_cursor, "test", ("test", "KAFKA", None, None, "kafka_transform.simple", stream_user, False))