Add remaining query functionality SHOW STREAMS (#184)

* Add query functionality SHOW STREAMS
This commit is contained in:
Kostas Kyrimis 2021-07-02 11:20:32 +03:00 committed by Antonio Andelic
parent a928c158da
commit a37755ce43
9 changed files with 80 additions and 10 deletions

View File

@ -534,6 +534,12 @@ antlrcpp::Any CypherMainVisitor::visitStopAllStreams(MemgraphCypher::StopAllStre
return stream_query;
}
antlrcpp::Any CypherMainVisitor::visitShowStreams(MemgraphCypher::ShowStreamsContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::SHOW_STREAMS;
return stream_query;
}
antlrcpp::Any CypherMainVisitor::visitCypherUnion(MemgraphCypher::CypherUnionContext *ctx) {
bool distinct = !ctx->ALL();
auto *cypher_union = storage_->Create<CypherUnion>(distinct);

View File

@ -283,6 +283,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitStopAllStreams(MemgraphCypher::StopAllStreamsContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitShowStreams(MemgraphCypher::ShowStreamsContext *ctx) override;
/**
* @return CypherUnion*
*/

View File

@ -147,6 +147,7 @@ streamQuery : createStream
| startAllStreams
| stopStream
| stopAllStreams
| showStreams
;
loadCsv : LOAD CSV FROM csvFile ( WITH | NO ) HEADER
@ -286,3 +287,5 @@ startAllStreams : START ALL STREAMS ;
stopStream : STOP STREAM streamName ;
stopAllStreams : STOP ALL STREAMS ;
showStreams : SHOW STREAMS ;

View File

@ -68,6 +68,7 @@ ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SESSION : S E S S I O N ;
SHOW : S H O W ;
SNAPSHOT : S N A P S H O T ;
START : S T A R T ;
STATS : S T A T S ;

View File

@ -530,9 +530,54 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
};
return callback;
}
case StreamQuery::Action::SHOW_STREAMS:
case StreamQuery::Action::TEST_STREAM:
throw std::logic_error("not implemented");
case StreamQuery::Action::SHOW_STREAMS: {
callback.header = {"name", "topics", "consumer_group", "batch_interval", "batch_size", "transformation_name",
"is running"};
callback.fn = [interpreter_context]() {
auto streams_status = interpreter_context->streams.GetStreamInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(streams_status.size());
auto topics_as_typed_topics = [](const auto &topics) {
std::vector<TypedValue> typed_topics;
typed_topics.reserve(topics.size());
for (const auto &elem : topics) {
typed_topics.emplace_back(elem);
}
return typed_topics;
};
auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics](auto &typed_status,
const auto &stream_info) {
typed_status.emplace_back(topics_as_typed_topics(stream_info.topics));
typed_status.emplace_back(stream_info.consumer_group);
if (stream_info.batch_interval.has_value()) {
typed_status.emplace_back(stream_info.batch_interval->count());
} else {
typed_status.emplace_back();
}
if (stream_info.batch_size.has_value()) {
typed_status.emplace_back(*stream_info.batch_size);
} else {
typed_status.emplace_back();
}
typed_status.emplace_back(stream_info.transformation_name);
};
for (const auto &status : streams_status) {
std::vector<TypedValue> typed_status;
typed_status.reserve(7);
typed_status.emplace_back(status.name);
stream_info_as_typed_stream_info_emplace_in(typed_status, status.info);
typed_status.emplace_back(status.is_running);
results.push_back(std::move(typed_status));
}
return results;
};
return callback;
case StreamQuery::Action::TEST_STREAM:
throw std::logic_error("not implemented");
}
}
}

View File

@ -188,7 +188,7 @@ void Streams::StopAll() {
}
}
std::vector<StreamStatus> Streams::Show() const {
std::vector<StreamStatus> Streams::GetStreamInfo() const {
std::vector<StreamStatus> result;
{
for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) {

View File

@ -113,7 +113,7 @@ class Streams final {
/// Return current status for all streams.
/// It might happend that the is_running field is out of date if the one of the streams stops during the invocation of
/// this function because of an error.
std::vector<StreamStatus> Show() const;
std::vector<StreamStatus> GetStreamInfo() const;
/// Do a dry-run consume from a stream.
///

View File

@ -3252,6 +3252,16 @@ TEST_P(CypherMainVisitorTest, StartAllStreams) {
ValidateMostlyEmptyStreamQuery(ast_generator, "StARt AlL StrEAMS", StreamQuery::Action::START_ALL_STREAMS, "");
}
TEST_P(CypherMainVisitorTest, ShowStreams) {
auto &ast_generator = *GetParam();
TestInvalidQuery("SHOW ALL", ast_generator);
TestInvalidQuery("SHOW STREAM", ast_generator);
TestInvalidQuery("SHOW STREAMS ALL", ast_generator);
ValidateMostlyEmptyStreamQuery(ast_generator, "SHOW STREAMS", StreamQuery::Action::SHOW_STREAMS, "");
}
TEST_P(CypherMainVisitorTest, StopStream) {
auto &ast_generator = *GetParam();

View File

@ -68,7 +68,7 @@ class StreamsTest : public ::testing::Test {
void CheckStreamStatus(const StreamCheckData &check_data) {
SCOPED_TRACE(fmt::format("Checking status of '{}'", check_data.name));
const auto &stream_statuses = streams_->Show();
const auto &stream_statuses = streams_->GetStreamInfo();
auto it = std::find_if(stream_statuses.begin(), stream_statuses.end(),
[&check_data](const auto &stream_status) { return stream_status.name == check_data.name; });
ASSERT_NE(it, stream_statuses.end());
@ -122,7 +122,7 @@ TEST_F(StreamsTest, SimpleStreamManagement) {
EXPECT_NO_FATAL_FAILURE(CheckStreamStatus(check_data));
EXPECT_NO_THROW(streams_->Drop(check_data.name));
EXPECT_TRUE(streams_->Show().empty());
EXPECT_TRUE(streams_->GetStreamInfo().empty());
}
TEST_F(StreamsTest, CreateAlreadyExisting) {
@ -183,16 +183,16 @@ TEST_F(StreamsTest, RestoreStreams) {
const auto check_restore_logic = [&stream_check_datas, this]() {
// Reset the Streams object to trigger reloading
ResetStreamsObject();
EXPECT_TRUE(streams_->Show().empty());
EXPECT_TRUE(streams_->GetStreamInfo().empty());
streams_->RestoreStreams();
EXPECT_EQ(stream_check_datas.size(), streams_->Show().size());
EXPECT_EQ(stream_check_datas.size(), streams_->GetStreamInfo().size());
for (const auto &check_data : stream_check_datas) {
ASSERT_NO_FATAL_FAILURE(CheckStreamStatus(check_data));
}
};
streams_->RestoreStreams();
EXPECT_TRUE(streams_->Show().empty());
EXPECT_TRUE(streams_->GetStreamInfo().empty());
for (auto &check_data : stream_check_datas) {
streams_->Create(check_data.name, check_data.info);