diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index 44e13590e..af073633b 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -1,6 +1,7 @@ #include "integrations/kafka/consumer.hpp" #include <algorithm> +#include <chrono> #include <iterator> #include <memory> #include <unordered_set> @@ -17,7 +18,10 @@ namespace integrations::kafka { constexpr std::chrono::milliseconds kDefaultBatchInterval{100}; constexpr int64_t kDefaultBatchSize = 1000; -constexpr int64_t kDefaultTestBatchLimit = 1; +constexpr int64_t kDefaultCheckBatchLimit = 1; +constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000}; +constexpr std::chrono::milliseconds kMinimumInterval{1}; +constexpr int64_t kMinimumSize{1}; namespace { utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer, @@ -42,12 +46,14 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaCon case RdKafka::ERR_NO_ERROR: batch.emplace_back(std::move(msg)); break; - + case RdKafka::ERR__MAX_POLL_EXCEEDED: + // max.poll.interval.ms reached between two calls of poll, just continue + spdlog::info("Consumer {} reached the max.poll.interval.ms.", info.consumer_name); + break; default: - // TODO(antaljanosbenjamin): handle RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED auto error = msg->errstr(); - spdlog::warn("Unexpected error while consuming message in consumer {}, error: {}!", info.consumer_name, - msg->errstr()); + spdlog::warn("Unexpected error while consuming message in consumer {}, error: {} (code {})!", + info.consumer_name, msg->errstr(), msg->err()); return {std::move(error)}; } @@ -95,6 +101,14 @@ int64_t Message::Timestamp() const { Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, ConsumerFunction consumer_function) : info_{std::move(info)}, consumer_function_(std::move(consumer_function)) { MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer"); + // NOLINTNEXTLINE (modernize-use-nullptr) + if (info.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) { + throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!"); + } + if (info.batch_size.value_or(kMinimumSize) < kMinimumSize) { + throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!"); + } + std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); if (conf == nullptr) { throw ConsumerFailedToInitializeException(info_.consumer_name, "Couldn't create Kafka configuration!"); @@ -193,9 +207,17 @@ void Consumer::StopIfRunning() { } } -void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction &test_consumer_function) const { +void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches, + const ConsumerFunction &check_consumer_function) const { + // NOLINTNEXTLINE (modernize-use-nullptr) + if (timeout.value_or(kMinimumInterval) < kMinimumInterval) { + throw ConsumerCheckFailedException(info_.consumer_name, "Timeout has to be positive!"); + } + if (limit_batches.value_or(kMinimumSize) < kMinimumSize) { + throw ConsumerCheckFailedException(info_.consumer_name, "Batch limit has to be positive!"); + } // The implementation of this function is questionable: it is const qualified, though it changes the inner state of - // KafkaConsumer. Though it changes the inner state, it saves the current assignment for future Test/Start calls to + // KafkaConsumer. Though it changes the inner state, it saves the current assignment for future Check/Start calls to // restore the current state, so the changes made by this function shouldn't be visible for the users of the class. It // also passes a non const reference of KafkaConsumer to GetBatch function. That means the object is bitwise const // (KafkaConsumer is stored in unique_ptr) and internally mostly synchronized. Mostly, because as Start/Stop requires @@ -209,23 +231,29 @@ void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction if (last_assignment_.empty()) { if (auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) { spdlog::warn("Saving the commited offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err)); - throw ConsumerTestFailedException(info_.consumer_name, - fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err))); + throw ConsumerCheckFailedException(info_.consumer_name, + fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err))); } } else { if (auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) { - throw ConsumerTestFailedException(info_.consumer_name, - fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err))); + throw ConsumerCheckFailedException(info_.consumer_name, + fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err))); } } - int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit); - + const auto num_of_batches = limit_batches.value_or(kDefaultCheckBatchLimit); + const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout); + const auto start = std::chrono::steady_clock::now(); for (int64_t i = 0; i < num_of_batches;) { + const auto now = std::chrono::steady_clock::now(); + // NOLINTNEXTLINE (modernize-use-nullptr) + if (now - start >= timeout_to_use) { + throw ConsumerCheckFailedException(info_.consumer_name, "timeout reached"); + } auto maybe_batch = GetBatch(*consumer_, info_, is_running_); if (maybe_batch.HasError()) { - throw ConsumerTestFailedException(info_.consumer_name, maybe_batch.GetError()); + throw ConsumerCheckFailedException(info_.consumer_name, maybe_batch.GetError()); } const auto &batch = maybe_batch.GetValue(); @@ -236,10 +264,10 @@ void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction ++i; try { - test_consumer_function(batch); + check_consumer_function(batch); } catch (const std::exception &e) { - spdlog::warn("Kafka consumer {} test failed with error {}", info_.consumer_name, e.what()); - throw ConsumerTestFailedException(info_.consumer_name, e.what()); + spdlog::warn("Kafka consumer {} check failed with error {}", info_.consumer_name, e.what()); + throw ConsumerCheckFailedException(info_.consumer_name, e.what()); } } } @@ -298,7 +326,6 @@ void Consumer::StartConsuming() { spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name); - // TODO (mferencevic): Figure out what to do with all other exceptions. try { consumer_function_(batch); if (auto err = consumer_->commitSync(); err != RdKafka::ERR_NO_ERROR) { diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp index f92f770a7..483e2cecc 100644 --- a/src/integrations/kafka/consumer.hpp +++ b/src/integrations/kafka/consumer.hpp @@ -99,12 +99,10 @@ class Consumer final : public RdKafka::EventCb { /// Starts consuming messages if it is not started already. /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized void StartIfStopped(); /// Stops consuming messages. /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized /// @throws ConsumerStoppedException if the consumer is already stopped void Stop(); @@ -118,10 +116,12 @@ class Consumer final : public RdKafka::EventCb { /// /// @param limit_batches the consumer will only test the given number of batches. If not present, a default value is /// used. - /// @param test_consumer_function a function to feed the received messages in, only used during this dry-run. + /// @param check_consumer_function a function to feed the received messages in, only used during this dry-run. /// /// @throws ConsumerRunningException if the consumer is alredy running. - void Test(std::optional<int64_t> limit_batches, const ConsumerFunction &test_consumer_function) const; + /// @throws ConsumerCheckFailedException if check isn't successful. + void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches, + const ConsumerFunction &check_consumer_function) const; /// Returns true if the consumer is actively consuming messages. bool IsRunning() const; diff --git a/src/integrations/kafka/exceptions.hpp b/src/integrations/kafka/exceptions.hpp index 81cc48540..b794e0b02 100644 --- a/src/integrations/kafka/exceptions.hpp +++ b/src/integrations/kafka/exceptions.hpp @@ -4,6 +4,7 @@ #include "utils/exceptions.hpp" +namespace integrations::kafka { class KafkaStreamException : public utils::BasicException { using utils::BasicException::BasicException; }; @@ -26,10 +27,10 @@ class ConsumerStoppedException : public KafkaStreamException { : KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {} }; -class ConsumerTestFailedException : public KafkaStreamException { +class ConsumerCheckFailedException : public KafkaStreamException { public: - explicit ConsumerTestFailedException(const std::string &consumer_name, const std::string &error) - : KafkaStreamException("Kafka consumer {} test failed: {}", consumer_name, error) {} + explicit ConsumerCheckFailedException(const std::string &consumer_name, const std::string &error) + : KafkaStreamException("Kafka consumer {} check failed: {}", consumer_name, error) {} }; class ConsumerStartFailedException : public KafkaStreamException { @@ -43,3 +44,4 @@ class TopicNotFoundException : public KafkaStreamException { TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name) : KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {} }; +} // namespace integrations::kafka diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index fc7c80203..f8758df95 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2479,6 +2479,9 @@ cpp<# :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) (batch_limit "Expression *" :initval "nullptr" :scope :public + :slk-save #'slk-save-ast-pointer + :slk-load (slk-load-ast-pointer "Expression")) + (timeout "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression"))) diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 7719385a2..9365b12b9 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -564,6 +564,12 @@ antlrcpp::Any CypherMainVisitor::visitCheckStream(MemgraphCypher::CheckStreamCon } stream_query->batch_limit_ = ctx->batchLimit->accept(this); } + if (ctx->TIMEOUT()) { + if (!ctx->timeout->numberLiteral() || !ctx->timeout->numberLiteral()->integerLiteral()) { + throw SemanticException("Timeout should be an integer literal!"); + } + stream_query->timeout_ = ctx->timeout->accept(this); + } return stream_query; } diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 9bd72568d..6f27d3853 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -294,4 +294,4 @@ stopAllStreams : STOP ALL STREAMS ; showStreams : SHOW STREAMS ; -checkStream : CHECK STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ; +checkStream : CHECK STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ( TIMEOUT timeout=literal ) ? ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 2fb049626..236d4baf5 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -583,8 +583,9 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete case StreamQuery::Action::CHECK_STREAM: { callback.header = {"query", "parameters"}; callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, + timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator), batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable { - return interpreter_context->streams.Test(stream_name, batch_limit); + return interpreter_context->streams.Check(stream_name, timeout, batch_limit); }; return callback; } diff --git a/src/query/streams.cpp b/src/query/streams.cpp index 44b3cce1e..ae0c0443b 100644 --- a/src/query/streams.cpp +++ b/src/query/streams.cpp @@ -277,7 +277,8 @@ std::vector<StreamStatus> Streams::GetStreamInfo() const { return result; } -TransformationResult Streams::Test(const std::string &stream_name, std::optional<int64_t> batch_limit) const { +TransformationResult Streams::Check(const std::string &stream_name, std::optional<std::chrono::milliseconds> timeout, + std::optional<int64_t> batch_limit) const { // This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after that auto [locked_consumer, transformation_name] = [this, &stream_name]() -> std::pair<SynchronizedConsumer::ReadLockedPtr, std::string> { @@ -307,7 +308,7 @@ TransformationResult Streams::Test(const std::string &stream_name, std::optional } }; - locked_consumer->Test(batch_limit, consumer_function); + locked_consumer->Check(timeout, batch_limit, consumer_function); return test_result; } diff --git a/src/query/streams.hpp b/src/query/streams.hpp index 68f559b57..be8ad5960 100644 --- a/src/query/streams.hpp +++ b/src/query/streams.hpp @@ -122,8 +122,10 @@ class Streams final { /// /// @throws StreamsException if the stream doesn't exist /// @throws ConsumerRunningException if the consumer is alredy running - /// @throws ConsumerTestFailedException if the transformation function throws any std::exception during processing - TransformationResult Test(const std::string &stream_name, std::optional<int64_t> batch_limit = std::nullopt) const; + /// @throws ConsumerCheckFailedException if the transformation function throws any std::exception during processing + TransformationResult Check(const std::string &stream_name, + std::optional<std::chrono::milliseconds> timeout = std::nullopt, + std::optional<int64_t> batch_limit = std::nullopt) const; private: using StreamsMap = std::unordered_map<std::string, StreamData>; diff --git a/tests/e2e/streams/streams_tests.py b/tests/e2e/streams/streams_tests.py index 1b528b927..a0d66b236 100755 --- a/tests/e2e/streams/streams_tests.py +++ b/tests/e2e/streams/streams_tests.py @@ -422,6 +422,9 @@ def test_start_and_stop_during_check(producer, topics, connection, operation): time.sleep(0.5) assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE) + assert timed_wait(lambda: get_is_running(cursor, "test_stream")) + assert check_counter.value == CHECK_BEFORE_EXECUTE, "SHOW STREAMS " \ + "was blocked until the end of CHECK STREAM" operation_proc.start() assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE) @@ -448,7 +451,7 @@ def test_start_and_stop_during_check(producer, topics, connection, operation): operation_proc.terminate() -def test_check_already_started_stream(producer, topics, connection): +def test_check_already_started_stream(topics, connection): assert len(topics) > 0 cursor = connection.cursor() @@ -462,5 +465,33 @@ def test_check_already_started_stream(producer, topics, connection): execute_and_fetch_all(cursor, "CHECK STREAM started_stream") +def test_start_checked_stream_after_timeout(topics, connection): + cursor = connection.cursor() + execute_and_fetch_all(cursor, + "CREATE STREAM test_stream " + f"TOPICS {topics[0]} " + f"TRANSFORM transform.simple") + + timeout_ms = 2000 + + def call_check(): + execute_and_fetch_all( + connect().cursor(), + f"CHECK STREAM test_stream TIMEOUT {timeout_ms}") + + check_stream_proc = Process(target=call_check, daemon=True) + + start = time.time() + check_stream_proc.start() + assert timed_wait(lambda: get_is_running(cursor, "test_stream")) + start_stream(cursor, "test_stream") + end = time.time() + + assert (end - start) < 1.3 * \ + timeout_ms, "The START STREAM was blocked too long" + assert get_is_running(cursor, "test_stream") + stop_stream(cursor, "test_stream") + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/streams/workloads.yaml b/tests/e2e/streams/workloads.yaml index 8d8a836be..624707f13 100644 --- a/tests/e2e/streams/workloads.yaml +++ b/tests/e2e/streams/workloads.yaml @@ -1,7 +1,7 @@ template_cluster: &template_cluster cluster: main: - args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092","--query-execution-timeout-sec=0"] + args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092", "--query-execution-timeout-sec=0"] log_file: "streams-e2e.log" setup_queries: [] validation_queries: [] diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 67ea169fb..7fe229568 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -3216,7 +3216,8 @@ void CheckOptionalExpression(Base &ast_generator, Expression *expression, const void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &query_string, const StreamQuery::Action action, const std::string_view stream_name, - const std::optional<TypedValue> &batch_limit = std::nullopt) { + const std::optional<TypedValue> &batch_limit = std::nullopt, + const std::optional<TypedValue> &timeout = std::nullopt) { auto *parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string)); ASSERT_NE(parsed_query, nullptr); EXPECT_EQ(parsed_query->action_, action); @@ -3227,6 +3228,7 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer EXPECT_EQ(parsed_query->batch_interval_, nullptr); EXPECT_EQ(parsed_query->batch_size_, nullptr); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit)); + EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout)); } TEST_P(CypherMainVisitorTest, DropStream) { @@ -3413,13 +3415,20 @@ TEST_P(CypherMainVisitorTest, CheckStream) { TestInvalidQuery("CHECK STREAM something,something", ast_generator); TestInvalidQuery("CHECK STREAM something BATCH LIMIT 1", ast_generator); TestInvalidQuery("CHECK STREAM something BATCH_LIMIT", ast_generator); + TestInvalidQuery("CHECK STREAM something TIMEOUT", ast_generator); + TestInvalidQuery("CHECK STREAM something BATCH_LIMIT 1 TIMEOUT", ast_generator); TestInvalidQuery<SemanticException>("CHECK STREAM something BATCH_LIMIT 'it should be an integer'", ast_generator); TestInvalidQuery<SemanticException>("CHECK STREAM something BATCH_LIMIT 2.5", ast_generator); + TestInvalidQuery<SemanticException>("CHECK STREAM something TIMEOUT 'it should be an integer'", ast_generator); ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream", StreamQuery::Action::CHECK_STREAM, "checkedStream"); ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream bAtCH_LIMIT 42", StreamQuery::Action::CHECK_STREAM, "checkedStream", TypedValue(42)); + ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream TimEOuT 666", + StreamQuery::Action::CHECK_STREAM, "checkedStream", std::nullopt, TypedValue(666)); + ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream BATCH_LIMIT 30 TIMEOUT 444", + StreamQuery::Action::CHECK_STREAM, "checkedStream", TypedValue(30), TypedValue(444)); } } // namespace diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp index 9cbfdca4d..521adc5c3 100644 --- a/tests/unit/integrations_kafka_consumer.cpp +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -19,6 +19,8 @@ using namespace integrations::kafka; namespace { const auto kDummyConsumerFunction = [](const auto & /*messages*/) {}; +constexpr std::optional<std::chrono::milliseconds> kDontCareTimeout = std::nullopt; + int SpanToInt(std::span<const char> span) { int result{0}; if (span.size() != sizeof(int)) { @@ -277,6 +279,32 @@ TEST_F(ConsumerTest, InvalidTopic) { EXPECT_THROW(Consumer(cluster.Bootstraps(), std::move(info), kDummyConsumerFunction), TopicNotFoundException); } +TEST_F(ConsumerTest, InvalidBatchInterval) { + auto info = CreateDefaultConsumerInfo(); + + info.batch_interval = std::chrono::milliseconds{0}; + EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException); + + info.batch_interval = std::chrono::milliseconds{-1}; + EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException); + + info.batch_interval = std::chrono::milliseconds{1}; + EXPECT_NO_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction)); +} + +TEST_F(ConsumerTest, InvalidBatchSize) { + auto info = CreateDefaultConsumerInfo(); + + info.batch_size = 0; + EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException); + + info.batch_size = -1; + EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException); + + info.batch_size = 1; + EXPECT_NO_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction)); +} + TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) { constexpr auto kBatchSize = 1; auto info = CreateDefaultConsumerInfo(); @@ -332,7 +360,7 @@ TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) { ASSERT_NO_FATAL_FAILURE(send_and_consume_messages(2)); } -TEST_F(ConsumerTest, TestMethodWorks) { +TEST_F(ConsumerTest, CheckMethodWorks) { constexpr auto kBatchSize = 1; auto info = CreateDefaultConsumerInfo(); info.batch_size = kBatchSize; @@ -354,7 +382,7 @@ TEST_F(ConsumerTest, TestMethodWorks) { ASSERT_FALSE(consumer->IsRunning()); - consumer->Test(kMessageCount, [&](const std::vector<Message> &messages) mutable { + consumer->Check(kDontCareTimeout, kMessageCount, [&](const std::vector<Message> &messages) mutable { auto message_count = received_message_count.load(); for (const auto &message : messages) { std::string message_payload = kMessagePrefix + std::to_string(message_count++); @@ -379,6 +407,48 @@ TEST_F(ConsumerTest, TestMethodWorks) { } } +TEST_F(ConsumerTest, CheckMethodTimeout) { + Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction}; + + std::chrono::milliseconds timeout{3000}; + + const auto start = std::chrono::steady_clock::now(); + EXPECT_THROW(consumer.Check(timeout, std::nullopt, kDummyConsumerFunction), ConsumerCheckFailedException); + const auto end = std::chrono::steady_clock::now(); + + const auto elapsed = (end - start); + EXPECT_LE(timeout, elapsed); + EXPECT_LE(elapsed, timeout * 1.2); +} + +TEST_F(ConsumerTest, CheckWithInvalidTimeout) { + Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction}; + + const auto start = std::chrono::steady_clock::now(); + EXPECT_THROW(consumer.Check(std::chrono::milliseconds{0}, std::nullopt, kDummyConsumerFunction), + ConsumerCheckFailedException); + EXPECT_THROW(consumer.Check(std::chrono::milliseconds{-1}, std::nullopt, kDummyConsumerFunction), + ConsumerCheckFailedException); + const auto end = std::chrono::steady_clock::now(); + + constexpr std::chrono::seconds kMaxExpectedTimeout{2}; + EXPECT_LE((end - start), kMaxExpectedTimeout) << "The check most probably failed because of an actual timeout and " + "not because of the invalid value for timeout."; +} + +TEST_F(ConsumerTest, CheckWithInvalidBatchSize) { + Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction}; + + const auto start = std::chrono::steady_clock::now(); + EXPECT_THROW(consumer.Check(std::nullopt, 0, kDummyConsumerFunction), ConsumerCheckFailedException); + EXPECT_THROW(consumer.Check(std::nullopt, -1, kDummyConsumerFunction), ConsumerCheckFailedException); + const auto end = std::chrono::steady_clock::now(); + + constexpr std::chrono::seconds kMaxExpectedTimeout{2}; + EXPECT_LE((end - start), kMaxExpectedTimeout) << "The check most probably failed because of an actual timeout and " + "not because of the invalid value for batch size."; +} + TEST_F(ConsumerTest, ConsumerStatus) { const std::string kConsumerName = "ConsumerGroupNameAAAA"; const std::vector<std::string> topics = {"Topic1QWER", "Topic2XCVBB"}; diff --git a/tests/unit/query_streams.cpp b/tests/unit/query_streams.cpp index 3d254c836..d73f04c26 100644 --- a/tests/unit/query_streams.cpp +++ b/tests/unit/query_streams.cpp @@ -4,6 +4,7 @@ #include <utility> #include <gtest/gtest.h> +#include "integrations/kafka/exceptions.hpp" #include "kafka_mock.hpp" #include "query/config.hpp" #include "query/interpreter.hpp" @@ -225,3 +226,19 @@ TEST_F(StreamsTest, RestoreStreams) { check_restore_logic(); } } + +TEST_F(StreamsTest, CheckWithTimeout) { + const auto stream_info = CreateDefaultStreamInfo(); + const auto stream_name = GetDefaultStreamName(); + streams_->Create(stream_name, stream_info); + + std::chrono::milliseconds timeout{3000}; + + const auto start = std::chrono::steady_clock::now(); + EXPECT_THROW(streams_->Check(stream_name, timeout, std::nullopt), integrations::kafka::ConsumerCheckFailedException); + const auto end = std::chrono::steady_clock::now(); + + const auto elapsed = (end - start); + EXPECT_LE(timeout, elapsed); + EXPECT_LE(elapsed, timeout * 1.2); +}