Add Kafka consumer (#167)

* Add CMake config for Kafka integration

* Add Consumer

* Add simple unit test for consumer

* Add explicit offset handling and test for it
This commit is contained in:
János Benjamin Antal 2021-06-10 12:43:01 +02:00 committed by Antonio Andelic
parent 3c9a46f823
commit 36afc6c5f3
14 changed files with 1059 additions and 1 deletions

View File

@ -222,3 +222,22 @@ import_external_library(spdlog STATIC
BUILD_COMMAND $(MAKE) spdlog)
include(jemalloc.cmake)
# Setup librdkafka.
import_external_library(librdkafka STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include
CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON
-DRDKAFKA_BUILD_EXAMPLES=OFF
-DRDKAFKA_BUILD_TESTS=OFF
-DCMAKE_INSTALL_LIBDIR=lib
-DWITH_SSL=ON
# If we want SASL, we need to install it on build machines
-DWITH_SASL=OFF)
target_link_libraries(librdkafka INTERFACE ${OPENSSL_LIBRARIES})
import_library(librdkafka++ STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include
)
target_link_libraries(librdkafka++ INTERFACE librdkafka)

View File

@ -111,6 +111,7 @@ declare -A primary_urls=(
["jemalloc"]="http://$local_cache_host/git/jemalloc.git"
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/b3e5cb7f20dcc5c806e418df34324eca60d17d4e/single_include/nlohmann/json.hpp"
["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz"
["librdkafka"]="http://$local_cache_host/git/librdkafka.git"
)
# The goal of secondary urls is to have links to the "source of truth" of
@ -137,6 +138,7 @@ declare -A secondary_urls=(
["jemalloc"]="https://github.com/jemalloc/jemalloc.git"
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/b3e5cb7f20dcc5c806e418df34324eca60d17d4e/single_include/nlohmann/json.hpp"
["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz"
["librdkafka"]="https://github.com/edenhill/librdkafka.git"
)
# antlr
@ -238,3 +240,7 @@ pushd jemalloc
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
./autogen.sh --with-malloc-conf="percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
popd
# librdkafka
librdkafka_tag="v1.7.0" # (2021-05-06)
repo_clone_try_double "${primary_urls[librdkafka]}" "${secondary_urls[librdkafka]}" "librdkafka" "$librdkafka_tag"

View File

@ -10,6 +10,7 @@ add_subdirectory(telemetry)
add_subdirectory(communication)
add_subdirectory(storage/v2)
add_subdirectory(query)
add_subdirectory(integrations)
add_subdirectory(slk)
add_subdirectory(rpc)
if (MG_ENTERPRISE)

View File

@ -0,0 +1 @@
add_subdirectory(kafka)

View File

@ -0,0 +1,6 @@
set(integrations_kafka_src_files
consumer.cpp
)
add_library(mg-integrations-kafka STATIC ${integrations_kafka_src_files})
target_link_libraries(mg-integrations-kafka mg-utils librdkafka++ librdkafka Threads::Threads zlib)

View File

@ -0,0 +1,308 @@
#include "integrations/kafka/consumer.hpp"
#include <algorithm>
#include <iterator>
#include <memory>
#include <unordered_set>
#include "integrations/kafka/exceptions.hpp"
#include "utils/exceptions.hpp"
#include "utils/logging.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/thread.hpp"
namespace integrations::kafka {
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize = 1000;
constexpr int64_t kDefaultTestBatchLimit = 1;
Message::Message(std::unique_ptr<RdKafka::Message> &&message) : message_{std::move(message)} {
// Because of these asserts, the message can be safely accessed in the member function functions, because it cannot
// be null and always points to a valid message (not to a wrapped error)
MG_ASSERT(message_.get() != nullptr, "Kafka message cannot be null!");
MG_ASSERT(message_->err() == 0 && message_->c_ptr() != nullptr, "Invalid kafka message!");
};
std::span<const char> Message::Key() const {
const auto *c_message = message_->c_ptr();
return {static_cast<const char *>(c_message->key), c_message->key_len};
}
std::string_view Message::TopicName() const {
const auto *c_message = message_->c_ptr();
return c_message->rkt == nullptr ? std::string_view{} : rd_kafka_topic_name(c_message->rkt);
}
std::span<const char> Message::Payload() const {
const auto *c_message = message_->c_ptr();
return {static_cast<const char *>(c_message->payload), c_message->len};
}
int64_t Message::Timestamp() const {
const auto *c_message = message_->c_ptr();
return rd_kafka_message_timestamp(c_message, nullptr);
}
Consumer::Consumer(ConsumerInfo info) : info_{std::move(info)} {
MG_ASSERT(info_.consumer_function, "Empty consumer function for Kafka consumer");
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
if (conf == nullptr) {
throw ConsumerFailedToInitializeException(info_.consumer_name, "Couldn't create Kafka configuration!");
}
std::string error;
if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("enable.partition.eof", "false", error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("enable.auto.offset.store", "false", error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("bootstrap.servers", info_.bootstrap_servers, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("group.id", info_.consumer_group, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
consumer_ = std::unique_ptr<RdKafka::KafkaConsumer, std::function<void(RdKafka::KafkaConsumer *)>>(
RdKafka::KafkaConsumer::create(conf.get(), error), [this](auto *consumer) {
this->StopConsuming();
consumer->close();
delete consumer;
});
if (consumer_ == nullptr) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
RdKafka::Metadata *raw_metadata = nullptr;
if (const auto err = consumer_->metadata(true, nullptr, &raw_metadata, 1000); err != RdKafka::ERR_NO_ERROR) {
delete raw_metadata;
throw ConsumerFailedToInitializeException(info_.consumer_name, RdKafka::err2str(err));
}
std::unique_ptr<RdKafka::Metadata> metadata(raw_metadata);
std::unordered_set<std::string> topic_names_from_metadata{};
std::transform(metadata->topics()->begin(), metadata->topics()->end(),
std::inserter(topic_names_from_metadata, topic_names_from_metadata.begin()),
[](const auto topic_metadata) { return topic_metadata->topic(); });
for (const auto &topic_name : info_.topics) {
if (!topic_names_from_metadata.contains(topic_name)) {
throw TopicNotFoundException(info_.consumer_name, topic_name);
}
}
if (const auto err = consumer_->subscribe(info_.topics); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerFailedToInitializeException(info_.consumer_name, RdKafka::err2str(err));
}
}
void Consumer::Start(std::optional<int64_t> limit_batches) {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
StartConsuming(limit_batches);
}
void Consumer::StartIfStopped() {
if (!is_running_) {
StartConsuming(std::nullopt);
}
}
void Consumer::Stop() {
if (!is_running_) {
throw ConsumerStoppedException(info_.consumer_name);
}
StopConsuming();
}
void Consumer::StopIfRunning() {
if (is_running_) {
StopConsuming();
}
}
void Consumer::Test(std::optional<int64_t> limit_batches, const ConsumerFunction &test_consumer_function) {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit);
is_running_.store(true);
std::vector<std::unique_ptr<RdKafka::TopicPartition>> partitions;
{
// Save the current offsets in order to restore them in cleanup
std::vector<RdKafka::TopicPartition *> tmp_partitions;
if (const auto err = consumer_->assignment(tmp_partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerTestFailedException(info_.consumer_name, RdKafka::err2str(err));
}
if (const auto err = consumer_->position(tmp_partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerTestFailedException(info_.consumer_name, RdKafka::err2str(err));
}
partitions.reserve(tmp_partitions.size());
std::transform(
tmp_partitions.begin(), tmp_partitions.end(), std::back_inserter(partitions),
[](RdKafka::TopicPartition *const partition) { return std::unique_ptr<RdKafka::TopicPartition>{partition}; });
}
utils::OnScopeExit cleanup([this, &partitions]() {
is_running_.store(false);
std::vector<RdKafka::TopicPartition *> tmp_partitions;
tmp_partitions.reserve(partitions.size());
std::transform(partitions.begin(), partitions.end(), std::back_inserter(tmp_partitions),
[](const auto &partition) { return partition.get(); });
if (const auto err = consumer_->assign(tmp_partitions); err != RdKafka::ERR_NO_ERROR) {
spdlog::error("Couldn't restore previous offsets after testing Kafka consumer {}!", info_.consumer_name);
throw ConsumerTestFailedException(info_.consumer_name, RdKafka::err2str(err));
}
});
for (int64_t i = 0; i < num_of_batches;) {
auto maybe_batch = GetBatch();
if (maybe_batch.HasError()) {
throw ConsumerTestFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
++i;
try {
test_consumer_function(batch);
} catch (const std::exception &e) {
spdlog::warn("Kafka consumer {} test failed with error {}", info_.consumer_name, e.what());
throw ConsumerTestFailedException(info_.consumer_name, e.what());
}
}
}
bool Consumer::IsRunning() const { return is_running_; }
void Consumer::event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::Type::EVENT_ERROR:
spdlog::warn("Kafka consumer {} received an error: {}", info_.consumer_name, RdKafka::err2str(event.err()));
break;
case RdKafka::Event::Type::EVENT_STATS:
case RdKafka::Event::Type::EVENT_LOG:
case RdKafka::Event::Type::EVENT_THROTTLE:
break;
}
}
void Consumer::StartConsuming(std::optional<int64_t> limit_batches) {
MG_ASSERT(!is_running_, "Cannot start already running consumer!");
if (thread_.joinable()) {
// This can happen if the thread just finished its last batch, already set is_running_ to false and currently
// shutting down.
thread_.join();
};
is_running_.store(true);
thread_ = std::thread([this, limit_batches]() {
constexpr auto kMaxThreadNameSize = utils::GetMaxThreadNameSize();
const auto full_thread_name = "Cons#" + info_.consumer_name;
utils::ThreadSetName(full_thread_name.substr(0, kMaxThreadNameSize));
int64_t batch_count = 0;
while (is_running_) {
auto maybe_batch = this->GetBatch();
if (maybe_batch.HasError()) {
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
maybe_batch.GetError());
is_running_.store(false);
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) continue;
spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name);
// TODO (mferencevic): Figure out what to do with all other exceptions.
try {
info_.consumer_function(batch);
consumer_->commitSync();
} catch (const utils::BasicException &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
}
if (limit_batches != std::nullopt && limit_batches <= ++batch_count) {
is_running_.store(false);
break;
}
}
});
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) thread_.join();
}
utils::BasicResult<std::string, std::vector<Message>> Consumer::GetBatch() {
std::vector<Message> batch{};
int64_t batch_size = info_.batch_size.value_or(kDefaultBatchSize);
batch.reserve(batch_size);
auto remaining_timeout_in_ms = info_.batch_interval.value_or(kDefaultBatchInterval).count();
auto start = std::chrono::steady_clock::now();
bool run_batch = true;
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size; ++i) {
std::unique_ptr<RdKafka::Message> msg(consumer_->consume(remaining_timeout_in_ms));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
run_batch = false;
break;
case RdKafka::ERR_NO_ERROR:
batch.emplace_back(std::move(msg));
break;
default:
auto error = msg->errstr();
spdlog::warn("Unexpected error while consuming message in consumer {}, error: {}!", info_.consumer_name,
msg->errstr());
return {std::move(error)};
}
if (!run_batch) {
break;
}
auto now = std::chrono::steady_clock::now();
auto took = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
return {std::move(batch)};
}
} // namespace integrations::kafka

View File

@ -0,0 +1,149 @@
#pragma once
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <span>
#include <thread>
#include <utility>
#include <vector>
#include <librdkafka/rdkafka.h>
#include <librdkafka/rdkafkacpp.h>
#include "utils/result.hpp"
namespace integrations::kafka {
/// Wraps the message returned from librdkafka.
///
/// The interface of RdKafka::Message is far from ideal, so this class provides a modern C++ wrapper to it. Some of the
/// problems of RdKafka::Message:
/// - First and foremost, RdKafka::Message might wrap a received message, or an error if something goes wrong during
/// polling. That means some of the getters cannot be called or return rubbish data when it contains an error. Message
/// ensures that the wrapped RdKafka::Message contains a valid message, and not some error.
/// - The topic_name is returned as a string, but the key is returned as a pointer to a string, because it is cached.
/// To unify them Message returns them as string_view without copying them, using the underlying C API.
/// - The payload is returned as void*, so it is better to cast it to char* as soon as possible. Returning the payload
/// as std::span also provides a more idiomatic way to communicate a byte array than returning a raw pointer and a
/// size.
class Message final {
public:
explicit Message(std::unique_ptr<RdKafka::Message> &&message);
Message(Message &&) = default;
Message &operator=(Message &&) = default;
~Message() = default;
Message(const Message &) = delete;
Message &operator=(const Message &) = delete;
/// Returns the key of the message, might be empty.
std::span<const char> Key() const;
/// Returns the name of the topic, might be empty.
std::string_view TopicName() const;
/// Returns the payload.
std::span<const char> Payload() const;
/// Returns the timestamp of the message.
///
/// The timestamp is the number of milliseconds since the epoch (UTC), or 0 if not available.
///
/// The timestamp might have different semantics based on the configuration of the Kafka cluster. It can be the time
/// of message creation or appendage to the log. Currently the Kafka integration doesn't support connections to
/// multiple clusters, the semantics can be figured out from the configuration of the cluster, so the transformations
/// can be implemented knowing that.
int64_t Timestamp() const;
private:
std::unique_ptr<RdKafka::Message> message_;
};
using ConsumerFunction = std::function<void(const std::vector<Message> &)>;
/// ConsumerInfo holds all the information necessary to create a Consumer.
struct ConsumerInfo {
ConsumerFunction consumer_function;
std::string consumer_name;
std::string bootstrap_servers;
std::vector<std::string> topics;
std::string consumer_group;
std::optional<std::chrono::milliseconds> batch_interval;
std::optional<int64_t> batch_size;
};
/// Memgraphs Kafka consumer wrapper.
///
/// Consumer wraps around librdkafka Consumer so it's easier to use it.
/// It extends RdKafka::EventCb in order to listen to error events.
class Consumer final : public RdKafka::EventCb {
public:
/// Creates a new consumer with the given parameters.
///
/// @throws ConsumerFailedToInitializeException if the consumer can't connect
/// to the Kafka endpoint.
explicit Consumer(ConsumerInfo info);
~Consumer() override = default;
Consumer(const Consumer &other) = delete;
Consumer(Consumer &&other) noexcept = delete;
Consumer &operator=(const Consumer &other) = delete;
Consumer &operator=(Consumer &&other) = delete;
/// Starts consuming messages.
///
/// This method will start a new thread which will poll all the topics for messages.
///
/// @param limit_batches if present, the consumer will only consume the given number of batches and stop afterwards.
///
/// @throws ConsumerRunningException if the consumer is already running
void Start(std::optional<int64_t> limit_batches);
/// Starts consuming messages if it is not started already.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
void StartIfStopped();
/// Stops consuming messages.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerStoppedException if the consumer is already stopped
void Stop();
/// Stops consuming messages if it is not stopped alread.
void StopIfRunning();
/// Performs a synchronous dry-run.
///
/// This function doesn't have any persistent effect on the consumer. The messages are fetched synchronously, so the
/// function returns only when the test run is done, unlike Start, which returns after starting a thread.
///
/// @param limit_batches the consumer will only test the given number of batches. If not present, a default value is
/// used.
/// @param test_consumer_function a function to feed the received messages in, only used during this dry-run.
///
/// @throws ConsumerRunningException if the consumer is alredy running.
void Test(std::optional<int64_t> limit_batches, const ConsumerFunction &test_consumer_function);
/// Returns true if the consumer is actively consuming messages.
bool IsRunning() const;
private:
void event_cb(RdKafka::Event &event) override;
void StartConsuming(std::optional<int64_t> limit_batches);
void StopConsuming();
utils::BasicResult<std::string, std::vector<Message>> GetBatch();
// TODO(antaljanosbenjamin) Maybe split this to store only the necessary information
ConsumerInfo info_;
mutable std::atomic<bool> is_running_{false};
std::optional<int64_t> limit_batches_{std::nullopt};
std::thread thread_;
std::unique_ptr<RdKafka::KafkaConsumer, std::function<void(RdKafka::KafkaConsumer *)>> consumer_;
};
} // namespace integrations::kafka

View File

@ -0,0 +1,39 @@
#pragma once
#include <string>
#include "utils/exceptions.hpp"
class KafkaStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
class ConsumerFailedToInitializeException : public KafkaStreamException {
public:
ConsumerFailedToInitializeException(const std::string &consumer_name, const std::string &error)
: KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {}
};
class ConsumerRunningException : public KafkaStreamException {
public:
explicit ConsumerRunningException(const std::string &consumer_name)
: KafkaStreamException("Kafka consumer {} is already running", consumer_name) {}
};
class ConsumerStoppedException : public KafkaStreamException {
public:
explicit ConsumerStoppedException(const std::string &consumer_name)
: KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {}
};
class ConsumerTestFailedException : public KafkaStreamException {
public:
explicit ConsumerTestFailedException(const std::string &consumer_name, const std::string &error)
: KafkaStreamException("Kafka consumer {} test failed: {}", consumer_name, error) {}
};
class TopicNotFoundException : public KafkaStreamException {
public:
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name)
: KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {}
};

View File

@ -7,7 +7,8 @@
namespace utils {
void ThreadSetName(const std::string &name) {
MG_ASSERT(name.size() <= 16, "Thread name '{}' is too long", name);
constexpr auto max_name_length = GetMaxThreadNameSize();
MG_ASSERT(name.size() <= max_name_length, "Thread name '{}' is too long", max_name_length);
if (prctl(PR_SET_NAME, name.c_str()) != 0) {
spdlog::warn("Couldn't set thread name: {}!", name);

View File

@ -5,6 +5,8 @@
namespace utils {
constexpr size_t GetMaxThreadNameSize() { return 16; }
/// This function sets the thread name of the calling thread.
/// Beware, the name length limit is 16 characters!
void ThreadSetName(const std::string &name);

View File

@ -101,6 +101,7 @@ target_link_libraries(${test_prefix}query_trigger mg-query)
add_unit_test(query_serialization_property_value.cpp)
target_link_libraries(${test_prefix}query_serialization_property_value mg-query)
# Test query/procedure
add_unit_test(query_procedure_mgp_type.cpp)
target_link_libraries(${test_prefix}query_procedure_mgp_type mg-query)
@ -326,3 +327,13 @@ add_custom_command(
add_custom_target(test_lcp ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_lcp)
add_test(test_lcp ${CMAKE_CURRENT_BINARY_DIR}/test_lcp)
add_dependencies(memgraph__unit test_lcp)
# Test integrations-kafka
add_library(kafka-mock STATIC kafka_mock.cpp)
target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threads zlib gtest)
# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests
add_unit_test(integrations_kafka_consumer.cpp kafka_mock.cpp)
target_link_libraries(${test_prefix}integrations_kafka_consumer kafka-mock mg-integrations-kafka)

View File

@ -0,0 +1,364 @@
#include <chrono>
#include <optional>
#include <string>
#include <string_view>
#include <thread>
#include <fmt/core.h>
#include <spdlog/common.h>
#include <spdlog/spdlog.h>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "integrations/kafka/consumer.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "kafka_mock.hpp"
#include "utils/timer.hpp"
using namespace integrations::kafka;
namespace {
int SpanToInt(std::span<const char> span) {
int result{0};
if (span.size() != sizeof(int)) {
std::runtime_error("Invalid span size");
}
std::memcpy(&result, span.data(), sizeof(int));
return result;
}
} // namespace
struct ConsumerTest : public ::testing::Test {
ConsumerTest() {}
ConsumerInfo CreateDefaultConsumerInfo() const {
const auto test_name = std::string{::testing::UnitTest::GetInstance()->current_test_info()->name()};
return ConsumerInfo{
.consumer_function = [](const std::vector<Message> &) {},
.consumer_name = "Consumer" + test_name,
.bootstrap_servers = cluster.Bootstraps(),
.topics = {kTopicName},
.consumer_group = "ConsumerGroup " + test_name,
.batch_interval = std::nullopt,
.batch_size = std::nullopt,
};
};
std::unique_ptr<Consumer> CreateConsumer(ConsumerInfo &&info) {
auto custom_consumer_function = std::move(info.consumer_function);
auto last_received_message = std::make_shared<std::atomic<int>>(0);
info.consumer_function = [weak_last_received_message = std::weak_ptr{last_received_message},
custom_consumer_function =
std::move(custom_consumer_function)](const std::vector<Message> &messages) {
auto last_received_message = weak_last_received_message.lock();
if (last_received_message != nullptr) {
*last_received_message = SpanToInt(messages.back().Payload());
} else {
custom_consumer_function(messages);
}
};
auto consumer = std::make_unique<Consumer>(std::move(info));
int sent_messages{1};
SeedTopicWithInt(kTopicName, sent_messages);
consumer->Start(std::nullopt);
if (!consumer->IsRunning()) {
return nullptr;
}
// Send messages to the topic until the consumer starts to receive them. In the first few seconds the consumer
// doesn't get messages because there is no leader in the consumer group. If consumer group leader election timeout
// could be lowered (didn't find anything in librdkafka docs), then this mechanism will become unnecessary.
while (last_received_message->load() == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
SeedTopicWithInt(kTopicName, ++sent_messages);
}
while (last_received_message->load() != sent_messages) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
};
consumer->Stop();
std::this_thread::sleep_for(std::chrono::seconds(4));
return consumer;
}
void SeedTopicWithInt(const std::string &topic_name, int value) {
std::array<char, sizeof(int)> int_as_char{};
std::memcpy(int_as_char.data(), &value, int_as_char.size());
cluster.SeedTopic(topic_name, int_as_char);
}
static const std::string kTopicName;
KafkaClusterMock cluster{{kTopicName}};
};
const std::string ConsumerTest::kTopicName{"FirstTopic"};
TEST_F(ConsumerTest, BatchInterval) {
// There might be ~300ms delay in message delivery with librdkafka mock, thus the batch interval cannot be too small.
constexpr auto kBatchInterval = std::chrono::milliseconds{500};
constexpr std::string_view kMessage = "BatchIntervalTestMessage";
auto info = CreateDefaultConsumerInfo();
std::vector<std::pair<size_t, std::chrono::steady_clock::time_point>> received_timestamps{};
info.batch_interval = kBatchInterval;
auto expected_messages_received = true;
info.consumer_function = [&](const std::vector<Message> &messages) mutable {
received_timestamps.push_back({messages.size(), std::chrono::steady_clock::now()});
for (const auto &message : messages) {
expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size()));
}
};
auto consumer = CreateConsumer(std::move(info));
consumer->Start(std::nullopt);
ASSERT_TRUE(consumer->IsRunning());
constexpr auto kMessageCount = 7;
for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) {
cluster.SeedTopic(kTopicName, kMessage);
std::this_thread::sleep_for(kBatchInterval * 0.5);
}
consumer->Stop();
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
auto check_received_timestamp = [&received_timestamps, kBatchInterval](size_t index) {
SCOPED_TRACE("Checking index " + std::to_string(index));
EXPECT_GE(index, 0) << "Cannot check first timestamp!";
const auto message_count = received_timestamps[index].first;
EXPECT_LE(1, message_count);
auto actual_diff = std::chrono::duration_cast<std::chrono::milliseconds>(received_timestamps[index].second -
received_timestamps[index - 1].second);
constexpr auto kMinDiff = kBatchInterval * 0.9;
constexpr auto kMaxDiff = kBatchInterval * 1.1;
EXPECT_LE(kMinDiff.count(), actual_diff.count());
EXPECT_GE(kMaxDiff.count(), actual_diff.count());
};
ASSERT_FALSE(received_timestamps.empty());
EXPECT_TRUE(1 <= received_timestamps[0].first && received_timestamps[0].first <= 2);
EXPECT_LE(3, received_timestamps.size());
for (auto i = 1; i < received_timestamps.size(); ++i) {
check_received_timestamp(i);
}
}
TEST_F(ConsumerTest, StartStop) {
Consumer consumer{CreateDefaultConsumerInfo()};
auto start = [&consumer](const bool use_conditional) {
if (use_conditional) {
consumer.StartIfStopped();
} else {
consumer.Start(std::nullopt);
}
};
auto stop = [&consumer](const bool use_conditional) {
if (use_conditional) {
consumer.StopIfRunning();
} else {
consumer.Stop();
}
};
auto check_config = [&start, &stop, &consumer](const bool use_conditional_start,
const bool use_conditional_stop) mutable {
SCOPED_TRACE(
fmt::format("Conditional start {} and conditional stop {}", use_conditional_start, use_conditional_stop));
EXPECT_FALSE(consumer.IsRunning());
EXPECT_THROW(consumer.Stop(), ConsumerStoppedException);
consumer.StopIfRunning();
EXPECT_FALSE(consumer.IsRunning());
start(use_conditional_start);
EXPECT_TRUE(consumer.IsRunning());
EXPECT_THROW(consumer.Start(std::nullopt), ConsumerRunningException);
consumer.StartIfStopped();
EXPECT_TRUE(consumer.IsRunning());
stop(use_conditional_stop);
EXPECT_FALSE(consumer.IsRunning());
};
constexpr auto kSimpleStart = false;
constexpr auto kSimpleStop = false;
constexpr auto kConditionalStart = true;
constexpr auto kConditionalStop = true;
check_config(kSimpleStart, kSimpleStop);
check_config(kSimpleStart, kConditionalStop);
check_config(kConditionalStart, kSimpleStop);
check_config(kConditionalStart, kConditionalStop);
}
TEST_F(ConsumerTest, BatchSize) {
// Increase default batch interval to give more time for messages to receive
constexpr auto kBatchInterval = std::chrono::milliseconds{1000};
constexpr auto kBatchSize = 3;
auto info = CreateDefaultConsumerInfo();
std::vector<std::pair<size_t, std::chrono::steady_clock::time_point>> received_timestamps{};
info.batch_interval = kBatchInterval;
info.batch_size = kBatchSize;
constexpr std::string_view kMessage = "BatchSizeTestMessage";
auto expected_messages_received = true;
info.consumer_function = [&](const std::vector<Message> &messages) mutable {
received_timestamps.push_back({messages.size(), std::chrono::steady_clock::now()});
for (const auto &message : messages) {
expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size()));
}
};
auto consumer = CreateConsumer(std::move(info));
consumer->Start(std::nullopt);
ASSERT_TRUE(consumer->IsRunning());
constexpr auto kLastBatchMessageCount = 1;
constexpr auto kMessageCount = 3 * kBatchSize + kLastBatchMessageCount;
for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) {
cluster.SeedTopic(kTopicName, kMessage);
}
std::this_thread::sleep_for(kBatchInterval * 2);
consumer->Stop();
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
auto check_received_timestamp = [&received_timestamps, kBatchInterval](size_t index, size_t expected_message_count) {
SCOPED_TRACE("Checking index " + std::to_string(index));
EXPECT_GE(index, 0) << "Cannot check first timestamp!";
const auto message_count = received_timestamps[index].first;
EXPECT_EQ(expected_message_count, message_count);
auto actual_diff = std::chrono::duration_cast<std::chrono::milliseconds>(received_timestamps[index].second -
received_timestamps[index - 1].second);
if (expected_message_count == kBatchSize) {
EXPECT_LE(actual_diff, kBatchInterval * 0.5);
} else {
constexpr auto kMinDiff = kBatchInterval * 0.9;
constexpr auto kMaxDiff = kBatchInterval * 1.1;
EXPECT_LE(kMinDiff.count(), actual_diff.count());
EXPECT_GE(kMaxDiff.count(), actual_diff.count());
}
};
ASSERT_FALSE(received_timestamps.empty());
EXPECT_EQ(kBatchSize, received_timestamps[0].first);
constexpr auto kExpectedBatchCount = kMessageCount / kBatchSize + 1;
EXPECT_EQ(kExpectedBatchCount, received_timestamps.size());
for (auto i = 1; i < received_timestamps.size() - 1; ++i) {
check_received_timestamp(i, kBatchSize);
}
check_received_timestamp(received_timestamps.size() - 1, kLastBatchMessageCount);
}
TEST_F(ConsumerTest, InvalidBootstrapServers) {
auto info = CreateDefaultConsumerInfo();
info.bootstrap_servers = "non.existing.host:9092";
EXPECT_THROW(Consumer(std::move(info)), ConsumerFailedToInitializeException);
}
TEST_F(ConsumerTest, InvalidTopic) {
auto info = CreateDefaultConsumerInfo();
info.topics = {"Non existing topic"};
EXPECT_THROW(Consumer(std::move(info)), TopicNotFoundException);
}
TEST_F(ConsumerTest, StartsFromPreviousOffset) {
constexpr auto kBatchSize = 1;
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
std::atomic<int> received_message_count{0};
const std::string kMessagePrefix{"Message"};
auto expected_messages_received = true;
info.consumer_function = [&](const std::vector<Message> &messages) mutable {
auto message_count = received_message_count.load();
for (const auto &message : messages) {
std::string message_payload = kMessagePrefix + std::to_string(message_count++);
expected_messages_received &=
(message_payload == std::string_view(message.Payload().data(), message.Payload().size()));
}
received_message_count = message_count;
};
// This test depends on CreateConsumer starts and stops the consumer, so the offset is stored
auto consumer = CreateConsumer(std::move(info));
ASSERT_FALSE(consumer->IsRunning());
constexpr auto kMessageCount = 4;
for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) {
cluster.SeedTopic(kTopicName, std::string_view{kMessagePrefix + std::to_string(sent_messages)});
}
auto do_batches = [&](int64_t batch_count) {
SCOPED_TRACE(fmt::format("Already received messages: {}", received_message_count.load()));
consumer->Start(batch_count);
const auto start = std::chrono::steady_clock::now();
ASSERT_TRUE(consumer->IsRunning());
constexpr auto kMaxWaitTime = std::chrono::seconds(5);
while (consumer->IsRunning() && (std::chrono::steady_clock::now() - start) < kMaxWaitTime) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
// it is stopped because of limited batches
ASSERT_FALSE(consumer->IsRunning());
};
ASSERT_NO_FATAL_FAILURE(do_batches(kMessageCount / 2));
ASSERT_NO_FATAL_FAILURE(do_batches(kMessageCount / 2));
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_EQ(received_message_count, kMessageCount);
}
TEST_F(ConsumerTest, TestMethodWorks) {
constexpr auto kBatchSize = 1;
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
const std::string kMessagePrefix{"Message"};
info.consumer_function = [](const std::vector<Message> &messages) mutable {};
// This test depends on CreateConsumer starts and stops the consumer, so the offset is stored
auto consumer = CreateConsumer(std::move(info));
constexpr auto kMessageCount = 4;
for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) {
cluster.SeedTopic(kTopicName, std::string_view{kMessagePrefix + std::to_string(sent_messages)});
}
// The test shouldn't commit the offsets, so it is possible to consume the same messages multiple times.
auto check_test_method = [&]() {
std::atomic<int> received_message_count{0};
auto expected_messages_received = true;
ASSERT_FALSE(consumer->IsRunning());
consumer->Test(kMessageCount, [&](const std::vector<Message> &messages) mutable {
auto message_count = received_message_count.load();
for (const auto &message : messages) {
std::string message_payload = kMessagePrefix + std::to_string(message_count++);
expected_messages_received &=
(message_payload == std::string_view(message.Payload().data(), message.Payload().size()));
}
received_message_count = message_count;
});
ASSERT_FALSE(consumer->IsRunning());
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_EQ(received_message_count, kMessageCount);
};
{
SCOPED_TRACE("First run");
EXPECT_NO_FATAL_FAILURE(check_test_method());
}
{
SCOPED_TRACE("Second run");
EXPECT_NO_FATAL_FAILURE(check_test_method());
}
}

114
tests/unit/kafka_mock.cpp Normal file
View File

@ -0,0 +1,114 @@
#include "kafka_mock.hpp"
#include <chrono>
#include <thread>
namespace details {
void RdKafkaDeleter::operator()(rd_kafka_t *rd) {
if (rd != nullptr) {
rd_kafka_destroy(rd);
}
}
void RdKafkaMockClusterDeleter::operator()(rd_kafka_mock_cluster_t *rd) {
if (rd != nullptr) {
rd_kafka_mock_cluster_destroy(rd);
}
}
} // namespace details
namespace {
void TestDeliveryReportCallback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->_private != nullptr) {
int *remains = static_cast<int *>(rkmessage->_private);
(*remains)--;
}
}
} // namespace
KafkaClusterMock::KafkaClusterMock(const std::vector<std::string> &topics) {
char errstr[256];
auto *conf = rd_kafka_conf_new();
if (conf == nullptr) {
throw std::runtime_error("Couldn't create conf for Kafka mock");
}
if (rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
throw std::runtime_error(std::string("Failed to set client.id: ") + errstr);
};
rk_.reset(rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)));
if (rk_ == nullptr) {
throw std::runtime_error("Couldn't create producer for Kafka mock");
}
if (rk_ == nullptr) {
throw std::runtime_error(std::string("Failed to create mock cluster rd_kafka_t: ") + errstr);
}
constexpr auto broker_count = 1;
cluster_.reset(rd_kafka_mock_cluster_new(rk_.get(), broker_count));
if (cluster_ == nullptr) {
throw std::runtime_error("Couldn't create cluster for Kafka mock");
}
for (const auto &topic : topics) {
constexpr auto partition_count = 1;
constexpr auto replication_factor = 1;
rd_kafka_resp_err_t topic_err =
rd_kafka_mock_topic_create(cluster_.get(), topic.c_str(), partition_count, replication_factor);
if (RD_KAFKA_RESP_ERR_NO_ERROR != topic_err) {
throw std::runtime_error("Failed to create the mock topic (" + topic + "): " + rd_kafka_err2str(topic_err));
}
}
};
std::string KafkaClusterMock::Bootstraps() const { return rd_kafka_mock_cluster_bootstraps(cluster_.get()); };
void KafkaClusterMock::SeedTopic(const std::string &topic_name, std::string_view message) {
SeedTopic(topic_name, std::span{message.data(), message.size()});
}
void KafkaClusterMock::SeedTopic(const std::string &topic_name, std::span<const char> message) {
char errstr[256] = {'\0'};
std::string bootstraps_servers = Bootstraps();
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (conf == nullptr) {
throw std::runtime_error("Failed to create configuration for Kafka Mock producer to seed the topic " + topic_name);
}
rd_kafka_conf_set_dr_msg_cb(conf, TestDeliveryReportCallback);
if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.c_str(), errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
throw std::runtime_error("Failed to configure 'bootstrap.servers' to seed the topic " + topic_name +
"error: " + errstr);
}
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (nullptr == rk) {
throw std::runtime_error("Failed to create RdKafka producer to seed the topic " + topic_name + "error: " + errstr);
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name.c_str(), topic_conf);
if (nullptr == rkt) {
throw std::runtime_error("Failed to create RdKafka topic " + topic_name);
}
int remains = 1;
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
static_cast<void *>(const_cast<char *>(message.data())), message.size(), nullptr, 0,
&remains) == -1) {
throw std::runtime_error("Failed to produce a message on " + topic_name + " to seed it");
}
while (remains > 0 && rd_kafka_outq_len(rk) > 0) {
rd_kafka_poll(rk, 1000);
}
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
if (remains != 0) {
throw std::runtime_error("Failed to delivered a message on " + topic_name + " to seed it");
}
}

37
tests/unit/kafka_mock.hpp Normal file
View File

@ -0,0 +1,37 @@
#pragma once
#include <memory>
#include <span>
#include <string>
#include <string_view>
#include <vector>
#include <librdkafka/rdkafka.h>
#include <librdkafka/rdkafka_mock.h>
#include <librdkafka/rdkafkacpp.h>
// Based on https://github.com/edenhill/librdkafka/issues/2693
namespace details {
struct RdKafkaDeleter {
void operator()(rd_kafka_t *rd);
};
struct RdKafkaMockClusterDeleter {
void operator()(rd_kafka_mock_cluster_t *rd);
};
} // namespace details
using RdKafkaUniquePtr = std::unique_ptr<rd_kafka_t, details::RdKafkaDeleter>;
using RdKafkaMockClusterUniquePtr = std::unique_ptr<rd_kafka_mock_cluster_t, details::RdKafkaMockClusterDeleter>;
class KafkaClusterMock {
public:
explicit KafkaClusterMock(const std::vector<std::string> &topics);
std::string Bootstraps() const;
void SeedTopic(const std::string &topic_name, std::span<const char> message);
void SeedTopic(const std::string &topic_name, std::string_view message);
private:
RdKafkaUniquePtr rk_{nullptr};
RdKafkaMockClusterUniquePtr cluster_{nullptr};
};