Add limit batches option to start stream query (#392)

This commit is contained in:
Jeremy B 2022-06-20 14:09:45 +02:00 committed by GitHub
parent 599c0a641f
commit 41d4185156
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 838 additions and 120 deletions

View File

@ -17,6 +17,7 @@
namespace memgraph::integrations {
inline constexpr int64_t kDefaultCheckBatchLimit{1};
inline constexpr int64_t kMinimumStartBatchLimit{1};
inline constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
inline constexpr std::chrono::milliseconds kMinimumInterval{1};
inline constexpr int64_t kMinimumSize{1};

View File

@ -74,6 +74,36 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaCon
return std::move(batch);
}
void CheckAndDestroyLastAssignmentIfNeeded(RdKafka::KafkaConsumer &consumer, const ConsumerInfo &info,
std::vector<RdKafka::TopicPartition *> &last_assignment) {
if (!last_assignment.empty()) {
if (const auto err = consumer.assign(last_assignment); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerStartFailedException(info.consumer_name,
fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err)));
}
RdKafka::TopicPartition::destroy(last_assignment);
}
}
void TryToConsumeBatch(RdKafka::KafkaConsumer &consumer, const ConsumerInfo &info,
const ConsumerFunction &consumer_function, const std::vector<Message> &batch) {
consumer_function(batch);
std::vector<RdKafka::TopicPartition *> partitions;
utils::OnScopeExit clear_partitions([&]() { RdKafka::TopicPartition::destroy(partitions); });
if (const auto err = consumer.assignment(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCommitFailedException(
info.consumer_name, fmt::format("Couldn't get assignment to commit offsets: {}", RdKafka::err2str(err)));
}
if (const auto err = consumer.position(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCommitFailedException(info.consumer_name,
fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err)));
}
if (const auto err = consumer.commitSync(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCommitFailedException(info.consumer_name, RdKafka::err2str(err));
}
}
} // namespace
Message::Message(std::unique_ptr<RdKafka::Message> &&message) : message_{std::move(message)} {
@ -221,10 +251,21 @@ void Consumer::Start() {
StartConsuming();
}
void Consumer::StartIfStopped() {
if (!is_running_) {
StartConsuming();
void Consumer::StartWithLimit(const uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
if (limit_batches < kMinimumStartBatchLimit) {
throw ConsumerStartFailedException(
info_.consumer_name, fmt::format("Batch limit has to be greater than or equal to {}", kMinimumStartBatchLimit));
}
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerStartFailedException(
info_.consumer_name,
fmt::format("Timeout has to be greater than or equal to {} milliseconds", kMinimumInterval.count()));
}
StartConsumingWithLimit(limit_batches, timeout);
}
void Consumer::Stop() {
@ -244,7 +285,7 @@ void Consumer::StopIfRunning() {
}
}
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const {
// NOLINTNEXTLINE (modernize-use-nullptr)
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
@ -344,13 +385,7 @@ void Consumer::StartConsuming() {
is_running_.store(true);
if (!last_assignment_.empty()) {
if (const auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerStartFailedException(info_.consumer_name,
fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err)));
}
RdKafka::TopicPartition::destroy(last_assignment_);
}
CheckAndDestroyLastAssignmentIfNeeded(*consumer_, info_, last_assignment_);
thread_ = std::thread([this] {
static constexpr auto kMaxThreadNameSize = utils::GetMaxThreadNameSize();
@ -361,33 +396,18 @@ void Consumer::StartConsuming() {
while (is_running_) {
auto maybe_batch = GetBatch(*consumer_, info_, is_running_);
if (maybe_batch.HasError()) {
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
maybe_batch.GetError());
break;
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) continue;
if (batch.empty()) {
continue;
}
spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name);
try {
consumer_function_(batch);
std::vector<RdKafka::TopicPartition *> partitions;
utils::OnScopeExit clear_partitions([&]() { RdKafka::TopicPartition::destroy(partitions); });
if (const auto err = consumer_->assignment(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCheckFailedException(
info_.consumer_name, fmt::format("Couldn't get assignment to commit offsets: {}", RdKafka::err2str(err)));
}
if (const auto err = consumer_->position(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCheckFailedException(
info_.consumer_name, fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err)));
}
if (const auto err = consumer_->commitSync(partitions); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Committing offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err));
break;
}
TryToConsumeBatch(*consumer_, info_, consumer_function_, batch);
} catch (const std::exception &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
@ -398,6 +418,44 @@ void Consumer::StartConsuming() {
});
}
void Consumer::StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const {
MG_ASSERT(!is_running_, "Cannot start already running consumer!");
if (is_running_.exchange(true)) {
throw ConsumerRunningException(info_.consumer_name);
}
utils::OnScopeExit restore_is_running([this] { is_running_.store(false); });
CheckAndDestroyLastAssignmentIfNeeded(*consumer_, info_, last_assignment_);
const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout);
const auto start = std::chrono::steady_clock::now();
for (uint64_t batch_count = 0; batch_count < limit_batches;) {
const auto now = std::chrono::steady_clock::now();
if (now - start >= timeout_to_use) {
throw ConsumerStartFailedException(info_.consumer_name, "Timeout reached");
}
const auto maybe_batch = GetBatch(*consumer_, info_, is_running_);
if (maybe_batch.HasError()) {
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
++batch_count;
spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name);
TryToConsumeBatch(*consumer_, info_, consumer_function_, batch);
spdlog::info("Kafka consumer {} finished processing", info_.consumer_name);
}
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) thread_.join();

View File

@ -113,11 +113,19 @@ class Consumer final : public RdKafka::EventCb {
/// This method will start a new thread which will poll all the topics for messages.
///
/// @throws ConsumerRunningException if the consumer is already running
/// @throws ConsumerStartFailedException if the commited offsets cannot be restored
void Start();
/// Starts consuming messages if it is not started already.
/// Starts consuming messages.
///
void StartIfStopped();
/// This method will start a new thread which will poll all the topics for messages.
///
/// @param limit_batches the consumer will only consume the given number of batches.
/// @param timeout the maximum duration during which the command should run.
///
/// @throws ConsumerRunningException if the consumer is already running
/// @throws ConsumerStartFailedException if the commited offsets cannot be restored
void StartWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
/// Stops consuming messages.
///
@ -136,9 +144,9 @@ class Consumer final : public RdKafka::EventCb {
/// used.
/// @param check_consumer_function a function to feed the received messages in, only used during this dry-run.
///
/// @throws ConsumerRunningException if the consumer is alredy running.
/// @throws ConsumerRunningException if the consumer is already running.
/// @throws ConsumerCheckFailedException if check isn't successful.
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const;
/// Returns true if the consumer is actively consuming messages.
@ -157,6 +165,7 @@ class Consumer final : public RdKafka::EventCb {
void event_cb(RdKafka::Event &event) override;
void StartConsuming();
void StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
void StopConsuming();
@ -178,7 +187,6 @@ class Consumer final : public RdKafka::EventCb {
ConsumerFunction consumer_function_;
mutable std::atomic<bool> is_running_{false};
mutable std::vector<RdKafka::TopicPartition *> last_assignment_; // Protected by is_running_
std::optional<int64_t> limit_batches_{std::nullopt};
std::unique_ptr<RdKafka::KafkaConsumer, std::function<void(RdKafka::KafkaConsumer *)>> consumer_;
std::thread thread_;
ConsumerRebalanceCb cb_;

View File

@ -64,4 +64,16 @@ class TopicNotFoundException : public KafkaStreamException {
TopicNotFoundException(const std::string_view consumer_name, const std::string_view topic_name)
: KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
class ConsumerCommitFailedException : public KafkaStreamException {
public:
ConsumerCommitFailedException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Committing offset of consumer {} failed: {}", consumer_name, error) {}
};
class ConsumerReadMessagesFailedException : public KafkaStreamException {
public:
ConsumerReadMessagesFailedException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Error happened in consumer {} while fetching messages: {}", consumer_name, error) {}
};
} // namespace memgraph::integrations::kafka

View File

@ -11,13 +11,14 @@
#include "integrations/pulsar/consumer.hpp"
#include <algorithm>
#include <chrono>
#include <thread>
#include <fmt/format.h>
#include <pulsar/Client.h>
#include <pulsar/InitialPosition.h>
#include <chrono>
#include <thread>
#include "integrations/constants.hpp"
#include "integrations/pulsar/exceptions.hpp"
#include "utils/concepts.hpp"
@ -33,6 +34,10 @@ namespace {
template <typename T>
concept PulsarConsumer = utils::SameAsAnyOf<T, pulsar_client::Consumer, pulsar_client::Reader>;
template <typename TFunc>
concept PulsarMessageGetter =
std::same_as<const pulsar_client::Message &, std::invoke_result_t<TFunc, const Message &>>;
pulsar_client::Result ConsumeMessage(pulsar_client::Consumer &consumer, pulsar_client::Message &message,
int remaining_timeout_in_ms) {
return consumer.receive(message, remaining_timeout_in_ms);
@ -97,6 +102,26 @@ pulsar_client::Client CreateClient(const std::string &service_url) {
conf.setLogger(new SpdlogLoggerFactory);
return {service_url, conf};
}
template <PulsarConsumer TConsumer, PulsarMessageGetter TPulsarMessageGetter>
void TryToConsumeBatch(TConsumer &consumer, const ConsumerInfo &info, const ConsumerFunction &consumer_function,
pulsar_client::MessageId &last_message_id, const std::vector<Message> &batch,
const TPulsarMessageGetter &message_getter) {
consumer_function(batch);
auto has_message_failed = [&consumer, &info, &last_message_id, &message_getter](const auto &message) {
if (const auto result = consumer.acknowledge(message_getter(message)); result != pulsar_client::ResultOk) {
spdlog::warn("Acknowledging a message of consumer {} failed: {}", info.consumer_name, result);
return true;
}
last_message_id = message_getter(message).getMessageId();
return false;
};
if (std::ranges::any_of(batch, has_message_failed)) {
throw ConsumerAcknowledgeMessagesFailedException(info.consumer_name);
}
}
} // namespace
Message::Message(pulsar_client::Message &&message) : message_{std::move(message)} {}
@ -137,6 +162,24 @@ void Consumer::Start() {
StartConsuming();
}
void Consumer::StartWithLimit(const uint64_t limit_batches,
const std::optional<std::chrono::milliseconds> timeout) const {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
if (limit_batches < kMinimumStartBatchLimit) {
throw ConsumerStartFailedException(
info_.consumer_name, fmt::format("Batch limit has to be greater than or equal to {}", kMinimumStartBatchLimit));
}
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerStartFailedException(
info_.consumer_name,
fmt::format("Timeout has to be greater than or equal to {} milliseconds", kMinimumInterval.count()));
}
StartConsumingWithLimit(limit_batches, timeout);
}
void Consumer::Stop() {
if (!is_running_) {
throw ConsumerStoppedException(info_.consumer_name);
@ -154,7 +197,7 @@ void Consumer::StopIfRunning() {
}
}
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const {
// NOLINTNEXTLINE (modernize-use-nullptr)
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
@ -240,9 +283,7 @@ void Consumer::StartConsuming() {
auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_);
if (maybe_batch.HasError()) {
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
maybe_batch.GetError());
break;
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
@ -254,18 +295,8 @@ void Consumer::StartConsuming() {
spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name);
try {
consumer_function_(batch);
if (std::any_of(batch.begin(), batch.end(), [&](const auto &message) {
if (const auto result = consumer_.acknowledge(message.message_); result != pulsar_client::ResultOk) {
spdlog::warn("Acknowledging a message of consumer {} failed: {}", info_.consumer_name, result);
return true;
}
last_message_id_ = message.message_.getMessageId();
return false;
})) {
break;
}
TryToConsumeBatch(consumer_, info_, consumer_function_, last_message_id_, batch,
[&](const Message &message) -> const pulsar_client::Message & { return message.message_; });
} catch (const std::exception &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
@ -277,6 +308,43 @@ void Consumer::StartConsuming() {
});
}
void Consumer::StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const {
if (is_running_.exchange(true)) {
throw ConsumerRunningException(info_.consumer_name);
}
utils::OnScopeExit restore_is_running([this] { is_running_.store(false); });
const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout);
const auto start = std::chrono::steady_clock::now();
for (uint64_t batch_count = 0; batch_count < limit_batches;) {
const auto now = std::chrono::steady_clock::now();
if (now - start >= timeout_to_use) {
throw ConsumerCheckFailedException(info_.consumer_name, "Timeout reached");
}
const auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_);
if (maybe_batch.HasError()) {
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
++batch_count;
spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name);
TryToConsumeBatch(consumer_, info_, consumer_function_, last_message_id_, batch,
[](const Message &message) -> const pulsar_client::Message & { return message.message_; });
spdlog::info("Pulsar consumer {} finished processing", info_.consumer_name);
}
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) {

View File

@ -58,25 +58,27 @@ class Consumer final {
bool IsRunning() const;
void Start();
void StartWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
void Stop();
void StopIfRunning();
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const;
const ConsumerInfo &Info() const;
private:
void StartConsuming();
void StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
void StopConsuming();
ConsumerInfo info_;
mutable pulsar_client::Client client_;
pulsar_client::Consumer consumer_;
mutable pulsar_client::Consumer consumer_;
ConsumerFunction consumer_function_;
mutable std::atomic<bool> is_running_{false};
pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()};
mutable pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()}; // Protected by is_running_
std::thread thread_;
};
} // namespace memgraph::integrations::pulsar

View File

@ -55,4 +55,16 @@ class TopicNotFoundException : public PulsarStreamException {
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name)
: PulsarStreamException("Pulsar consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
class ConsumerReadMessagesFailedException : public PulsarStreamException {
public:
ConsumerReadMessagesFailedException(const std::string_view consumer_name, const std::string_view error)
: PulsarStreamException("Error happened in consumer {} while fetching messages: {}", consumer_name, error) {}
};
class ConsumerAcknowledgeMessagesFailedException : public PulsarStreamException {
public:
explicit ConsumerAcknowledgeMessagesFailedException(const std::string_view consumer_name)
: PulsarStreamException("Acknowledging a message of consumer {} has failed!", consumer_name) {}
};
} // namespace memgraph::integrations::pulsar

View File

@ -775,6 +775,23 @@ antlrcpp::Any CypherMainVisitor::visitDropStream(MemgraphCypher::DropStreamConte
antlrcpp::Any CypherMainVisitor::visitStartStream(MemgraphCypher::StartStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::START_STREAM;
if (ctx->BATCH_LIMIT()) {
if (!ctx->batchLimit->numberLiteral() || !ctx->batchLimit->numberLiteral()->integerLiteral()) {
throw SemanticException("Batch limit should be an integer literal!");
}
stream_query->batch_limit_ = ctx->batchLimit->accept(this);
}
if (ctx->TIMEOUT()) {
if (!ctx->timeout->numberLiteral() || !ctx->timeout->numberLiteral()->integerLiteral()) {
throw SemanticException("Timeout should be an integer literal!");
}
if (!ctx->BATCH_LIMIT()) {
throw SemanticException("Parameter TIMEOUT can only be defined if BATCH_LIMIT is defined");
}
stream_query->timeout_ = ctx->timeout->accept(this);
}
stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as<std::string>();
return stream_query;
}

View File

@ -351,7 +351,7 @@ pulsarCreateStream : CREATE PULSAR STREAM streamName ( pulsarCreateStreamConfig
dropStream : DROP STREAM streamName ;
startStream : START STREAM streamName ;
startStream : START STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ( TIMEOUT timeout=literal ) ? ;
startAllStreams : START ALL STREAMS ;

View File

@ -687,12 +687,26 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return callback;
}
case StreamQuery::Action::START_STREAM: {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Start(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
fmt::format("Started stream {}.", stream_query->stream_name_));
const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
const auto timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator);
if (batch_limit.has_value()) {
if (batch_limit.value() < 0) {
throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, batch_limit, timeout]() {
interpreter_context->streams.StartWithLimit(stream_name, static_cast<uint64_t>(batch_limit.value()), timeout);
return std::vector<std::vector<TypedValue>>{};
};
} else {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Start(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
fmt::format("Started stream {}.", stream_query->stream_name_));
}
return callback;
}
case StreamQuery::Action::START_ALL_STREAMS: {
@ -762,9 +776,15 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
}
case StreamQuery::Action::CHECK_STREAM: {
callback.header = {"queries", "raw messages"};
const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
if (batch_limit.has_value() && batch_limit.value() < 0) {
throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator),
batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable {
batch_limit]() mutable {
return interpreter_context->streams.Check(stream_name, timeout, batch_limit);
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CHECK_STREAM,

View File

@ -52,10 +52,11 @@ concept Stream = requires(TStream stream) {
typename TStream::Message;
TStream{std::string{""}, typename TStream::StreamInfo{}, ConsumerFunction<typename TStream::Message>{}};
{ stream.Start() } -> std::same_as<void>;
{ stream.StartWithLimit(uint64_t{}, std::optional<std::chrono::milliseconds>{}) } -> std::same_as<void>;
{ stream.Stop() } -> std::same_as<void>;
{ stream.IsRunning() } -> std::same_as<bool>;
{
stream.Check(std::optional<std::chrono::milliseconds>{}, std::optional<int64_t>{},
stream.Check(std::optional<std::chrono::milliseconds>{}, std::optional<uint64_t>{},
ConsumerFunction<typename TStream::Message>{})
} -> std::same_as<void>;
requires std::same_as<std::decay_t<decltype(std::declval<typename TStream::StreamInfo>().common_info)>,

View File

@ -44,10 +44,13 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const
}
void KafkaStream::Start() { consumer_->Start(); }
void KafkaStream::StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const {
consumer_->StartWithLimit(batch_limit, timeout);
}
void KafkaStream::Stop() { consumer_->Stop(); }
bool KafkaStream::IsRunning() const { return consumer_->IsRunning(); }
void KafkaStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void KafkaStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<integrations::kafka::Message> &consumer_function) const {
consumer_->Check(timeout, batch_limit, consumer_function);
}
@ -106,10 +109,12 @@ PulsarStream::StreamInfo PulsarStream::Info(std::string transformation_name) con
}
void PulsarStream::Start() { consumer_->Start(); }
void PulsarStream::StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const {
consumer_->StartWithLimit(batch_limit, timeout);
}
void PulsarStream::Stop() { consumer_->Stop(); }
bool PulsarStream::IsRunning() const { return consumer_->IsRunning(); }
void PulsarStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void PulsarStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const {
consumer_->Check(timeout, batch_limit, consumer_function);
}

View File

@ -36,10 +36,11 @@ struct KafkaStream {
StreamInfo Info(std::string transformation_name) const;
void Start();
void StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const;
void Stop();
bool IsRunning() const;
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const;
utils::BasicResult<std::string> SetStreamOffset(int64_t offset);
@ -71,10 +72,11 @@ struct PulsarStream {
StreamInfo Info(std::string transformation_name) const;
void Start();
void StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const;
void Stop();
bool IsRunning() const;
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const;
private:

View File

@ -456,7 +456,7 @@ void Streams::Create(const std::string &stream_name, typename TStream::StreamInf
try {
std::visit(
[&](auto &&stream_data) {
[&](const auto &stream_data) {
const auto stream_source_ptr = stream_data.stream_source->ReadLock();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
},
@ -575,7 +575,7 @@ void Streams::RestoreStreams() {
auto it = CreateConsumer<T>(*locked_streams_map, stream_name, std::move(status.info), std::move(status.owner));
if (status.is_running) {
std::visit(
[&](auto &&stream_data) {
[&](const auto &stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Start();
},
@ -617,7 +617,7 @@ void Streams::Drop(const std::string &stream_name) {
// function can be executing with the consumer, nothing else.
// By acquiring the write lock here for the consumer, we make sure there is
// no running Test function for this consumer, therefore it can be erased.
std::visit([&](auto &&stream_data) { stream_data.stream_source->Lock(); }, it->second);
std::visit([&](const auto &stream_data) { stream_data.stream_source->Lock(); }, it->second);
locked_streams->erase(it);
if (!storage_.Delete(stream_name)) {
@ -632,7 +632,7 @@ void Streams::Start(const std::string &stream_name) {
auto it = GetStream(*locked_streams, stream_name);
std::visit(
[&, this](auto &&stream_data) {
[&, this](const auto &stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Start();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
@ -640,12 +640,27 @@ void Streams::Start(const std::string &stream_name) {
it->second);
}
void Streams::StartWithLimit(const std::string &stream_name, uint64_t batch_limit,
std::optional<std::chrono::milliseconds> timeout) const {
std::optional locked_streams{streams_.ReadLock()};
auto it = GetStream(**locked_streams, stream_name);
std::visit(
[&](const auto &stream_data) {
const auto locked_stream_source = stream_data.stream_source->ReadLock();
locked_streams.reset();
locked_stream_source->StartWithLimit(batch_limit, timeout);
},
it->second);
}
void Streams::Stop(const std::string &stream_name) {
auto locked_streams = streams_.Lock();
auto it = GetStream(*locked_streams, stream_name);
std::visit(
[&, this](auto &&stream_data) {
[&, this](const auto &stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Stop();
@ -657,7 +672,7 @@ void Streams::Stop(const std::string &stream_name) {
void Streams::StartAll() {
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&stream_name = stream_name, this](auto &&stream_data) {
[&stream_name = stream_name, this](const auto &stream_data) {
auto locked_stream_source = stream_data.stream_source->Lock();
if (!locked_stream_source->IsRunning()) {
locked_stream_source->Start();
@ -672,7 +687,7 @@ void Streams::StartAll() {
void Streams::StopAll() {
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&stream_name = stream_name, this](auto &&stream_data) {
[&stream_name = stream_name, this](const auto &stream_data) {
auto locked_stream_source = stream_data.stream_source->Lock();
if (locked_stream_source->IsRunning()) {
locked_stream_source->Stop();
@ -689,7 +704,7 @@ std::vector<StreamStatus<>> Streams::GetStreamInfo() const {
{
for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&, &stream_name = stream_name](auto &&stream_data) {
[&, &stream_name = stream_name](const auto &stream_data) {
auto locked_stream_source = stream_data.stream_source->ReadLock();
auto info = locked_stream_source->Info(stream_data.transformation_name);
result.emplace_back(StreamStatus<>{stream_name, StreamType(*locked_stream_source),
@ -703,12 +718,12 @@ std::vector<StreamStatus<>> Streams::GetStreamInfo() const {
}
TransformationResult Streams::Check(const std::string &stream_name, std::optional<std::chrono::milliseconds> timeout,
std::optional<int64_t> batch_limit) const {
std::optional<uint64_t> batch_limit) const {
std::optional locked_streams{streams_.ReadLock()};
auto it = GetStream(**locked_streams, stream_name);
return std::visit(
[&](auto &&stream_data) {
[&](const auto &stream_data) {
// This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after
// that
const auto locked_stream_source = stream_data.stream_source->ReadLock();

View File

@ -115,6 +115,17 @@ class Streams final {
/// @throws ConsumerRunningException if the consumer is already running
void Start(const std::string &stream_name);
/// Start consuming from a stream.
///
/// @param stream_name name of the stream that needs to be started
/// @param batch_limit number of batches we want to consume before stopping
/// @param timeout the maximum duration during which the command should run.
///
/// @throws StreamsException if the stream doesn't exist
/// @throws ConsumerRunningException if the consumer is already running
void StartWithLimit(const std::string &stream_name, uint64_t batch_limit,
std::optional<std::chrono::milliseconds> timeout) const;
/// Stop consuming from a stream.
///
/// @param stream_name name of the stream that needs to be stopped
@ -142,6 +153,7 @@ class Streams final {
///
/// @param stream_name name of the stream we want to test
/// @param batch_limit number of batches we want to test before stopping
/// @param timeout the maximum duration during which the command should run.
///
/// @returns A vector of vectors of TypedValue. Each subvector contains two elements, the query string and the
/// nullable parameters map.
@ -151,7 +163,7 @@ class Streams final {
/// @throws ConsumerCheckFailedException if the transformation function throws any std::exception during processing
TransformationResult Check(const std::string &stream_name,
std::optional<std::chrono::milliseconds> timeout = std::nullopt,
std::optional<int64_t> batch_limit = std::nullopt) const;
std::optional<uint64_t> batch_limit = std::nullopt) const;
private:
template <Stream TStream>

View File

@ -10,6 +10,7 @@
# licenses/APL.txt.
import mgclient
import pytest
import time
from multiprocessing import Manager, Process, Value
@ -112,6 +113,13 @@ def start_stream(cursor, stream_name):
assert get_is_running(cursor, stream_name)
def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None):
if timeout is not None:
execute_and_fetch_all(cursor, f"START STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout} ")
else:
execute_and_fetch_all(cursor, f"START STREAM {stream_name} BATCH_LIMIT {batch_limit}")
def stop_stream(cursor, stream_name):
execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}")
@ -253,10 +261,11 @@ def test_start_checked_stream_after_timeout(connection, stream_creator):
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator("test_stream"))
TIMEOUT_MS = 2000
TIMEOUT_IN_MS = 2000
TIMEOUT_IN_SECONDS = TIMEOUT_IN_MS / 1000
def call_check():
execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_MS}")
execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_IN_MS}")
check_stream_proc = Process(target=call_check, daemon=True)
@ -266,7 +275,7 @@ def test_start_checked_stream_after_timeout(connection, stream_creator):
start_stream(cursor, "test_stream")
end = time.time()
assert (end - start) < 1.3 * TIMEOUT_MS, "The START STREAM was blocked too long"
assert (end - start) < 1.3 * TIMEOUT_IN_SECONDS, "The START STREAM was blocked too long"
assert get_is_running(cursor, "test_stream")
stop_stream(cursor, "test_stream")
@ -401,3 +410,239 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre
assert expected_queries_and_raw_messages_1 == results.value[0]
assert expected_queries_and_raw_messages_2 == results.value[1]
assert expected_queries_and_raw_messages_3 == results.value[2]
def test_start_stream_with_batch_limit(connection, stream_creator, messages_sender):
STREAM_NAME = "test"
BATCH_LIMIT = 5
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
def start_new_stream_with_limit(stream_name, batch_limit):
connection = connect()
cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit)
thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT))
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
messages_sender(BATCH_LIMIT - 1)
# We have not sent enough batches to reach the limit. We check that the stream is still correctly running.
assert get_is_running(cursor, STREAM_NAME)
# We send a last message to reach the batch_limit
messages_sender(1)
time.sleep(2)
# We check that the stream has correctly stoped.
assert not get_is_running(cursor, STREAM_NAME)
def test_start_stream_with_batch_limit_timeout(connection, stream_creator):
# We check that we get the expected exception when trying to run START STREAM while providing TIMEOUT and not BATCH_LIMIT
STREAM_NAME = "test"
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"START STREAM {STREAM_NAME} TIMEOUT 3000")
def test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator):
# We check that we get the expected exception when running START STREAM while providing TIMEOUT and BATCH_LIMIT
STREAM_NAME = "test"
BATCH_LIMIT = 5
TIMEOUT = 3000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE))
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"START STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (
end_time - start_time
) >= TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to something else than timeout!"
def test_start_stream_with_batch_limit_while_check_running(
connection, stream_creator, message_sender, setup_function=None
):
# 1/ We check we get the correct exception calling START STREAM with BATCH_LIMIT while a CHECK STREAM is already running.
# 2/ Afterwards, we terminate the CHECK STREAM and start a START STREAM with BATCH_LIMIT
def start_check_stream(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout}")
def start_new_stream_with_limit(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit, timeout=timeout)
STREAM_NAME = "test_check_and_batch_limit"
BATCH_LIMIT = 1
TIMEOUT = 10000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
# 0/ Extra setup needed for Kafka to works correctly if Check stream is execute before any messages have been consumed.
if setup_function is not None:
setup_function(start_check_stream, cursor, STREAM_NAME, BATCH_LIMIT, TIMEOUT)
# 1/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, BATCH_LIMIT, timeout=TIMEOUT)
assert get_is_running(cursor, STREAM_NAME)
message_sender(SIMPLE_MSG)
thread_stream_check.join()
assert not get_is_running(cursor, STREAM_NAME)
# 2/
thread_stream_running = Process(
target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT + 1, TIMEOUT)
) # Sending BATCH_LIMIT + 1 messages as BATCH_LIMIT messages have already been sent during the CHECK STREAM (and not consumed)
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
def test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender):
# 1/ We check we get the correct exception calling CHECK STREAM while START STREAM with BATCH_LIMIT is already running
# 2/ Afterwards, we terminate the START STREAM with BATCH_LIMIT and start a CHECK STREAM
def start_new_stream_with_limit(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit, timeout=timeout)
def start_check_stream(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout}")
STREAM_NAME = "test_batch_limit_and_check"
BATCH_LIMIT = 1
TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
# 1/
thread_stream_running = Process(
target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT)
)
start_time = time.time()
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT, "The CHECK STREAM has probably thrown due to timeout!"
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
# 2/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
start_time = time.time()
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
message_sender(SIMPLE_MSG)
time.sleep(2)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"
assert not get_is_running(cursor, STREAM_NAME)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator):
# We check that we get a correct exception when giving a negative batch_limit
STREAM_NAME = "test_batch_limit_invalid_batch_limit"
TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
time.sleep(2)
# 1/ checking with batch_limit=-10
batch_limit = -10
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, batch_limit, timeout=TIMEOUT)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to timeout!"
# 2/ checking with batch_limit=0
batch_limit = 0
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, batch_limit, timeout=TIMEOUT)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to timeout!"
def test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator):
# We check that we get a correct exception when giving a negative batch_limit
STREAM_NAME = "test_batch_limit_invalid_batch_limit"
TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
time.sleep(2)
# 1/ checking with batch_limit=-10
batch_limit = -10
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {batch_limit} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"
# 2/ checking with batch_limit=0
batch_limit = 0
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {batch_limit} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"

View File

@ -18,7 +18,7 @@ import time
from multiprocessing import Process, Value
import common
TRANSFORMATIONS_TO_CHECK_C = ["empty_transformation"]
TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"]
TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"]
@ -381,10 +381,11 @@ def test_info_procedure(kafka_topics, connection):
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_C)
def test_load_c_transformations(connection, transformation):
cursor = connection.cursor()
query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations.{transformation}' RETURN name"
query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH '{transformation}' RETURN name"
result = common.execute_and_fetch_all(cursor, query)
assert len(result) == 1
assert result[0][0] == f"c_transformations.{transformation}"
assert result[0][0] == transformation
def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka_topics, connection):
@ -415,5 +416,100 @@ def test_check_stream_different_number_of_queries_than_messages(kafka_producer,
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
def messages_sender(nof_messages):
for x in range(nof_messages):
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender)
def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
common.test_start_stream_with_batch_limit_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_reaching_timeout(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name, batch_size):
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {batch_size}"
common.test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
def message_sender(message):
kafka_producer.send(kafka_topics[0], message).get(timeout=6000)
def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout):
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout))
thread_stream_check.start()
time.sleep(2)
assert common.get_is_running(cursor, stream_name)
message_sender(common.SIMPLE_MSG)
thread_stream_check.join()
common.test_start_stream_with_batch_limit_while_check_running(
connection, stream_creator, message_sender, setup_function
)
def test_check_while_stream_with_batch_limit_running(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
def message_sender(message):
kafka_producer.send(kafka_topics[0], message).get(timeout=6000)
common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
common.test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -344,6 +344,73 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
def test_start_stream_with_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def messages_sender(nof_messages):
for x in range(nof_messages):
producer.send(common.SIMPLE_MSG)
common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender)
def test_start_stream_with_batch_limit_timeout(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
common.test_start_stream_with_batch_limit_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_reaching_timeout(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
def stream_creator(stream_name, batch_size):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE {batch_size}"
common.test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_while_check_running(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(message):
producer.send(message)
common.test_start_stream_with_batch_limit_while_check_running(connection, stream_creator, message_sender)
def test_check_while_stream_with_batch_limit_running(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(message):
producer.send(message)
common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender)
def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
@ -380,5 +447,23 @@ def test_check_stream_different_number_of_queries_than_messages(pulsar_client, p
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
def test_check_stream_with_batch_limit_with_invalid_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
common.test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -149,7 +149,7 @@ TEST_F(ConsumerTest, BatchInterval) {
}
consumer->Stop();
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
auto check_received_timestamp = [&received_timestamps](size_t index) {
SCOPED_TRACE("Checking index " + std::to_string(index));
@ -178,14 +178,6 @@ TEST_F(ConsumerTest, BatchInterval) {
TEST_F(ConsumerTest, StartStop) {
Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction};
auto start = [&consumer](const bool use_conditional) {
if (use_conditional) {
consumer.StartIfStopped();
} else {
consumer.Start();
}
};
auto stop = [&consumer](const bool use_conditional) {
if (use_conditional) {
consumer.StopIfRunning();
@ -194,34 +186,28 @@ TEST_F(ConsumerTest, StartStop) {
}
};
auto check_config = [&start, &stop, &consumer](const bool use_conditional_start,
const bool use_conditional_stop) mutable {
SCOPED_TRACE(
fmt::format("Conditional start {} and conditional stop {}", use_conditional_start, use_conditional_stop));
auto check_config = [&stop, &consumer](const bool use_conditional_stop) mutable {
SCOPED_TRACE(fmt::format("Start and conditionally stop {}", use_conditional_stop));
EXPECT_FALSE(consumer.IsRunning());
EXPECT_THROW(consumer.Stop(), ConsumerStoppedException);
consumer.StopIfRunning();
EXPECT_FALSE(consumer.IsRunning());
start(use_conditional_start);
consumer.Start();
EXPECT_TRUE(consumer.IsRunning());
EXPECT_THROW(consumer.Start(), ConsumerRunningException);
consumer.StartIfStopped();
EXPECT_TRUE(consumer.IsRunning());
stop(use_conditional_stop);
EXPECT_FALSE(consumer.IsRunning());
};
static constexpr auto kSimpleStart = false;
static constexpr auto kSimpleStop = false;
static constexpr auto kConditionalStart = true;
static constexpr auto kConditionalStop = true;
check_config(kSimpleStart, kSimpleStop);
check_config(kSimpleStart, kConditionalStop);
check_config(kConditionalStart, kSimpleStop);
check_config(kConditionalStart, kConditionalStop);
check_config(kSimpleStop);
check_config(kConditionalStop);
}
TEST_F(ConsumerTest, BatchSize) {
@ -252,7 +238,7 @@ TEST_F(ConsumerTest, BatchSize) {
}
std::this_thread::sleep_for(kBatchInterval * 2);
consumer->Stop();
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
auto check_received_timestamp = [&received_timestamps](size_t index, size_t expected_message_count) {
SCOPED_TRACE("Checking index " + std::to_string(index));
@ -371,7 +357,7 @@ TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) {
EXPECT_EQ(expected_total_messages, received_message_count);
EXPECT_NO_THROW(consumer->Stop());
ASSERT_FALSE(consumer->IsRunning());
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
};
ASSERT_NO_FATAL_FAILURE(send_and_consume_messages(2));
@ -383,10 +369,9 @@ TEST_F(ConsumerTest, CheckMethodWorks) {
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
const std::string kMessagePrefix{"Message"};
auto consumer_function = [](const std::vector<Message> &messages) mutable {};
// This test depends on CreateConsumer starts and stops the consumer, so the offset is stored
auto consumer = CreateConsumer(std::move(info), std::move(consumer_function));
auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction);
static constexpr auto kMessageCount = 4;
for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) {
@ -411,7 +396,7 @@ TEST_F(ConsumerTest, CheckMethodWorks) {
});
ASSERT_FALSE(consumer->IsRunning());
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
EXPECT_EQ(received_message_count, kMessageCount);
};
@ -445,8 +430,6 @@ TEST_F(ConsumerTest, CheckWithInvalidTimeout) {
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{0}, std::nullopt, kDummyConsumerFunction),
ConsumerCheckFailedException);
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{-1}, std::nullopt, kDummyConsumerFunction),
ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
static constexpr std::chrono::seconds kMaxExpectedTimeout{2};
@ -459,7 +442,6 @@ TEST_F(ConsumerTest, CheckWithInvalidBatchSize) {
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::nullopt, 0, kDummyConsumerFunction), ConsumerCheckFailedException);
EXPECT_THROW(consumer.Check(std::nullopt, -1, kDummyConsumerFunction), ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
static constexpr std::chrono::seconds kMaxExpectedTimeout{2};
@ -496,8 +478,85 @@ TEST_F(ConsumerTest, ConsumerStatus) {
check_info(consumer.Info());
consumer.Start();
check_info(consumer.Info());
consumer.StartIfStopped();
check_info(consumer.Info());
consumer.StopIfRunning();
check_info(consumer.Info());
}
TEST_F(ConsumerTest, LimitBatches_CannotStartIfAlreadyRunning) {
static constexpr auto kLimitBatches = 3;
auto info = CreateDefaultConsumerInfo();
auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction);
consumer->Start();
ASSERT_TRUE(consumer->IsRunning());
EXPECT_THROW(consumer->StartWithLimit(kLimitBatches, std::nullopt /*timeout*/), ConsumerRunningException);
EXPECT_TRUE(consumer->IsRunning());
consumer->Stop();
EXPECT_FALSE(consumer->IsRunning());
}
TEST_F(ConsumerTest, LimitBatches_SendingMoreThanLimit) {
/*
We send more messages than the BatchSize*LimitBatches:
-Consumer should receive 2*3=6 messages.
-Consumer should not be running afterwards.
*/
static constexpr auto kBatchSize = 2;
static constexpr auto kLimitBatches = 3;
static constexpr auto kNumberOfMessagesToSend = 20;
static constexpr auto kNumberOfMessagesExpected = kBatchSize * kLimitBatches;
static constexpr auto kBatchInterval =
std::chrono::seconds{2}; // We do not want the batch interval to be the limiting factor here.
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
info.batch_interval = kBatchInterval;
static constexpr std::string_view kMessage = "LimitBatchesTestMessage";
auto expected_messages_received = true;
auto number_of_messages_received = 0;
auto consumer_function = [&expected_messages_received,
&number_of_messages_received](const std::vector<Message> &messages) mutable {
number_of_messages_received += messages.size();
for (const auto &message : messages) {
expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size()));
}
};
auto consumer = CreateConsumer(std::move(info), consumer_function);
for (auto sent_messages = 0; sent_messages <= kNumberOfMessagesToSend; ++sent_messages) {
cluster.SeedTopic(kTopicName, kMessage);
}
consumer->StartWithLimit(kLimitBatches, kDontCareTimeout);
EXPECT_FALSE(consumer->IsRunning());
EXPECT_EQ(number_of_messages_received, kNumberOfMessagesExpected);
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
}
TEST_F(ConsumerTest, LimitBatches_Timeout_Reached) {
// We do not send any messages, we expect an exeption to be thrown.
static constexpr auto kLimitBatches = 3;
auto info = CreateDefaultConsumerInfo();
auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction);
std::chrono::milliseconds timeout{3000};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer->StartWithLimit(kLimitBatches, timeout), ConsumerStartFailedException);
const auto end = std::chrono::steady_clock::now();
const auto elapsed = (end - start);
EXPECT_LE(timeout, elapsed);
EXPECT_LE(elapsed, timeout * 1.2);
}