From 41d418515698668de973bc9ed063de912cfee9aa Mon Sep 17 00:00:00 2001 From: Jeremy B <97525434+42jeremy@users.noreply.github.com> Date: Mon, 20 Jun 2022 14:09:45 +0200 Subject: [PATCH] Add limit batches option to start stream query (#392) --- src/integrations/constants.hpp | 1 + src/integrations/kafka/consumer.cpp | 120 ++++++--- src/integrations/kafka/consumer.hpp | 18 +- src/integrations/kafka/exceptions.hpp | 12 + src/integrations/pulsar/consumer.cpp | 106 ++++++-- src/integrations/pulsar/consumer.hpp | 8 +- src/integrations/pulsar/exceptions.hpp | 12 + .../frontend/ast/cypher_main_visitor.cpp | 17 ++ .../opencypher/grammar/MemgraphCypher.g4 | 2 +- src/query/interpreter.cpp | 34 ++- src/query/stream/common.hpp | 3 +- src/query/stream/sources.cpp | 11 +- src/query/stream/sources.hpp | 6 +- src/query/stream/streams.cpp | 35 ++- src/query/stream/streams.hpp | 14 +- tests/e2e/streams/common.py | 251 +++++++++++++++++- tests/e2e/streams/kafka_streams_tests.py | 102 ++++++- tests/e2e/streams/pulsar_streams_tests.py | 85 ++++++ tests/unit/integrations_kafka_consumer.cpp | 121 ++++++--- 19 files changed, 838 insertions(+), 120 deletions(-) diff --git a/src/integrations/constants.hpp b/src/integrations/constants.hpp index 9726c0ac3..ff1ec6f5c 100644 --- a/src/integrations/constants.hpp +++ b/src/integrations/constants.hpp @@ -17,6 +17,7 @@ namespace memgraph::integrations { inline constexpr int64_t kDefaultCheckBatchLimit{1}; +inline constexpr int64_t kMinimumStartBatchLimit{1}; inline constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000}; inline constexpr std::chrono::milliseconds kMinimumInterval{1}; inline constexpr int64_t kMinimumSize{1}; diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index ea2f6e98d..8674c8442 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -74,6 +74,36 @@ utils::BasicResult> GetBatch(RdKafka::KafkaCon return std::move(batch); } + +void CheckAndDestroyLastAssignmentIfNeeded(RdKafka::KafkaConsumer &consumer, const ConsumerInfo &info, + std::vector &last_assignment) { + if (!last_assignment.empty()) { + if (const auto err = consumer.assign(last_assignment); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerStartFailedException(info.consumer_name, + fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err))); + } + RdKafka::TopicPartition::destroy(last_assignment); + } +} + +void TryToConsumeBatch(RdKafka::KafkaConsumer &consumer, const ConsumerInfo &info, + const ConsumerFunction &consumer_function, const std::vector &batch) { + consumer_function(batch); + std::vector partitions; + utils::OnScopeExit clear_partitions([&]() { RdKafka::TopicPartition::destroy(partitions); }); + + if (const auto err = consumer.assignment(partitions); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerCommitFailedException( + info.consumer_name, fmt::format("Couldn't get assignment to commit offsets: {}", RdKafka::err2str(err))); + } + if (const auto err = consumer.position(partitions); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerCommitFailedException(info.consumer_name, + fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err))); + } + if (const auto err = consumer.commitSync(partitions); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerCommitFailedException(info.consumer_name, RdKafka::err2str(err)); + } +} } // namespace Message::Message(std::unique_ptr &&message) : message_{std::move(message)} { @@ -221,10 +251,21 @@ void Consumer::Start() { StartConsuming(); } -void Consumer::StartIfStopped() { - if (!is_running_) { - StartConsuming(); +void Consumer::StartWithLimit(const uint64_t limit_batches, std::optional timeout) const { + if (is_running_) { + throw ConsumerRunningException(info_.consumer_name); } + if (limit_batches < kMinimumStartBatchLimit) { + throw ConsumerStartFailedException( + info_.consumer_name, fmt::format("Batch limit has to be greater than or equal to {}", kMinimumStartBatchLimit)); + } + if (timeout.value_or(kMinimumInterval) < kMinimumInterval) { + throw ConsumerStartFailedException( + info_.consumer_name, + fmt::format("Timeout has to be greater than or equal to {} milliseconds", kMinimumInterval.count())); + } + + StartConsumingWithLimit(limit_batches, timeout); } void Consumer::Stop() { @@ -244,7 +285,7 @@ void Consumer::StopIfRunning() { } } -void Consumer::Check(std::optional timeout, std::optional limit_batches, +void Consumer::Check(std::optional timeout, std::optional limit_batches, const ConsumerFunction &check_consumer_function) const { // NOLINTNEXTLINE (modernize-use-nullptr) if (timeout.value_or(kMinimumInterval) < kMinimumInterval) { @@ -344,13 +385,7 @@ void Consumer::StartConsuming() { is_running_.store(true); - if (!last_assignment_.empty()) { - if (const auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) { - throw ConsumerStartFailedException(info_.consumer_name, - fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err))); - } - RdKafka::TopicPartition::destroy(last_assignment_); - } + CheckAndDestroyLastAssignmentIfNeeded(*consumer_, info_, last_assignment_); thread_ = std::thread([this] { static constexpr auto kMaxThreadNameSize = utils::GetMaxThreadNameSize(); @@ -361,33 +396,18 @@ void Consumer::StartConsuming() { while (is_running_) { auto maybe_batch = GetBatch(*consumer_, info_, is_running_); if (maybe_batch.HasError()) { - spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name, - maybe_batch.GetError()); - break; + throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError()); } const auto &batch = maybe_batch.GetValue(); - if (batch.empty()) continue; + if (batch.empty()) { + continue; + } spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name); try { - consumer_function_(batch); - std::vector partitions; - utils::OnScopeExit clear_partitions([&]() { RdKafka::TopicPartition::destroy(partitions); }); - - if (const auto err = consumer_->assignment(partitions); err != RdKafka::ERR_NO_ERROR) { - throw ConsumerCheckFailedException( - info_.consumer_name, fmt::format("Couldn't get assignment to commit offsets: {}", RdKafka::err2str(err))); - } - if (const auto err = consumer_->position(partitions); err != RdKafka::ERR_NO_ERROR) { - throw ConsumerCheckFailedException( - info_.consumer_name, fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err))); - } - if (const auto err = consumer_->commitSync(partitions); err != RdKafka::ERR_NO_ERROR) { - spdlog::warn("Committing offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err)); - break; - } + TryToConsumeBatch(*consumer_, info_, consumer_function_, batch); } catch (const std::exception &e) { spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what()); break; @@ -398,6 +418,44 @@ void Consumer::StartConsuming() { }); } +void Consumer::StartConsumingWithLimit(uint64_t limit_batches, std::optional timeout) const { + MG_ASSERT(!is_running_, "Cannot start already running consumer!"); + + if (is_running_.exchange(true)) { + throw ConsumerRunningException(info_.consumer_name); + } + utils::OnScopeExit restore_is_running([this] { is_running_.store(false); }); + + CheckAndDestroyLastAssignmentIfNeeded(*consumer_, info_, last_assignment_); + + const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout); + const auto start = std::chrono::steady_clock::now(); + + for (uint64_t batch_count = 0; batch_count < limit_batches;) { + const auto now = std::chrono::steady_clock::now(); + if (now - start >= timeout_to_use) { + throw ConsumerStartFailedException(info_.consumer_name, "Timeout reached"); + } + + const auto maybe_batch = GetBatch(*consumer_, info_, is_running_); + if (maybe_batch.HasError()) { + throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError()); + } + const auto &batch = maybe_batch.GetValue(); + + if (batch.empty()) { + continue; + } + ++batch_count; + + spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name); + + TryToConsumeBatch(*consumer_, info_, consumer_function_, batch); + + spdlog::info("Kafka consumer {} finished processing", info_.consumer_name); + } +} + void Consumer::StopConsuming() { is_running_.store(false); if (thread_.joinable()) thread_.join(); diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp index dcc0f8125..7ad3da4a5 100644 --- a/src/integrations/kafka/consumer.hpp +++ b/src/integrations/kafka/consumer.hpp @@ -113,11 +113,19 @@ class Consumer final : public RdKafka::EventCb { /// This method will start a new thread which will poll all the topics for messages. /// /// @throws ConsumerRunningException if the consumer is already running + /// @throws ConsumerStartFailedException if the commited offsets cannot be restored void Start(); - /// Starts consuming messages if it is not started already. + /// Starts consuming messages. /// - void StartIfStopped(); + /// This method will start a new thread which will poll all the topics for messages. + /// + /// @param limit_batches the consumer will only consume the given number of batches. + /// @param timeout the maximum duration during which the command should run. + /// + /// @throws ConsumerRunningException if the consumer is already running + /// @throws ConsumerStartFailedException if the commited offsets cannot be restored + void StartWithLimit(uint64_t limit_batches, std::optional timeout) const; /// Stops consuming messages. /// @@ -136,9 +144,9 @@ class Consumer final : public RdKafka::EventCb { /// used. /// @param check_consumer_function a function to feed the received messages in, only used during this dry-run. /// - /// @throws ConsumerRunningException if the consumer is alredy running. + /// @throws ConsumerRunningException if the consumer is already running. /// @throws ConsumerCheckFailedException if check isn't successful. - void Check(std::optional timeout, std::optional limit_batches, + void Check(std::optional timeout, std::optional limit_batches, const ConsumerFunction &check_consumer_function) const; /// Returns true if the consumer is actively consuming messages. @@ -157,6 +165,7 @@ class Consumer final : public RdKafka::EventCb { void event_cb(RdKafka::Event &event) override; void StartConsuming(); + void StartConsumingWithLimit(uint64_t limit_batches, std::optional timeout) const; void StopConsuming(); @@ -178,7 +187,6 @@ class Consumer final : public RdKafka::EventCb { ConsumerFunction consumer_function_; mutable std::atomic is_running_{false}; mutable std::vector last_assignment_; // Protected by is_running_ - std::optional limit_batches_{std::nullopt}; std::unique_ptr> consumer_; std::thread thread_; ConsumerRebalanceCb cb_; diff --git a/src/integrations/kafka/exceptions.hpp b/src/integrations/kafka/exceptions.hpp index e17221615..b7b277906 100644 --- a/src/integrations/kafka/exceptions.hpp +++ b/src/integrations/kafka/exceptions.hpp @@ -64,4 +64,16 @@ class TopicNotFoundException : public KafkaStreamException { TopicNotFoundException(const std::string_view consumer_name, const std::string_view topic_name) : KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {} }; + +class ConsumerCommitFailedException : public KafkaStreamException { + public: + ConsumerCommitFailedException(const std::string_view consumer_name, const std::string_view error) + : KafkaStreamException("Committing offset of consumer {} failed: {}", consumer_name, error) {} +}; + +class ConsumerReadMessagesFailedException : public KafkaStreamException { + public: + ConsumerReadMessagesFailedException(const std::string_view consumer_name, const std::string_view error) + : KafkaStreamException("Error happened in consumer {} while fetching messages: {}", consumer_name, error) {} +}; } // namespace memgraph::integrations::kafka diff --git a/src/integrations/pulsar/consumer.cpp b/src/integrations/pulsar/consumer.cpp index 96dcdfee2..b3a65050b 100644 --- a/src/integrations/pulsar/consumer.cpp +++ b/src/integrations/pulsar/consumer.cpp @@ -11,13 +11,14 @@ #include "integrations/pulsar/consumer.hpp" +#include +#include +#include + #include #include #include -#include -#include - #include "integrations/constants.hpp" #include "integrations/pulsar/exceptions.hpp" #include "utils/concepts.hpp" @@ -33,6 +34,10 @@ namespace { template concept PulsarConsumer = utils::SameAsAnyOf; +template +concept PulsarMessageGetter = + std::same_as>; + pulsar_client::Result ConsumeMessage(pulsar_client::Consumer &consumer, pulsar_client::Message &message, int remaining_timeout_in_ms) { return consumer.receive(message, remaining_timeout_in_ms); @@ -97,6 +102,26 @@ pulsar_client::Client CreateClient(const std::string &service_url) { conf.setLogger(new SpdlogLoggerFactory); return {service_url, conf}; } + +template +void TryToConsumeBatch(TConsumer &consumer, const ConsumerInfo &info, const ConsumerFunction &consumer_function, + pulsar_client::MessageId &last_message_id, const std::vector &batch, + const TPulsarMessageGetter &message_getter) { + consumer_function(batch); + + auto has_message_failed = [&consumer, &info, &last_message_id, &message_getter](const auto &message) { + if (const auto result = consumer.acknowledge(message_getter(message)); result != pulsar_client::ResultOk) { + spdlog::warn("Acknowledging a message of consumer {} failed: {}", info.consumer_name, result); + return true; + } + last_message_id = message_getter(message).getMessageId(); + return false; + }; + + if (std::ranges::any_of(batch, has_message_failed)) { + throw ConsumerAcknowledgeMessagesFailedException(info.consumer_name); + } +} } // namespace Message::Message(pulsar_client::Message &&message) : message_{std::move(message)} {} @@ -137,6 +162,24 @@ void Consumer::Start() { StartConsuming(); } +void Consumer::StartWithLimit(const uint64_t limit_batches, + const std::optional timeout) const { + if (is_running_) { + throw ConsumerRunningException(info_.consumer_name); + } + if (limit_batches < kMinimumStartBatchLimit) { + throw ConsumerStartFailedException( + info_.consumer_name, fmt::format("Batch limit has to be greater than or equal to {}", kMinimumStartBatchLimit)); + } + if (timeout.value_or(kMinimumInterval) < kMinimumInterval) { + throw ConsumerStartFailedException( + info_.consumer_name, + fmt::format("Timeout has to be greater than or equal to {} milliseconds", kMinimumInterval.count())); + } + + StartConsumingWithLimit(limit_batches, timeout); +} + void Consumer::Stop() { if (!is_running_) { throw ConsumerStoppedException(info_.consumer_name); @@ -154,7 +197,7 @@ void Consumer::StopIfRunning() { } } -void Consumer::Check(std::optional timeout, std::optional limit_batches, +void Consumer::Check(std::optional timeout, std::optional limit_batches, const ConsumerFunction &check_consumer_function) const { // NOLINTNEXTLINE (modernize-use-nullptr) if (timeout.value_or(kMinimumInterval) < kMinimumInterval) { @@ -240,9 +283,7 @@ void Consumer::StartConsuming() { auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_); if (maybe_batch.HasError()) { - spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name, - maybe_batch.GetError()); - break; + throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError()); } const auto &batch = maybe_batch.GetValue(); @@ -254,18 +295,8 @@ void Consumer::StartConsuming() { spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name); try { - consumer_function_(batch); - - if (std::any_of(batch.begin(), batch.end(), [&](const auto &message) { - if (const auto result = consumer_.acknowledge(message.message_); result != pulsar_client::ResultOk) { - spdlog::warn("Acknowledging a message of consumer {} failed: {}", info_.consumer_name, result); - return true; - } - last_message_id_ = message.message_.getMessageId(); - return false; - })) { - break; - } + TryToConsumeBatch(consumer_, info_, consumer_function_, last_message_id_, batch, + [&](const Message &message) -> const pulsar_client::Message & { return message.message_; }); } catch (const std::exception &e) { spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what()); break; @@ -277,6 +308,43 @@ void Consumer::StartConsuming() { }); } +void Consumer::StartConsumingWithLimit(uint64_t limit_batches, std::optional timeout) const { + if (is_running_.exchange(true)) { + throw ConsumerRunningException(info_.consumer_name); + } + utils::OnScopeExit restore_is_running([this] { is_running_.store(false); }); + + const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout); + const auto start = std::chrono::steady_clock::now(); + + for (uint64_t batch_count = 0; batch_count < limit_batches;) { + const auto now = std::chrono::steady_clock::now(); + if (now - start >= timeout_to_use) { + throw ConsumerCheckFailedException(info_.consumer_name, "Timeout reached"); + } + + const auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_); + + if (maybe_batch.HasError()) { + throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError()); + } + + const auto &batch = maybe_batch.GetValue(); + + if (batch.empty()) { + continue; + } + ++batch_count; + + spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name); + + TryToConsumeBatch(consumer_, info_, consumer_function_, last_message_id_, batch, + [](const Message &message) -> const pulsar_client::Message & { return message.message_; }); + + spdlog::info("Pulsar consumer {} finished processing", info_.consumer_name); + } +} + void Consumer::StopConsuming() { is_running_.store(false); if (thread_.joinable()) { diff --git a/src/integrations/pulsar/consumer.hpp b/src/integrations/pulsar/consumer.hpp index f3771f1f4..1caa366ad 100644 --- a/src/integrations/pulsar/consumer.hpp +++ b/src/integrations/pulsar/consumer.hpp @@ -58,25 +58,27 @@ class Consumer final { bool IsRunning() const; void Start(); + void StartWithLimit(uint64_t limit_batches, std::optional timeout) const; void Stop(); void StopIfRunning(); - void Check(std::optional timeout, std::optional limit_batches, + void Check(std::optional timeout, std::optional limit_batches, const ConsumerFunction &check_consumer_function) const; const ConsumerInfo &Info() const; private: void StartConsuming(); + void StartConsumingWithLimit(uint64_t limit_batches, std::optional timeout) const; void StopConsuming(); ConsumerInfo info_; mutable pulsar_client::Client client_; - pulsar_client::Consumer consumer_; + mutable pulsar_client::Consumer consumer_; ConsumerFunction consumer_function_; mutable std::atomic is_running_{false}; - pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()}; + mutable pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()}; // Protected by is_running_ std::thread thread_; }; } // namespace memgraph::integrations::pulsar diff --git a/src/integrations/pulsar/exceptions.hpp b/src/integrations/pulsar/exceptions.hpp index 87bb442af..53b169ab3 100644 --- a/src/integrations/pulsar/exceptions.hpp +++ b/src/integrations/pulsar/exceptions.hpp @@ -55,4 +55,16 @@ class TopicNotFoundException : public PulsarStreamException { TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name) : PulsarStreamException("Pulsar consumer {} cannot find topic {}", consumer_name, topic_name) {} }; + +class ConsumerReadMessagesFailedException : public PulsarStreamException { + public: + ConsumerReadMessagesFailedException(const std::string_view consumer_name, const std::string_view error) + : PulsarStreamException("Error happened in consumer {} while fetching messages: {}", consumer_name, error) {} +}; + +class ConsumerAcknowledgeMessagesFailedException : public PulsarStreamException { + public: + explicit ConsumerAcknowledgeMessagesFailedException(const std::string_view consumer_name) + : PulsarStreamException("Acknowledging a message of consumer {} has failed!", consumer_name) {} +}; } // namespace memgraph::integrations::pulsar diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 3eac72d95..f4a269dfd 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -775,6 +775,23 @@ antlrcpp::Any CypherMainVisitor::visitDropStream(MemgraphCypher::DropStreamConte antlrcpp::Any CypherMainVisitor::visitStartStream(MemgraphCypher::StartStreamContext *ctx) { auto *stream_query = storage_->Create(); stream_query->action_ = StreamQuery::Action::START_STREAM; + + if (ctx->BATCH_LIMIT()) { + if (!ctx->batchLimit->numberLiteral() || !ctx->batchLimit->numberLiteral()->integerLiteral()) { + throw SemanticException("Batch limit should be an integer literal!"); + } + stream_query->batch_limit_ = ctx->batchLimit->accept(this); + } + if (ctx->TIMEOUT()) { + if (!ctx->timeout->numberLiteral() || !ctx->timeout->numberLiteral()->integerLiteral()) { + throw SemanticException("Timeout should be an integer literal!"); + } + if (!ctx->BATCH_LIMIT()) { + throw SemanticException("Parameter TIMEOUT can only be defined if BATCH_LIMIT is defined"); + } + stream_query->timeout_ = ctx->timeout->accept(this); + } + stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as(); return stream_query; } diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 76fd3038a..b412a474a 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -351,7 +351,7 @@ pulsarCreateStream : CREATE PULSAR STREAM streamName ( pulsarCreateStreamConfig dropStream : DROP STREAM streamName ; -startStream : START STREAM streamName ; +startStream : START STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ( TIMEOUT timeout=literal ) ? ; startAllStreams : START ALL STREAMS ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 80480f2d7..fd2161829 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -687,12 +687,26 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete return callback; } case StreamQuery::Action::START_STREAM: { - callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() { - interpreter_context->streams.Start(stream_name); - return std::vector>{}; - }; - notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM, - fmt::format("Started stream {}.", stream_query->stream_name_)); + const auto batch_limit = GetOptionalValue(stream_query->batch_limit_, evaluator); + const auto timeout = GetOptionalValue(stream_query->timeout_, evaluator); + + if (batch_limit.has_value()) { + if (batch_limit.value() < 0) { + throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value"); + } + + callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, batch_limit, timeout]() { + interpreter_context->streams.StartWithLimit(stream_name, static_cast(batch_limit.value()), timeout); + return std::vector>{}; + }; + } else { + callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() { + interpreter_context->streams.Start(stream_name); + return std::vector>{}; + }; + notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM, + fmt::format("Started stream {}.", stream_query->stream_name_)); + } return callback; } case StreamQuery::Action::START_ALL_STREAMS: { @@ -762,9 +776,15 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete } case StreamQuery::Action::CHECK_STREAM: { callback.header = {"queries", "raw messages"}; + + const auto batch_limit = GetOptionalValue(stream_query->batch_limit_, evaluator); + if (batch_limit.has_value() && batch_limit.value() < 0) { + throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value"); + } + callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, timeout = GetOptionalValue(stream_query->timeout_, evaluator), - batch_limit = GetOptionalValue(stream_query->batch_limit_, evaluator)]() mutable { + batch_limit]() mutable { return interpreter_context->streams.Check(stream_name, timeout, batch_limit); }; notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CHECK_STREAM, diff --git a/src/query/stream/common.hpp b/src/query/stream/common.hpp index d67de0e7a..0882d9cd4 100644 --- a/src/query/stream/common.hpp +++ b/src/query/stream/common.hpp @@ -52,10 +52,11 @@ concept Stream = requires(TStream stream) { typename TStream::Message; TStream{std::string{""}, typename TStream::StreamInfo{}, ConsumerFunction{}}; { stream.Start() } -> std::same_as; + { stream.StartWithLimit(uint64_t{}, std::optional{}) } -> std::same_as; { stream.Stop() } -> std::same_as; { stream.IsRunning() } -> std::same_as; { - stream.Check(std::optional{}, std::optional{}, + stream.Check(std::optional{}, std::optional{}, ConsumerFunction{}) } -> std::same_as; requires std::same_as().common_info)>, diff --git a/src/query/stream/sources.cpp b/src/query/stream/sources.cpp index e799a5fbb..82ddc6216 100644 --- a/src/query/stream/sources.cpp +++ b/src/query/stream/sources.cpp @@ -44,10 +44,13 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const } void KafkaStream::Start() { consumer_->Start(); } +void KafkaStream::StartWithLimit(uint64_t batch_limit, std::optional timeout) const { + consumer_->StartWithLimit(batch_limit, timeout); +} void KafkaStream::Stop() { consumer_->Stop(); } bool KafkaStream::IsRunning() const { return consumer_->IsRunning(); } -void KafkaStream::Check(std::optional timeout, std::optional batch_limit, +void KafkaStream::Check(std::optional timeout, std::optional batch_limit, const ConsumerFunction &consumer_function) const { consumer_->Check(timeout, batch_limit, consumer_function); } @@ -106,10 +109,12 @@ PulsarStream::StreamInfo PulsarStream::Info(std::string transformation_name) con } void PulsarStream::Start() { consumer_->Start(); } +void PulsarStream::StartWithLimit(uint64_t batch_limit, std::optional timeout) const { + consumer_->StartWithLimit(batch_limit, timeout); +} void PulsarStream::Stop() { consumer_->Stop(); } bool PulsarStream::IsRunning() const { return consumer_->IsRunning(); } - -void PulsarStream::Check(std::optional timeout, std::optional batch_limit, +void PulsarStream::Check(std::optional timeout, std::optional batch_limit, const ConsumerFunction &consumer_function) const { consumer_->Check(timeout, batch_limit, consumer_function); } diff --git a/src/query/stream/sources.hpp b/src/query/stream/sources.hpp index 29548a074..2eeaa39d8 100644 --- a/src/query/stream/sources.hpp +++ b/src/query/stream/sources.hpp @@ -36,10 +36,11 @@ struct KafkaStream { StreamInfo Info(std::string transformation_name) const; void Start(); + void StartWithLimit(uint64_t batch_limit, std::optional timeout) const; void Stop(); bool IsRunning() const; - void Check(std::optional timeout, std::optional batch_limit, + void Check(std::optional timeout, std::optional batch_limit, const ConsumerFunction &consumer_function) const; utils::BasicResult SetStreamOffset(int64_t offset); @@ -71,10 +72,11 @@ struct PulsarStream { StreamInfo Info(std::string transformation_name) const; void Start(); + void StartWithLimit(uint64_t batch_limit, std::optional timeout) const; void Stop(); bool IsRunning() const; - void Check(std::optional timeout, std::optional batch_limit, + void Check(std::optional timeout, std::optional batch_limit, const ConsumerFunction &consumer_function) const; private: diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index f532f4b34..8a02b188f 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -456,7 +456,7 @@ void Streams::Create(const std::string &stream_name, typename TStream::StreamInf try { std::visit( - [&](auto &&stream_data) { + [&](const auto &stream_data) { const auto stream_source_ptr = stream_data.stream_source->ReadLock(); Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr)); }, @@ -575,7 +575,7 @@ void Streams::RestoreStreams() { auto it = CreateConsumer(*locked_streams_map, stream_name, std::move(status.info), std::move(status.owner)); if (status.is_running) { std::visit( - [&](auto &&stream_data) { + [&](const auto &stream_data) { auto stream_source_ptr = stream_data.stream_source->Lock(); stream_source_ptr->Start(); }, @@ -617,7 +617,7 @@ void Streams::Drop(const std::string &stream_name) { // function can be executing with the consumer, nothing else. // By acquiring the write lock here for the consumer, we make sure there is // no running Test function for this consumer, therefore it can be erased. - std::visit([&](auto &&stream_data) { stream_data.stream_source->Lock(); }, it->second); + std::visit([&](const auto &stream_data) { stream_data.stream_source->Lock(); }, it->second); locked_streams->erase(it); if (!storage_.Delete(stream_name)) { @@ -632,7 +632,7 @@ void Streams::Start(const std::string &stream_name) { auto it = GetStream(*locked_streams, stream_name); std::visit( - [&, this](auto &&stream_data) { + [&, this](const auto &stream_data) { auto stream_source_ptr = stream_data.stream_source->Lock(); stream_source_ptr->Start(); Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr)); @@ -640,12 +640,27 @@ void Streams::Start(const std::string &stream_name) { it->second); } +void Streams::StartWithLimit(const std::string &stream_name, uint64_t batch_limit, + std::optional timeout) const { + std::optional locked_streams{streams_.ReadLock()}; + auto it = GetStream(**locked_streams, stream_name); + + std::visit( + [&](const auto &stream_data) { + const auto locked_stream_source = stream_data.stream_source->ReadLock(); + locked_streams.reset(); + + locked_stream_source->StartWithLimit(batch_limit, timeout); + }, + it->second); +} + void Streams::Stop(const std::string &stream_name) { auto locked_streams = streams_.Lock(); auto it = GetStream(*locked_streams, stream_name); std::visit( - [&, this](auto &&stream_data) { + [&, this](const auto &stream_data) { auto stream_source_ptr = stream_data.stream_source->Lock(); stream_source_ptr->Stop(); @@ -657,7 +672,7 @@ void Streams::Stop(const std::string &stream_name) { void Streams::StartAll() { for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) { std::visit( - [&stream_name = stream_name, this](auto &&stream_data) { + [&stream_name = stream_name, this](const auto &stream_data) { auto locked_stream_source = stream_data.stream_source->Lock(); if (!locked_stream_source->IsRunning()) { locked_stream_source->Start(); @@ -672,7 +687,7 @@ void Streams::StartAll() { void Streams::StopAll() { for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) { std::visit( - [&stream_name = stream_name, this](auto &&stream_data) { + [&stream_name = stream_name, this](const auto &stream_data) { auto locked_stream_source = stream_data.stream_source->Lock(); if (locked_stream_source->IsRunning()) { locked_stream_source->Stop(); @@ -689,7 +704,7 @@ std::vector> Streams::GetStreamInfo() const { { for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) { std::visit( - [&, &stream_name = stream_name](auto &&stream_data) { + [&, &stream_name = stream_name](const auto &stream_data) { auto locked_stream_source = stream_data.stream_source->ReadLock(); auto info = locked_stream_source->Info(stream_data.transformation_name); result.emplace_back(StreamStatus<>{stream_name, StreamType(*locked_stream_source), @@ -703,12 +718,12 @@ std::vector> Streams::GetStreamInfo() const { } TransformationResult Streams::Check(const std::string &stream_name, std::optional timeout, - std::optional batch_limit) const { + std::optional batch_limit) const { std::optional locked_streams{streams_.ReadLock()}; auto it = GetStream(**locked_streams, stream_name); return std::visit( - [&](auto &&stream_data) { + [&](const auto &stream_data) { // This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after // that const auto locked_stream_source = stream_data.stream_source->ReadLock(); diff --git a/src/query/stream/streams.hpp b/src/query/stream/streams.hpp index fa556517b..5fe5d3f8e 100644 --- a/src/query/stream/streams.hpp +++ b/src/query/stream/streams.hpp @@ -115,6 +115,17 @@ class Streams final { /// @throws ConsumerRunningException if the consumer is already running void Start(const std::string &stream_name); + /// Start consuming from a stream. + /// + /// @param stream_name name of the stream that needs to be started + /// @param batch_limit number of batches we want to consume before stopping + /// @param timeout the maximum duration during which the command should run. + /// + /// @throws StreamsException if the stream doesn't exist + /// @throws ConsumerRunningException if the consumer is already running + void StartWithLimit(const std::string &stream_name, uint64_t batch_limit, + std::optional timeout) const; + /// Stop consuming from a stream. /// /// @param stream_name name of the stream that needs to be stopped @@ -142,6 +153,7 @@ class Streams final { /// /// @param stream_name name of the stream we want to test /// @param batch_limit number of batches we want to test before stopping + /// @param timeout the maximum duration during which the command should run. /// /// @returns A vector of vectors of TypedValue. Each subvector contains two elements, the query string and the /// nullable parameters map. @@ -151,7 +163,7 @@ class Streams final { /// @throws ConsumerCheckFailedException if the transformation function throws any std::exception during processing TransformationResult Check(const std::string &stream_name, std::optional timeout = std::nullopt, - std::optional batch_limit = std::nullopt) const; + std::optional batch_limit = std::nullopt) const; private: template diff --git a/tests/e2e/streams/common.py b/tests/e2e/streams/common.py index 8de3637fc..1b3315071 100644 --- a/tests/e2e/streams/common.py +++ b/tests/e2e/streams/common.py @@ -10,6 +10,7 @@ # licenses/APL.txt. import mgclient +import pytest import time from multiprocessing import Manager, Process, Value @@ -112,6 +113,13 @@ def start_stream(cursor, stream_name): assert get_is_running(cursor, stream_name) +def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None): + if timeout is not None: + execute_and_fetch_all(cursor, f"START STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout} ") + else: + execute_and_fetch_all(cursor, f"START STREAM {stream_name} BATCH_LIMIT {batch_limit}") + + def stop_stream(cursor, stream_name): execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}") @@ -253,10 +261,11 @@ def test_start_checked_stream_after_timeout(connection, stream_creator): cursor = connection.cursor() execute_and_fetch_all(cursor, stream_creator("test_stream")) - TIMEOUT_MS = 2000 + TIMEOUT_IN_MS = 2000 + TIMEOUT_IN_SECONDS = TIMEOUT_IN_MS / 1000 def call_check(): - execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_MS}") + execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_IN_MS}") check_stream_proc = Process(target=call_check, daemon=True) @@ -266,7 +275,7 @@ def test_start_checked_stream_after_timeout(connection, stream_creator): start_stream(cursor, "test_stream") end = time.time() - assert (end - start) < 1.3 * TIMEOUT_MS, "The START STREAM was blocked too long" + assert (end - start) < 1.3 * TIMEOUT_IN_SECONDS, "The START STREAM was blocked too long" assert get_is_running(cursor, "test_stream") stop_stream(cursor, "test_stream") @@ -401,3 +410,239 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre assert expected_queries_and_raw_messages_1 == results.value[0] assert expected_queries_and_raw_messages_2 == results.value[1] assert expected_queries_and_raw_messages_3 == results.value[2] + + +def test_start_stream_with_batch_limit(connection, stream_creator, messages_sender): + STREAM_NAME = "test" + BATCH_LIMIT = 5 + + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + + def start_new_stream_with_limit(stream_name, batch_limit): + connection = connect() + cursor = connection.cursor() + start_stream_with_limit(cursor, stream_name, batch_limit) + + thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT)) + thread_stream_running.start() + + time.sleep(2) + assert get_is_running(cursor, STREAM_NAME) + + messages_sender(BATCH_LIMIT - 1) + + # We have not sent enough batches to reach the limit. We check that the stream is still correctly running. + assert get_is_running(cursor, STREAM_NAME) + + # We send a last message to reach the batch_limit + messages_sender(1) + + time.sleep(2) + + # We check that the stream has correctly stoped. + assert not get_is_running(cursor, STREAM_NAME) + + +def test_start_stream_with_batch_limit_timeout(connection, stream_creator): + # We check that we get the expected exception when trying to run START STREAM while providing TIMEOUT and not BATCH_LIMIT + STREAM_NAME = "test" + + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, f"START STREAM {STREAM_NAME} TIMEOUT 3000") + + +def test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator): + # We check that we get the expected exception when running START STREAM while providing TIMEOUT and BATCH_LIMIT + STREAM_NAME = "test" + BATCH_LIMIT = 5 + TIMEOUT = 3000 + TIMEOUT_IN_SECONDS = TIMEOUT / 1000 + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE)) + + start_time = time.time() + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, f"START STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}") + + end_time = time.time() + assert ( + end_time - start_time + ) >= TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to something else than timeout!" + + +def test_start_stream_with_batch_limit_while_check_running( + connection, stream_creator, message_sender, setup_function=None +): + # 1/ We check we get the correct exception calling START STREAM with BATCH_LIMIT while a CHECK STREAM is already running. + # 2/ Afterwards, we terminate the CHECK STREAM and start a START STREAM with BATCH_LIMIT + def start_check_stream(stream_name, batch_limit, timeout): + connection = connect() + cursor = connection.cursor() + execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout}") + + def start_new_stream_with_limit(stream_name, batch_limit, timeout): + connection = connect() + cursor = connection.cursor() + start_stream_with_limit(cursor, stream_name, batch_limit, timeout=timeout) + + STREAM_NAME = "test_check_and_batch_limit" + BATCH_LIMIT = 1 + TIMEOUT = 10000 + + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + + # 0/ Extra setup needed for Kafka to works correctly if Check stream is execute before any messages have been consumed. + if setup_function is not None: + setup_function(start_check_stream, cursor, STREAM_NAME, BATCH_LIMIT, TIMEOUT) + + # 1/ + thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT)) + thread_stream_check.start() + time.sleep(2) + assert get_is_running(cursor, STREAM_NAME) + + with pytest.raises(mgclient.DatabaseError): + start_stream_with_limit(cursor, STREAM_NAME, BATCH_LIMIT, timeout=TIMEOUT) + + assert get_is_running(cursor, STREAM_NAME) + message_sender(SIMPLE_MSG) + thread_stream_check.join() + + assert not get_is_running(cursor, STREAM_NAME) + + # 2/ + thread_stream_running = Process( + target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT + 1, TIMEOUT) + ) # Sending BATCH_LIMIT + 1 messages as BATCH_LIMIT messages have already been sent during the CHECK STREAM (and not consumed) + thread_stream_running.start() + time.sleep(2) + assert get_is_running(cursor, STREAM_NAME) + + message_sender(SIMPLE_MSG) + time.sleep(2) + + assert not get_is_running(cursor, STREAM_NAME) + + +def test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender): + # 1/ We check we get the correct exception calling CHECK STREAM while START STREAM with BATCH_LIMIT is already running + # 2/ Afterwards, we terminate the START STREAM with BATCH_LIMIT and start a CHECK STREAM + def start_new_stream_with_limit(stream_name, batch_limit, timeout): + connection = connect() + cursor = connection.cursor() + start_stream_with_limit(cursor, stream_name, batch_limit, timeout=timeout) + + def start_check_stream(stream_name, batch_limit, timeout): + connection = connect() + cursor = connection.cursor() + execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout}") + + STREAM_NAME = "test_batch_limit_and_check" + BATCH_LIMIT = 1 + TIMEOUT = 10000 + TIMEOUT_IN_SECONDS = TIMEOUT / 1000 + + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + + # 1/ + thread_stream_running = Process( + target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT) + ) + start_time = time.time() + thread_stream_running.start() + time.sleep(2) + assert get_is_running(cursor, STREAM_NAME) + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}") + + end_time = time.time() + assert (end_time - start_time) < 0.8 * TIMEOUT, "The CHECK STREAM has probably thrown due to timeout!" + + message_sender(SIMPLE_MSG) + time.sleep(2) + + assert not get_is_running(cursor, STREAM_NAME) + + # 2/ + thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT)) + start_time = time.time() + thread_stream_check.start() + time.sleep(2) + assert get_is_running(cursor, STREAM_NAME) + + message_sender(SIMPLE_MSG) + time.sleep(2) + end_time = time.time() + assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!" + + assert not get_is_running(cursor, STREAM_NAME) + + +def test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator): + # We check that we get a correct exception when giving a negative batch_limit + STREAM_NAME = "test_batch_limit_invalid_batch_limit" + TIMEOUT = 10000 + TIMEOUT_IN_SECONDS = TIMEOUT / 1000 + + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + time.sleep(2) + + # 1/ checking with batch_limit=-10 + batch_limit = -10 + start_time = time.time() + + with pytest.raises(mgclient.DatabaseError): + start_stream_with_limit(cursor, STREAM_NAME, batch_limit, timeout=TIMEOUT) + + end_time = time.time() + assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to timeout!" + + # 2/ checking with batch_limit=0 + batch_limit = 0 + start_time = time.time() + + with pytest.raises(mgclient.DatabaseError): + start_stream_with_limit(cursor, STREAM_NAME, batch_limit, timeout=TIMEOUT) + + end_time = time.time() + assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to timeout!" + + +def test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator): + # We check that we get a correct exception when giving a negative batch_limit + STREAM_NAME = "test_batch_limit_invalid_batch_limit" + TIMEOUT = 10000 + TIMEOUT_IN_SECONDS = TIMEOUT / 1000 + + cursor = connection.cursor() + execute_and_fetch_all(cursor, stream_creator(STREAM_NAME)) + time.sleep(2) + + # 1/ checking with batch_limit=-10 + batch_limit = -10 + start_time = time.time() + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {batch_limit} TIMEOUT {TIMEOUT}") + + end_time = time.time() + assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!" + + # 2/ checking with batch_limit=0 + batch_limit = 0 + start_time = time.time() + + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {batch_limit} TIMEOUT {TIMEOUT}") + + end_time = time.time() + assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!" diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index 63cc291c9..65ad1b6b0 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -18,7 +18,7 @@ import time from multiprocessing import Process, Value import common -TRANSFORMATIONS_TO_CHECK_C = ["empty_transformation"] +TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"] TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"] @@ -381,10 +381,11 @@ def test_info_procedure(kafka_topics, connection): @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_C) def test_load_c_transformations(connection, transformation): cursor = connection.cursor() - query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations.{transformation}' RETURN name" + + query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH '{transformation}' RETURN name" result = common.execute_and_fetch_all(cursor, query) assert len(result) == 1 - assert result[0][0] == f"c_transformations.{transformation}" + assert result[0][0] == transformation def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka_topics, connection): @@ -415,5 +416,100 @@ def test_check_stream_different_number_of_queries_than_messages(kafka_producer, common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender) +def test_start_stream_with_batch_limit(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name): + return ( + f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + ) + + def messages_sender(nof_messages): + for x in range(nof_messages): + kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60) + + common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender) + + +def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name): + return ( + f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + ) + + common.test_start_stream_with_batch_limit_timeout(connection, stream_creator) + + +def test_start_stream_with_batch_limit_reaching_timeout(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name, batch_size): + return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {batch_size}" + + common.test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator) + + +def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name): + return ( + f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + ) + + def message_sender(message): + kafka_producer.send(kafka_topics[0], message).get(timeout=6000) + + def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout): + thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout)) + thread_stream_check.start() + time.sleep(2) + assert common.get_is_running(cursor, stream_name) + message_sender(common.SIMPLE_MSG) + thread_stream_check.join() + + common.test_start_stream_with_batch_limit_while_check_running( + connection, stream_creator, message_sender, setup_function + ) + + +def test_check_while_stream_with_batch_limit_running(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name): + return ( + f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + ) + + def message_sender(message): + kafka_producer.send(kafka_topics[0], message).get(timeout=6000) + + common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender) + + +def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name): + return ( + f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + ) + + common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator) + + +def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection): + assert len(kafka_topics) > 0 + + def stream_creator(stream_name): + return ( + f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1" + ) + + common.test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/streams/pulsar_streams_tests.py b/tests/e2e/streams/pulsar_streams_tests.py index c77d72372..e0aecbebc 100755 --- a/tests/e2e/streams/pulsar_streams_tests.py +++ b/tests/e2e/streams/pulsar_streams_tests.py @@ -344,6 +344,73 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation): check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG) +def test_start_stream_with_batch_limit(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 1 + + def stream_creator(stream_name): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + + producer = pulsar_client.create_producer( + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) + + def messages_sender(nof_messages): + for x in range(nof_messages): + producer.send(common.SIMPLE_MSG) + + common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender) + + +def test_start_stream_with_batch_limit_timeout(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 1 + + def stream_creator(stream_name): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + + common.test_start_stream_with_batch_limit_timeout(connection, stream_creator) + + +def test_start_stream_with_batch_limit_reaching_timeout(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 1 + + def stream_creator(stream_name, batch_size): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE {batch_size}" + + common.test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator) + + +def test_start_stream_with_batch_limit_while_check_running(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 0 + + def stream_creator(stream_name): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + + producer = pulsar_client.create_producer( + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) + + def message_sender(message): + producer.send(message) + + common.test_start_stream_with_batch_limit_while_check_running(connection, stream_creator, message_sender) + + +def test_check_while_stream_with_batch_limit_running(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 0 + + def stream_creator(stream_name): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + + producer = pulsar_client.create_producer( + common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000 + ) + + def message_sender(message): + producer.send(message) + + common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender) + + def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection): assert len(pulsar_topics) > 0 @@ -380,5 +447,23 @@ def test_check_stream_different_number_of_queries_than_messages(pulsar_client, p common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender) +def test_start_stream_with_batch_limit_with_invalid_batch_limit(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 0 + + def stream_creator(stream_name): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + + common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator) + + +def test_check_stream_with_batch_limit_with_invalid_batch_limit(pulsar_client, pulsar_topics, connection): + assert len(pulsar_topics) > 0 + + def stream_creator(stream_name): + return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1" + + common.test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp index 552431096..426c836c3 100644 --- a/tests/unit/integrations_kafka_consumer.cpp +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -149,7 +149,7 @@ TEST_F(ConsumerTest, BatchInterval) { } consumer->Stop(); - EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received"; auto check_received_timestamp = [&received_timestamps](size_t index) { SCOPED_TRACE("Checking index " + std::to_string(index)); @@ -178,14 +178,6 @@ TEST_F(ConsumerTest, BatchInterval) { TEST_F(ConsumerTest, StartStop) { Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction}; - auto start = [&consumer](const bool use_conditional) { - if (use_conditional) { - consumer.StartIfStopped(); - } else { - consumer.Start(); - } - }; - auto stop = [&consumer](const bool use_conditional) { if (use_conditional) { consumer.StopIfRunning(); @@ -194,34 +186,28 @@ TEST_F(ConsumerTest, StartStop) { } }; - auto check_config = [&start, &stop, &consumer](const bool use_conditional_start, - const bool use_conditional_stop) mutable { - SCOPED_TRACE( - fmt::format("Conditional start {} and conditional stop {}", use_conditional_start, use_conditional_stop)); + auto check_config = [&stop, &consumer](const bool use_conditional_stop) mutable { + SCOPED_TRACE(fmt::format("Start and conditionally stop {}", use_conditional_stop)); EXPECT_FALSE(consumer.IsRunning()); EXPECT_THROW(consumer.Stop(), ConsumerStoppedException); consumer.StopIfRunning(); EXPECT_FALSE(consumer.IsRunning()); - start(use_conditional_start); + consumer.Start(); EXPECT_TRUE(consumer.IsRunning()); EXPECT_THROW(consumer.Start(), ConsumerRunningException); - consumer.StartIfStopped(); + EXPECT_TRUE(consumer.IsRunning()); stop(use_conditional_stop); EXPECT_FALSE(consumer.IsRunning()); }; - static constexpr auto kSimpleStart = false; static constexpr auto kSimpleStop = false; - static constexpr auto kConditionalStart = true; static constexpr auto kConditionalStop = true; - check_config(kSimpleStart, kSimpleStop); - check_config(kSimpleStart, kConditionalStop); - check_config(kConditionalStart, kSimpleStop); - check_config(kConditionalStart, kConditionalStop); + check_config(kSimpleStop); + check_config(kConditionalStop); } TEST_F(ConsumerTest, BatchSize) { @@ -252,7 +238,7 @@ TEST_F(ConsumerTest, BatchSize) { } std::this_thread::sleep_for(kBatchInterval * 2); consumer->Stop(); - EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received"; auto check_received_timestamp = [&received_timestamps](size_t index, size_t expected_message_count) { SCOPED_TRACE("Checking index " + std::to_string(index)); @@ -371,7 +357,7 @@ TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) { EXPECT_EQ(expected_total_messages, received_message_count); EXPECT_NO_THROW(consumer->Stop()); ASSERT_FALSE(consumer->IsRunning()); - EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received"; }; ASSERT_NO_FATAL_FAILURE(send_and_consume_messages(2)); @@ -383,10 +369,9 @@ TEST_F(ConsumerTest, CheckMethodWorks) { auto info = CreateDefaultConsumerInfo(); info.batch_size = kBatchSize; const std::string kMessagePrefix{"Message"}; - auto consumer_function = [](const std::vector &messages) mutable {}; // This test depends on CreateConsumer starts and stops the consumer, so the offset is stored - auto consumer = CreateConsumer(std::move(info), std::move(consumer_function)); + auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction); static constexpr auto kMessageCount = 4; for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) { @@ -411,7 +396,7 @@ TEST_F(ConsumerTest, CheckMethodWorks) { }); ASSERT_FALSE(consumer->IsRunning()); - EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received"; EXPECT_EQ(received_message_count, kMessageCount); }; @@ -445,8 +430,6 @@ TEST_F(ConsumerTest, CheckWithInvalidTimeout) { const auto start = std::chrono::steady_clock::now(); EXPECT_THROW(consumer.Check(std::chrono::milliseconds{0}, std::nullopt, kDummyConsumerFunction), ConsumerCheckFailedException); - EXPECT_THROW(consumer.Check(std::chrono::milliseconds{-1}, std::nullopt, kDummyConsumerFunction), - ConsumerCheckFailedException); const auto end = std::chrono::steady_clock::now(); static constexpr std::chrono::seconds kMaxExpectedTimeout{2}; @@ -459,7 +442,6 @@ TEST_F(ConsumerTest, CheckWithInvalidBatchSize) { const auto start = std::chrono::steady_clock::now(); EXPECT_THROW(consumer.Check(std::nullopt, 0, kDummyConsumerFunction), ConsumerCheckFailedException); - EXPECT_THROW(consumer.Check(std::nullopt, -1, kDummyConsumerFunction), ConsumerCheckFailedException); const auto end = std::chrono::steady_clock::now(); static constexpr std::chrono::seconds kMaxExpectedTimeout{2}; @@ -496,8 +478,85 @@ TEST_F(ConsumerTest, ConsumerStatus) { check_info(consumer.Info()); consumer.Start(); check_info(consumer.Info()); - consumer.StartIfStopped(); - check_info(consumer.Info()); consumer.StopIfRunning(); check_info(consumer.Info()); } + +TEST_F(ConsumerTest, LimitBatches_CannotStartIfAlreadyRunning) { + static constexpr auto kLimitBatches = 3; + + auto info = CreateDefaultConsumerInfo(); + + auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction); + + consumer->Start(); + ASSERT_TRUE(consumer->IsRunning()); + + EXPECT_THROW(consumer->StartWithLimit(kLimitBatches, std::nullopt /*timeout*/), ConsumerRunningException); + + EXPECT_TRUE(consumer->IsRunning()); + + consumer->Stop(); + EXPECT_FALSE(consumer->IsRunning()); +} + +TEST_F(ConsumerTest, LimitBatches_SendingMoreThanLimit) { + /* + We send more messages than the BatchSize*LimitBatches: + -Consumer should receive 2*3=6 messages. + -Consumer should not be running afterwards. + */ + static constexpr auto kBatchSize = 2; + static constexpr auto kLimitBatches = 3; + static constexpr auto kNumberOfMessagesToSend = 20; + static constexpr auto kNumberOfMessagesExpected = kBatchSize * kLimitBatches; + static constexpr auto kBatchInterval = + std::chrono::seconds{2}; // We do not want the batch interval to be the limiting factor here. + + auto info = CreateDefaultConsumerInfo(); + info.batch_size = kBatchSize; + info.batch_interval = kBatchInterval; + + static constexpr std::string_view kMessage = "LimitBatchesTestMessage"; + + auto expected_messages_received = true; + auto number_of_messages_received = 0; + auto consumer_function = [&expected_messages_received, + &number_of_messages_received](const std::vector &messages) mutable { + number_of_messages_received += messages.size(); + for (const auto &message : messages) { + expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size())); + } + }; + + auto consumer = CreateConsumer(std::move(info), consumer_function); + + for (auto sent_messages = 0; sent_messages <= kNumberOfMessagesToSend; ++sent_messages) { + cluster.SeedTopic(kTopicName, kMessage); + } + + consumer->StartWithLimit(kLimitBatches, kDontCareTimeout); + + EXPECT_FALSE(consumer->IsRunning()); + EXPECT_EQ(number_of_messages_received, kNumberOfMessagesExpected); + EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received"; +} + +TEST_F(ConsumerTest, LimitBatches_Timeout_Reached) { + // We do not send any messages, we expect an exeption to be thrown. + static constexpr auto kLimitBatches = 3; + + auto info = CreateDefaultConsumerInfo(); + + auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction); + + std::chrono::milliseconds timeout{3000}; + + const auto start = std::chrono::steady_clock::now(); + EXPECT_THROW(consumer->StartWithLimit(kLimitBatches, timeout), ConsumerStartFailedException); + const auto end = std::chrono::steady_clock::now(); + const auto elapsed = (end - start); + + EXPECT_LE(timeout, elapsed); + EXPECT_LE(elapsed, timeout * 1.2); +}