// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // License, and you may not use this file except in compliance with the Business Source License. // // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. #include "kafka_mock.hpp" #include #include namespace details { void RdKafkaDeleter::operator()(rd_kafka_t *rd) { if (rd != nullptr) { rd_kafka_destroy(rd); } } void RdKafkaMockClusterDeleter::operator()(rd_kafka_mock_cluster_t *rd) { if (rd != nullptr) { rd_kafka_mock_cluster_destroy(rd); } } } // namespace details namespace { void TestDeliveryReportCallback(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->_private != nullptr) { int *remains = static_cast(rkmessage->_private); (*remains)--; } } } // namespace KafkaClusterMock::KafkaClusterMock(const std::vector &topics) { char errstr[256]; auto *conf = rd_kafka_conf_new(); if (conf == nullptr) { throw std::runtime_error("Couldn't create conf for Kafka mock"); } if (rd_kafka_conf_set(conf, "client.id", "MOCK", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { throw std::runtime_error(std::string("Failed to set client.id: ") + errstr); }; rk_.reset(rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr))); if (rk_ == nullptr) { throw std::runtime_error("Couldn't create producer for Kafka mock"); } if (rk_ == nullptr) { throw std::runtime_error(std::string("Failed to create mock cluster rd_kafka_t: ") + errstr); } static constexpr auto broker_count = 1; cluster_.reset(rd_kafka_mock_cluster_new(rk_.get(), broker_count)); if (cluster_ == nullptr) { throw std::runtime_error("Couldn't create cluster for Kafka mock"); } for (const auto &topic : topics) { CreateTopic(topic); } }; std::string KafkaClusterMock::Bootstraps() const { return rd_kafka_mock_cluster_bootstraps(cluster_.get()); }; void KafkaClusterMock::CreateTopic(const std::string &topic_name) { static constexpr auto partition_count = 1; static constexpr auto replication_factor = 1; rd_kafka_resp_err_t topic_err = rd_kafka_mock_topic_create(cluster_.get(), topic_name.c_str(), partition_count, replication_factor); if (RD_KAFKA_RESP_ERR_NO_ERROR != topic_err) { throw std::runtime_error("Failed to create the mock topic (" + topic_name + "): " + rd_kafka_err2str(topic_err)); } } void KafkaClusterMock::SeedTopic(const std::string &topic_name, std::string_view message) { SeedTopic(topic_name, std::span{message.data(), message.size()}); } void KafkaClusterMock::SeedTopic(const std::string &topic_name, std::span message) { char errstr[256] = {'\0'}; std::string bootstraps_servers = Bootstraps(); rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (conf == nullptr) { throw std::runtime_error("Failed to create configuration for Kafka Mock producer to seed the topic " + topic_name); } rd_kafka_conf_set_dr_msg_cb(conf, TestDeliveryReportCallback); if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { throw std::runtime_error("Failed to configure 'bootstrap.servers' to seed the topic " + topic_name + "error: " + errstr); } rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (nullptr == rk) { throw std::runtime_error("Failed to create RdKafka producer to seed the topic " + topic_name + "error: " + errstr); } rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name.c_str(), topic_conf); if (nullptr == rkt) { throw std::runtime_error("Failed to create RdKafka topic " + topic_name); } int remains = 1; if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, static_cast(const_cast(message.data())), message.size(), nullptr, 0, &remains) == -1) { throw std::runtime_error("Failed to produce a message on " + topic_name + " to seed it"); } while (remains > 0 && rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 1000); } rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); if (remains != 0) { throw std::runtime_error("Failed to delivered a message on " + topic_name + " to seed it"); } }