Compare commits

...

23 Commits

Author SHA1 Message Date
János Benjamin Antal
f57843088b Add flag for after commit trigger pool size 2022-02-04 14:52:06 +01:00
János Benjamin Antal
6a2e566492 Use asynchronous committing 2022-02-04 14:35:31 +01:00
János Benjamin Antal
fbf52a4ce2 Merge remote-tracking branch 'origin/T0805-MG-fix-cpp-transformation-result-add' into MG-for-mrma 2022-01-31 16:20:51 +01:00
János Benjamin Antal
59cc1d5f2b Fix use after move errors in case of transaction retry 2022-01-31 16:10:55 +01:00
János Benjamin Antal
6c2763b492 Abort the transaction in case of serialization error 2022-01-31 16:09:49 +01:00
josipmrden
80905b83c1 Added fix for loading C++ transformations, reversed logic whether result has been added 2022-01-26 14:36:51 +01:00
Kostas
ba0262559b add microseconds precision to now TT functions 2022-01-24 22:13:59 +02:00
János Benjamin Antal
cc281355d6 Constify local variables 2022-01-24 13:51:13 +01:00
János Benjamin Antal
1a73c8178c Fix comment's grammar 2022-01-24 13:50:59 +01:00
János Benjamin Antal
8dd1a4cc97 Formatting changes 2022-01-24 13:50:41 +01:00
János Benjamin Antal
397752311f Use string_view in kafka exceptions 2022-01-24 13:47:48 +01:00
János Benjamin Antal
4ae1202a68 Fix cleanup in case of failure 2022-01-24 10:30:30 +01:00
János Benjamin Antal
9b7801379c Use move when it is possible 2022-01-24 10:29:48 +01:00
János Benjamin Antal
339dec2a3b Do not use DMG_ASSERT 2022-01-24 10:29:28 +01:00
János Benjamin Antal
214db8da48
Merge branch 'master' into T0791-MG-configs-for-kafka-streams 2022-01-24 08:29:22 +01:00
János Benjamin Antal
8a34bf95d2 Add empty line to the end of the file 2022-01-21 16:24:23 +01:00
János Benjamin Antal
635fe8b42f Update e2e tests 2022-01-21 16:19:17 +01:00
János Benjamin Antal
5c576b95af Disable altera-unroll-loops clang-tidy check 2022-01-21 16:18:40 +01:00
János Benjamin Antal
8ea6b48879 Add unit tests 2022-01-21 15:51:51 +01:00
János Benjamin Antal
1e3de8e76a Add configs and credentials to mg.kafka_stream_info 2022-01-20 09:05:02 +01:00
János Benjamin Antal
0bc73da66d Use string literals in config maps 2022-01-19 17:42:10 +01:00
János Benjamin Antal
382d96496a Add CONFIGS and CREDENTIALS 2022-01-19 15:24:39 +01:00
János Benjamin Antal
b9dd12c88c Add CONFIGS and CREDENTIALS to CREATE KAFKA STREAM 2022-01-19 15:24:10 +01:00
26 changed files with 543 additions and 152 deletions

View File

@ -2,6 +2,7 @@
Checks: '*,
-abseil-string-find-str-contains,
-altera-struct-pack-align,
-altera-unroll-loops,
-android-*,
-cert-err58-cpp,
-cppcoreguidelines-avoid-c-arrays,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,11 +10,14 @@
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <string>
namespace integrations {
constexpr int64_t kDefaultCheckBatchLimit{1};
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1};
constexpr int64_t kMinimumSize{1};
const std::string kReducted{"<REDUCTED>"};
} // namespace integrations

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -109,7 +109,10 @@ int64_t Message::Offset() const {
}
Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
: info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) {
: info_{std::move(info)},
consumer_function_(std::move(consumer_function)),
cb_(info_.consumer_name),
offset_cb_(info_.consumer_name) {
MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer");
// NOLINTNEXTLINE (modernize-use-nullptr)
if (info_.batch_interval < kMinimumInterval) {
@ -126,6 +129,18 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
std::string error;
for (const auto &[key, value] : info_.public_configs) {
if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) {
throw SettingCustomConfigFailed(info_.consumer_name, error, key, value);
}
}
for (const auto &[key, value] : info_.private_configs) {
if (conf->set(key, value, error) != RdKafka::Conf::CONF_OK) {
throw SettingCustomConfigFailed(info_.consumer_name, error, key, kReducted);
}
}
if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
@ -134,6 +149,10 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("offset_commit_cb", &offset_cb_, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("enable.partition.eof", "false", error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
@ -370,10 +389,12 @@ void Consumer::StartConsuming() {
throw ConsumerCheckFailedException(
info_.consumer_name, fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err)));
}
if (const auto err = consumer_->commitSync(partitions); err != RdKafka::ERR_NO_ERROR) {
spdlog::trace("Got offset positions for {}.", info_.consumer_name);
if (const auto err = consumer_->commitAsync(partitions); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Committing offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err));
break;
}
spdlog::trace("Requested committing offsets asynchronously for {}.", info_.consumer_name);
} catch (const std::exception &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
@ -436,4 +457,16 @@ void Consumer::ConsumerRebalanceCb::rebalance_cb(RdKafka::KafkaConsumer *consume
}
}
void Consumer::ConsumerRebalanceCb::set_offset(int64_t offset) { offset_ = offset; }
Consumer::OffsetCommitCb::OffsetCommitCb(std::string consumer_name) : consumer_name_{consumer_name} {};
void Consumer::OffsetCommitCb::offset_commit_cb(RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition *> & /*offsets*/) {
if (err != RdKafka::ErrorCode::ERR_NO_ERROR) {
spdlog::error("Committing offset failed for {} with error \"{}\"", consumer_name_, RdKafka::err2str(err));
} else {
spdlog::trace("Committing offset succeeded for {}", consumer_name_);
}
}
} // namespace integrations::kafka

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -86,6 +86,8 @@ struct ConsumerInfo {
std::string bootstrap_servers;
std::chrono::milliseconds batch_interval;
int64_t batch_size;
std::unordered_map<std::string, std::string> public_configs;
std::unordered_map<std::string, std::string> private_configs;
};
/// Memgraphs Kafka consumer wrapper.
@ -172,6 +174,16 @@ class Consumer final : public RdKafka::EventCb {
std::string consumer_name_;
};
class OffsetCommitCb : public RdKafka::OffsetCommitCb {
public:
explicit OffsetCommitCb(std::string consumer_name);
void offset_commit_cb(RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition *> &offsets) final;
private:
std::string consumer_name_;
};
ConsumerInfo info_;
ConsumerFunction consumer_function_;
mutable std::atomic<bool> is_running_{false};
@ -180,5 +192,6 @@ class Consumer final : public RdKafka::EventCb {
std::unique_ptr<RdKafka::KafkaConsumer, std::function<void(RdKafka::KafkaConsumer *)>> consumer_;
std::thread thread_;
ConsumerRebalanceCb cb_;
OffsetCommitCb offset_cb_;
};
} // namespace integrations::kafka

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,7 +11,7 @@
#pragma once
#include <string>
#include <string_view>
#include "utils/exceptions.hpp"
@ -22,37 +22,46 @@ class KafkaStreamException : public utils::BasicException {
class ConsumerFailedToInitializeException : public KafkaStreamException {
public:
ConsumerFailedToInitializeException(const std::string &consumer_name, const std::string &error)
ConsumerFailedToInitializeException(std::string_view consumer_name, std::string_view error)
: KafkaStreamException("Failed to initialize Kafka consumer {} : {}", consumer_name, error) {}
};
class SettingCustomConfigFailed : public ConsumerFailedToInitializeException {
public:
SettingCustomConfigFailed(std::string_view consumer_name, std::string_view error, std::string_view key,
std::string_view value)
: ConsumerFailedToInitializeException(
consumer_name,
fmt::format(R"(failed to set custom config ("{}": "{}"), because of error {})", key, value, error)) {}
};
class ConsumerRunningException : public KafkaStreamException {
public:
explicit ConsumerRunningException(const std::string &consumer_name)
explicit ConsumerRunningException(std::string_view consumer_name)
: KafkaStreamException("Kafka consumer {} is already running", consumer_name) {}
};
class ConsumerStoppedException : public KafkaStreamException {
public:
explicit ConsumerStoppedException(const std::string &consumer_name)
explicit ConsumerStoppedException(std::string_view consumer_name)
: KafkaStreamException("Kafka consumer {} is already stopped", consumer_name) {}
};
class ConsumerCheckFailedException : public KafkaStreamException {
public:
explicit ConsumerCheckFailedException(const std::string &consumer_name, const std::string &error)
explicit ConsumerCheckFailedException(std::string_view consumer_name, std::string_view error)
: KafkaStreamException("Kafka consumer {} check failed: {}", consumer_name, error) {}
};
class ConsumerStartFailedException : public KafkaStreamException {
public:
explicit ConsumerStartFailedException(const std::string &consumer_name, const std::string &error)
explicit ConsumerStartFailedException(std::string_view consumer_name, std::string_view error)
: KafkaStreamException("Starting Kafka consumer {} failed: {}", consumer_name, error) {}
};
class TopicNotFoundException : public KafkaStreamException {
public:
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name)
TopicNotFoundException(std::string_view consumer_name, std::string_view topic_name)
: KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
} // namespace integrations::kafka

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -219,10 +219,18 @@ DEFINE_double(query_execution_timeout_sec, 600,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of 0 means no limit.");
DEFINE_VALIDATED_uint64(after_commit_trigger_pool_size, 1, "Number of threads to process after commit triggers.", {
if (value == 0) {
std::cout << "At least one thread is required for processing after commit triggers!" << std::endl;
return false;
}
return true;
});
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint64(
memory_limit, 0,
"Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap "
DEFINE_uint64(memory_limit, 0,
"Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical "
"memory if the swap "
"is enabled and 90\% of the physical memory otherwise.");
namespace {
@ -1135,6 +1143,7 @@ int main(int argc, char **argv) {
&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.after_commit_trigger_pool_size = FLAGS_after_commit_trigger_pool_size,
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
.default_pulsar_service_url = FLAGS_pulsar_service_url,
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,6 +22,8 @@ struct InterpreterConfig {
// The default execution timeout is 10 minutes.
double execution_timeout_sec{600.0};
size_t after_commit_trigger_pool_size;
std::string default_kafka_bootstrap_servers;
std::string default_pulsar_service_url;
uint32_t stream_transaction_conflict_retries;

View File

@ -54,9 +54,7 @@ cpp<#
size_t size = 0;
slk::Load(&size, reader);
self->${member}.resize(size);
for (size_t i = 0;
i < size;
++i) {
for (size_t i = 0; i < size; ++i) {
self->${member}[i] = query::LoadAstPointer<query::${type}>(storage, reader);
}
cpp<#))
@ -75,9 +73,7 @@ cpp<#
#>cpp
size_t size = 0;
slk::Load(&size, reader);
for (size_t i = 0;
i < size;
++i) {
for (size_t i = 0; i < size; ++i) {
query::PropertyIx key;
slk::Load(&key, reader, storage);
auto *value = query::LoadAstPointer<query::Expression>(storage, reader);
@ -93,6 +89,34 @@ cpp<#
}
cpp<#)
(defun slk-save-expression-map (member)
#>cpp
size_t size = self.${member}.size();
slk::Save(size, builder);
for (const auto &entry : self.${member}) {
query::SaveAstPointer(entry.first, builder);
query::SaveAstPointer(entry.second, builder);
}
cpp<#)
(defun slk-load-expression-map (member)
#>cpp
size_t size = 0;
slk::Load(&size, reader);
for (size_t i = 0; i < size; ++i) {
auto *key = query::LoadAstPointer<query::Expression>(storage, reader);
auto *value = query::LoadAstPointer<query::Expression>(storage, reader);
self->${member}.emplace(key, value);
}
cpp<#)
(defun clone-expression-map (source dest)
#>cpp
for (const auto &[key, value] : ${source}) {
${dest}[key->Clone(storage)] = value->Clone(storage);
}
cpp<#)
(defun slk-load-name-ix (name-type)
(lambda (member)
#>cpp
@ -1819,9 +1843,7 @@ cpp<#
size_t size = 0;
slk::Load(&size, reader);
self->${member}.resize(size);
for (size_t i = 0;
i < size;
++i) {
for (size_t i = 0; i < size; ++i) {
slk::Load(&self->${member}[i], reader, storage);
}
cpp<#)
@ -2541,7 +2563,17 @@ cpp<#
(service_url "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))
:slk-load (slk-load-ast-pointer "Expression"))
(configs "std::unordered_map<Expression *, Expression *>" :scope :public
:slk-save #'slk-save-expression-map
:slk-load #'slk-load-expression-map
:clone #'clone-expression-map)
(credentials "std::unordered_map<Expression *, Expression *>" :scope :public
:slk-save #'slk-save-expression-map
:slk-load #'slk-load-expression-map
:clone #'clone-expression-map))
(:public
(lcp:define-enum action

View File

@ -554,7 +554,7 @@ std::string_view ToString(const CommonStreamConfigKey key) {
__VA_ARGS__ \
};
GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS);
GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS, CONFIGS, CREDENTIALS);
std::string_view ToString(const KafkaConfigKey key) {
switch (key) {
@ -564,6 +564,10 @@ std::string_view ToString(const KafkaConfigKey key) {
return "CONSUMER_GROUP";
case KafkaConfigKey::BOOTSTRAP_SERVERS:
return "BOOTSTRAP_SERVERS";
case KafkaConfigKey::CONFIGS:
return "CONFIGS";
case KafkaConfigKey::CREDENTIALS:
return "CREDENTIALS";
}
}
@ -574,6 +578,21 @@ void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) {
}
} // namespace
antlrcpp::Any CypherMainVisitor::visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) {
MG_ASSERT(ctx->literal().size() == 2);
return std::pair{ctx->literal(0)->accept(this).as<Expression *>(), ctx->literal(1)->accept(this).as<Expression *>()};
}
antlrcpp::Any CypherMainVisitor::visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) {
std::unordered_map<Expression *, Expression *> map;
for (auto *key_value_pair : ctx->configKeyValuePair()) {
// If the queries are cached, then only the stripped query is parsed, so the actual keys cannot be determined
// here. That means duplicates cannot be checked.
map.insert(key_value_pair->accept(this).as<std::pair<Expression *, Expression *>>());
}
return map;
}
antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
@ -587,6 +606,10 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCre
MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_);
MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_);
MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_);
MapConfig<false, std::unordered_map<Expression *, Expression *>>(memory_, KafkaConfigKey::CONFIGS,
stream_query->configs_);
MapConfig<false, std::unordered_map<Expression *, Expression *>>(memory_, KafkaConfigKey::CREDENTIALS,
stream_query->credentials_);
MapCommonStreamConfigs(memory_, *stream_query);
@ -622,18 +645,33 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::Ka
if (ctx->TOPICS()) {
ThrowIfExists(memory_, KafkaConfigKey::TOPICS);
const auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS);
constexpr auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS);
GetTopicNames(memory_[topics_key], ctx->topicNames(), *this);
return {};
}
if (ctx->CONSUMER_GROUP()) {
ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP);
const auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP);
constexpr auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP);
memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup);
return {};
}
if (ctx->CONFIGS()) {
ThrowIfExists(memory_, KafkaConfigKey::CONFIGS);
constexpr auto configs_key = static_cast<uint8_t>(KafkaConfigKey::CONFIGS);
memory_.emplace(configs_key, ctx->configsMap->accept(this).as<std::unordered_map<Expression *, Expression *>>());
return {};
}
if (ctx->CREDENTIALS()) {
ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS);
constexpr auto credentials_key = static_cast<uint8_t>(KafkaConfigKey::CREDENTIALS);
memory_.emplace(credentials_key,
ctx->credentialsMap->accept(this).as<std::unordered_map<Expression *, Expression *>>());
return {};
}
MG_ASSERT(ctx->BOOTSTRAP_SERVERS());
ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS);
if (!ctx->bootstrapServers->StringLiteral()) {

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -269,6 +269,16 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) override;
/**
* @return StreamQuery*
*/
@ -849,7 +859,9 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
ParsingContext context_;
AstStorage *storage_;
std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>>> memory_;
std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>,
std::unordered_map<Expression *, Expression *>>>
memory_;
// Set of identifiers from queries.
std::unordered_set<std::string> users_identifiers;
// Identifiers that user didn't name.

View File

@ -35,7 +35,9 @@ memgraphCypherKeyword : cypherKeyword
| COMMIT
| COMMITTED
| CONFIG
| CONFIGS
| CONSUMER_GROUP
| CREDENTIALS
| CSV
| DATA
| DELIMITER
@ -306,9 +308,15 @@ commonCreateStreamConfig : TRANSFORM transformationName=procedureName
createStream : kafkaCreateStream | pulsarCreateStream ;
configKeyValuePair : literal ':' literal ;
configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ;
kafkaCreateStreamConfig : TOPICS topicNames
| CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus
| BOOTSTRAP_SERVERS bootstrapServers=literal
| CONFIGS configsMap=configMap
| CREDENTIALS credentialsMap=configMap
| commonCreateStreamConfig
;

View File

@ -40,7 +40,9 @@ CLEAR : C L E A R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
CONFIGS : C O N F I G S;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CREDENTIALS : C R E D E N T I A L S ;
CSV : C S V ;
DATA : D A T A ;
DELIMITER : D E L I M I T E R ;

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -564,10 +564,26 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
const auto get_config_map = [&evaluator](std::unordered_map<Expression *, Expression *> map,
std::string_view map_name) -> std::unordered_map<std::string, std::string> {
std::unordered_map<std::string, std::string> config_map;
for (const auto [key_expr, value_expr] : map) {
const auto key = key_expr->Accept(evaluator);
const auto value = value_expr->Accept(evaluator);
if (!key.IsString() || !value.IsString()) {
throw SemanticException("{} must contain only string keys and values!", map_name);
}
config_map.emplace(key.ValueString(), value.ValueString());
}
return config_map;
};
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable {
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username),
configs = get_config_map(stream_query->configs_, "Configs"),
credentials = get_config_map(stream_query->credentials_, "Credentials")]() mutable {
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
@ -575,7 +591,9 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
{.common_info = std::move(common_stream_info),
.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)},
.bootstrap_servers = std::move(bootstrap),
.configs = std::move(configs),
.credentials = std::move(credentials)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{};
@ -969,7 +987,11 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
InterpreterContext::InterpreterContext(storage::Storage *db, const InterpreterConfig config,
const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
: db(db),
config(config),
trigger_store(data_directory / "triggers"),
after_commit_trigger_pool{config.after_commit_trigger_pool_size},
streams{this, data_directory / "streams"} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -168,6 +168,7 @@ struct InterpreterContext {
const std::filesystem::path &data_directory);
storage::Storage *db;
const InterpreterConfig config;
// ANTLR has singleton instance that is shared between threads. It is
// protected by locks inside of ANTLR. Unfortunately, they are not protected
@ -186,9 +187,7 @@ struct InterpreterContext {
utils::SkipList<PlanCacheEntry> plan_cache;
TriggerStore trigger_store;
utils::ThreadPool after_commit_trigger_pool{1};
const InterpreterConfig config;
utils::ThreadPool after_commit_trigger_pool;
query::stream::Streams streams;
};

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -421,8 +421,8 @@ bool SharedLibraryModule::Load(const std::filesystem::path &file_path) {
return with_error(error);
}
for (auto &trans : module_def->transformations) {
const bool was_result_added = MgpTransAddFixedResult(&trans.second);
if (!was_result_added) {
const auto error_code = MgpTransAddFixedResult(&trans.second);
if (error_code != MGP_ERROR_NO_ERROR) {
const auto error =
fmt::format("Unable to add result to transformation in module {}; add result failed", file_path);
return with_error(error);

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,6 +13,8 @@
#include <json/json.hpp>
#include "integrations/constants.hpp"
namespace query::stream {
KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::kafka::Message> consumer_function) {
@ -23,6 +25,8 @@ KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
.bootstrap_servers = std::move(stream_info.bootstrap_servers),
.batch_interval = stream_info.common_info.batch_interval,
.batch_size = stream_info.common_info.batch_size,
.public_configs = std::move(stream_info.configs),
.private_configs = std::move(stream_info.credentials),
};
consumer_.emplace(std::move(consumer_info), std::move(consumer_function));
};
@ -34,7 +38,9 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const
.transformation_name = std::move(transformation_name)},
.topics = info.topics,
.consumer_group = info.consumer_group,
.bootstrap_servers = info.bootstrap_servers};
.bootstrap_servers = info.bootstrap_servers,
.configs = info.public_configs,
.credentials = info.private_configs};
}
void KafkaStream::Start() { consumer_->Start(); }
@ -54,6 +60,10 @@ namespace {
const std::string kTopicsKey{"topics"};
const std::string kConsumerGroupKey{"consumer_group"};
const std::string kBoostrapServers{"bootstrap_servers"};
const std::string kConfigs{"configs"};
const std::string kCredentials{"credentials"};
const std::unordered_map<std::string, std::string> kDefaultConfigsMap;
} // namespace
void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
@ -61,6 +71,8 @@ void to_json(nlohmann::json &data, KafkaStream::StreamInfo &&info) {
data[kTopicsKey] = std::move(info.topics);
data[kConsumerGroupKey] = info.consumer_group;
data[kBoostrapServers] = std::move(info.bootstrap_servers);
data[kConfigs] = std::move(info.configs);
data[kCredentials] = std::move(info.credentials);
}
void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
@ -68,6 +80,9 @@ void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
data.at(kTopicsKey).get_to(info.topics);
data.at(kConsumerGroupKey).get_to(info.consumer_group);
data.at(kBoostrapServers).get_to(info.bootstrap_servers);
// These values might not be present in the persisted JSON object
info.configs = data.value(kConfigs, kDefaultConfigsMap);
info.credentials = data.value(kCredentials, kDefaultConfigsMap);
}
PulsarStream::PulsarStream(std::string stream_name, StreamInfo stream_info,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -24,6 +24,8 @@ struct KafkaStream {
std::vector<std::string> topics;
std::string consumer_group;
std::string bootstrap_servers;
std::unordered_map<std::string, std::string> configs;
std::unordered_map<std::string, std::string> credentials;
};
using Message = integrations::kafka::Message;

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -18,6 +18,7 @@
#include <spdlog/spdlog.h>
#include <json/json.hpp>
#include "integrations/constants.hpp"
#include "mg_procedure.h"
#include "query/db_accessor.hpp"
#include "query/discard_value_stream.hpp"
@ -29,6 +30,7 @@
#include "query/stream/sources.hpp"
#include "query/typed_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/logging.hpp"
#include "utils/memory.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/pmr/string.hpp"
@ -54,7 +56,7 @@ auto GetStream(auto &map, const std::string &stream_name) {
}
std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformationResult(
utils::pmr::map<utils::pmr::string, TypedValue> &&values, const std::string_view transformation_name,
const utils::pmr::map<utils::pmr::string, TypedValue> &values, const std::string_view transformation_name,
const std::string_view stream_name) {
if (values.size() != kExpectedTransformationResultSize) {
throw StreamsException(
@ -62,7 +64,7 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
transformation_name, stream_name);
}
auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & {
auto get_value = [&](const utils::pmr::string &field_name) mutable -> const TypedValue & {
auto it = values.find(field_name);
if (it == values.end()) {
throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.",
@ -71,11 +73,11 @@ std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformation
return it->second;
};
auto &query_value = get_value(query_param_name);
const auto &query_value = get_value(query_param_name);
MG_ASSERT(query_value.IsString());
auto &params_value = get_value(params_param_name);
const auto &params_value = get_value(params_param_name);
MG_ASSERT(params_value.IsNull() || params_value.IsMap());
return {std::move(query_value), std::move(params_value)};
return {query_value, params_value};
}
template <typename TMessage>
@ -208,10 +210,12 @@ void Streams::RegisterKafkaProcedures() {
constexpr std::string_view consumer_group_result_name = "consumer_group";
constexpr std::string_view topics_result_name = "topics";
constexpr std::string_view bootstrap_servers_result_name = "bootstrap_servers";
constexpr std::string_view configs_result_name = "configs";
constexpr std::string_view credentials_result_name = "credentials";
auto get_stream_info = [this, proc_name, consumer_group_result_name, topics_result_name,
bootstrap_servers_result_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory *memory) {
bootstrap_servers_result_name, configs_result_name, credentials_result_name](
mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, mgp_memory *memory) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto lock_ptr = streams_.Lock();
@ -259,13 +263,12 @@ void Streams::RegisterKafkaProcedures() {
procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy};
{
const auto success = procedure::TryOrSetError(
[&] {
return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release());
},
[&] { return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.get()); },
result);
if (!success) {
return;
}
static_cast<void>(topic_names.release());
}
const auto bootstrap_servers_value =
@ -274,6 +277,57 @@ void Streams::RegisterKafkaProcedures() {
return;
}
const auto convert_config_map =
[result, memory](const std::unordered_map<std::string, std::string> &configs_to_convert)
-> procedure::MgpUniquePtr<mgp_value> {
procedure::MgpUniquePtr<mgp_value> configs_value{nullptr, mgp_value_destroy};
procedure::MgpUniquePtr<mgp_map> configs{nullptr, mgp_map_destroy};
{
const auto success = procedure::TryOrSetError(
[&] { return procedure::CreateMgpObject(configs, mgp_map_make_empty, memory); }, result);
if (!success) {
return configs_value;
}
}
for (const auto &[key, value] : configs_to_convert) {
auto value_value = procedure::GetStringValueOrSetError(value.c_str(), memory, result);
if (!value_value) {
return configs_value;
}
configs->items.emplace(key, std::move(*value_value));
}
{
const auto success = procedure::TryOrSetError(
[&] { return procedure::CreateMgpObject(configs_value, mgp_value_make_map, configs.get()); },
result);
if (!success) {
return configs_value;
}
static_cast<void>(configs.release());
}
return configs_value;
};
const auto configs_value = convert_config_map(info.configs);
if (configs_value == nullptr) {
return;
}
using CredentialsType = decltype(KafkaStream::StreamInfo::credentials);
CredentialsType reducted_credentials;
std::transform(info.credentials.begin(), info.credentials.end(),
std::inserter(reducted_credentials, reducted_credentials.end()),
[](const auto &pair) -> CredentialsType::value_type {
return {pair.first, integrations::kReducted};
});
const auto credentials_value = convert_config_map(reducted_credentials);
if (credentials_value == nullptr) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, consumer_group_result_name.data(),
consumer_group_value.get())) {
return;
@ -287,6 +341,16 @@ void Streams::RegisterKafkaProcedures() {
bootstrap_servers_value.get())) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, configs_result_name.data(),
configs_value.get())) {
return;
}
if (!procedure::InsertResultOrSetError(result, record, credentials_result_name.data(),
credentials_value.get())) {
return;
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
@ -305,6 +369,10 @@ void Streams::RegisterKafkaProcedures() {
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, configs_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, credentials_result_name.data(), procedure::Call<mgp_type *>(mgp_type_map)) ==
MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
@ -462,8 +530,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
interpreter->BeginTransaction();
for (auto &row : result.rows) {
spdlog::trace("Processing row in stream '{}'", stream_name);
auto [query_value, params_value] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
auto [query_value, params_value] = ExtractTransformationResult(row.values, transformation_name, stream_name);
storage::PropertyValue params_prop{params_value};
std::string query{query_value.ValueString()};
@ -484,6 +551,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
result.rows.clear();
break;
} catch (const query::TransactionSerializationException &e) {
interpreter->Abort();
if (i == total_retries) {
throw;
}
@ -680,8 +748,7 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
for (auto &row : result.rows) {
auto [query, parameters] =
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
auto [query, parameters] = ExtractTransformationResult(row.values, transformation_name, stream_name);
std::vector<TypedValue> result_row;
result_row.reserve(kExpectedTransformationResultSize);
result_row.push_back(std::move(query));

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -31,6 +31,7 @@
#include "utils/rw_lock.hpp"
#include "utils/synchronized.hpp"
class StreamsTest;
namespace query {
struct InterpreterContext;
@ -73,6 +74,8 @@ using TransformationResult = std::vector<std::vector<TypedValue>>;
///
/// This class is responsible for all query supported actions to happen.
class Streams final {
friend StreamsTest;
public:
/// Initializes the streams.
///

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -79,36 +79,15 @@ Date::Date(const DateParameters &date_parameters) {
day = date_parameters.day;
}
namespace {
tm GetUtcFromSystemClockOrThrow() {
namespace chrono = std::chrono;
const auto today = chrono::system_clock::to_time_t(chrono::system_clock::now());
tm utc_today;
if (!gmtime_r(&today, &utc_today)) {
throw temporal::InvalidArgumentException("Can't access clock's UTC time");
}
return utc_today;
}
Date UtcToday() { return UtcLocalDateTime().date; }
int64_t TMYearToUtcYear(int year) { return year + 1900; }
int64_t TMMonthToUtcMonth(int month) { return month + 1; }
} // namespace
Date UtcToday() {
const auto utc_today = GetUtcFromSystemClockOrThrow();
return Date({TMYearToUtcYear(utc_today.tm_year), TMMonthToUtcMonth(utc_today.tm_mon), utc_today.tm_mday});
}
LocalTime UtcLocalTime() {
const auto utc_today = GetUtcFromSystemClockOrThrow();
return LocalTime({utc_today.tm_hour, utc_today.tm_min, utc_today.tm_sec});
}
LocalTime UtcLocalTime() { return UtcLocalDateTime().local_time; }
LocalDateTime UtcLocalDateTime() {
const auto utc_today = GetUtcFromSystemClockOrThrow();
return LocalDateTime({TMYearToUtcYear(utc_today.tm_year), TMMonthToUtcMonth(utc_today.tm_mon), utc_today.tm_mday},
{utc_today.tm_hour, utc_today.tm_min, utc_today.tm_sec});
namespace chrono = std::chrono;
using TpMicros = chrono::time_point<chrono::system_clock, chrono::microseconds>;
TpMicros ts = chrono::time_point_cast<chrono::microseconds>(chrono::system_clock::now());
return LocalDateTime(ts.time_since_epoch().count());
}
namespace {

View File

@ -421,10 +421,13 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
assert comparison_check("Final Message", res[0])
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
def test_info_procedure(kafka_topics, connection):
cursor = connection.cursor()
stream_name = 'test_stream'
configs = {"sasl.username": "michael.scott"}
local = "localhost:9092"
credentials = {"sasl.password": "S3cr3tP4ssw0rd"}
consumer_group = "ConsumerGr"
common.execute_and_fetch_all(
cursor,
@ -433,12 +436,20 @@ def test_info_procedure(kafka_topics, connection):
f"TRANSFORM pulsar_transform.simple "
f"CONSUMER_GROUP {consumer_group} "
f"BOOTSTRAP_SERVERS '{local}' "
f"CONFIGS {configs} "
f"CREDENTIALS {credentials}"
)
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *")
stream_info = common.execute_and_fetch_all(
cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *")
expected_stream_info = [(local, consumer_group, kafka_topics)]
reducted_credentials = {key: "<REDUCTED>" for (
key, value) in credentials.items()}
expected_stream_info = [
(local, configs, consumer_group, reducted_credentials, kafka_topics)]
common.validate_info(stream_info, expected_stream_info)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -9,20 +9,10 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//
#include <algorithm>
#include <climits>
#include <limits>
#include <optional>
#include <string>
#include <unordered_map>
#include <variant>
@ -38,6 +28,7 @@
#include <json/json.hpp>
//////////////////////////////////////////////////////
#include <antlr4-runtime.h>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@ -87,23 +78,37 @@ class Base {
}
}
TypedValue GetLiteral(Expression *expression, const bool use_parameter_lookup,
const std::optional<int> &token_position = std::nullopt) const {
if (use_parameter_lookup) {
auto *param_lookup = dynamic_cast<ParameterLookup *>(expression);
if (param_lookup == nullptr) {
ADD_FAILURE();
return {};
}
if (token_position) {
EXPECT_EQ(param_lookup->token_position_, *token_position);
}
return TypedValue(parameters_.AtTokenPosition(param_lookup->token_position_));
}
auto *literal = dynamic_cast<PrimitiveLiteral *>(expression);
if (literal == nullptr) {
ADD_FAILURE();
return {};
}
if (token_position) {
EXPECT_EQ(literal->token_position_, *token_position);
}
return TypedValue(literal->value_);
}
template <class TValue>
void CheckLiteral(Expression *expression, const TValue &expected,
const std::optional<int> &token_position = std::nullopt) const {
TypedValue value;
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
TypedValue expected_tv(expected);
if (!expected_tv.IsNull() && context_.is_query_cached) {
auto *param_lookup = dynamic_cast<ParameterLookup *>(expression);
ASSERT_TRUE(param_lookup);
if (token_position) EXPECT_EQ(param_lookup->token_position_, *token_position);
value = TypedValue(parameters_.AtTokenPosition(param_lookup->token_position_));
} else {
auto *literal = dynamic_cast<PrimitiveLiteral *>(expression);
ASSERT_TRUE(literal);
if (token_position) ASSERT_EQ(literal->token_position_, *token_position);
value = TypedValue(literal->value_);
}
const auto use_parameter_lookup = !expected_tv.IsNull() && context_.is_query_cached;
TypedValue value = GetLiteral(expression, use_parameter_lookup, token_position);
EXPECT_TRUE(TypedValue::BoolEqual{}(value, expected_tv));
}
};
@ -3580,6 +3585,8 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer
EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr);
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit));
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout));
EXPECT_TRUE(parsed_query->configs_.empty());
EXPECT_TRUE(parsed_query->credentials_.empty());
}
TEST_P(CypherMainVisitorTest, DropStream) {
@ -3661,7 +3668,9 @@ void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &quer
const std::string_view transform_name, const std::string_view consumer_group,
const std::optional<TypedValue> &batch_interval,
const std::optional<TypedValue> &batch_size,
const std::string_view bootstrap_servers = "") {
const std::string_view bootstrap_servers,
const std::unordered_map<std::string, std::string> &configs,
const std::unordered_map<std::string, std::string> &credentials) {
SCOPED_TRACE(query_string);
StreamQuery *parsed_query{nullptr};
ASSERT_NO_THROW(parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string))) << query_string;
@ -3675,14 +3684,31 @@ void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &quer
EXPECT_EQ(parsed_query->batch_limit_, nullptr);
if (bootstrap_servers.empty()) {
EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr);
return;
}
} else {
EXPECT_NE(parsed_query->bootstrap_servers_, nullptr);
}
const auto evaluate_config_map = [&ast_generator](const std::unordered_map<Expression *, Expression *> &config_map) {
std::unordered_map<std::string, std::string> evaluated_config_map;
const auto expr_to_str = [&ast_generator](Expression *expression) {
return std::string{ast_generator.GetLiteral(expression, ast_generator.context_.is_query_cached).ValueString()};
};
std::transform(config_map.begin(), config_map.end(),
std::inserter(evaluated_config_map, evaluated_config_map.end()),
[&expr_to_str](const auto expr_pair) {
return std::pair{expr_to_str(expr_pair.first), expr_to_str(expr_pair.second)};
});
return evaluated_config_map;
};
using testing::UnorderedElementsAreArray;
EXPECT_THAT(evaluate_config_map(parsed_query->configs_), UnorderedElementsAreArray(configs.begin(), configs.end()));
EXPECT_THAT(evaluate_config_map(parsed_query->credentials_),
UnorderedElementsAreArray(credentials.begin(), credentials.end()));
}
TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
auto &ast_generator = *GetParam();
TestInvalidQuery("CREATE KAFKA STREAM", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator);
@ -3709,6 +3735,13 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092",
ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator);
// the keys must be string literals
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONFIGS { symbolicname : 'string' }",
ast_generator);
TestInvalidQuery(
"CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CREDENTIALS { symbolicname : 'string' }",
ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CREDENTIALS 2", ast_generator);
const std::vector<std::string> topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots",
"topic-name.with-multiple.dots-and-dashes"};
@ -3728,34 +3761,37 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt);
kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt, {}, {}, {});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup),
kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt,
std::nullopt);
kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, std::nullopt,
{}, {}, {});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TRANSFORM {} TOPICS {} BATCH_INTERVAL {}",
kStreamName, kTransformName, topic_names_as_str, kBatchInterval),
kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt);
kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt, {},
{}, {});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} BATCH_SIZE {} TOPICS {} TRANSFORM {}",
kStreamName, kBatchSize, topic_names_as_str, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value);
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value, {}, {},
{});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS '{}' BATCH_SIZE {} TRANSFORM {}",
kStreamName, topic_names_as_str, kBatchSize, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value);
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value, {}, {},
{});
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value);
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, {}, {}, {});
using namespace std::string_literals;
const auto host1 = "localhost:9094"s;
ValidateCreateKafkaStreamQuery(
@ -3763,14 +3799,16 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
fmt::format("CREATE KAFKA STREAM {} TOPICS {} CONSUMER_GROUP {} BATCH_SIZE {} BATCH_INTERVAL {} TRANSFORM {} "
"BOOTSTRAP_SERVERS '{}'",
kStreamName, topic_names_as_str, kConsumerGroup, kBatchSize, kBatchInterval, kTransformName, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1);
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1, {},
{});
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE KAFKA STREAM {} CONSUMER_GROUP {} TOPICS {} BATCH_INTERVAL {} TRANSFORM {} BATCH_SIZE {} "
"BOOTSTRAP_SERVERS '{}'",
kStreamName, kConsumerGroup, topic_names_as_str, kBatchInterval, kTransformName, kBatchSize, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1);
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1, {},
{});
const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s;
ValidateCreateKafkaStreamQuery(
@ -3778,7 +3816,8 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
fmt::format("CREATE KAFKA STREAM {} TOPICS {} BOOTSTRAP_SERVERS '{}' CONSUMER_GROUP {} TRANSFORM {} "
"BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, host2, kConsumerGroup, kTransformName, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2);
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2, {},
{});
};
for (const auto &topic_name : topic_names) {
@ -3793,7 +3832,7 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}",
kStreamName, kTopicName, kTransformName, consumer_group),
kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt,
std::nullopt);
std::nullopt, {}, {}, {});
};
using namespace std::literals;
@ -3803,6 +3842,44 @@ TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
for (const auto consumer_group : consumer_groups) {
EXPECT_NO_FATAL_FAILURE(check_consumer_group(consumer_group));
}
auto check_config_map = [&](const std::unordered_map<std::string, std::string> &config_map) {
const std::string kTopicName{"topic1"};
const auto map_as_str = std::invoke([&config_map] {
std::stringstream buffer;
buffer << '{';
if (!config_map.empty()) {
const auto &first_item = *config_map.begin();
buffer << fmt::format("'{}': '{}'", first_item.first, first_item.second);
for (auto it = ++config_map.begin(); it != config_map.end(); ++it) {
buffer << fmt::format(", '{}': '{}'", it->first, it->second);
}
}
buffer << '}';
return std::move(buffer).str();
});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONFIGS {}", kStreamName,
kTopicName, kTransformName, map_as_str),
kStreamName, {kTopicName}, kTransformName, "", std::nullopt, std::nullopt, {},
config_map, {});
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CREDENTIALS {}",
kStreamName, kTopicName, kTransformName, map_as_str),
kStreamName, {kTopicName}, kTransformName, "", std::nullopt, std::nullopt, {}, {},
config_map);
};
const std::array<std::unordered_map<std::string, std::string>, 3> config_maps = {
std::unordered_map<std::string, std::string>{}, std::unordered_map<std::string, std::string>{{"key", "value"}},
std::unordered_map<std::string, std::string>{{"key.with.dot", "value.with.doth"},
{"key with space", "value with space"}}};
for (const auto &map_to_test : config_maps) {
EXPECT_NO_FATAL_FAILURE(check_config_map(map_to_test));
}
}
void ValidateCreatePulsarStreamQuery(Base &ast_generator, const std::string &query_string,

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -59,6 +59,8 @@ struct ConsumerTest : public ::testing::Test {
.bootstrap_servers = cluster.Bootstraps(),
.batch_interval = kDefaultBatchInterval,
.batch_size = kDefaultBatchSize,
.public_configs = {},
.private_configs = {},
};
};

View File

@ -1,15 +1,4 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -15,12 +15,15 @@
#include <utility>
#include <gtest/gtest.h>
#include "integrations/constants.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "kafka_mock.hpp"
#include "query/config.hpp"
#include "query/interpreter.hpp"
#include "query/stream/streams.hpp"
#include "storage/v2/storage.hpp"
#include "test_utils.hpp"
using Streams = query::stream::Streams;
using StreamInfo = query::stream::KafkaStream::StreamInfo;
@ -78,6 +81,17 @@ class StreamsTest : public ::testing::Test {
EXPECT_EQ(check_data.is_running, status.is_running);
}
void CheckConfigAndCredentials(const StreamCheckData &check_data) {
const auto locked_streams = streams_->streams_.ReadLock();
const auto &stream = locked_streams->at(check_data.name);
const auto *stream_data = std::get_if<query::stream::Streams::StreamData<query::stream::KafkaStream>>(&stream);
ASSERT_NE(stream_data, nullptr);
const auto stream_info =
stream_data->stream_source->ReadLock()->Info(check_data.info.common_info.transformation_name);
EXPECT_TRUE(
std::equal(check_data.info.configs.begin(), check_data.info.configs.end(), stream_info.configs.begin()));
}
void StartStream(StreamCheckData &check_data) {
streams_->Start(check_data.name);
check_data.is_running = true;
@ -183,6 +197,13 @@ TEST_F(StreamsTest, RestoreStreams) {
stream_info.common_info.batch_interval = std::chrono::milliseconds((i + 1) * 10);
stream_info.common_info.batch_size = 1000 + i;
stream_check_data.owner = std::string{"owner"} + iteration_postfix;
if (i == 1 || i == 3) {
stream_info.configs.emplace(std::string{"sasl.username"}, std::string{"username"} + iteration_postfix);
}
if (i == 2 || i == 3) {
stream_info.credentials.emplace(std::string{"sasl.password"}, std::string{"password"} + iteration_postfix);
}
}
mock_cluster_.CreateTopic(stream_info.topics[0]);
@ -198,6 +219,7 @@ TEST_F(StreamsTest, RestoreStreams) {
EXPECT_EQ(stream_check_datas.size(), streams_->GetStreamInfo().size());
for (const auto &check_data : stream_check_datas) {
ASSERT_NO_FATAL_FAILURE(CheckStreamStatus(check_data));
ASSERT_NO_FATAL_FAILURE(CheckConfigAndCredentials(check_data));
}
};
@ -253,3 +275,32 @@ TEST_F(StreamsTest, CheckWithTimeout) {
EXPECT_LE(timeout, elapsed);
EXPECT_LE(elapsed, timeout * 1.2);
}
TEST_F(StreamsTest, CheckInvalidConfig) {
auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
constexpr auto kInvalidConfigName = "doesnt.exist";
constexpr auto kConfigValue = "myprecious";
stream_info.configs.emplace(kInvalidConfigName, kConfigValue);
const auto checker = [](const std::string_view message) {
EXPECT_TRUE(message.find(kInvalidConfigName) != std::string::npos) << message;
EXPECT_TRUE(message.find(kConfigValue) != std::string::npos) << message;
};
EXPECT_THROW_WITH_MSG(streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt),
integrations::kafka::SettingCustomConfigFailed, checker);
}
TEST_F(StreamsTest, CheckInvalidCredentials) {
auto stream_info = CreateDefaultStreamInfo();
const auto stream_name = GetDefaultStreamName();
constexpr auto kInvalidCredentialName = "doesnt.exist";
constexpr auto kCredentialValue = "myprecious";
stream_info.credentials.emplace(kInvalidCredentialName, kCredentialValue);
const auto checker = [](const std::string_view message) {
EXPECT_TRUE(message.find(kInvalidCredentialName) != std::string::npos) << message;
EXPECT_TRUE(message.find(integrations::kReducted) != std::string::npos) << message;
EXPECT_TRUE(message.find(kCredentialValue) == std::string::npos) << message;
};
EXPECT_THROW_WITH_MSG(streams_->Create<query::stream::KafkaStream>(stream_name, stream_info, std::nullopt),
integrations::kafka::SettingCustomConfigFailed, checker);
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -35,3 +35,15 @@ TResult ExpectNoError(const char *file, int line, TFunc func, TArgs &&...args) {
} // namespace test_utils
#define EXPECT_MGP_NO_ERROR(type, ...) test_utils::ExpectNoError<type>(__FILE__, __LINE__, __VA_ARGS__)
#define EXPECT_THROW_WITH_MSG(statement, expected_exception, msg_checker) \
EXPECT_THROW( \
{ \
try { \
statement; \
} catch (const expected_exception &e) { \
EXPECT_NO_FATAL_FAILURE(msg_checker(e.what())); \
throw; \
} \
}, \
expected_exception);