From 4004e94ca1635756ec4d0061d02cb9880b88b646 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Wed, 16 Jun 2021 16:46:44 +0300 Subject: [PATCH] Introduce messages C-API (#168) --- include/mg_procedure.h | 52 +++++++ libs/CMakeLists.txt | 2 +- src/CMakeLists.txt | 2 +- src/query/CMakeLists.txt | 1 + src/query/procedure/mg_procedure_impl.cpp | 18 +++ src/query/procedure/mg_procedure_impl.hpp | 22 ++- tests/unit/CMakeLists.txt | 3 + tests/unit/mgp_kafka_c_api.cpp | 157 ++++++++++++++++++++++ 8 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 tests/unit/mgp_kafka_c_api.cpp diff --git a/include/mg_procedure.h b/include/mg_procedure.h index 3b0168bcb..5053d8706 100644 --- a/include/mg_procedure.h +++ b/include/mg_procedure.h @@ -802,6 +802,58 @@ int mgp_must_abort(const struct mgp_graph *graph); /// @} +/// @name Kafka message API +/// Currently the API below is for kafka only but in the future +/// mgp_message and mgp_messages might be generic to support +/// other streaming systems. +///@{ + +/// A single Kafka message +struct mgp_message; + +/// A list of Kafka messages +struct mgp_messages; + +/// Payload is not null terminated and not a string but rather a byte array. +/// You need to call mgp_message_get_payload_size() first, to read the size +/// of the payload. +const char *mgp_message_get_payload(const struct mgp_message *); + +/// Return the payload size +size_t mgp_message_get_payload_size(const struct mgp_message *); + +/// Return the name of topic +const char *mgp_message_topic_name(const struct mgp_message *); + +/// Return the key of mgp_message as a byte array +const char *mgp_message_key(const struct mgp_message *); + +/// Return the key size of mgp_message +size_t mgp_message_key_size(const struct mgp_message *); + +/// Return the timestamp of mgp_message as a byte array +int64_t mgp_message_timestamp(const struct mgp_message *); + +/// Return the number of messages contained in the mgp_messages list +size_t mgp_messages_size(const struct mgp_messages *); + +/// Return the message from a messages list at given index +const struct mgp_message *mgp_messages_at(const struct mgp_messages *, size_t); + +/// General type that models a transformation +// struct mgp_trans; + +// TODO @kostasrim +/// General syntax for a transformation callback +// typedef void (*mgp_trans_cb)(const struct mgp_messages, struct mgp_graph *, +// struct mgp_result *, struct mgp_memory*); + +// TODO @kostasrim +/// Adds a transformation cb to the module pointed by mgp_module +// struct mgp_trans *mgp_module_add_transformation(struct mgp_module *module, const char *name, +// mgp_trans_cb cb); +/// @} + #ifdef __cplusplus } // extern "C" #endif diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index f7655e024..f2cbc6cb2 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -234,7 +234,7 @@ import_external_library(librdkafka STATIC -DWITH_SSL=ON # If we want SASL, we need to install it on build machines -DWITH_SASL=OFF) -target_link_libraries(librdkafka INTERFACE ${OPENSSL_LIBRARIES}) +target_link_libraries(librdkafka INTERFACE ${OPENSSL_LIBRARIES} zlib) import_library(librdkafka++ STATIC ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f4092aa2d..1d1c29f5d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,8 +9,8 @@ add_subdirectory(kvstore) add_subdirectory(telemetry) add_subdirectory(communication) add_subdirectory(storage/v2) -add_subdirectory(query) add_subdirectory(integrations) +add_subdirectory(query) add_subdirectory(slk) add_subdirectory(rpc) if (MG_ENTERPRISE) diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 53f9ee8f9..812c04450 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -39,6 +39,7 @@ set(mg_query_sources add_library(mg-query STATIC ${mg_query_sources}) add_dependencies(mg-query generate_lcp_query) target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include) +target_link_libraries(mg-query mg-integrations-kafka) target_link_libraries(mg-query dl cppitertools) target_link_libraries(mg-query mg-storage-v2 mg-utils mg-kvstore) if("${MG_PYTHON_VERSION}" STREQUAL "") diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index e65f9437a..4325942bb 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -1439,3 +1439,21 @@ bool IsValidIdentifierName(const char *name) { } } // namespace query::procedure + +const char *mgp_message_get_payload(const mgp_message *message) { return message->msg->Payload().data(); } + +size_t mgp_message_get_payload_size(const mgp_message *message) { return message->msg->Payload().size(); } + +const char *mgp_message_topic_name(const mgp_message *message) { return message->msg->TopicName().data(); } + +const char *mgp_message_key(const mgp_message *message) { return message->msg->Key().data(); } + +size_t mgp_message_key_size(const struct mgp_message *message) { return message->msg->Key().size(); } + +int64_t mgp_message_timestamp(const mgp_message *message) { return message->msg->Timestamp(); } + +size_t mgp_messages_size(const mgp_messages *messages) { return messages->messages.size(); } + +const mgp_message *mgp_messages_at(const mgp_messages *messages, size_t index) { + return index >= mgp_messages_size(messages) ? nullptr : &messages->messages[index]; +} diff --git a/src/query/procedure/mg_procedure_impl.hpp b/src/query/procedure/mg_procedure_impl.hpp index c60bb8e28..46bcb5358 100644 --- a/src/query/procedure/mg_procedure_impl.hpp +++ b/src/query/procedure/mg_procedure_impl.hpp @@ -8,6 +8,7 @@ #include #include +#include "integrations/kafka/consumer.hpp" #include "query/context.hpp" #include "query/db_accessor.hpp" #include "query/procedure/cypher_types.hpp" @@ -17,7 +18,6 @@ #include "utils/pmr/map.hpp" #include "utils/pmr/string.hpp" #include "utils/pmr/vector.hpp" - /// Wraps memory resource used in custom procedures. /// /// This should have been `using mgp_memory = utils::MemoryResource`, but that's @@ -503,3 +503,23 @@ void PrintProcSignature(const mgp_proc &, std::ostream *); bool IsValidIdentifierName(const char *name); } // namespace query::procedure + +struct mgp_message { + integrations::kafka::Message *msg; +}; + +struct mgp_messages { + using allocator_type = utils::Allocator; + using storage_type = utils::pmr::vector; + explicit mgp_messages(storage_type &&storage) : messages(std::move(storage)) {} + + mgp_messages(const mgp_messages &) = delete; + mgp_messages &operator=(const mgp_messages &) = delete; + + mgp_messages(mgp_messages &&) = delete; + mgp_messages &operator=(mgp_messages &&) = delete; + + ~mgp_messages() = default; + + storage_type messages; +}; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index c4f944d9f..25c743e2e 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -337,3 +337,6 @@ target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threa add_unit_test(integrations_kafka_consumer.cpp kafka_mock.cpp) target_link_libraries(${test_prefix}integrations_kafka_consumer kafka-mock mg-integrations-kafka) + +add_unit_test(mgp_kafka_c_api.cpp) +target_link_libraries(${test_prefix}mgp_kafka_c_api mg-query mg-integrations-kafka) diff --git a/tests/unit/mgp_kafka_c_api.cpp b/tests/unit/mgp_kafka_c_api.cpp new file mode 100644 index 000000000..c8a4ddeb3 --- /dev/null +++ b/tests/unit/mgp_kafka_c_api.cpp @@ -0,0 +1,157 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "integrations/kafka/consumer.hpp" +#include "query/procedure/mg_procedure_impl.hpp" +#include "utils/pmr/vector.hpp" + +/// This class implements the interface of RdKafka::Message such that it can be mocked. +/// It's important to note that integrations::kafka::Message member functions +/// use c_ptr() to indirectly access the results inside the rd_kafka_message_s structure +/// effectively bypassing the mocked values returned by the overrides below. Therefore, to +/// protect against accidental use of the public members, the functions are marked as +/// [[noreturn]] and throw an std::logic_error exception. +class MockedRdKafkaMessage : public RdKafka::Message { + public: + explicit MockedRdKafkaMessage(std::string key, std::string payload) + : key_(std::move(key)), payload_(std::move(payload)) { + message_.err = rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__BEGIN; + message_.key = static_cast(key_.data()); + message_.key_len = key_.size(); + message_.offset = 0; + message_.payload = static_cast(payload_.data()); + message_.len = payload_.size(); + rd_kafka_ = rd_kafka_new(rd_kafka_type_t::RD_KAFKA_CONSUMER, nullptr, nullptr, 0); + message_.rkt = rd_kafka_topic_new(rd_kafka_, mocked_topic_name.data(), nullptr); + } + + ~MockedRdKafkaMessage() override { + rd_kafka_destroy(rd_kafka_); + rd_kafka_topic_destroy(message_.rkt); + } + + // The two can be accessed safely. Any use of the other public members should + // be considered accidental (as per the current semantics of the class + // Message) and therefore they are marked as [[noreturn]] and throw + rd_kafka_message_s *c_ptr() override { return &message_; } + + // This is used by Message() constructor + + RdKafka::ErrorCode err() const override { return RdKafka::ErrorCode::ERR_NO_ERROR; } + + [[noreturn]] std::string errstr() const override { ThrowIllegalCallError(); } + + [[noreturn]] RdKafka::Topic *topic() const override { ThrowIllegalCallError(); } + + [[noreturn]] std::string topic_name() const override { ThrowIllegalCallError(); } + + [[noreturn]] int32_t partition() const override { ThrowIllegalCallError(); } + + [[noreturn]] void *payload() const override { ThrowIllegalCallError(); } + + [[noreturn]] size_t len() const override { ThrowIllegalCallError(); } + + [[noreturn]] const std::string *key() const override { ThrowIllegalCallError(); } + + [[noreturn]] const void *key_pointer() const override { ThrowIllegalCallError(); } + + [[noreturn]] size_t key_len() const override { ThrowIllegalCallError(); } + + [[noreturn]] int64_t offset() const override { ThrowIllegalCallError(); } + + [[noreturn]] RdKafka::MessageTimestamp timestamp() const override { ThrowIllegalCallError(); } + + [[noreturn]] void *msg_opaque() const override { ThrowIllegalCallError(); } + + [[noreturn]] int64_t latency() const override { ThrowIllegalCallError(); } + + [[noreturn]] Status status() const override { ThrowIllegalCallError(); } + + [[noreturn]] RdKafka::Headers *headers() override { ThrowIllegalCallError(); } + + [[noreturn]] RdKafka::Headers *headers(RdKafka::ErrorCode *err) override { ThrowIllegalCallError(); } + + [[noreturn]] int32_t broker_id() const override { ThrowIllegalCallError(); } + + private: + [[noreturn]] void ThrowIllegalCallError() const { + throw std::logic_error("This function should not have been called"); + } + + std::string key_; + rd_kafka_t *rd_kafka_; + std::string payload_; + rd_kafka_message_s message_; + + static std::string mocked_topic_name; +}; + +std::string MockedRdKafkaMessage::mocked_topic_name = "Topic1"; + +class MgpApiTest : public ::testing::Test { + public: + using Message = integrations::kafka::Message; + using KafkaMessage = MockedRdKafkaMessage; + + MgpApiTest() { messages_.emplace(CreateMockedBatch()); } + ~MgpApiTest() { messages_.reset(); } + + mgp_messages &Messages() { return *messages_; } + + protected: + struct ExpectedResult { + const char *payload; + const char key; + const char *topic_name; + const size_t payload_size; + }; + + static constexpr std::array expected = {ExpectedResult{"payload1", '1', "Topic1", 8}, + ExpectedResult{"payload2", '2', "Topic1", 8}}; + + private: + utils::pmr::vector CreateMockedBatch() { + std::transform(expected.begin(), expected.end(), std::back_inserter(msgs_storage_), [](const auto expected) { + return Message(std::make_unique(std::string(1, expected.key), expected.payload)); + }); + auto v = utils::pmr::vector(utils::NewDeleteResource()); + v.reserve(expected.size()); + std::transform(msgs_storage_.begin(), msgs_storage_.end(), std::back_inserter(v), + [](auto &msgs) { return mgp_message{&msgs}; }); + return v; + } + + utils::pmr::vector msgs_storage_{utils::NewDeleteResource()}; + std::optional messages_; +}; + +TEST_F(MgpApiTest, TestAllMgpKafkaCApi) { + const mgp_messages &messages = Messages(); + EXPECT_EQ(mgp_messages_size(&messages), expected.size()); + + for (int i = 0; i < expected.size(); ++i) { + const auto *message = mgp_messages_at(&messages, i); + // Test for key and key size. Key size is always 1 in this test. + EXPECT_EQ(mgp_message_key_size(message), 1); + EXPECT_EQ(*mgp_message_key(message), expected[i].key); + + // Test for payload size + EXPECT_EQ(mgp_message_get_payload_size(message), expected[i].payload_size); + // Test for payload + EXPECT_FALSE(std::strcmp(mgp_message_get_payload(message), expected[i].payload)); + // Test for topic name + EXPECT_FALSE(std::strcmp(mgp_message_topic_name(message), expected[i].topic_name)); + } + + // Unfortunately, we can't test timestamp here because we can't mock (as explained above) + // and the test does not have access to the internal rd_kafka_message2msg() function. + // auto expected_timestamp = rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; + // EXPECT_EQ(mgp_message_timestamp(first_msg), expected_timestamp); + // EXPECT_EQ(mgp_message_timestamp(second_msg), expected_timestamp); +}