Small improvements for streams (#193)

* Add timeout to CHECK STREAM

* Handle RdKafka::ERR__MAX_POLL_EXCEEDED in consumer

Co-authored-by: Jure Bajic <jbajic@users.noreply.github.com>

Co-authored-by: Jure Bajic <jbajic@users.noreply.github.com>
This commit is contained in:
János Benjamin Antal 2021-07-07 15:33:06 +02:00 committed by Antonio Andelic
parent 6cfec787dc
commit fb5a2ed4b6
14 changed files with 205 additions and 36 deletions

View File

@ -1,6 +1,7 @@
#include "integrations/kafka/consumer.hpp"
#include <algorithm>
#include <chrono>
#include <iterator>
#include <memory>
#include <unordered_set>
@ -17,7 +18,10 @@ namespace integrations::kafka {
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize = 1000;
constexpr int64_t kDefaultTestBatchLimit = 1;
constexpr int64_t kDefaultCheckBatchLimit = 1;
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1};
constexpr int64_t kMinimumSize{1};
namespace {
utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer,
@ -42,12 +46,14 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaCon
case RdKafka::ERR_NO_ERROR:
batch.emplace_back(std::move(msg));
break;
case RdKafka::ERR__MAX_POLL_EXCEEDED:
// max.poll.interval.ms reached between two calls of poll, just continue
spdlog::info("Consumer {} reached the max.poll.interval.ms.", info.consumer_name);
break;
default:
// TODO(antaljanosbenjamin): handle RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED
auto error = msg->errstr();
spdlog::warn("Unexpected error while consuming message in consumer {}, error: {}!", info.consumer_name,
msg->errstr());
spdlog::warn("Unexpected error while consuming message in consumer {}, error: {} (code {})!",
info.consumer_name, msg->errstr(), msg->err());
return {std::move(error)};
}
@ -95,6 +101,14 @@ int64_t Message::Timestamp() const {
Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, ConsumerFunction consumer_function)
: info_{std::move(info)}, consumer_function_(std::move(consumer_function)) {
MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer");
// NOLINTNEXTLINE (modernize-use-nullptr)
if (info.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch interval has to be positive!");
}
if (info.batch_size.value_or(kMinimumSize) < kMinimumSize) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Batch size has to be positive!");
}
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
if (conf == nullptr) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Couldn't create Kafka configuration!");
@ -193,9 +207,17 @@ void Consumer::StopIfRunning() {
}
}
void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction &test_consumer_function) const {
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const {
// NOLINTNEXTLINE (modernize-use-nullptr)
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerCheckFailedException(info_.consumer_name, "Timeout has to be positive!");
}
if (limit_batches.value_or(kMinimumSize) < kMinimumSize) {
throw ConsumerCheckFailedException(info_.consumer_name, "Batch limit has to be positive!");
}
// The implementation of this function is questionable: it is const qualified, though it changes the inner state of
// KafkaConsumer. Though it changes the inner state, it saves the current assignment for future Test/Start calls to
// KafkaConsumer. Though it changes the inner state, it saves the current assignment for future Check/Start calls to
// restore the current state, so the changes made by this function shouldn't be visible for the users of the class. It
// also passes a non const reference of KafkaConsumer to GetBatch function. That means the object is bitwise const
// (KafkaConsumer is stored in unique_ptr) and internally mostly synchronized. Mostly, because as Start/Stop requires
@ -209,23 +231,29 @@ void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction
if (last_assignment_.empty()) {
if (auto err = consumer_->assignment(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Saving the commited offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err));
throw ConsumerTestFailedException(info_.consumer_name,
fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err)));
throw ConsumerCheckFailedException(info_.consumer_name,
fmt::format("Couldn't save commited offsets: '{}'", RdKafka::err2str(err)));
}
} else {
if (auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerTestFailedException(info_.consumer_name,
fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err)));
throw ConsumerCheckFailedException(info_.consumer_name,
fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err)));
}
}
int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit);
const auto num_of_batches = limit_batches.value_or(kDefaultCheckBatchLimit);
const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout);
const auto start = std::chrono::steady_clock::now();
for (int64_t i = 0; i < num_of_batches;) {
const auto now = std::chrono::steady_clock::now();
// NOLINTNEXTLINE (modernize-use-nullptr)
if (now - start >= timeout_to_use) {
throw ConsumerCheckFailedException(info_.consumer_name, "timeout reached");
}
auto maybe_batch = GetBatch(*consumer_, info_, is_running_);
if (maybe_batch.HasError()) {
throw ConsumerTestFailedException(info_.consumer_name, maybe_batch.GetError());
throw ConsumerCheckFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
@ -236,10 +264,10 @@ void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction
++i;
try {
test_consumer_function(batch);
check_consumer_function(batch);
} catch (const std::exception &e) {
spdlog::warn("Kafka consumer {} test failed with error {}", info_.consumer_name, e.what());
throw ConsumerTestFailedException(info_.consumer_name, e.what());
spdlog::warn("Kafka consumer {} check failed with error {}", info_.consumer_name, e.what());
throw ConsumerCheckFailedException(info_.consumer_name, e.what());
}
}
}
@ -298,7 +326,6 @@ void Consumer::StartConsuming() {
spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name);
// TODO (mferencevic): Figure out what to do with all other exceptions.
try {
consumer_function_(batch);
if (auto err = consumer_->commitSync(); err != RdKafka::ERR_NO_ERROR) {

View File

@ -99,12 +99,10 @@ class Consumer final : public RdKafka::EventCb {
/// Starts consuming messages if it is not started already.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
void StartIfStopped();
/// Stops consuming messages.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerStoppedException if the consumer is already stopped
void Stop();
@ -118,10 +116,12 @@ class Consumer final : public RdKafka::EventCb {
///
/// @param limit_batches the consumer will only test the given number of batches. If not present, a default value is
/// used.
/// @param test_consumer_function a function to feed the received messages in, only used during this dry-run.
/// @param check_consumer_function a function to feed the received messages in, only used during this dry-run.
///
/// @throws ConsumerRunningException if the consumer is alredy running.
void Test(std::optional<int64_t> limit_batches, const ConsumerFunction &test_consumer_function) const;
/// @throws ConsumerCheckFailedException if check isn't successful.
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const;
/// Returns true if the consumer is actively consuming messages.
bool IsRunning() const;

View File

@ -4,6 +4,7 @@
#include "utils/exceptions.hpp"
namespace integrations::kafka {
class KafkaStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
@ -26,10 +27,10 @@ class ConsumerStoppedException : public KafkaStreamException {
: KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {}
};
class ConsumerTestFailedException : public KafkaStreamException {
class ConsumerCheckFailedException : public KafkaStreamException {
public:
explicit ConsumerTestFailedException(const std::string &consumer_name, const std::string &error)
: KafkaStreamException("Kafka consumer {} test failed: {}", consumer_name, error) {}
explicit ConsumerCheckFailedException(const std::string &consumer_name, const std::string &error)
: KafkaStreamException("Kafka consumer {} check failed: {}", consumer_name, error) {}
};
class ConsumerStartFailedException : public KafkaStreamException {
@ -43,3 +44,4 @@ class TopicNotFoundException : public KafkaStreamException {
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name)
: KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
} // namespace integrations::kafka

View File

@ -2479,6 +2479,9 @@ cpp<#
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(batch_limit "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(timeout "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))

View File

@ -564,6 +564,12 @@ antlrcpp::Any CypherMainVisitor::visitCheckStream(MemgraphCypher::CheckStreamCon
}
stream_query->batch_limit_ = ctx->batchLimit->accept(this);
}
if (ctx->TIMEOUT()) {
if (!ctx->timeout->numberLiteral() || !ctx->timeout->numberLiteral()->integerLiteral()) {
throw SemanticException("Timeout should be an integer literal!");
}
stream_query->timeout_ = ctx->timeout->accept(this);
}
return stream_query;
}

View File

@ -294,4 +294,4 @@ stopAllStreams : STOP ALL STREAMS ;
showStreams : SHOW STREAMS ;
checkStream : CHECK STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ;
checkStream : CHECK STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ( TIMEOUT timeout=literal ) ? ;

View File

@ -583,8 +583,9 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
case StreamQuery::Action::CHECK_STREAM: {
callback.header = {"query", "parameters"};
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator),
batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable {
return interpreter_context->streams.Test(stream_name, batch_limit);
return interpreter_context->streams.Check(stream_name, timeout, batch_limit);
};
return callback;
}

View File

@ -277,7 +277,8 @@ std::vector<StreamStatus> Streams::GetStreamInfo() const {
return result;
}
TransformationResult Streams::Test(const std::string &stream_name, std::optional<int64_t> batch_limit) const {
TransformationResult Streams::Check(const std::string &stream_name, std::optional<std::chrono::milliseconds> timeout,
std::optional<int64_t> batch_limit) const {
// This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after that
auto [locked_consumer,
transformation_name] = [this, &stream_name]() -> std::pair<SynchronizedConsumer::ReadLockedPtr, std::string> {
@ -307,7 +308,7 @@ TransformationResult Streams::Test(const std::string &stream_name, std::optional
}
};
locked_consumer->Test(batch_limit, consumer_function);
locked_consumer->Check(timeout, batch_limit, consumer_function);
return test_result;
}

View File

@ -122,8 +122,10 @@ class Streams final {
///
/// @throws StreamsException if the stream doesn't exist
/// @throws ConsumerRunningException if the consumer is alredy running
/// @throws ConsumerTestFailedException if the transformation function throws any std::exception during processing
TransformationResult Test(const std::string &stream_name, std::optional<int64_t> batch_limit = std::nullopt) const;
/// @throws ConsumerCheckFailedException if the transformation function throws any std::exception during processing
TransformationResult Check(const std::string &stream_name,
std::optional<std::chrono::milliseconds> timeout = std::nullopt,
std::optional<int64_t> batch_limit = std::nullopt) const;
private:
using StreamsMap = std::unordered_map<std::string, StreamData>;

View File

@ -422,6 +422,9 @@ def test_start_and_stop_during_check(producer, topics, connection, operation):
time.sleep(0.5)
assert timed_wait(lambda: check_counter.value == CHECK_BEFORE_EXECUTE)
assert timed_wait(lambda: get_is_running(cursor, "test_stream"))
assert check_counter.value == CHECK_BEFORE_EXECUTE, "SHOW STREAMS " \
"was blocked until the end of CHECK STREAM"
operation_proc.start()
assert timed_wait(lambda: operation_counter.value == OP_BEFORE_EXECUTE)
@ -448,7 +451,7 @@ def test_start_and_stop_during_check(producer, topics, connection, operation):
operation_proc.terminate()
def test_check_already_started_stream(producer, topics, connection):
def test_check_already_started_stream(topics, connection):
assert len(topics) > 0
cursor = connection.cursor()
@ -462,5 +465,33 @@ def test_check_already_started_stream(producer, topics, connection):
execute_and_fetch_all(cursor, "CHECK STREAM started_stream")
def test_start_checked_stream_after_timeout(topics, connection):
cursor = connection.cursor()
execute_and_fetch_all(cursor,
"CREATE STREAM test_stream "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple")
timeout_ms = 2000
def call_check():
execute_and_fetch_all(
connect().cursor(),
f"CHECK STREAM test_stream TIMEOUT {timeout_ms}")
check_stream_proc = Process(target=call_check, daemon=True)
start = time.time()
check_stream_proc.start()
assert timed_wait(lambda: get_is_running(cursor, "test_stream"))
start_stream(cursor, "test_stream")
end = time.time()
assert (end - start) < 1.3 * \
timeout_ms, "The START STREAM was blocked too long"
assert get_is_running(cursor, "test_stream")
stop_stream(cursor, "test_stream")
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,7 +1,7 @@
template_cluster: &template_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092","--query-execution-timeout-sec=0"]
args: ["--bolt-port", "7687", "--log-level=TRACE", "--kafka-bootstrap-servers=localhost:9092", "--query-execution-timeout-sec=0"]
log_file: "streams-e2e.log"
setup_queries: []
validation_queries: []

View File

@ -3216,7 +3216,8 @@ void CheckOptionalExpression(Base &ast_generator, Expression *expression, const
void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &query_string,
const StreamQuery::Action action, const std::string_view stream_name,
const std::optional<TypedValue> &batch_limit = std::nullopt) {
const std::optional<TypedValue> &batch_limit = std::nullopt,
const std::optional<TypedValue> &timeout = std::nullopt) {
auto *parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string));
ASSERT_NE(parsed_query, nullptr);
EXPECT_EQ(parsed_query->action_, action);
@ -3227,6 +3228,7 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer
EXPECT_EQ(parsed_query->batch_interval_, nullptr);
EXPECT_EQ(parsed_query->batch_size_, nullptr);
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit));
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout));
}
TEST_P(CypherMainVisitorTest, DropStream) {
@ -3413,13 +3415,20 @@ TEST_P(CypherMainVisitorTest, CheckStream) {
TestInvalidQuery("CHECK STREAM something,something", ast_generator);
TestInvalidQuery("CHECK STREAM something BATCH LIMIT 1", ast_generator);
TestInvalidQuery("CHECK STREAM something BATCH_LIMIT", ast_generator);
TestInvalidQuery("CHECK STREAM something TIMEOUT", ast_generator);
TestInvalidQuery("CHECK STREAM something BATCH_LIMIT 1 TIMEOUT", ast_generator);
TestInvalidQuery<SemanticException>("CHECK STREAM something BATCH_LIMIT 'it should be an integer'", ast_generator);
TestInvalidQuery<SemanticException>("CHECK STREAM something BATCH_LIMIT 2.5", ast_generator);
TestInvalidQuery<SemanticException>("CHECK STREAM something TIMEOUT 'it should be an integer'", ast_generator);
ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream", StreamQuery::Action::CHECK_STREAM,
"checkedStream");
ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream bAtCH_LIMIT 42",
StreamQuery::Action::CHECK_STREAM, "checkedStream", TypedValue(42));
ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream TimEOuT 666",
StreamQuery::Action::CHECK_STREAM, "checkedStream", std::nullopt, TypedValue(666));
ValidateMostlyEmptyStreamQuery(ast_generator, "CHECK STREAM checkedStream BATCH_LIMIT 30 TIMEOUT 444",
StreamQuery::Action::CHECK_STREAM, "checkedStream", TypedValue(30), TypedValue(444));
}
} // namespace

View File

@ -19,6 +19,8 @@ using namespace integrations::kafka;
namespace {
const auto kDummyConsumerFunction = [](const auto & /*messages*/) {};
constexpr std::optional<std::chrono::milliseconds> kDontCareTimeout = std::nullopt;
int SpanToInt(std::span<const char> span) {
int result{0};
if (span.size() != sizeof(int)) {
@ -277,6 +279,32 @@ TEST_F(ConsumerTest, InvalidTopic) {
EXPECT_THROW(Consumer(cluster.Bootstraps(), std::move(info), kDummyConsumerFunction), TopicNotFoundException);
}
TEST_F(ConsumerTest, InvalidBatchInterval) {
auto info = CreateDefaultConsumerInfo();
info.batch_interval = std::chrono::milliseconds{0};
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_interval = std::chrono::milliseconds{-1};
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_interval = std::chrono::milliseconds{1};
EXPECT_NO_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction));
}
TEST_F(ConsumerTest, InvalidBatchSize) {
auto info = CreateDefaultConsumerInfo();
info.batch_size = 0;
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_size = -1;
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_size = 1;
EXPECT_NO_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction));
}
TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) {
constexpr auto kBatchSize = 1;
auto info = CreateDefaultConsumerInfo();
@ -332,7 +360,7 @@ TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) {
ASSERT_NO_FATAL_FAILURE(send_and_consume_messages(2));
}
TEST_F(ConsumerTest, TestMethodWorks) {
TEST_F(ConsumerTest, CheckMethodWorks) {
constexpr auto kBatchSize = 1;
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
@ -354,7 +382,7 @@ TEST_F(ConsumerTest, TestMethodWorks) {
ASSERT_FALSE(consumer->IsRunning());
consumer->Test(kMessageCount, [&](const std::vector<Message> &messages) mutable {
consumer->Check(kDontCareTimeout, kMessageCount, [&](const std::vector<Message> &messages) mutable {
auto message_count = received_message_count.load();
for (const auto &message : messages) {
std::string message_payload = kMessagePrefix + std::to_string(message_count++);
@ -379,6 +407,48 @@ TEST_F(ConsumerTest, TestMethodWorks) {
}
}
TEST_F(ConsumerTest, CheckMethodTimeout) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
std::chrono::milliseconds timeout{3000};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(timeout, std::nullopt, kDummyConsumerFunction), ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
const auto elapsed = (end - start);
EXPECT_LE(timeout, elapsed);
EXPECT_LE(elapsed, timeout * 1.2);
}
TEST_F(ConsumerTest, CheckWithInvalidTimeout) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{0}, std::nullopt, kDummyConsumerFunction),
ConsumerCheckFailedException);
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{-1}, std::nullopt, kDummyConsumerFunction),
ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
constexpr std::chrono::seconds kMaxExpectedTimeout{2};
EXPECT_LE((end - start), kMaxExpectedTimeout) << "The check most probably failed because of an actual timeout and "
"not because of the invalid value for timeout.";
}
TEST_F(ConsumerTest, CheckWithInvalidBatchSize) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::nullopt, 0, kDummyConsumerFunction), ConsumerCheckFailedException);
EXPECT_THROW(consumer.Check(std::nullopt, -1, kDummyConsumerFunction), ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
constexpr std::chrono::seconds kMaxExpectedTimeout{2};
EXPECT_LE((end - start), kMaxExpectedTimeout) << "The check most probably failed because of an actual timeout and "
"not because of the invalid value for batch size.";
}
TEST_F(ConsumerTest, ConsumerStatus) {
const std::string kConsumerName = "ConsumerGroupNameAAAA";
const std::vector<std::string> topics = {"Topic1QWER", "Topic2XCVBB"};

View File

@ -4,6 +4,7 @@
#include <utility>
#include <gtest/gtest.h>
#include "integrations/kafka/exceptions.hpp"
#include "kafka_mock.hpp"
#include "query/config.hpp"
#include "query/interpreter.hpp"
@ -225,3 +226,19 @@ TEST_F(StreamsTest, RestoreStreams) {
check_restore_logic();
}
}
TEST_F(StreamsTest, CheckWithTimeout) {
const auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
streams_->Create(stream_name, stream_info);
std::chrono::milliseconds timeout{3000};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(streams_->Check(stream_name, timeout, std::nullopt), integrations::kafka::ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
const auto elapsed = (end - start);
EXPECT_LE(timeout, elapsed);
EXPECT_LE(elapsed, timeout * 1.2);
}