Add CONFIGS and CREDENTIALS to CREATE KAFKA STREAM

This commit is contained in:
János Benjamin Antal 2022-01-19 15:24:10 +01:00
parent e1f31d3d02
commit b9dd12c88c
10 changed files with 108 additions and 9 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -30,6 +30,8 @@
namespace integrations::kafka {
namespace {
const std::string kReducted = "<REDUCTED>";
utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer,
const ConsumerInfo &info,
std::atomic<bool> &is_running) {
@ -126,6 +128,18 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
std::string error;
for (const auto &[key, value] : info_.public_configs) {
if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) {
throw SettingCustomConfigFailed(info_.consumer_name, error, key, value);
}
}
for (const auto &[key, value] : info_.private_configs) {
if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) {
throw SettingCustomConfigFailed(info_.consumer_name, error, key, kReducted);
}
}
if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -86,6 +86,8 @@ struct ConsumerInfo {
std::string bootstrap_servers;
std::chrono::milliseconds batch_interval;
int64_t batch_size;
std::unordered_map<std::string, std::string> public_configs;
std::unordered_map<std::string, std::string> private_configs;
};
/// Memgraphs Kafka consumer wrapper.

View File

@ -26,6 +26,14 @@ class ConsumerFailedToInitializeException : public KafkaStreamException {
: KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {}
};
class SettingCustomConfigFailed : public KafkaStreamException {
public:
SettingCustomConfigFailed(const std::string &consumer_name, const std::string &error, const std::string &key,
const std::string &value)
: KafkaStreamException(R"(Failed to set custom config ("{}": "{}") for Kafka consumer {} : {})", key, value,
consumer_name, error) {}
};
class ConsumerRunningException : public KafkaStreamException {
public:
explicit ConsumerRunningException(const std::string &consumer_name)

View File

@ -2531,7 +2531,7 @@ cpp<#
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(topic_names "std::variant<Expression*, std::vector<std::string>>" :initval "nullptr"
(topic_names "std::variant<Expression*, std::vector<std::string>>" :initval "nullptr"
:clone #'clone-variant-topic-names
:scope :public)
(consumer_group "std::string" :scope :public)
@ -2540,6 +2540,14 @@ cpp<#
:slk-load (slk-load-ast-pointer "Expression"))
(service_url "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(configs "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(credentials "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))

View File

@ -564,7 +564,7 @@ std::string_view ToString(const CommonStreamConfigKey key) {
__VA_ARGS__ \
};
GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS);
GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS, CONFIGS, CREDENTIALS);
std::string_view ToString(const KafkaConfigKey key) {
switch (key) {
@ -574,6 +574,10 @@ std::string_view ToString(const KafkaConfigKey key) {
return "CONSUMER_GROUP";
case KafkaConfigKey::BOOTSTRAP_SERVERS:
return "BOOTSTRAP_SERVERS";
case KafkaConfigKey::CONFIGS:
return "CONFIGS";
case KafkaConfigKey::CREDENTIALS:
return "CREDENTIALS";
}
}
@ -597,6 +601,8 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCre
MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_);
MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_);
MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_);
MapConfig<false, Expression *>(memory_, KafkaConfigKey::CONFIGS, stream_query->configs_);
MapConfig<false, Expression *>(memory_, KafkaConfigKey::CREDENTIALS, stream_query->credentials_);
MapCommonStreamConfigs(memory_, *stream_query);
@ -632,18 +638,38 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::Ka
if (ctx->TOPICS()) {
ThrowIfExists(memory_, KafkaConfigKey::TOPICS);
const auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS);
constexpr auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS);
GetTopicNames(memory_[topics_key], ctx->topicNames(), *this);
return {};
}
if (ctx->CONSUMER_GROUP()) {
ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP);
const auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP);
constexpr auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP);
memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup);
return {};
}
if (ctx->CONFIGS()) {
if (!ctx->configsMap->mapLiteral()) {
throw SemanticException("Configs must be a map literal!");
}
ThrowIfExists(memory_, KafkaConfigKey::CONFIGS);
constexpr auto configs_key = static_cast<uint8_t>(KafkaConfigKey::CONFIGS);
memory_.emplace(configs_key, ctx->configsMap->accept(this).as<Expression *>());
return {};
}
if (ctx->CREDENTIALS()) {
if (!ctx->credentialsMap->mapLiteral()) {
throw SemanticException("Credentials must be a map literal!");
}
ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS);
constexpr auto credentials_key = static_cast<uint8_t>(KafkaConfigKey::CREDENTIALS);
memory_.emplace(credentials_key, ctx->credentialsMap->accept(this).as<Expression *>());
return {};
}
MG_ASSERT(ctx->BOOTSTRAP_SERVERS());
ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS);
if (!ctx->bootstrapServers->StringLiteral()) {

View File

@ -35,7 +35,9 @@ memgraphCypherKeyword : cypherKeyword
| COMMIT
| COMMITTED
| CONFIG
| CONFIGS
| CONSUMER_GROUP
| CREDENTIALS
| CSV
| DATA
| DELIMITER
@ -309,6 +311,8 @@ createStream : kafkaCreateStream | pulsarCreateStream ;
kafkaCreateStreamConfig : TOPICS topicNames
| CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus
| BOOTSTRAP_SERVERS bootstrapServers=literal
| CONFIGS configsMap=literal
| CREDENTIALS credentialsMap=literal
| commonCreateStreamConfig
;

View File

@ -40,7 +40,9 @@ CLEAR : C L E A R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
CONFIGS : C O N F I G S;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CREDENTIALS : C R E D E N T I A L S ;
CSV : C S V ;
DATA : D A T A ;
DELIMITER : D E L I M I T E R ;

View File

@ -564,10 +564,29 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
const auto get_config_map = [&evaluator](Expression *config_literal,
std::string_view map_name) -> std::unordered_map<std::string, std::string> {
if (config_literal == nullptr) {
return {};
}
const auto evaluated_config = config_literal->Accept(evaluator);
MG_ASSERT(evaluated_config.IsMap());
std::unordered_map<std::string, std::string> config_map;
for (const auto &[key, value] : evaluated_config.ValueMap()) {
if (!value.IsString()) {
throw SemanticException("{} must contain only string values!", map_name);
}
config_map.emplace(key, value.ValueString());
}
return config_map;
};
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable {
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username),
configs = get_config_map(stream_query->configs_, "Configs"),
credentials = get_config_map(stream_query->credentials_, "Credentials")]() mutable {
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
@ -575,7 +594,9 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
{.common_info = std::move(common_stream_info),
.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)},
.bootstrap_servers = std::move(bootstrap),
.configs = std::move(configs),
.credentials = std::move(credentials)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{};

View File

@ -23,6 +23,8 @@ KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
.bootstrap_servers = std::move(stream_info.bootstrap_servers),
.batch_interval = stream_info.common_info.batch_interval,
.batch_size = stream_info.common_info.batch_size,
.public_configs = stream_info.configs,
.private_configs = stream_info.credentials,
};
consumer_.emplace(std::move(consumer_info), std::move(consumer_function));
};
@ -34,7 +36,8 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const
.transformation_name = std::move(transformation_name)},
.topics = info.topics,
.consumer_group = info.consumer_group,
.bootstrap_servers = info.bootstrap_servers};
.bootstrap_servers = info.bootstrap_servers,
.configs = info.public_configs};
}
void KafkaStream::Start() { consumer_->Start(); }
@ -54,6 +57,10 @@ namespace {
const std::string kTopicsKey{"topics"};
const std::string kConsumerGroupKey{"consumer_group"};
const std::string kBoostrapServers{"bootstrap_servers"};
const std::string kConfigs{"configs"};
const std::string kCredentials{"credentials"};
const std::unordered_map<std::string, std::string> kDefaultConfigsMap;
} // namespace
void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
@ -61,6 +68,8 @@ void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
data[kTopicsKey] = std::move(info.topics);
data[kConsumerGroupKey] = info.consumer_group;
data[kBoostrapServers] = std::move(info.bootstrap_servers);
data[kConfigs] = std::move(info.configs);
data[kCredentials] = std::move(info.credentials);
}
void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
@ -68,6 +77,9 @@ void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
data.at(kTopicsKey).get_to(info.topics);
data.at(kConsumerGroupKey).get_to(info.consumer_group);
data.at(kBoostrapServers).get_to(info.bootstrap_servers);
// These values might not be present in the persisted JSON object
info.configs = data.value(kConfigs, kDefaultConfigsMap);
info.credentials = data.value(kCredentials, kDefaultConfigsMap);
}
PulsarStream::PulsarStream(std::string stream_name, StreamInfo stream_info,

View File

@ -24,6 +24,8 @@ struct KafkaStream {
std::vector<std::string> topics;
std::string consumer_group;
std::string bootstrap_servers;
std::unordered_map<std::string, std::string> configs;
std::unordered_map<std::string, std::string> credentials;
};
using Message = integrations::kafka::Message;