Add support for type in show streams (#300)

This commit is contained in:
Antonio Andelic 2021-11-15 18:25:23 +01:00 committed by Antonio Andelic
parent c7b045bffc
commit 8606e69fd6
6 changed files with 23 additions and 10 deletions

View File

@ -675,7 +675,7 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return callback;
}
case StreamQuery::Action::SHOW_STREAMS: {
callback.header = {"name", "batch_interval", "batch_size", "transformation_name", "owner", "is running"};
callback.header = {"name", "type", "batch_interval", "batch_size", "transformation_name", "owner", "is running"};
callback.fn = [interpreter_context]() {
auto streams_status = interpreter_context->streams.GetStreamInfo();
std::vector<std::vector<TypedValue>> results;
@ -696,8 +696,9 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
for (const auto &status : streams_status) {
std::vector<TypedValue> typed_status;
typed_status.reserve(8);
typed_status.reserve(7);
typed_status.emplace_back(status.name);
typed_status.emplace_back(StreamSourceTypeToString(status.type));
stream_info_as_typed_stream_info_emplace_in(typed_status, status.info);
if (status.owner.has_value()) {
typed_status.emplace_back(*status.owner);

View File

@ -63,6 +63,15 @@ concept Stream = requires(TStream stream) {
enum class StreamSourceType : uint8_t { KAFKA, PULSAR };
constexpr std::string_view StreamSourceTypeToString(StreamSourceType type) {
switch (type) {
case StreamSourceType::KAFKA:
return "KAFKA";
case StreamSourceType::PULSAR:
return "PULSAR";
}
}
template <Stream T>
StreamSourceType StreamType(const T & /*stream*/);

View File

@ -17,11 +17,12 @@ from multiprocessing import Process, Value
# These are the indices of the different values in the result of SHOW STREAM
# query
NAME = 0
BATCH_INTERVAL = 1
BATCH_SIZE = 2
TRANSFORM = 3
OWNER = 4
IS_RUNNING = 5
TYPE = 1
BATCH_INTERVAL = 2
BATCH_SIZE = 3
TRANSFORM = 4
OWNER = 5
IS_RUNNING = 6
# These are the indices of the query and parameters in the result of CHECK
# STREAM query

View File

@ -218,7 +218,7 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
common.check_stream_info(
cursor,
"default_values",
("default_values", None, None, "kafka_transform.simple", None, False),
("default_values", "KAFKA", None, None, "kafka_transform.simple", None, False),
)
common.check_stream_info(
@ -226,6 +226,7 @@ def test_show_streams(kafka_producer, kafka_topics, connection):
"complex_values",
(
"complex_values",
"KAFKA",
batch_interval,
batch_size,
"kafka_transform.with_parameters",

View File

@ -251,7 +251,7 @@ def test_show_streams(pulsar_client, pulsar_topics, connection):
common.check_stream_info(
cursor,
"default_values",
("default_values", None, None, "pulsar_transform.simple", None, False),
("default_values", "PULSAR", None, None, "pulsar_transform.simple", None, False),
)
common.check_stream_info(
@ -259,6 +259,7 @@ def test_show_streams(pulsar_client, pulsar_topics, connection):
"complex_values",
(
"complex_values",
"PULSAR",
batch_interval,
batch_size,
"pulsar_transform.with_parameters",

View File

@ -77,7 +77,7 @@ def test_owner_is_shown(kafka_topics, connection):
f"TOPICS {kafka_topics[0]} "
f"TRANSFORM kafka_transform.simple")
common.check_stream_info(userless_cursor, "test", ("test", None, None,
common.check_stream_info(userless_cursor, "test", ("test", "KAFKA", None, None,
"kafka_transform.simple", stream_user, False))