From 6c00d146f2dcf44bd0b032e8eb774f41575dcef2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Mon, 31 Jan 2022 17:26:53 +0100 Subject: [PATCH] Add configs for kafka streams (#328) --- .clang-tidy | 1 + src/integrations/constants.hpp | 5 +- src/integrations/kafka/consumer.cpp | 14 +- src/integrations/kafka/consumer.hpp | 4 +- src/integrations/kafka/exceptions.hpp | 25 ++- src/query/frontend/ast/ast.lcp | 54 ++++-- .../frontend/ast/cypher_main_visitor.cpp | 44 ++++- .../frontend/ast/cypher_main_visitor.hpp | 16 +- .../opencypher/grammar/MemgraphCypher.g4 | 8 + .../opencypher/grammar/MemgraphCypherLexer.g4 | 2 + src/query/interpreter.cpp | 24 ++- src/query/stream/sources.cpp | 19 ++- src/query/stream/sources.hpp | 4 +- src/query/stream/streams.cpp | 80 ++++++++- src/query/stream/streams.hpp | 5 +- tests/e2e/streams/kafka_streams_tests.py | 17 +- tests/unit/cypher_main_visitor.cpp | 157 +++++++++++++----- tests/unit/integrations_kafka_consumer.cpp | 4 +- tests/unit/interpreter.cpp | 13 +- tests/unit/query_streams.cpp | 58 ++++++- tests/unit/test_utils.hpp | 16 +- 21 files changed, 471 insertions(+), 99 deletions(-) diff --git a/.clang-tidy b/.clang-tidy index 91ad20646..1560bebe0 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -2,6 +2,7 @@ Checks: '*, -abseil-string-find-str-contains, -altera-struct-pack-align, + -altera-unroll-loops, -android-*, -cert-err58-cpp, -cppcoreguidelines-avoid-c-arrays, diff --git a/src/integrations/constants.hpp b/src/integrations/constants.hpp index 6adb193c7..95782e7e2 100644 --- a/src/integrations/constants.hpp +++ b/src/integrations/constants.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,11 +10,14 @@ // licenses/APL.txt. #pragma once + #include +#include namespace integrations { constexpr int64_t kDefaultCheckBatchLimit{1}; constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000}; constexpr std::chrono::milliseconds kMinimumInterval{1}; constexpr int64_t kMinimumSize{1}; +const std::string kReducted{""}; } // namespace integrations diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index 8bcff2045..051e0e4b8 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -126,6 +126,18 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function) std::string error; + for (const auto &[key, value] : info_.public_configs) { + if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) { + throw SettingCustomConfigFailed(info_.consumer_name, error, key, value); + } + } + + for (const auto &[key, value] : info_.private_configs) { + if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) { + throw SettingCustomConfigFailed(info_.consumer_name, error, key, kReducted); + } + } + if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) { throw ConsumerFailedToInitializeException(info_.consumer_name, error); } diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp index 63178b4be..725f03a61 100644 --- a/src/integrations/kafka/consumer.hpp +++ b/src/integrations/kafka/consumer.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -86,6 +86,8 @@ struct ConsumerInfo { std::string bootstrap_servers; std::chrono::milliseconds batch_interval; int64_t batch_size; + std::unordered_map public_configs; + std::unordered_map private_configs; }; /// Memgraphs Kafka consumer wrapper. diff --git a/src/integrations/kafka/exceptions.hpp b/src/integrations/kafka/exceptions.hpp index 6eab29f1f..3e8d70285 100644 --- a/src/integrations/kafka/exceptions.hpp +++ b/src/integrations/kafka/exceptions.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -11,7 +11,7 @@ #pragma once -#include +#include #include "utils/exceptions.hpp" @@ -22,37 +22,46 @@ class KafkaStreamException : public utils::BasicException { class ConsumerFailedToInitializeException : public KafkaStreamException { public: - ConsumerFailedToInitializeException(const std::string &consumer_name, const std::string &error) + ConsumerFailedToInitializeException(const std::string_view consumer_name, const std::string_view error) : KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {} }; +class SettingCustomConfigFailed : public ConsumerFailedToInitializeException { + public: + SettingCustomConfigFailed(const std::string_view consumer_name, const std::string_view error, + const std::string_view key, const std::string_view value) + : ConsumerFailedToInitializeException( + consumer_name, + fmt::format(R"(failed to set custom config ("{}": "{}"), because of error {})", key, value, error)) {} +}; + class ConsumerRunningException : public KafkaStreamException { public: - explicit ConsumerRunningException(const std::string &consumer_name) + explicit ConsumerRunningException(const std::string_view consumer_name) : KafkaStreamException("Kafka consumer {} is already running", consumer_name) {} }; class ConsumerStoppedException : public KafkaStreamException { public: - explicit ConsumerStoppedException(const std::string &consumer_name) + explicit ConsumerStoppedException(const std::string_view consumer_name) : KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {} }; class ConsumerCheckFailedException : public KafkaStreamException { public: - explicit ConsumerCheckFailedException(const std::string &consumer_name, const std::string &error) + explicit ConsumerCheckFailedException(const std::string_view consumer_name, const std::string_view error) : KafkaStreamException("Kafka consumer {} check failed: {}", consumer_name, error) {} }; class ConsumerStartFailedException : public KafkaStreamException { public: - explicit ConsumerStartFailedException(const std::string &consumer_name, const std::string &error) + explicit ConsumerStartFailedException(const std::string_view consumer_name, const std::string_view error) : KafkaStreamException("Starting Kafka consumer {} failed: {}", consumer_name, error) {} }; class TopicNotFoundException : public KafkaStreamException { public: - TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name) + TopicNotFoundException(const std::string_view consumer_name, const std::string_view topic_name) : KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {} }; } // namespace integrations::kafka diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index ba3998bc5..167fca065 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -54,9 +54,7 @@ cpp<# size_t size = 0; slk::Load(&size, reader); self->${member}.resize(size); - for (size_t i = 0; - i < size; - ++i) { + for (size_t i = 0; i < size; ++i) { self->${member}[i] = query::LoadAstPointer(storage, reader); } cpp<#)) @@ -75,9 +73,7 @@ cpp<# #>cpp size_t size = 0; slk::Load(&size, reader); - for (size_t i = 0; - i < size; - ++i) { + for (size_t i = 0; i < size; ++i) { query::PropertyIx key; slk::Load(&key, reader, storage); auto *value = query::LoadAstPointer(storage, reader); @@ -93,6 +89,34 @@ cpp<# } cpp<#) +(defun slk-save-expression-map (member) + #>cpp + size_t size = self.${member}.size(); + slk::Save(size, builder); + for (const auto &entry : self.${member}) { + query::SaveAstPointer(entry.first, builder); + query::SaveAstPointer(entry.second, builder); + } + cpp<#) + +(defun slk-load-expression-map (member) + #>cpp + size_t size = 0; + slk::Load(&size, reader); + for (size_t i = 0; i < size; ++i) { + auto *key = query::LoadAstPointer(storage, reader); + auto *value = query::LoadAstPointer(storage, reader); + self->${member}.emplace(key, value); + } + cpp<#) + +(defun clone-expression-map (source dest) + #>cpp + for (const auto &[key, value] : ${source}) { + ${dest}[key->Clone(storage)] = value->Clone(storage); + } + cpp<#) + (defun slk-load-name-ix (name-type) (lambda (member) #>cpp @@ -1819,9 +1843,7 @@ cpp<# size_t size = 0; slk::Load(&size, reader); self->${member}.resize(size); - for (size_t i = 0; - i < size; - ++i) { + for (size_t i = 0; i < size; ++i) { slk::Load(&self->${member}[i], reader, storage); } cpp<#) @@ -2531,7 +2553,7 @@ cpp<# :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) - (topic_names "std::variant>" :initval "nullptr" + (topic_names "std::variant>" :initval "nullptr" :clone #'clone-variant-topic-names :scope :public) (consumer_group "std::string" :scope :public) @@ -2541,7 +2563,17 @@ cpp<# (service_url "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression"))) + :slk-load (slk-load-ast-pointer "Expression")) + + (configs "std::unordered_map" :scope :public + :slk-save #'slk-save-expression-map + :slk-load #'slk-load-expression-map + :clone #'clone-expression-map) + + (credentials "std::unordered_map" :scope :public + :slk-save #'slk-save-expression-map + :slk-load #'slk-load-expression-map + :clone #'clone-expression-map)) (:public (lcp:define-enum action diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 16356ac32..b80bc7278 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -554,7 +554,7 @@ std::string_view ToString(const CommonStreamConfigKey key) { __VA_ARGS__ \ }; -GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS); +GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS, CONFIGS, CREDENTIALS); std::string_view ToString(const KafkaConfigKey key) { switch (key) { @@ -564,6 +564,10 @@ std::string_view ToString(const KafkaConfigKey key) { return "CONSUMER_GROUP"; case KafkaConfigKey::BOOTSTRAP_SERVERS: return "BOOTSTRAP_SERVERS"; + case KafkaConfigKey::CONFIGS: + return "CONFIGS"; + case KafkaConfigKey::CREDENTIALS: + return "CREDENTIALS"; } } @@ -574,6 +578,21 @@ void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) { } } // namespace +antlrcpp::Any CypherMainVisitor::visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) { + MG_ASSERT(ctx->literal().size() == 2); + return std::pair{ctx->literal(0)->accept(this).as(), ctx->literal(1)->accept(this).as()}; +} + +antlrcpp::Any CypherMainVisitor::visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) { + std::unordered_map map; + for (auto *key_value_pair : ctx->configKeyValuePair()) { + // If the queries are cached, then only the stripped query is parsed, so the actual keys cannot be determined + // here. That means duplicates cannot be checked. + map.insert(key_value_pair->accept(this).as>()); + } + return map; +} + antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) { auto *stream_query = storage_->Create(); stream_query->action_ = StreamQuery::Action::CREATE_STREAM; @@ -587,6 +606,10 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCre MapConfig, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_); MapConfig(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_); MapConfig(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_); + MapConfig>(memory_, KafkaConfigKey::CONFIGS, + stream_query->configs_); + MapConfig>(memory_, KafkaConfigKey::CREDENTIALS, + stream_query->credentials_); MapCommonStreamConfigs(memory_, *stream_query); @@ -622,18 +645,33 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::Ka if (ctx->TOPICS()) { ThrowIfExists(memory_, KafkaConfigKey::TOPICS); - const auto topics_key = static_cast(KafkaConfigKey::TOPICS); + constexpr auto topics_key = static_cast(KafkaConfigKey::TOPICS); GetTopicNames(memory_[topics_key], ctx->topicNames(), *this); return {}; } if (ctx->CONSUMER_GROUP()) { ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP); - const auto consumer_group_key = static_cast(KafkaConfigKey::CONSUMER_GROUP); + constexpr auto consumer_group_key = static_cast(KafkaConfigKey::CONSUMER_GROUP); memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup); return {}; } + if (ctx->CONFIGS()) { + ThrowIfExists(memory_, KafkaConfigKey::CONFIGS); + constexpr auto configs_key = static_cast(KafkaConfigKey::CONFIGS); + memory_.emplace(configs_key, ctx->configsMap->accept(this).as>()); + return {}; + } + + if (ctx->CREDENTIALS()) { + ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS); + constexpr auto credentials_key = static_cast(KafkaConfigKey::CREDENTIALS); + memory_.emplace(credentials_key, + ctx->credentialsMap->accept(this).as>()); + return {}; + } + MG_ASSERT(ctx->BOOTSTRAP_SERVERS()); ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS); if (!ctx->bootstrapServers->StringLiteral()) { diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index b5290d86a..431b58480 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -269,6 +269,16 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override; + /** + * @return StreamQuery* + */ + antlrcpp::Any visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) override; + + /** + * @return StreamQuery* + */ + antlrcpp::Any visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) override; + /** * @return StreamQuery* */ @@ -849,7 +859,9 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { ParsingContext context_; AstStorage *storage_; - std::unordered_map>> memory_; + std::unordered_map, + std::unordered_map>> + memory_; // Set of identifiers from queries. std::unordered_set users_identifiers; // Identifiers that user didn't name. diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index e6fc8736f..d91bb42e8 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -35,7 +35,9 @@ memgraphCypherKeyword : cypherKeyword | COMMIT | COMMITTED | CONFIG + | CONFIGS | CONSUMER_GROUP + | CREDENTIALS | CSV | DATA | DELIMITER @@ -306,9 +308,15 @@ commonCreateStreamConfig : TRANSFORM transformationName=procedureName createStream : kafkaCreateStream | pulsarCreateStream ; +configKeyValuePair : literal ':' literal ; + +configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ; + kafkaCreateStreamConfig : TOPICS topicNames | CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus | BOOTSTRAP_SERVERS bootstrapServers=literal + | CONFIGS configsMap=configMap + | CREDENTIALS credentialsMap=configMap | commonCreateStreamConfig ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 1ebca5497..50556df8f 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -40,7 +40,9 @@ CLEAR : C L E A R ; COMMIT : C O M M I T ; COMMITTED : C O M M I T T E D ; CONFIG : C O N F I G ; +CONFIGS : C O N F I G S; CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; +CREDENTIALS : C R E D E N T I A L S ; CSV : C S V ; DATA : D A T A ; DELIMITER : D E L I M I T E R ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 50d897970..18c3089a9 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -564,10 +564,26 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp } auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator); + const auto get_config_map = [&evaluator](std::unordered_map map, + std::string_view map_name) -> std::unordered_map { + std::unordered_map config_map; + for (const auto [key_expr, value_expr] : map) { + const auto key = key_expr->Accept(evaluator); + const auto value = value_expr->Accept(evaluator); + if (!key.IsString() || !value.IsString()) { + throw SemanticException("{} must contain only string keys and values!", map_name); + } + config_map.emplace(key.ValueString(), value.ValueString()); + } + return config_map; + }; + return [interpreter_context, stream_name = stream_query->stream_name_, topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_), consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info), - bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable { + bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username), + configs = get_config_map(stream_query->configs_, "Configs"), + credentials = get_config_map(stream_query->credentials_, "Credentials")]() mutable { std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : std::string{interpreter_context->config.default_kafka_bootstrap_servers}; @@ -575,7 +591,9 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp {.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .consumer_group = std::move(consumer_group), - .bootstrap_servers = std::move(bootstrap)}, + .bootstrap_servers = std::move(bootstrap), + .configs = std::move(configs), + .credentials = std::move(credentials)}, std::move(owner)); return std::vector>{}; diff --git a/src/query/stream/sources.cpp b/src/query/stream/sources.cpp index 5c1148672..5f350a694 100644 --- a/src/query/stream/sources.cpp +++ b/src/query/stream/sources.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,6 +13,8 @@ #include +#include "integrations/constants.hpp" + namespace query::stream { KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info, ConsumerFunction consumer_function) { @@ -23,6 +25,8 @@ KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info, .bootstrap_servers = std::move(stream_info.bootstrap_servers), .batch_interval = stream_info.common_info.batch_interval, .batch_size = stream_info.common_info.batch_size, + .public_configs = std::move(stream_info.configs), + .private_configs = std::move(stream_info.credentials), }; consumer_.emplace(std::move(consumer_info), std::move(consumer_function)); }; @@ -34,7 +38,9 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const .transformation_name = std::move(transformation_name)}, .topics = info.topics, .consumer_group = info.consumer_group, - .bootstrap_servers = info.bootstrap_servers}; + .bootstrap_servers = info.bootstrap_servers, + .configs = info.public_configs, + .credentials = info.private_configs}; } void KafkaStream::Start() { consumer_->Start(); } @@ -54,6 +60,10 @@ namespace { const std::string kTopicsKey{"topics"}; const std::string kConsumerGroupKey{"consumer_group"}; const std::string kBoostrapServers{"bootstrap_servers"}; +const std::string kConfigs{"configs"}; +const std::string kCredentials{"credentials"}; + +const std::unordered_map kDefaultConfigsMap; } // namespace void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) { @@ -61,6 +71,8 @@ void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) { data[kTopicsKey] = std::move(info.topics); data[kConsumerGroupKey] = info.consumer_group; data[kBoostrapServers] = std::move(info.bootstrap_servers); + data[kConfigs] = std::move(info.configs); + data[kCredentials] = std::move(info.credentials); } void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) { @@ -68,6 +80,9 @@ void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) { data.at(kTopicsKey).get_to(info.topics); data.at(kConsumerGroupKey).get_to(info.consumer_group); data.at(kBoostrapServers).get_to(info.bootstrap_servers); + // These values might not be present in the persisted JSON object + info.configs = data.value(kConfigs, kDefaultConfigsMap); + info.credentials = data.value(kCredentials, kDefaultConfigsMap); } PulsarStream::PulsarStream(std::string stream_name, StreamInfo stream_info, diff --git a/src/query/stream/sources.hpp b/src/query/stream/sources.hpp index ec29890ed..1b5b62f58 100644 --- a/src/query/stream/sources.hpp +++ b/src/query/stream/sources.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -24,6 +24,8 @@ struct KafkaStream { std::vector topics; std::string consumer_group; std::string bootstrap_servers; + std::unordered_map configs; + std::unordered_map credentials; }; using Message = integrations::kafka::Message; diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index f8d28ffe2..1763956a3 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -18,6 +18,7 @@ #include #include +#include "integrations/constants.hpp" #include "mg_procedure.h" #include "query/db_accessor.hpp" #include "query/discard_value_stream.hpp" @@ -29,6 +30,7 @@ #include "query/stream/sources.hpp" #include "query/typed_value.hpp" #include "utils/event_counter.hpp" +#include "utils/logging.hpp" #include "utils/memory.hpp" #include "utils/on_scope_exit.hpp" #include "utils/pmr/string.hpp" @@ -208,10 +210,12 @@ void Streams::RegisterKafkaProcedures() { constexpr std::string_view consumer_group_result_name = "consumer_group"; constexpr std::string_view topics_result_name = "topics"; constexpr std::string_view bootstrap_servers_result_name = "bootstrap_servers"; + constexpr std::string_view configs_result_name = "configs"; + constexpr std::string_view credentials_result_name = "credentials"; auto get_stream_info = [this, proc_name, consumer_group_result_name, topics_result_name, - bootstrap_servers_result_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, - mgp_memory *memory) { + bootstrap_servers_result_name, configs_result_name, credentials_result_name]( + mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, mgp_memory *memory) { auto *arg_stream_name = procedure::Call(mgp_list_at, args, 0); const auto *stream_name = procedure::Call(mgp_value_get_string, arg_stream_name); auto lock_ptr = streams_.Lock(); @@ -259,13 +263,12 @@ void Streams::RegisterKafkaProcedures() { procedure::MgpUniquePtr topics_value{nullptr, mgp_value_destroy}; { const auto success = procedure::TryOrSetError( - [&] { - return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release()); - }, + [&] { return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.get()); }, result); if (!success) { return; } + static_cast(topic_names.release()); } const auto bootstrap_servers_value = @@ -274,6 +277,57 @@ void Streams::RegisterKafkaProcedures() { return; } + const auto convert_config_map = + [result, memory](const std::unordered_map &configs_to_convert) + -> procedure::MgpUniquePtr { + procedure::MgpUniquePtr configs_value{nullptr, mgp_value_destroy}; + procedure::MgpUniquePtr configs{nullptr, mgp_map_destroy}; + { + const auto success = procedure::TryOrSetError( + [&] { return procedure::CreateMgpObject(configs, mgp_map_make_empty, memory); }, result); + if (!success) { + return configs_value; + } + } + + for (const auto &[key, value] : configs_to_convert) { + auto value_value = procedure::GetStringValueOrSetError(value.c_str(), memory, result); + if (!value_value) { + return configs_value; + } + configs->items.emplace(key, std::move(*value_value)); + } + + { + const auto success = procedure::TryOrSetError( + [&] { return procedure::CreateMgpObject(configs_value, mgp_value_make_map, configs.get()); }, + result); + if (!success) { + return configs_value; + } + static_cast(configs.release()); + } + return configs_value; + }; + + const auto configs_value = convert_config_map(info.configs); + if (configs_value == nullptr) { + return; + } + + using CredentialsType = decltype(KafkaStream::StreamInfo::credentials); + CredentialsType reducted_credentials; + std::transform(info.credentials.begin(), info.credentials.end(), + std::inserter(reducted_credentials, reducted_credentials.end()), + [](const auto &pair) -> CredentialsType::value_type { + return {pair.first, integrations::kReducted}; + }); + + const auto credentials_value = convert_config_map(reducted_credentials); + if (credentials_value == nullptr) { + return; + } + if (!procedure::InsertResultOrSetError(result, record, consumer_group_result_name.data(), consumer_group_value.get())) { return; @@ -287,6 +341,16 @@ void Streams::RegisterKafkaProcedures() { bootstrap_servers_value.get())) { return; } + + if (!procedure::InsertResultOrSetError(result, record, configs_result_name.data(), + configs_value.get())) { + return; + } + + if (!procedure::InsertResultOrSetError(result, record, credentials_result_name.data(), + credentials_value.get())) { + return; + } }, [proc_name](auto && /*other*/) { throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name); @@ -305,6 +369,10 @@ void Streams::RegisterKafkaProcedures() { MGP_ERROR_NO_ERROR); MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(), procedure::Call(mgp_type_string)) == MGP_ERROR_NO_ERROR); + MG_ASSERT(mgp_proc_add_result(&proc, configs_result_name.data(), procedure::Call(mgp_type_map)) == + MGP_ERROR_NO_ERROR); + MG_ASSERT(mgp_proc_add_result(&proc, credentials_result_name.data(), procedure::Call(mgp_type_map)) == + MGP_ERROR_NO_ERROR); procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc)); } diff --git a/src/query/stream/streams.hpp b/src/query/stream/streams.hpp index b927c4c07..a1afd1a47 100644 --- a/src/query/stream/streams.hpp +++ b/src/query/stream/streams.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -31,6 +31,7 @@ #include "utils/rw_lock.hpp" #include "utils/synchronized.hpp" +class StreamsTest; namespace query { struct InterpreterContext; @@ -73,6 +74,8 @@ using TransformationResult = std::vector>; /// /// This class is responsible for all query supported actions to happen. class Streams final { + friend StreamsTest; + public: /// Initializes the streams. /// diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index b05fc7e8a..1455fe95d 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -421,10 +421,13 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation): assert comparison_check("Final Message", res[0]) common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") + def test_info_procedure(kafka_topics, connection): cursor = connection.cursor() stream_name = 'test_stream' + configs = {"sasl.username": "michael.scott"} local = "localhost:9092" + credentials = {"sasl.password": "S3cr3tP4ssw0rd"} consumer_group = "ConsumerGr" common.execute_and_fetch_all( cursor, @@ -432,13 +435,21 @@ def test_info_procedure(kafka_topics, connection): f"TOPICS {','.join(kafka_topics)} " f"TRANSFORM pulsar_transform.simple " f"CONSUMER_GROUP {consumer_group} " - f"BOOTSTRAP_SERVERS '{local}'" + f"BOOTSTRAP_SERVERS '{local}' " + f"CONFIGS {configs} " + f"CREDENTIALS {credentials}" ) - stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *") + stream_info = common.execute_and_fetch_all( + cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *") - expected_stream_info = [(local, consumer_group, kafka_topics)] + reducted_credentials = {key: "" for + key in credentials.keys()} + + expected_stream_info = [ + (local, configs, consumer_group, reducted_credentials, kafka_topics)] common.validate_info(stream_info, expected_stream_info) + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 8eb548243..c3fd82a23 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -9,20 +9,10 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -// Copyright 2021 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. -// #include #include #include +#include #include #include #include @@ -38,6 +28,7 @@ #include ////////////////////////////////////////////////////// #include +#include #include #include @@ -87,23 +78,37 @@ class Base { } } + TypedValue GetLiteral(Expression *expression, const bool use_parameter_lookup, + const std::optional &token_position = std::nullopt) const { + if (use_parameter_lookup) { + auto *param_lookup = dynamic_cast(expression); + if (param_lookup == nullptr) { + ADD_FAILURE(); + return {}; + } + if (token_position) { + EXPECT_EQ(param_lookup->token_position_, *token_position); + } + return TypedValue(parameters_.AtTokenPosition(param_lookup->token_position_)); + } + + auto *literal = dynamic_cast(expression); + if (literal == nullptr) { + ADD_FAILURE(); + return {}; + } + if (token_position) { + EXPECT_EQ(literal->token_position_, *token_position); + } + return TypedValue(literal->value_); + } + template void CheckLiteral(Expression *expression, const TValue &expected, const std::optional &token_position = std::nullopt) const { - TypedValue value; - // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) TypedValue expected_tv(expected); - if (!expected_tv.IsNull() && context_.is_query_cached) { - auto *param_lookup = dynamic_cast(expression); - ASSERT_TRUE(param_lookup); - if (token_position) EXPECT_EQ(param_lookup->token_position_, *token_position); - value = TypedValue(parameters_.AtTokenPosition(param_lookup->token_position_)); - } else { - auto *literal = dynamic_cast(expression); - ASSERT_TRUE(literal); - if (token_position) ASSERT_EQ(literal->token_position_, *token_position); - value = TypedValue(literal->value_); - } + const auto use_parameter_lookup = !expected_tv.IsNull() && context_.is_query_cached; + TypedValue value = GetLiteral(expression, use_parameter_lookup, token_position); EXPECT_TRUE(TypedValue::BoolEqual{}(value, expected_tv)); } }; @@ -3580,6 +3585,8 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit)); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout)); + EXPECT_TRUE(parsed_query->configs_.empty()); + EXPECT_TRUE(parsed_query->credentials_.empty()); } TEST_P(CypherMainVisitorTest, DropStream) { @@ -3661,7 +3668,9 @@ void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &quer const std::string_view transform_name, const std::string_view consumer_group, const std::optional &batch_interval, const std::optional &batch_size, - const std::string_view bootstrap_servers = "") { + const std::string_view bootstrap_servers, + const std::unordered_map &configs, + const std::unordered_map &credentials) { SCOPED_TRACE(query_string); StreamQuery *parsed_query{nullptr}; ASSERT_NO_THROW(parsed_query = dynamic_cast(ast_generator.ParseQuery(query_string))) << query_string; @@ -3675,14 +3684,31 @@ void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &quer EXPECT_EQ(parsed_query->batch_limit_, nullptr); if (bootstrap_servers.empty()) { EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr); - return; + } else { + EXPECT_NE(parsed_query->bootstrap_servers_, nullptr); } - EXPECT_NE(parsed_query->bootstrap_servers_, nullptr); + + const auto evaluate_config_map = [&ast_generator](const std::unordered_map &config_map) { + std::unordered_map evaluated_config_map; + const auto expr_to_str = [&ast_generator](Expression *expression) { + return std::string{ast_generator.GetLiteral(expression, ast_generator.context_.is_query_cached).ValueString()}; + }; + std::transform(config_map.begin(), config_map.end(), + std::inserter(evaluated_config_map, evaluated_config_map.end()), + [&expr_to_str](const auto expr_pair) { + return std::pair{expr_to_str(expr_pair.first), expr_to_str(expr_pair.second)}; + }); + return evaluated_config_map; + }; + + using testing::UnorderedElementsAreArray; + EXPECT_THAT(evaluate_config_map(parsed_query->configs_), UnorderedElementsAreArray(configs.begin(), configs.end())); + EXPECT_THAT(evaluate_config_map(parsed_query->credentials_), + UnorderedElementsAreArray(credentials.begin(), credentials.end())); } TEST_P(CypherMainVisitorTest, CreateKafkaStream) { auto &ast_generator = *GetParam(); - TestInvalidQuery("CREATE KAFKA STREAM", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator); @@ -3709,6 +3735,13 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) { TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator); + // the keys must be string literals + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONFIGS { symbolicname : 'string' }", + ast_generator); + TestInvalidQuery( + "CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CREDENTIALS { symbolicname : 'string' }", + ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CREDENTIALS 2", ast_generator); const std::vector topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots", "topic-name.with-multiple.dots-and-dashes"}; @@ -3728,34 +3761,37 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) { ValidateCreateKafkaStreamQuery( ast_generator, fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName), - kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt); + kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt, {}, {}, {}); ValidateCreateKafkaStreamQuery(ast_generator, fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ", kStreamName, topic_names_as_str, kTransformName, kConsumerGroup), - kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, - std::nullopt); + kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, std::nullopt, + {}, {}, {}); ValidateCreateKafkaStreamQuery(ast_generator, fmt::format("CREATE KAFKA STREAM {} TRANSFORM {} TOPICS {} BATCH_INTERVAL {}", kStreamName, kTransformName, topic_names_as_str, kBatchInterval), - kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt); + kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt, {}, + {}, {}); ValidateCreateKafkaStreamQuery(ast_generator, fmt::format("CREATE KAFKA STREAM {} BATCH_SIZE {} TOPICS {} TRANSFORM {}", kStreamName, kBatchSize, topic_names_as_str, kTransformName), - kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); + kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value, {}, {}, + {}); ValidateCreateKafkaStreamQuery(ast_generator, fmt::format("CREATE KAFKA STREAM {} TOPICS '{}' BATCH_SIZE {} TRANSFORM {}", kStreamName, topic_names_as_str, kBatchSize, kTransformName), - kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); + kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value, {}, {}, + {}); ValidateCreateKafkaStreamQuery( ast_generator, fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}", kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize), - kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value); + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, {}, {}, {}); using namespace std::string_literals; const auto host1 = "localhost:9094"s; ValidateCreateKafkaStreamQuery( @@ -3763,14 +3799,16 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) { fmt::format("CREATE KAFKA STREAM {} TOPICS {} CONSUMER_GROUP {} BATCH_SIZE {} BATCH_INTERVAL {} TRANSFORM {} " "BOOTSTRAP_SERVERS '{}'", kStreamName, topic_names_as_str, kConsumerGroup, kBatchSize, kBatchInterval, kTransformName, host1), - kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1, {}, + {}); ValidateCreateKafkaStreamQuery( ast_generator, fmt::format("CREATE KAFKA STREAM {} CONSUMER_GROUP {} TOPICS {} BATCH_INTERVAL {} TRANSFORM {} BATCH_SIZE {} " "BOOTSTRAP_SERVERS '{}'", kStreamName, kConsumerGroup, topic_names_as_str, kBatchInterval, kTransformName, kBatchSize, host1), - kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1, {}, + {}); const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s; ValidateCreateKafkaStreamQuery( @@ -3778,7 +3816,8 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) { fmt::format("CREATE KAFKA STREAM {} TOPICS {} BOOTSTRAP_SERVERS '{}' CONSUMER_GROUP {} TRANSFORM {} " "BATCH_INTERVAL {} BATCH_SIZE {}", kStreamName, topic_names_as_str, host2, kConsumerGroup, kTransformName, kBatchInterval, kBatchSize), - kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2); + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2, {}, + {}); }; for (const auto &topic_name : topic_names) { @@ -3793,7 +3832,7 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) { fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}", kStreamName, kTopicName, kTransformName, consumer_group), kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt, - std::nullopt); + std::nullopt, {}, {}, {}); }; using namespace std::literals; @@ -3803,6 +3842,44 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) { for (const auto consumer_group : consumer_groups) { EXPECT_NO_FATAL_FAILURE(check_consumer_group(consumer_group)); } + + auto check_config_map = [&](const std::unordered_map &config_map) { + const std::string kTopicName{"topic1"}; + + const auto map_as_str = std::invoke([&config_map] { + std::stringstream buffer; + buffer << '{'; + if (!config_map.empty()) { + auto it = config_map.begin(); + buffer << fmt::format("'{}': '{}'", it->first, it->second); + for (; it != config_map.end(); ++it) { + buffer << fmt::format(", '{}': '{}'", it->first, it->second); + } + } + buffer << '}'; + return std::move(buffer).str(); + }); + + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONFIGS {}", kStreamName, + kTopicName, kTransformName, map_as_str), + kStreamName, {kTopicName}, kTransformName, "", std::nullopt, std::nullopt, {}, + config_map, {}); + + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CREDENTIALS {}", + kStreamName, kTopicName, kTransformName, map_as_str), + kStreamName, {kTopicName}, kTransformName, "", std::nullopt, std::nullopt, {}, {}, + config_map); + }; + + const std::array config_maps = {std::unordered_map{}, + std::unordered_map{{"key", "value"}}, + std::unordered_map{{"key.with.dot", "value.with.doth"}, + {"key with space", "value with space"}}}; + for (const auto &map_to_test : config_maps) { + EXPECT_NO_FATAL_FAILURE(check_config_map(map_to_test)); + } } void ValidateCreatePulsarStreamQuery(Base &ast_generator, const std::string &query_string, diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp index 502317658..d598fefaa 100644 --- a/tests/unit/integrations_kafka_consumer.cpp +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -59,6 +59,8 @@ struct ConsumerTest : public ::testing::Test { .bootstrap_servers = cluster.Bootstraps(), .batch_interval = kDefaultBatchInterval, .batch_size = kDefaultBatchSize, + .public_configs = {}, + .private_configs = {}, }; }; diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index bd884f2cf..78ca82497 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -1,15 +1,4 @@ -// Copyright 2021 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/tests/unit/query_streams.cpp b/tests/unit/query_streams.cpp index 018da55c3..1f26fdcc2 100644 --- a/tests/unit/query_streams.cpp +++ b/tests/unit/query_streams.cpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -15,12 +15,15 @@ #include #include + +#include "integrations/constants.hpp" #include "integrations/kafka/exceptions.hpp" #include "kafka_mock.hpp" #include "query/config.hpp" #include "query/interpreter.hpp" #include "query/stream/streams.hpp" #include "storage/v2/storage.hpp" +#include "test_utils.hpp" using Streams = query::stream::Streams; using StreamInfo = query::stream::KafkaStream::StreamInfo; @@ -78,6 +81,17 @@ class StreamsTest : public ::testing::Test { EXPECT_EQ(check_data.is_running, status.is_running); } + void CheckConfigAndCredentials(const StreamCheckData &check_data) { + const auto locked_streams = streams_->streams_.ReadLock(); + const auto &stream = locked_streams->at(check_data.name); + const auto *stream_data = std::get_if>(&stream); + ASSERT_NE(stream_data, nullptr); + const auto stream_info = + stream_data->stream_source->ReadLock()->Info(check_data.info.common_info.transformation_name); + EXPECT_TRUE( + std::equal(check_data.info.configs.begin(), check_data.info.configs.end(), stream_info.configs.begin())); + } + void StartStream(StreamCheckData &check_data) { streams_->Start(check_data.name); check_data.is_running = true; @@ -183,6 +197,18 @@ TEST_F(StreamsTest, RestoreStreams) { stream_info.common_info.batch_interval = std::chrono::milliseconds((i + 1) * 10); stream_info.common_info.batch_size = 1000 + i; stream_check_data.owner = std::string{"owner"} + iteration_postfix; + + // These are just random numbers to make the CONFIGS and CREDENTIALS map vary between consumers: + // - 0 means no config, no credential + // - 1 means only config + // - 2 means only credential + // - 3 means both configuration and credential is set + if (i == 1 || i == 3) { + stream_info.configs.emplace(std::string{"sasl.username"}, std::string{"username"} + iteration_postfix); + } + if (i == 2 || i == 3) { + stream_info.credentials.emplace(std::string{"sasl.password"}, std::string{"password"} + iteration_postfix); + } } mock_cluster_.CreateTopic(stream_info.topics[0]); @@ -198,6 +224,7 @@ TEST_F(StreamsTest, RestoreStreams) { EXPECT_EQ(stream_check_datas.size(), streams_->GetStreamInfo().size()); for (const auto &check_data : stream_check_datas) { ASSERT_NO_FATAL_FAILURE(CheckStreamStatus(check_data)); + ASSERT_NO_FATAL_FAILURE(CheckConfigAndCredentials(check_data)); } }; @@ -253,3 +280,32 @@ TEST_F(StreamsTest, CheckWithTimeout) { EXPECT_LE(timeout, elapsed); EXPECT_LE(elapsed, timeout * 1.2); } + +TEST_F(StreamsTest, CheckInvalidConfig) { + auto stream_info = CreateDefaultStreamInfo(); + const auto stream_name = GetDefaultStreamName(); + constexpr auto kInvalidConfigName = "doesnt.exist"; + constexpr auto kConfigValue = "myprecious"; + stream_info.configs.emplace(kInvalidConfigName, kConfigValue); + const auto checker = [](const std::string_view message) { + EXPECT_TRUE(message.find(kInvalidConfigName) != std::string::npos) << message; + EXPECT_TRUE(message.find(kConfigValue) != std::string::npos) << message; + }; + EXPECT_THROW_WITH_MSG(streams_->Create(stream_name, stream_info, std::nullopt), + integrations::kafka::SettingCustomConfigFailed, checker); +} + +TEST_F(StreamsTest, CheckInvalidCredentials) { + auto stream_info = CreateDefaultStreamInfo(); + const auto stream_name = GetDefaultStreamName(); + constexpr auto kInvalidCredentialName = "doesnt.exist"; + constexpr auto kCredentialValue = "myprecious"; + stream_info.credentials.emplace(kInvalidCredentialName, kCredentialValue); + const auto checker = [](const std::string_view message) { + EXPECT_TRUE(message.find(kInvalidCredentialName) != std::string::npos) << message; + EXPECT_TRUE(message.find(integrations::kReducted) != std::string::npos) << message; + EXPECT_TRUE(message.find(kCredentialValue) == std::string::npos) << message; + }; + EXPECT_THROW_WITH_MSG(streams_->Create(stream_name, stream_info, std::nullopt), + integrations::kafka::SettingCustomConfigFailed, checker); +} diff --git a/tests/unit/test_utils.hpp b/tests/unit/test_utils.hpp index d83254e56..f6a8ea232 100644 --- a/tests/unit/test_utils.hpp +++ b/tests/unit/test_utils.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -34,4 +34,16 @@ TResult ExpectNoError(const char *file, int line, TFunc func, TArgs &&...args) { } } // namespace test_utils -#define EXPECT_MGP_NO_ERROR(type, ...) test_utils::ExpectNoError(__FILE__, __LINE__, __VA_ARGS__) \ No newline at end of file +#define EXPECT_MGP_NO_ERROR(type, ...) test_utils::ExpectNoError(__FILE__, __LINE__, __VA_ARGS__) + +#define EXPECT_THROW_WITH_MSG(statement, expected_exception, msg_checker) \ + EXPECT_THROW( \ + { \ + try { \ + statement; \ + } catch (const expected_exception &e) { \ + EXPECT_NO_FATAL_FAILURE(msg_checker(e.what())); \ + throw; \ + } \ + }, \ + expected_exception);