From 36afc6c5f3cf85997842af9266fad456cc5bd522 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Thu, 10 Jun 2021 12:43:01 +0200 Subject: [PATCH] Add Kafka consumer (#167) * Add CMake config for Kafka integration * Add Consumer * Add simple unit test for consumer * Add explicit offset handling and test for it --- libs/CMakeLists.txt | 19 ++ libs/setup.sh | 6 + src/CMakeLists.txt | 1 + src/integrations/CMakeLists.txt | 1 + src/integrations/kafka/CMakeLists.txt | 6 + src/integrations/kafka/consumer.cpp | 308 +++++++++++++++++ src/integrations/kafka/consumer.hpp | 149 +++++++++ src/integrations/kafka/exceptions.hpp | 39 +++ src/utils/thread.cpp | 3 +- src/utils/thread.hpp | 2 + tests/unit/CMakeLists.txt | 11 + tests/unit/integrations_kafka_consumer.cpp | 364 +++++++++++++++++++++ tests/unit/kafka_mock.cpp | 114 +++++++ tests/unit/kafka_mock.hpp | 37 +++ 14 files changed, 1059 insertions(+), 1 deletion(-) create mode 100644 src/integrations/CMakeLists.txt create mode 100644 src/integrations/kafka/CMakeLists.txt create mode 100644 src/integrations/kafka/consumer.cpp create mode 100644 src/integrations/kafka/consumer.hpp create mode 100644 src/integrations/kafka/exceptions.hpp create mode 100644 tests/unit/integrations_kafka_consumer.cpp create mode 100644 tests/unit/kafka_mock.cpp create mode 100644 tests/unit/kafka_mock.hpp diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 9f7941c78..f7655e024 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -222,3 +222,22 @@ import_external_library(spdlog STATIC BUILD_COMMAND $(MAKE) spdlog) include(jemalloc.cmake) + +# Setup librdkafka. +import_external_library(librdkafka STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include + CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON + -DRDKAFKA_BUILD_EXAMPLES=OFF + -DRDKAFKA_BUILD_TESTS=OFF + -DCMAKE_INSTALL_LIBDIR=lib + -DWITH_SSL=ON + # If we want SASL, we need to install it on build machines + -DWITH_SASL=OFF) +target_link_libraries(librdkafka INTERFACE ${OPENSSL_LIBRARIES}) + +import_library(librdkafka++ STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include +) +target_link_libraries(librdkafka++ INTERFACE librdkafka) diff --git a/libs/setup.sh b/libs/setup.sh index bab12fbfc..322b13900 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -111,6 +111,7 @@ declare -A primary_urls=( ["jemalloc"]="http://$local_cache_host/git/jemalloc.git" ["nlohmann"]="http://$local_cache_host/file/nlohmann/json/b3e5cb7f20dcc5c806e418df34324eca60d17d4e/single_include/nlohmann/json.hpp" ["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz" + ["librdkafka"]="http://$local_cache_host/git/librdkafka.git" ) # The goal of secondary urls is to have links to the "source of truth" of @@ -137,6 +138,7 @@ declare -A secondary_urls=( ["jemalloc"]="https://github.com/jemalloc/jemalloc.git" ["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/b3e5cb7f20dcc5c806e418df34324eca60d17d4e/single_include/nlohmann/json.hpp" ["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz" + ["librdkafka"]="https://github.com/edenhill/librdkafka.git" ) # antlr @@ -238,3 +240,7 @@ pushd jemalloc # https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation. ./autogen.sh --with-malloc-conf="percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" popd + +# librdkafka +librdkafka_tag="v1.7.0" # (2021-05-06) +repo_clone_try_double "${primary_urls[librdkafka]}" "${secondary_urls[librdkafka]}" "librdkafka" "$librdkafka_tag" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ddae1b4ad..f4092aa2d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,6 +10,7 @@ add_subdirectory(telemetry) add_subdirectory(communication) add_subdirectory(storage/v2) add_subdirectory(query) +add_subdirectory(integrations) add_subdirectory(slk) add_subdirectory(rpc) if (MG_ENTERPRISE) diff --git a/src/integrations/CMakeLists.txt b/src/integrations/CMakeLists.txt new file mode 100644 index 000000000..f7f7449ff --- /dev/null +++ b/src/integrations/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(kafka) diff --git a/src/integrations/kafka/CMakeLists.txt b/src/integrations/kafka/CMakeLists.txt new file mode 100644 index 000000000..7a1bd442f --- /dev/null +++ b/src/integrations/kafka/CMakeLists.txt @@ -0,0 +1,6 @@ +set(integrations_kafka_src_files + consumer.cpp +) + +add_library(mg-integrations-kafka STATIC ${integrations_kafka_src_files}) +target_link_libraries(mg-integrations-kafka mg-utils librdkafka++ librdkafka Threads::Threads zlib) diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp new file mode 100644 index 000000000..d63a71306 --- /dev/null +++ b/src/integrations/kafka/consumer.cpp @@ -0,0 +1,308 @@ +#include "integrations/kafka/consumer.hpp" + +#include +#include +#include +#include + +#include "integrations/kafka/exceptions.hpp" +#include "utils/exceptions.hpp" +#include "utils/logging.hpp" +#include "utils/on_scope_exit.hpp" +#include "utils/thread.hpp" + +namespace integrations::kafka { + +constexpr std::chrono::milliseconds kDefaultBatchInterval{100}; +constexpr int64_t kDefaultBatchSize = 1000; +constexpr int64_t kDefaultTestBatchLimit = 1; + +Message::Message(std::unique_ptr &&message) : message_{std::move(message)} { + // Because of these asserts, the message can be safely accessed in the member function functions, because it cannot + // be null and always points to a valid message (not to a wrapped error) + MG_ASSERT(message_.get() != nullptr, "Kafka message cannot be null!"); + MG_ASSERT(message_->err() == 0 && message_->c_ptr() != nullptr, "Invalid kafka message!"); +}; + +std::span Message::Key() const { + const auto *c_message = message_->c_ptr(); + return {static_cast(c_message->key), c_message->key_len}; +} + +std::string_view Message::TopicName() const { + const auto *c_message = message_->c_ptr(); + return c_message->rkt == nullptr ? std::string_view{} : rd_kafka_topic_name(c_message->rkt); +} + +std::span Message::Payload() const { + const auto *c_message = message_->c_ptr(); + return {static_cast(c_message->payload), c_message->len}; +} + +int64_t Message::Timestamp() const { + const auto *c_message = message_->c_ptr(); + return rd_kafka_message_timestamp(c_message, nullptr); +} + +Consumer::Consumer(ConsumerInfo info) : info_{std::move(info)} { + MG_ASSERT(info_.consumer_function, "Empty consumer function for Kafka consumer"); + std::unique_ptr conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + if (conf == nullptr) { + throw ConsumerFailedToInitializeException(info_.consumer_name, "Couldn't create Kafka configuration!"); + } + + std::string error; + + if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) { + throw ConsumerFailedToInitializeException(info_.consumer_name, error); + } + + if (conf->set("enable.partition.eof", "false", error) != RdKafka::Conf::CONF_OK) { + throw ConsumerFailedToInitializeException(info_.consumer_name, error); + } + + if (conf->set("enable.auto.offset.store", "false", error) != RdKafka::Conf::CONF_OK) { + throw ConsumerFailedToInitializeException(info_.consumer_name, error); + } + + if (conf->set("bootstrap.servers", info_.bootstrap_servers, error) != RdKafka::Conf::CONF_OK) { + throw ConsumerFailedToInitializeException(info_.consumer_name, error); + } + + if (conf->set("group.id", info_.consumer_group, error) != RdKafka::Conf::CONF_OK) { + throw ConsumerFailedToInitializeException(info_.consumer_name, error); + } + + consumer_ = std::unique_ptr>( + RdKafka::KafkaConsumer::create(conf.get(), error), [this](auto *consumer) { + this->StopConsuming(); + consumer->close(); + delete consumer; + }); + + if (consumer_ == nullptr) { + throw ConsumerFailedToInitializeException(info_.consumer_name, error); + } + + RdKafka::Metadata *raw_metadata = nullptr; + if (const auto err = consumer_->metadata(true, nullptr, &raw_metadata, 1000); err != RdKafka::ERR_NO_ERROR) { + delete raw_metadata; + throw ConsumerFailedToInitializeException(info_.consumer_name, RdKafka::err2str(err)); + } + std::unique_ptr metadata(raw_metadata); + + std::unordered_set topic_names_from_metadata{}; + std::transform(metadata->topics()->begin(), metadata->topics()->end(), + std::inserter(topic_names_from_metadata, topic_names_from_metadata.begin()), + [](const auto topic_metadata) { return topic_metadata->topic(); }); + + for (const auto &topic_name : info_.topics) { + if (!topic_names_from_metadata.contains(topic_name)) { + throw TopicNotFoundException(info_.consumer_name, topic_name); + } + } + + if (const auto err = consumer_->subscribe(info_.topics); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerFailedToInitializeException(info_.consumer_name, RdKafka::err2str(err)); + } +} + +void Consumer::Start(std::optional limit_batches) { + if (is_running_) { + throw ConsumerRunningException(info_.consumer_name); + } + + StartConsuming(limit_batches); +} + +void Consumer::StartIfStopped() { + if (!is_running_) { + StartConsuming(std::nullopt); + } +} + +void Consumer::Stop() { + if (!is_running_) { + throw ConsumerStoppedException(info_.consumer_name); + } + + StopConsuming(); +} + +void Consumer::StopIfRunning() { + if (is_running_) { + StopConsuming(); + } +} + +void Consumer::Test(std::optional limit_batches, const ConsumerFunction &test_consumer_function) { + if (is_running_) { + throw ConsumerRunningException(info_.consumer_name); + } + + int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit); + + is_running_.store(true); + + std::vector> partitions; + { + // Save the current offsets in order to restore them in cleanup + std::vector tmp_partitions; + if (const auto err = consumer_->assignment(tmp_partitions); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerTestFailedException(info_.consumer_name, RdKafka::err2str(err)); + } + if (const auto err = consumer_->position(tmp_partitions); err != RdKafka::ERR_NO_ERROR) { + throw ConsumerTestFailedException(info_.consumer_name, RdKafka::err2str(err)); + } + partitions.reserve(tmp_partitions.size()); + std::transform( + tmp_partitions.begin(), tmp_partitions.end(), std::back_inserter(partitions), + [](RdKafka::TopicPartition *const partition) { return std::unique_ptr{partition}; }); + } + + utils::OnScopeExit cleanup([this, &partitions]() { + is_running_.store(false); + + std::vector tmp_partitions; + tmp_partitions.reserve(partitions.size()); + std::transform(partitions.begin(), partitions.end(), std::back_inserter(tmp_partitions), + [](const auto &partition) { return partition.get(); }); + + if (const auto err = consumer_->assign(tmp_partitions); err != RdKafka::ERR_NO_ERROR) { + spdlog::error("Couldn't restore previous offsets after testing Kafka consumer {}!", info_.consumer_name); + throw ConsumerTestFailedException(info_.consumer_name, RdKafka::err2str(err)); + } + }); + + for (int64_t i = 0; i < num_of_batches;) { + auto maybe_batch = GetBatch(); + + if (maybe_batch.HasError()) { + throw ConsumerTestFailedException(info_.consumer_name, maybe_batch.GetError()); + } + + const auto &batch = maybe_batch.GetValue(); + + if (batch.empty()) { + continue; + } + ++i; + + try { + test_consumer_function(batch); + } catch (const std::exception &e) { + spdlog::warn("Kafka consumer {} test failed with error {}", info_.consumer_name, e.what()); + throw ConsumerTestFailedException(info_.consumer_name, e.what()); + } + } +} + +bool Consumer::IsRunning() const { return is_running_; } + +void Consumer::event_cb(RdKafka::Event &event) { + switch (event.type()) { + case RdKafka::Event::Type::EVENT_ERROR: + spdlog::warn("Kafka consumer {} received an error: {}", info_.consumer_name, RdKafka::err2str(event.err())); + break; + case RdKafka::Event::Type::EVENT_STATS: + case RdKafka::Event::Type::EVENT_LOG: + case RdKafka::Event::Type::EVENT_THROTTLE: + break; + } +} +void Consumer::StartConsuming(std::optional limit_batches) { + MG_ASSERT(!is_running_, "Cannot start already running consumer!"); + + if (thread_.joinable()) { + // This can happen if the thread just finished its last batch, already set is_running_ to false and currently + // shutting down. + thread_.join(); + }; + + is_running_.store(true); + + thread_ = std::thread([this, limit_batches]() { + constexpr auto kMaxThreadNameSize = utils::GetMaxThreadNameSize(); + const auto full_thread_name = "Cons#" + info_.consumer_name; + + utils::ThreadSetName(full_thread_name.substr(0, kMaxThreadNameSize)); + + int64_t batch_count = 0; + + while (is_running_) { + auto maybe_batch = this->GetBatch(); + if (maybe_batch.HasError()) { + spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name, + maybe_batch.GetError()); + is_running_.store(false); + } + const auto &batch = maybe_batch.GetValue(); + + if (batch.empty()) continue; + + spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name); + + // TODO (mferencevic): Figure out what to do with all other exceptions. + try { + info_.consumer_function(batch); + consumer_->commitSync(); + } catch (const utils::BasicException &e) { + spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what()); + break; + } + + if (limit_batches != std::nullopt && limit_batches <= ++batch_count) { + is_running_.store(false); + break; + } + } + }); +} + +void Consumer::StopConsuming() { + is_running_.store(false); + if (thread_.joinable()) thread_.join(); +} + +utils::BasicResult> Consumer::GetBatch() { + std::vector batch{}; + + int64_t batch_size = info_.batch_size.value_or(kDefaultBatchSize); + batch.reserve(batch_size); + + auto remaining_timeout_in_ms = info_.batch_interval.value_or(kDefaultBatchInterval).count(); + auto start = std::chrono::steady_clock::now(); + + bool run_batch = true; + for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size; ++i) { + std::unique_ptr msg(consumer_->consume(remaining_timeout_in_ms)); + switch (msg->err()) { + case RdKafka::ERR__TIMED_OUT: + run_batch = false; + break; + + case RdKafka::ERR_NO_ERROR: + batch.emplace_back(std::move(msg)); + break; + + default: + auto error = msg->errstr(); + spdlog::warn("Unexpected error while consuming message in consumer {}, error: {}!", info_.consumer_name, + msg->errstr()); + return {std::move(error)}; + } + + if (!run_batch) { + break; + } + + auto now = std::chrono::steady_clock::now(); + auto took = std::chrono::duration_cast(now - start); + remaining_timeout_in_ms = remaining_timeout_in_ms - took.count(); + start = now; + } + + return {std::move(batch)}; +} + +} // namespace integrations::kafka diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp new file mode 100644 index 000000000..3874725e4 --- /dev/null +++ b/src/integrations/kafka/consumer.hpp @@ -0,0 +1,149 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "utils/result.hpp" + +namespace integrations::kafka { + +/// Wraps the message returned from librdkafka. +/// +/// The interface of RdKafka::Message is far from ideal, so this class provides a modern C++ wrapper to it. Some of the +/// problems of RdKafka::Message: +/// - First and foremost, RdKafka::Message might wrap a received message, or an error if something goes wrong during +/// polling. That means some of the getters cannot be called or return rubbish data when it contains an error. Message +/// ensures that the wrapped RdKafka::Message contains a valid message, and not some error. +/// - The topic_name is returned as a string, but the key is returned as a pointer to a string, because it is cached. +/// To unify them Message returns them as string_view without copying them, using the underlying C API. +/// - The payload is returned as void*, so it is better to cast it to char* as soon as possible. Returning the payload +/// as std::span also provides a more idiomatic way to communicate a byte array than returning a raw pointer and a +/// size. +class Message final { + public: + explicit Message(std::unique_ptr &&message); + Message(Message &&) = default; + Message &operator=(Message &&) = default; + ~Message() = default; + + Message(const Message &) = delete; + Message &operator=(const Message &) = delete; + + /// Returns the key of the message, might be empty. + std::span Key() const; + + /// Returns the name of the topic, might be empty. + std::string_view TopicName() const; + + /// Returns the payload. + std::span Payload() const; + + /// Returns the timestamp of the message. + /// + /// The timestamp is the number of milliseconds since the epoch (UTC), or 0 if not available. + /// + /// The timestamp might have different semantics based on the configuration of the Kafka cluster. It can be the time + /// of message creation or appendage to the log. Currently the Kafka integration doesn't support connections to + /// multiple clusters, the semantics can be figured out from the configuration of the cluster, so the transformations + /// can be implemented knowing that. + int64_t Timestamp() const; + + private: + std::unique_ptr message_; +}; + +using ConsumerFunction = std::function &)>; + +/// ConsumerInfo holds all the information necessary to create a Consumer. +struct ConsumerInfo { + ConsumerFunction consumer_function; + std::string consumer_name; + std::string bootstrap_servers; + std::vector topics; + std::string consumer_group; + std::optional batch_interval; + std::optional batch_size; +}; + +/// Memgraphs Kafka consumer wrapper. +/// +/// Consumer wraps around librdkafka Consumer so it's easier to use it. +/// It extends RdKafka::EventCb in order to listen to error events. +class Consumer final : public RdKafka::EventCb { + public: + /// Creates a new consumer with the given parameters. + /// + /// @throws ConsumerFailedToInitializeException if the consumer can't connect + /// to the Kafka endpoint. + explicit Consumer(ConsumerInfo info); + ~Consumer() override = default; + + Consumer(const Consumer &other) = delete; + Consumer(Consumer &&other) noexcept = delete; + Consumer &operator=(const Consumer &other) = delete; + Consumer &operator=(Consumer &&other) = delete; + + /// Starts consuming messages. + /// + /// This method will start a new thread which will poll all the topics for messages. + /// + /// @param limit_batches if present, the consumer will only consume the given number of batches and stop afterwards. + /// + /// @throws ConsumerRunningException if the consumer is already running + void Start(std::optional limit_batches); + + /// Starts consuming messages if it is not started already. + /// + /// @throws ConsumerNotAvailableException if the consumer isn't initialized + void StartIfStopped(); + + /// Stops consuming messages. + /// + /// @throws ConsumerNotAvailableException if the consumer isn't initialized + /// @throws ConsumerStoppedException if the consumer is already stopped + void Stop(); + + /// Stops consuming messages if it is not stopped alread. + void StopIfRunning(); + + /// Performs a synchronous dry-run. + /// + /// This function doesn't have any persistent effect on the consumer. The messages are fetched synchronously, so the + /// function returns only when the test run is done, unlike Start, which returns after starting a thread. + /// + /// @param limit_batches the consumer will only test the given number of batches. If not present, a default value is + /// used. + /// @param test_consumer_function a function to feed the received messages in, only used during this dry-run. + /// + /// @throws ConsumerRunningException if the consumer is alredy running. + void Test(std::optional limit_batches, const ConsumerFunction &test_consumer_function); + + /// Returns true if the consumer is actively consuming messages. + bool IsRunning() const; + + private: + void event_cb(RdKafka::Event &event) override; + + void StartConsuming(std::optional limit_batches); + + void StopConsuming(); + + utils::BasicResult> GetBatch(); + + // TODO(antaljanosbenjamin) Maybe split this to store only the necessary information + ConsumerInfo info_; + mutable std::atomic is_running_{false}; + std::optional limit_batches_{std::nullopt}; + std::thread thread_; + std::unique_ptr> consumer_; +}; +} // namespace integrations::kafka diff --git a/src/integrations/kafka/exceptions.hpp b/src/integrations/kafka/exceptions.hpp new file mode 100644 index 000000000..482d8ae2a --- /dev/null +++ b/src/integrations/kafka/exceptions.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include + +#include "utils/exceptions.hpp" + +class KafkaStreamException : public utils::BasicException { + using utils::BasicException::BasicException; +}; + +class ConsumerFailedToInitializeException : public KafkaStreamException { + public: + ConsumerFailedToInitializeException(const std::string &consumer_name, const std::string &error) + : KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {} +}; + +class ConsumerRunningException : public KafkaStreamException { + public: + explicit ConsumerRunningException(const std::string &consumer_name) + : KafkaStreamException("Kafka consumer {} is already running", consumer_name) {} +}; + +class ConsumerStoppedException : public KafkaStreamException { + public: + explicit ConsumerStoppedException(const std::string &consumer_name) + : KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {} +}; + +class ConsumerTestFailedException : public KafkaStreamException { + public: + explicit ConsumerTestFailedException(const std::string &consumer_name, const std::string &error) + : KafkaStreamException("Kafka consumer {} test failed: {}", consumer_name, error) {} +}; + +class TopicNotFoundException : public KafkaStreamException { + public: + TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name) + : KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {} +}; diff --git a/src/utils/thread.cpp b/src/utils/thread.cpp index 4b6d48a69..1eb08ff0f 100644 --- a/src/utils/thread.cpp +++ b/src/utils/thread.cpp @@ -7,7 +7,8 @@ namespace utils { void ThreadSetName(const std::string &name) { - MG_ASSERT(name.size() <= 16, "Thread name '{}' is too long", name); + constexpr auto max_name_length = GetMaxThreadNameSize(); + MG_ASSERT(name.size() <= max_name_length, "Thread name '{}' is too long", max_name_length); if (prctl(PR_SET_NAME, name.c_str()) != 0) { spdlog::warn("Couldn't set thread name: {}!", name); diff --git a/src/utils/thread.hpp b/src/utils/thread.hpp index 2e1e49ce7..15f89a046 100644 --- a/src/utils/thread.hpp +++ b/src/utils/thread.hpp @@ -5,6 +5,8 @@ namespace utils { +constexpr size_t GetMaxThreadNameSize() { return 16; } + /// This function sets the thread name of the calling thread. /// Beware, the name length limit is 16 characters! void ThreadSetName(const std::string &name); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 7a2c0e5e7..c4f944d9f 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -101,6 +101,7 @@ target_link_libraries(${test_prefix}query_trigger mg-query) add_unit_test(query_serialization_property_value.cpp) target_link_libraries(${test_prefix}query_serialization_property_value mg-query) + # Test query/procedure add_unit_test(query_procedure_mgp_type.cpp) target_link_libraries(${test_prefix}query_procedure_mgp_type mg-query) @@ -326,3 +327,13 @@ add_custom_command( add_custom_target(test_lcp ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_lcp) add_test(test_lcp ${CMAKE_CURRENT_BINARY_DIR}/test_lcp) add_dependencies(memgraph__unit test_lcp) + + +# Test integrations-kafka + +add_library(kafka-mock STATIC kafka_mock.cpp) +target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threads zlib gtest) +# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests + +add_unit_test(integrations_kafka_consumer.cpp kafka_mock.cpp) +target_link_libraries(${test_prefix}integrations_kafka_consumer kafka-mock mg-integrations-kafka) diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp new file mode 100644 index 000000000..260584885 --- /dev/null +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -0,0 +1,364 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "integrations/kafka/consumer.hpp" +#include "integrations/kafka/exceptions.hpp" +#include "kafka_mock.hpp" +#include "utils/timer.hpp" + +using namespace integrations::kafka; + +namespace { +int SpanToInt(std::span span) { + int result{0}; + if (span.size() != sizeof(int)) { + std::runtime_error("Invalid span size"); + } + std::memcpy(&result, span.data(), sizeof(int)); + return result; +} +} // namespace + +struct ConsumerTest : public ::testing::Test { + ConsumerTest() {} + + ConsumerInfo CreateDefaultConsumerInfo() const { + const auto test_name = std::string{::testing::UnitTest::GetInstance()->current_test_info()->name()}; + return ConsumerInfo{ + .consumer_function = [](const std::vector &) {}, + .consumer_name = "Consumer" + test_name, + .bootstrap_servers = cluster.Bootstraps(), + .topics = {kTopicName}, + .consumer_group = "ConsumerGroup " + test_name, + .batch_interval = std::nullopt, + .batch_size = std::nullopt, + }; + }; + + std::unique_ptr CreateConsumer(ConsumerInfo &&info) { + auto custom_consumer_function = std::move(info.consumer_function); + auto last_received_message = std::make_shared>(0); + info.consumer_function = [weak_last_received_message = std::weak_ptr{last_received_message}, + custom_consumer_function = + std::move(custom_consumer_function)](const std::vector &messages) { + auto last_received_message = weak_last_received_message.lock(); + if (last_received_message != nullptr) { + *last_received_message = SpanToInt(messages.back().Payload()); + } else { + custom_consumer_function(messages); + } + }; + + auto consumer = std::make_unique(std::move(info)); + int sent_messages{1}; + SeedTopicWithInt(kTopicName, sent_messages); + + consumer->Start(std::nullopt); + if (!consumer->IsRunning()) { + return nullptr; + } + + // Send messages to the topic until the consumer starts to receive them. In the first few seconds the consumer + // doesn't get messages because there is no leader in the consumer group. If consumer group leader election timeout + // could be lowered (didn't find anything in librdkafka docs), then this mechanism will become unnecessary. + while (last_received_message->load() == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + SeedTopicWithInt(kTopicName, ++sent_messages); + } + + while (last_received_message->load() != sent_messages) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + }; + + consumer->Stop(); + std::this_thread::sleep_for(std::chrono::seconds(4)); + return consumer; + } + + void SeedTopicWithInt(const std::string &topic_name, int value) { + std::array int_as_char{}; + std::memcpy(int_as_char.data(), &value, int_as_char.size()); + + cluster.SeedTopic(topic_name, int_as_char); + } + + static const std::string kTopicName; + KafkaClusterMock cluster{{kTopicName}}; +}; + +const std::string ConsumerTest::kTopicName{"FirstTopic"}; + +TEST_F(ConsumerTest, BatchInterval) { + // There might be ~300ms delay in message delivery with librdkafka mock, thus the batch interval cannot be too small. + constexpr auto kBatchInterval = std::chrono::milliseconds{500}; + constexpr std::string_view kMessage = "BatchIntervalTestMessage"; + auto info = CreateDefaultConsumerInfo(); + std::vector> received_timestamps{}; + info.batch_interval = kBatchInterval; + auto expected_messages_received = true; + info.consumer_function = [&](const std::vector &messages) mutable { + received_timestamps.push_back({messages.size(), std::chrono::steady_clock::now()}); + for (const auto &message : messages) { + expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size())); + } + }; + + auto consumer = CreateConsumer(std::move(info)); + consumer->Start(std::nullopt); + ASSERT_TRUE(consumer->IsRunning()); + + constexpr auto kMessageCount = 7; + for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) { + cluster.SeedTopic(kTopicName, kMessage); + std::this_thread::sleep_for(kBatchInterval * 0.5); + } + + consumer->Stop(); + EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + + auto check_received_timestamp = [&received_timestamps, kBatchInterval](size_t index) { + SCOPED_TRACE("Checking index " + std::to_string(index)); + EXPECT_GE(index, 0) << "Cannot check first timestamp!"; + const auto message_count = received_timestamps[index].first; + EXPECT_LE(1, message_count); + + auto actual_diff = std::chrono::duration_cast(received_timestamps[index].second - + received_timestamps[index - 1].second); + constexpr auto kMinDiff = kBatchInterval * 0.9; + constexpr auto kMaxDiff = kBatchInterval * 1.1; + EXPECT_LE(kMinDiff.count(), actual_diff.count()); + EXPECT_GE(kMaxDiff.count(), actual_diff.count()); + }; + + ASSERT_FALSE(received_timestamps.empty()); + + EXPECT_TRUE(1 <= received_timestamps[0].first && received_timestamps[0].first <= 2); + + EXPECT_LE(3, received_timestamps.size()); + for (auto i = 1; i < received_timestamps.size(); ++i) { + check_received_timestamp(i); + } +} + +TEST_F(ConsumerTest, StartStop) { + Consumer consumer{CreateDefaultConsumerInfo()}; + + auto start = [&consumer](const bool use_conditional) { + if (use_conditional) { + consumer.StartIfStopped(); + } else { + consumer.Start(std::nullopt); + } + }; + + auto stop = [&consumer](const bool use_conditional) { + if (use_conditional) { + consumer.StopIfRunning(); + } else { + consumer.Stop(); + } + }; + + auto check_config = [&start, &stop, &consumer](const bool use_conditional_start, + const bool use_conditional_stop) mutable { + SCOPED_TRACE( + fmt::format("Conditional start {} and conditional stop {}", use_conditional_start, use_conditional_stop)); + EXPECT_FALSE(consumer.IsRunning()); + EXPECT_THROW(consumer.Stop(), ConsumerStoppedException); + consumer.StopIfRunning(); + EXPECT_FALSE(consumer.IsRunning()); + + start(use_conditional_start); + EXPECT_TRUE(consumer.IsRunning()); + EXPECT_THROW(consumer.Start(std::nullopt), ConsumerRunningException); + consumer.StartIfStopped(); + EXPECT_TRUE(consumer.IsRunning()); + + stop(use_conditional_stop); + EXPECT_FALSE(consumer.IsRunning()); + }; + + constexpr auto kSimpleStart = false; + constexpr auto kSimpleStop = false; + constexpr auto kConditionalStart = true; + constexpr auto kConditionalStop = true; + + check_config(kSimpleStart, kSimpleStop); + check_config(kSimpleStart, kConditionalStop); + check_config(kConditionalStart, kSimpleStop); + check_config(kConditionalStart, kConditionalStop); +} + +TEST_F(ConsumerTest, BatchSize) { + // Increase default batch interval to give more time for messages to receive + constexpr auto kBatchInterval = std::chrono::milliseconds{1000}; + constexpr auto kBatchSize = 3; + auto info = CreateDefaultConsumerInfo(); + std::vector> received_timestamps{}; + info.batch_interval = kBatchInterval; + info.batch_size = kBatchSize; + constexpr std::string_view kMessage = "BatchSizeTestMessage"; + auto expected_messages_received = true; + info.consumer_function = [&](const std::vector &messages) mutable { + received_timestamps.push_back({messages.size(), std::chrono::steady_clock::now()}); + for (const auto &message : messages) { + expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size())); + } + }; + + auto consumer = CreateConsumer(std::move(info)); + consumer->Start(std::nullopt); + ASSERT_TRUE(consumer->IsRunning()); + + constexpr auto kLastBatchMessageCount = 1; + constexpr auto kMessageCount = 3 * kBatchSize + kLastBatchMessageCount; + for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) { + cluster.SeedTopic(kTopicName, kMessage); + } + std::this_thread::sleep_for(kBatchInterval * 2); + consumer->Stop(); + EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + + auto check_received_timestamp = [&received_timestamps, kBatchInterval](size_t index, size_t expected_message_count) { + SCOPED_TRACE("Checking index " + std::to_string(index)); + EXPECT_GE(index, 0) << "Cannot check first timestamp!"; + const auto message_count = received_timestamps[index].first; + EXPECT_EQ(expected_message_count, message_count); + + auto actual_diff = std::chrono::duration_cast(received_timestamps[index].second - + received_timestamps[index - 1].second); + if (expected_message_count == kBatchSize) { + EXPECT_LE(actual_diff, kBatchInterval * 0.5); + } else { + constexpr auto kMinDiff = kBatchInterval * 0.9; + constexpr auto kMaxDiff = kBatchInterval * 1.1; + EXPECT_LE(kMinDiff.count(), actual_diff.count()); + EXPECT_GE(kMaxDiff.count(), actual_diff.count()); + } + }; + + ASSERT_FALSE(received_timestamps.empty()); + + EXPECT_EQ(kBatchSize, received_timestamps[0].first); + + constexpr auto kExpectedBatchCount = kMessageCount / kBatchSize + 1; + EXPECT_EQ(kExpectedBatchCount, received_timestamps.size()); + for (auto i = 1; i < received_timestamps.size() - 1; ++i) { + check_received_timestamp(i, kBatchSize); + } + check_received_timestamp(received_timestamps.size() - 1, kLastBatchMessageCount); +} + +TEST_F(ConsumerTest, InvalidBootstrapServers) { + auto info = CreateDefaultConsumerInfo(); + info.bootstrap_servers = "non.existing.host:9092"; + EXPECT_THROW(Consumer(std::move(info)), ConsumerFailedToInitializeException); +} + +TEST_F(ConsumerTest, InvalidTopic) { + auto info = CreateDefaultConsumerInfo(); + info.topics = {"Non existing topic"}; + EXPECT_THROW(Consumer(std::move(info)), TopicNotFoundException); +} + +TEST_F(ConsumerTest, StartsFromPreviousOffset) { + constexpr auto kBatchSize = 1; + auto info = CreateDefaultConsumerInfo(); + info.batch_size = kBatchSize; + std::atomic received_message_count{0}; + const std::string kMessagePrefix{"Message"}; + auto expected_messages_received = true; + info.consumer_function = [&](const std::vector &messages) mutable { + auto message_count = received_message_count.load(); + for (const auto &message : messages) { + std::string message_payload = kMessagePrefix + std::to_string(message_count++); + expected_messages_received &= + (message_payload == std::string_view(message.Payload().data(), message.Payload().size())); + } + received_message_count = message_count; + }; + + // This test depends on CreateConsumer starts and stops the consumer, so the offset is stored + auto consumer = CreateConsumer(std::move(info)); + ASSERT_FALSE(consumer->IsRunning()); + + constexpr auto kMessageCount = 4; + for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) { + cluster.SeedTopic(kTopicName, std::string_view{kMessagePrefix + std::to_string(sent_messages)}); + } + + auto do_batches = [&](int64_t batch_count) { + SCOPED_TRACE(fmt::format("Already received messages: {}", received_message_count.load())); + consumer->Start(batch_count); + const auto start = std::chrono::steady_clock::now(); + ASSERT_TRUE(consumer->IsRunning()); + constexpr auto kMaxWaitTime = std::chrono::seconds(5); + + while (consumer->IsRunning() && (std::chrono::steady_clock::now() - start) < kMaxWaitTime) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + // it is stopped because of limited batches + ASSERT_FALSE(consumer->IsRunning()); + }; + + ASSERT_NO_FATAL_FAILURE(do_batches(kMessageCount / 2)); + ASSERT_NO_FATAL_FAILURE(do_batches(kMessageCount / 2)); + + EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + EXPECT_EQ(received_message_count, kMessageCount); +} + +TEST_F(ConsumerTest, TestMethodWorks) { + constexpr auto kBatchSize = 1; + auto info = CreateDefaultConsumerInfo(); + info.batch_size = kBatchSize; + const std::string kMessagePrefix{"Message"}; + info.consumer_function = [](const std::vector &messages) mutable {}; + + // This test depends on CreateConsumer starts and stops the consumer, so the offset is stored + auto consumer = CreateConsumer(std::move(info)); + + constexpr auto kMessageCount = 4; + for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) { + cluster.SeedTopic(kTopicName, std::string_view{kMessagePrefix + std::to_string(sent_messages)}); + } + + // The test shouldn't commit the offsets, so it is possible to consume the same messages multiple times. + auto check_test_method = [&]() { + std::atomic received_message_count{0}; + auto expected_messages_received = true; + + ASSERT_FALSE(consumer->IsRunning()); + + consumer->Test(kMessageCount, [&](const std::vector &messages) mutable { + auto message_count = received_message_count.load(); + for (const auto &message : messages) { + std::string message_payload = kMessagePrefix + std::to_string(message_count++); + expected_messages_received &= + (message_payload == std::string_view(message.Payload().data(), message.Payload().size())); + } + received_message_count = message_count; + }); + ASSERT_FALSE(consumer->IsRunning()); + + EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received"; + EXPECT_EQ(received_message_count, kMessageCount); + }; + + { + SCOPED_TRACE("First run"); + EXPECT_NO_FATAL_FAILURE(check_test_method()); + } + { + SCOPED_TRACE("Second run"); + EXPECT_NO_FATAL_FAILURE(check_test_method()); + } +} diff --git a/tests/unit/kafka_mock.cpp b/tests/unit/kafka_mock.cpp new file mode 100644 index 000000000..ecdfdc928 --- /dev/null +++ b/tests/unit/kafka_mock.cpp @@ -0,0 +1,114 @@ +#include "kafka_mock.hpp" + +#include +#include + +namespace details { +void RdKafkaDeleter::operator()(rd_kafka_t *rd) { + if (rd != nullptr) { + rd_kafka_destroy(rd); + } +} +void RdKafkaMockClusterDeleter::operator()(rd_kafka_mock_cluster_t *rd) { + if (rd != nullptr) { + rd_kafka_mock_cluster_destroy(rd); + } +} +} // namespace details + +namespace { +void TestDeliveryReportCallback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + if (rkmessage->_private != nullptr) { + int *remains = static_cast(rkmessage->_private); + (*remains)--; + } +} +} // namespace + +KafkaClusterMock::KafkaClusterMock(const std::vector &topics) { + char errstr[256]; + auto *conf = rd_kafka_conf_new(); + if (conf == nullptr) { + throw std::runtime_error("Couldn't create conf for Kafka mock"); + } + + if (rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + throw std::runtime_error(std::string("Failed to set client.id: ") + errstr); + }; + + rk_.reset(rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr))); + if (rk_ == nullptr) { + throw std::runtime_error("Couldn't create producer for Kafka mock"); + } + + if (rk_ == nullptr) { + throw std::runtime_error(std::string("Failed to create mock cluster rd_kafka_t: ") + errstr); + } + constexpr auto broker_count = 1; + cluster_.reset(rd_kafka_mock_cluster_new(rk_.get(), broker_count)); + if (cluster_ == nullptr) { + throw std::runtime_error("Couldn't create cluster for Kafka mock"); + } + + for (const auto &topic : topics) { + constexpr auto partition_count = 1; + constexpr auto replication_factor = 1; + rd_kafka_resp_err_t topic_err = + rd_kafka_mock_topic_create(cluster_.get(), topic.c_str(), partition_count, replication_factor); + if (RD_KAFKA_RESP_ERR_NO_ERROR != topic_err) { + throw std::runtime_error("Failed to create the mock topic (" + topic + "): " + rd_kafka_err2str(topic_err)); + } + } +}; + +std::string KafkaClusterMock::Bootstraps() const { return rd_kafka_mock_cluster_bootstraps(cluster_.get()); }; + +void KafkaClusterMock::SeedTopic(const std::string &topic_name, std::string_view message) { + SeedTopic(topic_name, std::span{message.data(), message.size()}); +} + +void KafkaClusterMock::SeedTopic(const std::string &topic_name, std::span message) { + char errstr[256] = {'\0'}; + std::string bootstraps_servers = Bootstraps(); + + rd_kafka_conf_t *conf = rd_kafka_conf_new(); + if (conf == nullptr) { + throw std::runtime_error("Failed to create configuration for Kafka Mock producer to seed the topic " + topic_name); + } + rd_kafka_conf_set_dr_msg_cb(conf, TestDeliveryReportCallback); + + if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.c_str(), errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + throw std::runtime_error("Failed to configure 'bootstrap.servers' to seed the topic " + topic_name + + "error: " + errstr); + } + + rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); + if (nullptr == rk) { + throw std::runtime_error("Failed to create RdKafka producer to seed the topic " + topic_name + "error: " + errstr); + } + + rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); + + rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name.c_str(), topic_conf); + if (nullptr == rkt) { + throw std::runtime_error("Failed to create RdKafka topic " + topic_name); + } + + int remains = 1; + if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + static_cast(const_cast(message.data())), message.size(), nullptr, 0, + &remains) == -1) { + throw std::runtime_error("Failed to produce a message on " + topic_name + " to seed it"); + } + + while (remains > 0 && rd_kafka_outq_len(rk) > 0) { + rd_kafka_poll(rk, 1000); + } + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + if (remains != 0) { + throw std::runtime_error("Failed to delivered a message on " + topic_name + " to seed it"); + } +} diff --git a/tests/unit/kafka_mock.hpp b/tests/unit/kafka_mock.hpp new file mode 100644 index 000000000..ab0838266 --- /dev/null +++ b/tests/unit/kafka_mock.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include + +// Based on https://github.com/edenhill/librdkafka/issues/2693 +namespace details { +struct RdKafkaDeleter { + void operator()(rd_kafka_t *rd); +}; +struct RdKafkaMockClusterDeleter { + void operator()(rd_kafka_mock_cluster_t *rd); +}; +} // namespace details + +using RdKafkaUniquePtr = std::unique_ptr; +using RdKafkaMockClusterUniquePtr = std::unique_ptr; + +class KafkaClusterMock { + public: + explicit KafkaClusterMock(const std::vector &topics); + + std::string Bootstraps() const; + void SeedTopic(const std::string &topic_name, std::span message); + void SeedTopic(const std::string &topic_name, std::string_view message); + + private: + RdKafkaUniquePtr rk_{nullptr}; + RdKafkaMockClusterUniquePtr cluster_{nullptr}; +};