Add configs for kafka streams (#328)

This commit is contained in:
János Benjamin Antal 2022-01-31 17:26:53 +01:00 committed by GitHub
parent ced84e17b6
commit 6c00d146f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 471 additions and 99 deletions

View File

@ -2,6 +2,7 @@
Checks: '*, Checks: '*,
-abseil-string-find-str-contains, -abseil-string-find-str-contains,
-altera-struct-pack-align, -altera-struct-pack-align,
-altera-unroll-loops,
-android-*, -android-*,
-cert-err58-cpp, -cert-err58-cpp,
-cppcoreguidelines-avoid-c-arrays, -cppcoreguidelines-avoid-c-arrays,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,11 +10,14 @@
// licenses/APL.txt. // licenses/APL.txt.
#pragma once #pragma once
#include <chrono> #include <chrono>
#include <string>
namespace integrations { namespace integrations {
constexpr int64_t kDefaultCheckBatchLimit{1}; constexpr int64_t kDefaultCheckBatchLimit{1};
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000}; constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1}; constexpr std::chrono::milliseconds kMinimumInterval{1};
constexpr int64_t kMinimumSize{1}; constexpr int64_t kMinimumSize{1};
const std::string kReducted{"<REDUCTED>"};
} // namespace integrations } // namespace integrations

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -126,6 +126,18 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
std::string error; std::string error;
for (const auto &[key, value] : info_.public_configs) {
if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) {
throw SettingCustomConfigFailed(info_.consumer_name, error, key, value);
}
}
for (const auto &[key, value] : info_.private_configs) {
if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) {
throw SettingCustomConfigFailed(info_.consumer_name, error, key, kReducted);
}
}
if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) { if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error); throw ConsumerFailedToInitializeException(info_.consumer_name, error);
} }

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -86,6 +86,8 @@ struct ConsumerInfo {
std::string bootstrap_servers; std::string bootstrap_servers;
std::chrono::milliseconds batch_interval; std::chrono::milliseconds batch_interval;
int64_t batch_size; int64_t batch_size;
std::unordered_map<std::string, std::string> public_configs;
std::unordered_map<std::string, std::string> private_configs;
}; };
/// Memgraphs Kafka consumer wrapper. /// Memgraphs Kafka consumer wrapper.

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,7 +11,7 @@
#pragma once #pragma once
#include <string> #include <string_view>
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
@ -22,37 +22,46 @@ class KafkaStreamException : public utils::BasicException {
class ConsumerFailedToInitializeException : public KafkaStreamException { class ConsumerFailedToInitializeException : public KafkaStreamException {
public: public:
ConsumerFailedToInitializeException(const std::string &consumer_name, const std::string &error) ConsumerFailedToInitializeException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {} : KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {}
}; };
class SettingCustomConfigFailed : public ConsumerFailedToInitializeException {
public:
SettingCustomConfigFailed(const std::string_view consumer_name, const std::string_view error,
const std::string_view key, const std::string_view value)
: ConsumerFailedToInitializeException(
consumer_name,
fmt::format(R"(failed to set custom config ("{}": "{}"), because of error {})", key, value, error)) {}
};
class ConsumerRunningException : public KafkaStreamException { class ConsumerRunningException : public KafkaStreamException {
public: public:
explicit ConsumerRunningException(const std::string &consumer_name) explicit ConsumerRunningException(const std::string_view consumer_name)
: KafkaStreamException("Kafka consumer {} is already running", consumer_name) {} : KafkaStreamException("Kafka consumer {} is already running", consumer_name) {}
}; };
class ConsumerStoppedException : public KafkaStreamException { class ConsumerStoppedException : public KafkaStreamException {
public: public:
explicit ConsumerStoppedException(const std::string &consumer_name) explicit ConsumerStoppedException(const std::string_view consumer_name)
: KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {} : KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {}
}; };
class ConsumerCheckFailedException : public KafkaStreamException { class ConsumerCheckFailedException : public KafkaStreamException {
public: public:
explicit ConsumerCheckFailedException(const std::string &consumer_name, const std::string &error) explicit ConsumerCheckFailedException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Kafka consumer {} check failed: {}", consumer_name, error) {} : KafkaStreamException("Kafka consumer {} check failed: {}", consumer_name, error) {}
}; };
class ConsumerStartFailedException : public KafkaStreamException { class ConsumerStartFailedException : public KafkaStreamException {
public: public:
explicit ConsumerStartFailedException(const std::string &consumer_name, const std::string &error) explicit ConsumerStartFailedException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Starting Kafka consumer {} failed: {}", consumer_name, error) {} : KafkaStreamException("Starting Kafka consumer {} failed: {}", consumer_name, error) {}
}; };
class TopicNotFoundException : public KafkaStreamException { class TopicNotFoundException : public KafkaStreamException {
public: public:
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name) TopicNotFoundException(const std::string_view consumer_name, const std::string_view topic_name)
: KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {} : KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {}
}; };
} // namespace integrations::kafka } // namespace integrations::kafka

View File

@ -54,9 +54,7 @@ cpp<#
size_t size = 0; size_t size = 0;
slk::Load(&size, reader); slk::Load(&size, reader);
self->${member}.resize(size); self->${member}.resize(size);
for (size_t i = 0; for (size_t i = 0; i < size; ++i) {
i < size;
++i) {
self->${member}[i] = query::LoadAstPointer<query::${type}>(storage, reader); self->${member}[i] = query::LoadAstPointer<query::${type}>(storage, reader);
} }
cpp<#)) cpp<#))
@ -75,9 +73,7 @@ cpp<#
#>cpp #>cpp
size_t size = 0; size_t size = 0;
slk::Load(&size, reader); slk::Load(&size, reader);
for (size_t i = 0; for (size_t i = 0; i < size; ++i) {
i < size;
++i) {
query::PropertyIx key; query::PropertyIx key;
slk::Load(&key, reader, storage); slk::Load(&key, reader, storage);
auto *value = query::LoadAstPointer<query::Expression>(storage, reader); auto *value = query::LoadAstPointer<query::Expression>(storage, reader);
@ -93,6 +89,34 @@ cpp<#
} }
cpp<#) cpp<#)
(defun slk-save-expression-map (member)
#>cpp
size_t size = self.${member}.size();
slk::Save(size, builder);
for (const auto &entry : self.${member}) {
query::SaveAstPointer(entry.first, builder);
query::SaveAstPointer(entry.second, builder);
}
cpp<#)
(defun slk-load-expression-map (member)
#>cpp
size_t size = 0;
slk::Load(&size, reader);
for (size_t i = 0; i < size; ++i) {
auto *key = query::LoadAstPointer<query::Expression>(storage, reader);
auto *value = query::LoadAstPointer<query::Expression>(storage, reader);
self->${member}.emplace(key, value);
}
cpp<#)
(defun clone-expression-map (source dest)
#>cpp
for (const auto &[key, value] : ${source}) {
${dest}[key->Clone(storage)] = value->Clone(storage);
}
cpp<#)
(defun slk-load-name-ix (name-type) (defun slk-load-name-ix (name-type)
(lambda (member) (lambda (member)
#>cpp #>cpp
@ -1819,9 +1843,7 @@ cpp<#
size_t size = 0; size_t size = 0;
slk::Load(&size, reader); slk::Load(&size, reader);
self->${member}.resize(size); self->${member}.resize(size);
for (size_t i = 0; for (size_t i = 0; i < size; ++i) {
i < size;
++i) {
slk::Load(&self->${member}[i], reader, storage); slk::Load(&self->${member}[i], reader, storage);
} }
cpp<#) cpp<#)
@ -2531,7 +2553,7 @@ cpp<#
:slk-save #'slk-save-ast-pointer :slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")) :slk-load (slk-load-ast-pointer "Expression"))
(topic_names "std::variant<Expression*, std::vector<std::string>>" :initval "nullptr" (topic_names "std::variant<Expression*, std::vector<std::string>>" :initval "nullptr"
:clone #'clone-variant-topic-names :clone #'clone-variant-topic-names
:scope :public) :scope :public)
(consumer_group "std::string" :scope :public) (consumer_group "std::string" :scope :public)
@ -2541,7 +2563,17 @@ cpp<#
(service_url "Expression *" :initval "nullptr" :scope :public (service_url "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer :slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))) :slk-load (slk-load-ast-pointer "Expression"))
(configs "std::unordered_map<Expression *, Expression *>" :scope :public
:slk-save #'slk-save-expression-map
:slk-load #'slk-load-expression-map
:clone #'clone-expression-map)
(credentials "std::unordered_map<Expression *, Expression *>" :scope :public
:slk-save #'slk-save-expression-map
:slk-load #'slk-load-expression-map
:clone #'clone-expression-map))
(:public (:public
(lcp:define-enum action (lcp:define-enum action

View File

@ -554,7 +554,7 @@ std::string_view ToString(const CommonStreamConfigKey key) {
__VA_ARGS__ \ __VA_ARGS__ \
}; };
GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS); GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS, CONFIGS, CREDENTIALS);
std::string_view ToString(const KafkaConfigKey key) { std::string_view ToString(const KafkaConfigKey key) {
switch (key) { switch (key) {
@ -564,6 +564,10 @@ std::string_view ToString(const KafkaConfigKey key) {
return "CONSUMER_GROUP"; return "CONSUMER_GROUP";
case KafkaConfigKey::BOOTSTRAP_SERVERS: case KafkaConfigKey::BOOTSTRAP_SERVERS:
return "BOOTSTRAP_SERVERS"; return "BOOTSTRAP_SERVERS";
case KafkaConfigKey::CONFIGS:
return "CONFIGS";
case KafkaConfigKey::CREDENTIALS:
return "CREDENTIALS";
} }
} }
@ -574,6 +578,21 @@ void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) {
} }
} // namespace } // namespace
antlrcpp::Any CypherMainVisitor::visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) {
MG_ASSERT(ctx->literal().size() == 2);
return std::pair{ctx->literal(0)->accept(this).as<Expression *>(), ctx->literal(1)->accept(this).as<Expression *>()};
}
antlrcpp::Any CypherMainVisitor::visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) {
std::unordered_map<Expression *, Expression *> map;
for (auto *key_value_pair : ctx->configKeyValuePair()) {
// If the queries are cached, then only the stripped query is parsed, so the actual keys cannot be determined
// here. That means duplicates cannot be checked.
map.insert(key_value_pair->accept(this).as<std::pair<Expression *, Expression *>>());
}
return map;
}
antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) { antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>(); auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::CREATE_STREAM; stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
@ -587,6 +606,10 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCre
MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_); MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_);
MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_); MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_);
MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_); MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_);
MapConfig<false, std::unordered_map<Expression *, Expression *>>(memory_, KafkaConfigKey::CONFIGS,
stream_query->configs_);
MapConfig<false, std::unordered_map<Expression *, Expression *>>(memory_, KafkaConfigKey::CREDENTIALS,
stream_query->credentials_);
MapCommonStreamConfigs(memory_, *stream_query); MapCommonStreamConfigs(memory_, *stream_query);
@ -622,18 +645,33 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::Ka
if (ctx->TOPICS()) { if (ctx->TOPICS()) {
ThrowIfExists(memory_, KafkaConfigKey::TOPICS); ThrowIfExists(memory_, KafkaConfigKey::TOPICS);
const auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS); constexpr auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS);
GetTopicNames(memory_[topics_key], ctx->topicNames(), *this); GetTopicNames(memory_[topics_key], ctx->topicNames(), *this);
return {}; return {};
} }
if (ctx->CONSUMER_GROUP()) { if (ctx->CONSUMER_GROUP()) {
ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP); ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP);
const auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP); constexpr auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP);
memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup); memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup);
return {}; return {};
} }
if (ctx->CONFIGS()) {
ThrowIfExists(memory_, KafkaConfigKey::CONFIGS);
constexpr auto configs_key = static_cast<uint8_t>(KafkaConfigKey::CONFIGS);
memory_.emplace(configs_key, ctx->configsMap->accept(this).as<std::unordered_map<Expression *, Expression *>>());
return {};
}
if (ctx->CREDENTIALS()) {
ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS);
constexpr auto credentials_key = static_cast<uint8_t>(KafkaConfigKey::CREDENTIALS);
memory_.emplace(credentials_key,
ctx->credentialsMap->accept(this).as<std::unordered_map<Expression *, Expression *>>());
return {};
}
MG_ASSERT(ctx->BOOTSTRAP_SERVERS()); MG_ASSERT(ctx->BOOTSTRAP_SERVERS());
ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS); ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS);
if (!ctx->bootstrapServers->StringLiteral()) { if (!ctx->bootstrapServers->StringLiteral()) {

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -269,6 +269,16 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/ */
antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override; antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) override;
/** /**
* @return StreamQuery* * @return StreamQuery*
*/ */
@ -849,7 +859,9 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
ParsingContext context_; ParsingContext context_;
AstStorage *storage_; AstStorage *storage_;
std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>>> memory_; std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>,
std::unordered_map<Expression *, Expression *>>>
memory_;
// Set of identifiers from queries. // Set of identifiers from queries.
std::unordered_set<std::string> users_identifiers; std::unordered_set<std::string> users_identifiers;
// Identifiers that user didn't name. // Identifiers that user didn't name.

View File

@ -35,7 +35,9 @@ memgraphCypherKeyword : cypherKeyword
| COMMIT | COMMIT
| COMMITTED | COMMITTED
| CONFIG | CONFIG
| CONFIGS
| CONSUMER_GROUP | CONSUMER_GROUP
| CREDENTIALS
| CSV | CSV
| DATA | DATA
| DELIMITER | DELIMITER
@ -306,9 +308,15 @@ commonCreateStreamConfig : TRANSFORM transformationName=procedureName
createStream : kafkaCreateStream | pulsarCreateStream ; createStream : kafkaCreateStream | pulsarCreateStream ;
configKeyValuePair : literal ':' literal ;
configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ;
kafkaCreateStreamConfig : TOPICS topicNames kafkaCreateStreamConfig : TOPICS topicNames
| CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus | CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus
| BOOTSTRAP_SERVERS bootstrapServers=literal | BOOTSTRAP_SERVERS bootstrapServers=literal
| CONFIGS configsMap=configMap
| CREDENTIALS credentialsMap=configMap
| commonCreateStreamConfig | commonCreateStreamConfig
; ;

View File

@ -40,7 +40,9 @@ CLEAR : C L E A R ;
COMMIT : C O M M I T ; COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ; COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ; CONFIG : C O N F I G ;
CONFIGS : C O N F I G S;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CREDENTIALS : C R E D E N T I A L S ;
CSV : C S V ; CSV : C S V ;
DATA : D A T A ; DATA : D A T A ;
DELIMITER : D E L I M I T E R ; DELIMITER : D E L I M I T E R ;

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -564,10 +564,26 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
} }
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator); auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
const auto get_config_map = [&evaluator](std::unordered_map<Expression *, Expression *> map,
std::string_view map_name) -> std::unordered_map<std::string, std::string> {
std::unordered_map<std::string, std::string> config_map;
for (const auto [key_expr, value_expr] : map) {
const auto key = key_expr->Accept(evaluator);
const auto value = value_expr->Accept(evaluator);
if (!key.IsString() || !value.IsString()) {
throw SemanticException("{} must contain only string keys and values!", map_name);
}
config_map.emplace(key.ValueString(), value.ValueString());
}
return config_map;
};
return [interpreter_context, stream_name = stream_query->stream_name_, return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_), topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info), consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable { bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username),
configs = get_config_map(stream_query->configs_, "Configs"),
credentials = get_config_map(stream_query->credentials_, "Credentials")]() mutable {
std::string bootstrap = bootstrap_servers std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers) ? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers}; : std::string{interpreter_context->config.default_kafka_bootstrap_servers};
@ -575,7 +591,9 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
{.common_info = std::move(common_stream_info), {.common_info = std::move(common_stream_info),
.topics = std::move(topic_names), .topics = std::move(topic_names),
.consumer_group = std::move(consumer_group), .consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)}, .bootstrap_servers = std::move(bootstrap),
.configs = std::move(configs),
.credentials = std::move(credentials)},
std::move(owner)); std::move(owner));
return std::vector<std::vector<TypedValue>>{}; return std::vector<std::vector<TypedValue>>{};

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,6 +13,8 @@
#include <json/json.hpp> #include <json/json.hpp>
#include "integrations/constants.hpp"
namespace query::stream { namespace query::stream {
KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info, KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::kafka::Message> consumer_function) { ConsumerFunction<integrations::kafka::Message> consumer_function) {
@ -23,6 +25,8 @@ KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
.bootstrap_servers = std::move(stream_info.bootstrap_servers), .bootstrap_servers = std::move(stream_info.bootstrap_servers),
.batch_interval = stream_info.common_info.batch_interval, .batch_interval = stream_info.common_info.batch_interval,
.batch_size = stream_info.common_info.batch_size, .batch_size = stream_info.common_info.batch_size,
.public_configs = std::move(stream_info.configs),
.private_configs = std::move(stream_info.credentials),
}; };
consumer_.emplace(std::move(consumer_info), std::move(consumer_function)); consumer_.emplace(std::move(consumer_info), std::move(consumer_function));
}; };
@ -34,7 +38,9 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const
.transformation_name = std::move(transformation_name)}, .transformation_name = std::move(transformation_name)},
.topics = info.topics, .topics = info.topics,
.consumer_group = info.consumer_group, .consumer_group = info.consumer_group,
.bootstrap_servers = info.bootstrap_servers}; .bootstrap_servers = info.bootstrap_servers,
.configs = info.public_configs,
.credentials = info.private_configs};
} }
void KafkaStream::Start() { consumer_->Start(); } void KafkaStream::Start() { consumer_->Start(); }
@ -54,6 +60,10 @@ namespace {
const std::string kTopicsKey{"topics"}; const std::string kTopicsKey{"topics"};
const std::string kConsumerGroupKey{"consumer_group"}; const std::string kConsumerGroupKey{"consumer_group"};
const std::string kBoostrapServers{"bootstrap_servers"}; const std::string kBoostrapServers{"bootstrap_servers"};
const std::string kConfigs{"configs"};
const std::string kCredentials{"credentials"};
const std::unordered_map<std::string, std::string> kDefaultConfigsMap;
} // namespace } // namespace
void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) { void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
@ -61,6 +71,8 @@ void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
data[kTopicsKey] = std::move(info.topics); data[kTopicsKey] = std::move(info.topics);
data[kConsumerGroupKey] = info.consumer_group; data[kConsumerGroupKey] = info.consumer_group;
data[kBoostrapServers] = std::move(info.bootstrap_servers); data[kBoostrapServers] = std::move(info.bootstrap_servers);
data[kConfigs] = std::move(info.configs);
data[kCredentials] = std::move(info.credentials);
} }
void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) { void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
@ -68,6 +80,9 @@ void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
data.at(kTopicsKey).get_to(info.topics); data.at(kTopicsKey).get_to(info.topics);
data.at(kConsumerGroupKey).get_to(info.consumer_group); data.at(kConsumerGroupKey).get_to(info.consumer_group);
data.at(kBoostrapServers).get_to(info.bootstrap_servers); data.at(kBoostrapServers).get_to(info.bootstrap_servers);
// These values might not be present in the persisted JSON object
info.configs = data.value(kConfigs, kDefaultConfigsMap);
info.credentials = data.value(kCredentials, kDefaultConfigsMap);
} }
PulsarStream::PulsarStream(std::string stream_name, StreamInfo stream_info, PulsarStream::PulsarStream(std::string stream_name, StreamInfo stream_info,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -24,6 +24,8 @@ struct KafkaStream {
std::vector<std::string> topics; std::vector<std::string> topics;
std::string consumer_group; std::string consumer_group;
std::string bootstrap_servers; std::string bootstrap_servers;
std::unordered_map<std::string, std::string> configs;
std::unordered_map<std::string, std::string> credentials;
}; };
using Message = integrations::kafka::Message; using Message = integrations::kafka::Message;

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -18,6 +18,7 @@
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <json/json.hpp> #include <json/json.hpp>
#include "integrations/constants.hpp"
#include "mg_procedure.h" #include "mg_procedure.h"
#include "query/db_accessor.hpp" #include "query/db_accessor.hpp"
#include "query/discard_value_stream.hpp" #include "query/discard_value_stream.hpp"
@ -29,6 +30,7 @@
#include "query/stream/sources.hpp" #include "query/stream/sources.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "utils/event_counter.hpp" #include "utils/event_counter.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp" #include "utils/memory.hpp"
#include "utils/on_scope_exit.hpp" #include "utils/on_scope_exit.hpp"
#include "utils/pmr/string.hpp" #include "utils/pmr/string.hpp"
@ -208,10 +210,12 @@ void Streams::RegisterKafkaProcedures() {
constexpr std::string_view consumer_group_result_name = "consumer_group"; constexpr std::string_view consumer_group_result_name = "consumer_group";
constexpr std::string_view topics_result_name = "topics"; constexpr std::string_view topics_result_name = "topics";
constexpr std::string_view bootstrap_servers_result_name = "bootstrap_servers"; constexpr std::string_view bootstrap_servers_result_name = "bootstrap_servers";
constexpr std::string_view configs_result_name = "configs";
constexpr std::string_view credentials_result_name = "credentials";
auto get_stream_info = [this, proc_name, consumer_group_result_name, topics_result_name, auto get_stream_info = [this, proc_name, consumer_group_result_name, topics_result_name,
bootstrap_servers_result_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, bootstrap_servers_result_name, configs_result_name, credentials_result_name](
mgp_memory *memory) { mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, mgp_memory *memory) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0); auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name); const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto lock_ptr = streams_.Lock(); auto lock_ptr = streams_.Lock();
@ -259,13 +263,12 @@ void Streams::RegisterKafkaProcedures() {
procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy}; procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy};
{ {
const auto success = procedure::TryOrSetError( const auto success = procedure::TryOrSetError(
[&] { [&] { return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.get()); },
return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release());
},
result); result);
if (!success) { if (!success) {
return; return;
} }
static_cast<void>(topic_names.release());
} }
const auto bootstrap_servers_value = const auto bootstrap_servers_value =
@ -274,6 +277,57 @@ void Streams::RegisterKafkaProcedures() {
return; return;
} }
const auto convert_config_map =
[result, memory](const std::unordered_map<std::string, std::string> &configs_to_convert)
-> procedure::MgpUniquePtr<mgp_value> {
procedure::MgpUniquePtr<mgp_value> configs_value{nullptr, mgp_value_destroy};
procedure::MgpUniquePtr<mgp_map> configs{nullptr, mgp_map_destroy};
{
const auto success = procedure::TryOrSetError(
[&] { return procedure::CreateMgpObject(configs, mgp_map_make_empty, memory); }, result);
if (!success) {
return configs_value;
}
}
for (const auto &[key, value] : configs_to_convert) {
auto value_value = procedure::GetStringValueOrSetError(value.c_str(), memory, result);
if (!value_value) {
return configs_value;
}
configs->items.emplace(key, std::move(*value_value));
}
{
const auto success = procedure::TryOrSetError(
[&] { return procedure::CreateMgpObject(configs_value, mgp_value_make_map, configs.get()); },
result);
if (!success) {
return configs_value;
}
static_cast<void>(configs.release());
}
return configs_value;
};
const auto configs_value = convert_config_map(info.configs);
if (configs_value == nullptr) {
return;
}
using CredentialsType = decltype(KafkaStream::StreamInfo::credentials);
CredentialsType reducted_credentials;
std::transform(info.credentials.begin(), info.credentials.end(),
std::inserter(reducted_credentials, reducted_credentials.end()),
[](const auto &pair) -> CredentialsType::value_type {
return {pair.first, integrations::kReducted};
});
const auto credentials_value = convert_config_map(reducted_credentials);
if (credentials_value == nullptr) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, consumer_group_result_name.data(), if (!procedure::InsertResultOrSetError(result, record, consumer_group_result_name.data(),
consumer_group_value.get())) { consumer_group_value.get())) {
return; return;
@ -287,6 +341,16 @@ void Streams::RegisterKafkaProcedures() {
bootstrap_servers_value.get())) { bootstrap_servers_value.get())) {
return; return;
} }
if (!procedure::InsertResultOrSetError(result, record, configs_result_name.data(),
configs_value.get())) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, credentials_result_name.data(),
credentials_value.get())) {
return;
}
}, },
[proc_name](auto && /*other*/) { [proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name); throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
@ -305,6 +369,10 @@ void Streams::RegisterKafkaProcedures() {
MGP_ERROR_NO_ERROR); MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(), MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR); procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, configs_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, credentials_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc)); procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
} }

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -31,6 +31,7 @@
#include "utils/rw_lock.hpp" #include "utils/rw_lock.hpp"
#include "utils/synchronized.hpp" #include "utils/synchronized.hpp"
class StreamsTest;
namespace query { namespace query {
struct InterpreterContext; struct InterpreterContext;
@ -73,6 +74,8 @@ using TransformationResult = std::vector<std::vector<TypedValue>>;
/// ///
/// This class is responsible for all query supported actions to happen. /// This class is responsible for all query supported actions to happen.
class Streams final { class Streams final {
friend StreamsTest;
public: public:
/// Initializes the streams. /// Initializes the streams.
/// ///

View File

@ -421,10 +421,13 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
assert comparison_check("Final Message", res[0]) assert comparison_check("Final Message", res[0])
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
def test_info_procedure(kafka_topics, connection): def test_info_procedure(kafka_topics, connection):
cursor = connection.cursor() cursor = connection.cursor()
stream_name = 'test_stream' stream_name = 'test_stream'
configs = {"sasl.username": "michael.scott"}
local = "localhost:9092" local = "localhost:9092"
credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
consumer_group = "ConsumerGr" consumer_group = "ConsumerGr"
common.execute_and_fetch_all( common.execute_and_fetch_all(
cursor, cursor,
@ -432,13 +435,21 @@ def test_info_procedure(kafka_topics, connection):
f"TOPICS {','.join(kafka_topics)} " f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM pulsar_transform.simple " f"TRANSFORM pulsar_transform.simple "
f"CONSUMER_GROUP {consumer_group} " f"CONSUMER_GROUP {consumer_group} "
f"BOOTSTRAP_SERVERS '{local}'" f"BOOTSTRAP_SERVERS '{local}' "
f"CONFIGS {configs} "
f"CREDENTIALS {credentials}"
) )
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *") stream_info = common.execute_and_fetch_all(
cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *")
expected_stream_info = [(local, consumer_group, kafka_topics)] reducted_credentials = {key: "<REDUCTED>" for
key in credentials.keys()}
expected_stream_info = [
(local, configs, consumer_group, reducted_credentials, kafka_topics)]
common.validate_info(stream_info, expected_stream_info) common.validate_info(stream_info, expected_stream_info)
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"])) sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -9,20 +9,10 @@
// by the Apache License, Version 2.0, included in the file // by the Apache License, Version 2.0, included in the file
// licenses/APL.txt. // licenses/APL.txt.
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//
#include <algorithm> #include <algorithm>
#include <climits> #include <climits>
#include <limits> #include <limits>
#include <optional>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <variant> #include <variant>
@ -38,6 +28,7 @@
#include <json/json.hpp> #include <json/json.hpp>
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
#include <antlr4-runtime.h> #include <antlr4-runtime.h>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
@ -87,23 +78,37 @@ class Base {
} }
} }
TypedValue GetLiteral(Expression *expression, const bool use_parameter_lookup,
const std::optional<int> &token_position = std::nullopt) const {
if (use_parameter_lookup) {
auto *param_lookup = dynamic_cast<ParameterLookup *>(expression);
if (param_lookup == nullptr) {
ADD_FAILURE();
return {};
}
if (token_position) {
EXPECT_EQ(param_lookup->token_position_, *token_position);
}
return TypedValue(parameters_.AtTokenPosition(param_lookup->token_position_));
}
auto *literal = dynamic_cast<PrimitiveLiteral *>(expression);
if (literal == nullptr) {
ADD_FAILURE();
return {};
}
if (token_position) {
EXPECT_EQ(literal->token_position_, *token_position);
}
return TypedValue(literal->value_);
}
template <class TValue> template <class TValue>
void CheckLiteral(Expression *expression, const TValue &expected, void CheckLiteral(Expression *expression, const TValue &expected,
const std::optional<int> &token_position = std::nullopt) const { const std::optional<int> &token_position = std::nullopt) const {
TypedValue value;
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
TypedValue expected_tv(expected); TypedValue expected_tv(expected);
if (!expected_tv.IsNull() && context_.is_query_cached) { const auto use_parameter_lookup = !expected_tv.IsNull() && context_.is_query_cached;
auto *param_lookup = dynamic_cast<ParameterLookup *>(expression); TypedValue value = GetLiteral(expression, use_parameter_lookup, token_position);
ASSERT_TRUE(param_lookup);
if (token_position) EXPECT_EQ(param_lookup->token_position_, *token_position);
value = TypedValue(parameters_.AtTokenPosition(param_lookup->token_position_));
} else {
auto *literal = dynamic_cast<PrimitiveLiteral *>(expression);
ASSERT_TRUE(literal);
if (token_position) ASSERT_EQ(literal->token_position_, *token_position);
value = TypedValue(literal->value_);
}
EXPECT_TRUE(TypedValue::BoolEqual{}(value, expected_tv)); EXPECT_TRUE(TypedValue::BoolEqual{}(value, expected_tv));
} }
}; };
@ -3580,6 +3585,8 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer
EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr); EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr);
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit)); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit));
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout)); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout));
EXPECT_TRUE(parsed_query->configs_.empty());
EXPECT_TRUE(parsed_query->credentials_.empty());
} }
TEST_P(CypherMainVisitorTest, DropStream) { TEST_P(CypherMainVisitorTest, DropStream) {
@ -3661,7 +3668,9 @@ void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &quer
const std::string_view transform_name, const std::string_view consumer_group, const std::string_view transform_name, const std::string_view consumer_group,
const std::optional<TypedValue> &batch_interval, const std::optional<TypedValue> &batch_interval,
const std::optional<TypedValue> &batch_size, const std::optional<TypedValue> &batch_size,
const std::string_view bootstrap_servers = "") { const std::string_view bootstrap_servers,
const std::unordered_map<std::string, std::string> &configs,
const std::unordered_map<std::string, std::string> &credentials) {
SCOPED_TRACE(query_string); SCOPED_TRACE(query_string);
StreamQuery *parsed_query{nullptr}; StreamQuery *parsed_query{nullptr};
ASSERT_NO_THROW(parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string))) << query_string; ASSERT_NO_THROW(parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string))) << query_string;
@ -3675,14 +3684,31 @@ void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &quer
EXPECT_EQ(parsed_query->batch_limit_, nullptr); EXPECT_EQ(parsed_query->batch_limit_, nullptr);
if (bootstrap_servers.empty()) { if (bootstrap_servers.empty()) {
EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr); EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr);
return; } else {
EXPECT_NE(parsed_query->bootstrap_servers_, nullptr);
} }
EXPECT_NE(parsed_query->bootstrap_servers_, nullptr);
const auto evaluate_config_map = [&ast_generator](const std::unordered_map<Expression *, Expression *> &config_map) {
std::unordered_map<std::string, std::string> evaluated_config_map;
const auto expr_to_str = [&ast_generator](Expression *expression) {
return std::string{ast_generator.GetLiteral(expression, ast_generator.context_.is_query_cached).ValueString()};
};
std::transform(config_map.begin(), config_map.end(),
std::inserter(evaluated_config_map, evaluated_config_map.end()),
[&expr_to_str](const auto expr_pair) {
return std::pair{expr_to_str(expr_pair.first), expr_to_str(expr_pair.second)};
});
return evaluated_config_map;
};
using testing::UnorderedElementsAreArray;
EXPECT_THAT(evaluate_config_map(parsed_query->configs_), UnorderedElementsAreArray(configs.begin(), configs.end()));
EXPECT_THAT(evaluate_config_map(parsed_query->credentials_),
UnorderedElementsAreArray(credentials.begin(), credentials.end()));
} }
TEST_P(CypherMainVisitorTest, CreateKafkaStream) { TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
auto &ast_generator = *GetParam(); auto &ast_generator = *GetParam();
TestInvalidQuery("CREATE KAFKA STREAM", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator);
@ -3709,6 +3735,13 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092", TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092",
ast_generator); ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator); TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator);
// the keys must be string literals
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONFIGS { symbolicname : 'string' }",
ast_generator);
TestInvalidQuery(
"CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CREDENTIALS { symbolicname : 'string' }",
ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CREDENTIALS 2", ast_generator);
const std::vector<std::string> topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots", const std::vector<std::string> topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots",
"topic-name.with-multiple.dots-and-dashes"}; "topic-name.with-multiple.dots-and-dashes"};
@ -3728,34 +3761,37 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
ValidateCreateKafkaStreamQuery( ValidateCreateKafkaStreamQuery(
ast_generator, ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName), fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt); kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt, {}, {}, {});
ValidateCreateKafkaStreamQuery(ast_generator, ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ", fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup), kStreamName, topic_names_as_str, kTransformName, kConsumerGroup),
kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, std::nullopt,
std::nullopt); {}, {}, {});
ValidateCreateKafkaStreamQuery(ast_generator, ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TRANSFORM {} TOPICS {} BATCH_INTERVAL {}", fmt::format("CREATE KAFKA STREAM {} TRANSFORM {} TOPICS {} BATCH_INTERVAL {}",
kStreamName, kTransformName, topic_names_as_str, kBatchInterval), kStreamName, kTransformName, topic_names_as_str, kBatchInterval),
kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt); kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt, {},
{}, {});
ValidateCreateKafkaStreamQuery(ast_generator, ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} BATCH_SIZE {} TOPICS {} TRANSFORM {}", fmt::format("CREATE KAFKA STREAM {} BATCH_SIZE {} TOPICS {} TRANSFORM {}",
kStreamName, kBatchSize, topic_names_as_str, kTransformName), kStreamName, kBatchSize, topic_names_as_str, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value, {}, {},
{});
ValidateCreateKafkaStreamQuery(ast_generator, ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS '{}' BATCH_SIZE {} TRANSFORM {}", fmt::format("CREATE KAFKA STREAM {} TOPICS '{}' BATCH_SIZE {} TRANSFORM {}",
kStreamName, topic_names_as_str, kBatchSize, kTransformName), kStreamName, topic_names_as_str, kBatchSize, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value, {}, {},
{});
ValidateCreateKafkaStreamQuery( ValidateCreateKafkaStreamQuery(
ast_generator, ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}", fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize), kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value); kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, {}, {}, {});
using namespace std::string_literals; using namespace std::string_literals;
const auto host1 = "localhost:9094"s; const auto host1 = "localhost:9094"s;
ValidateCreateKafkaStreamQuery( ValidateCreateKafkaStreamQuery(
@ -3763,14 +3799,16 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
fmt::format("CREATE KAFKA STREAM {} TOPICS {} CONSUMER_GROUP {} BATCH_SIZE {} BATCH_INTERVAL {} TRANSFORM {} " fmt::format("CREATE KAFKA STREAM {} TOPICS {} CONSUMER_GROUP {} BATCH_SIZE {} BATCH_INTERVAL {} TRANSFORM {} "
"BOOTSTRAP_SERVERS '{}'", "BOOTSTRAP_SERVERS '{}'",
kStreamName, topic_names_as_str, kConsumerGroup, kBatchSize, kBatchInterval, kTransformName, host1), kStreamName, topic_names_as_str, kConsumerGroup, kBatchSize, kBatchInterval, kTransformName, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1, {},
{});
ValidateCreateKafkaStreamQuery( ValidateCreateKafkaStreamQuery(
ast_generator, ast_generator,
fmt::format("CREATE KAFKA STREAM {} CONSUMER_GROUP {} TOPICS {} BATCH_INTERVAL {} TRANSFORM {} BATCH_SIZE {} " fmt::format("CREATE KAFKA STREAM {} CONSUMER_GROUP {} TOPICS {} BATCH_INTERVAL {} TRANSFORM {} BATCH_SIZE {} "
"BOOTSTRAP_SERVERS '{}'", "BOOTSTRAP_SERVERS '{}'",
kStreamName, kConsumerGroup, topic_names_as_str, kBatchInterval, kTransformName, kBatchSize, host1), kStreamName, kConsumerGroup, topic_names_as_str, kBatchInterval, kTransformName, kBatchSize, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1, {},
{});
const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s; const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s;
ValidateCreateKafkaStreamQuery( ValidateCreateKafkaStreamQuery(
@ -3778,7 +3816,8 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
fmt::format("CREATE KAFKA STREAM {} TOPICS {} BOOTSTRAP_SERVERS '{}' CONSUMER_GROUP {} TRANSFORM {} " fmt::format("CREATE KAFKA STREAM {} TOPICS {} BOOTSTRAP_SERVERS '{}' CONSUMER_GROUP {} TRANSFORM {} "
"BATCH_INTERVAL {} BATCH_SIZE {}", "BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, host2, kConsumerGroup, kTransformName, kBatchInterval, kBatchSize), kStreamName, topic_names_as_str, host2, kConsumerGroup, kTransformName, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2); kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2, {},
{});
}; };
for (const auto &topic_name : topic_names) { for (const auto &topic_name : topic_names) {
@ -3793,7 +3832,7 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}", fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}",
kStreamName, kTopicName, kTransformName, consumer_group), kStreamName, kTopicName, kTransformName, consumer_group),
kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt, kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt,
std::nullopt); std::nullopt, {}, {}, {});
}; };
using namespace std::literals; using namespace std::literals;
@ -3803,6 +3842,44 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
for (const auto consumer_group : consumer_groups) { for (const auto consumer_group : consumer_groups) {
EXPECT_NO_FATAL_FAILURE(check_consumer_group(consumer_group)); EXPECT_NO_FATAL_FAILURE(check_consumer_group(consumer_group));
} }
auto check_config_map = [&](const std::unordered_map<std::string, std::string> &config_map) {
const std::string kTopicName{"topic1"};
const auto map_as_str = std::invoke([&config_map] {
std::stringstream buffer;
buffer << '{';
if (!config_map.empty()) {
auto it = config_map.begin();
buffer << fmt::format("'{}': '{}'", it->first, it->second);
for (; it != config_map.end(); ++it) {
buffer << fmt::format(", '{}': '{}'", it->first, it->second);
}
}
buffer << '}';
return std::move(buffer).str();
});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONFIGS {}", kStreamName,
kTopicName, kTransformName, map_as_str),
kStreamName, {kTopicName}, kTransformName, "", std::nullopt, std::nullopt, {},
config_map, {});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CREDENTIALS {}",
kStreamName, kTopicName, kTransformName, map_as_str),
kStreamName, {kTopicName}, kTransformName, "", std::nullopt, std::nullopt, {}, {},
config_map);
};
const std::array config_maps = {std::unordered_map<std::string, std::string>{},
std::unordered_map<std::string, std::string>{{"key", "value"}},
std::unordered_map<std::string, std::string>{{"key.with.dot", "value.with.doth"},
{"key with space", "value with space"}}};
for (const auto &map_to_test : config_maps) {
EXPECT_NO_FATAL_FAILURE(check_config_map(map_to_test));
}
} }
void ValidateCreatePulsarStreamQuery(Base &ast_generator, const std::string &query_string, void ValidateCreatePulsarStreamQuery(Base &ast_generator, const std::string &query_string,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -59,6 +59,8 @@ struct ConsumerTest : public ::testing::Test {
.bootstrap_servers = cluster.Bootstraps(), .bootstrap_servers = cluster.Bootstraps(),
.batch_interval = kDefaultBatchInterval, .batch_interval = kDefaultBatchInterval,
.batch_size = kDefaultBatchSize, .batch_size = kDefaultBatchSize,
.public_configs = {},
.private_configs = {},
}; };
}; };

View File

@ -1,15 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Copyright 2021 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,12 +15,15 @@
#include <utility> #include <utility>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "integrations/constants.hpp"
#include "integrations/kafka/exceptions.hpp" #include "integrations/kafka/exceptions.hpp"
#include "kafka_mock.hpp" #include "kafka_mock.hpp"
#include "query/config.hpp" #include "query/config.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "query/stream/streams.hpp" #include "query/stream/streams.hpp"
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "test_utils.hpp"
using Streams = query::stream::Streams; using Streams = query::stream::Streams;
using StreamInfo = query::stream::KafkaStream::StreamInfo; using StreamInfo = query::stream::KafkaStream::StreamInfo;
@ -78,6 +81,17 @@ class StreamsTest : public ::testing::Test {
EXPECT_EQ(check_data.is_running, status.is_running); EXPECT_EQ(check_data.is_running, status.is_running);
} }
void CheckConfigAndCredentials(const StreamCheckData &check_data) {
const auto locked_streams = streams_->streams_.ReadLock();
const auto &stream = locked_streams->at(check_data.name);
const auto *stream_data = std::get_if<query::stream::Streams::StreamData<query::stream::KafkaStream>>(&stream);
ASSERT_NE(stream_data, nullptr);
const auto stream_info =
stream_data->stream_source->ReadLock()->Info(check_data.info.common_info.transformation_name);
EXPECT_TRUE(
std::equal(check_data.info.configs.begin(), check_data.info.configs.end(), stream_info.configs.begin()));
}
void StartStream(StreamCheckData &check_data) { void StartStream(StreamCheckData &check_data) {
streams_->Start(check_data.name); streams_->Start(check_data.name);
check_data.is_running = true; check_data.is_running = true;
@ -183,6 +197,18 @@ TEST_F(StreamsTest, RestoreStreams) {
stream_info.common_info.batch_interval = std::chrono::milliseconds((i + 1) * 10); stream_info.common_info.batch_interval = std::chrono::milliseconds((i + 1) * 10);
stream_info.common_info.batch_size = 1000 + i; stream_info.common_info.batch_size = 1000 + i;
stream_check_data.owner = std::string{"owner"} + iteration_postfix; stream_check_data.owner = std::string{"owner"} + iteration_postfix;
// These are just random numbers to make the CONFIGS and CREDENTIALS map vary between consumers:
// - 0 means no config, no credential
// - 1 means only config
// - 2 means only credential
// - 3 means both configuration and credential is set
if (i == 1 || i == 3) {
stream_info.configs.emplace(std::string{"sasl.username"}, std::string{"username"} + iteration_postfix);
}
if (i == 2 || i == 3) {
stream_info.credentials.emplace(std::string{"sasl.password"}, std::string{"password"} + iteration_postfix);
}
} }
mock_cluster_.CreateTopic(stream_info.topics[0]); mock_cluster_.CreateTopic(stream_info.topics[0]);
@ -198,6 +224,7 @@ TEST_F(StreamsTest, RestoreStreams) {
EXPECT_EQ(stream_check_datas.size(), streams_->GetStreamInfo().size()); EXPECT_EQ(stream_check_datas.size(), streams_->GetStreamInfo().size());
for (const auto &check_data : stream_check_datas) { for (const auto &check_data : stream_check_datas) {
ASSERT_NO_FATAL_FAILURE(CheckStreamStatus(check_data)); ASSERT_NO_FATAL_FAILURE(CheckStreamStatus(check_data));
ASSERT_NO_FATAL_FAILURE(CheckConfigAndCredentials(check_data));
} }
}; };
@ -253,3 +280,32 @@ TEST_F(StreamsTest, CheckWithTimeout) {
EXPECT_LE(timeout, elapsed); EXPECT_LE(timeout, elapsed);
EXPECT_LE(elapsed, timeout * 1.2); EXPECT_LE(elapsed, timeout * 1.2);
} }
TEST_F(StreamsTest, CheckInvalidConfig) {
auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
constexpr auto kInvalidConfigName = "doesnt.exist";
constexpr auto kConfigValue = "myprecious";
stream_info.configs.emplace(kInvalidConfigName, kConfigValue);
const auto checker = [](const std::string_view message) {
EXPECT_TRUE(message.find(kInvalidConfigName) != std::string::npos) << message;
EXPECT_TRUE(message.find(kConfigValue) != std::string::npos) << message;
};
EXPECT_THROW_WITH_MSG(streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt),
integrations::kafka::SettingCustomConfigFailed, checker);
}
TEST_F(StreamsTest, CheckInvalidCredentials) {
auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
constexpr auto kInvalidCredentialName = "doesnt.exist";
constexpr auto kCredentialValue = "myprecious";
stream_info.credentials.emplace(kInvalidCredentialName, kCredentialValue);
const auto checker = [](const std::string_view message) {
EXPECT_TRUE(message.find(kInvalidCredentialName) != std::string::npos) << message;
EXPECT_TRUE(message.find(integrations::kReducted) != std::string::npos) << message;
EXPECT_TRUE(message.find(kCredentialValue) == std::string::npos) << message;
};
EXPECT_THROW_WITH_MSG(streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt),
integrations::kafka::SettingCustomConfigFailed, checker);
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd. // Copyright 2022 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -34,4 +34,16 @@ TResult ExpectNoError(const char *file, int line, TFunc func, TArgs &&...args) {
} }
} // namespace test_utils } // namespace test_utils
#define EXPECT_MGP_NO_ERROR(type, ...) test_utils::ExpectNoError<type>(__FILE__, __LINE__, __VA_ARGS__) #define EXPECT_MGP_NO_ERROR(type, ...) test_utils::ExpectNoError<type>(__FILE__, __LINE__, __VA_ARGS__)
#define EXPECT_THROW_WITH_MSG(statement, expected_exception, msg_checker) \
EXPECT_THROW( \
{ \
try { \
statement; \
} catch (const expected_exception &e) { \
EXPECT_NO_FATAL_FAILURE(msg_checker(e.what())); \
throw; \
} \
}, \
expected_exception);