Add support for Pulsar queries (#292)

This commit is contained in:
Antonio Andelic 2021-11-15 11:12:07 +01:00 committed by Antonio Andelic
parent 0e4719018a
commit b66cc66503
19 changed files with 679 additions and 164 deletions

View File

@ -260,6 +260,7 @@ import_external_library(protobuf STATIC
CONFIGURE_COMMAND true)
set(BOOST_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/boost/lib)
set(BOOST_ROOT ${BOOST_ROOT} PARENT_SCOPE)
import_external_library(pulsar STATIC
${CMAKE_CURRENT_SOURCE_DIR}/pulsar/pulsar-client-cpp/lib/libpulsarwithdeps.a

View File

@ -280,7 +280,7 @@ tar -xzf boost_1_77_0.tar.gz
mv boost_1_77_0 boost
pushd boost
./bootstrap.sh --prefix=$(pwd)/lib --with-libraries="system,regex"
./b2 -j$(nproc) install
./b2 -j$(nproc) install variant=release
popd
#pulsar

View File

@ -175,7 +175,16 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
std::inserter(topic_names_from_metadata, topic_names_from_metadata.begin()),
[](const auto topic_metadata) { return topic_metadata->topic(); });
constexpr size_t max_topic_name_length = 249;
constexpr auto is_valid_topic_name = [](const auto c) { return std::isalnum(c) || c == '.' || c == '_' || c == '-'; };
for (const auto &topic_name : info_.topics) {
if (topic_name.size() > max_topic_name_length ||
std::any_of(topic_name.begin(), topic_name.end(), [&](const auto c) { return !is_valid_topic_name(c); })) {
throw ConsumerFailedToInitializeException(info_.consumer_name,
fmt::format("'{}' is an invalid topic name", topic_name));
}
if (!topic_names_from_metadata.contains(topic_name)) {
throw TopicNotFoundException(info_.consumer_name, topic_name);
}

View File

@ -99,9 +99,8 @@ class SpdlogLoggerFactory : public pulsar_client::LoggerFactory {
};
pulsar_client::Client CreateClient(const std::string &service_url) {
static SpdlogLoggerFactory logger_factory;
pulsar_client::ClientConfiguration conf;
conf.setLogger(&logger_factory);
conf.setLogger(new SpdlogLoggerFactory);
return {service_url, conf};
}
} // namespace

View File

@ -190,6 +190,9 @@ DEFINE_bool(telemetry_enabled, false,
DEFINE_string(kafka_bootstrap_servers, "",
"List of default Kafka brokers as a comma separated list of broker host or host:port.");
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_string(pulsar_service_url, "", "Default URL used while connecting to Pulsar brokers.");
// Audit logging flags.
#ifdef MG_ENTERPRISE
DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging.");
@ -1122,7 +1125,8 @@ int main(int argc, char **argv) {
query::InterpreterContext interpreter_context{&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers},
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
.default_pulsar_service_url = FLAGS_pulsar_service_url},
FLAGS_data_directory};
#ifdef MG_ENTERPRISE
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};

View File

@ -40,10 +40,12 @@ set(mg_query_sources
trigger_context.cpp
typed_value.cpp)
find_package(Boost REQUIRED)
add_library(mg-query STATIC ${mg_query_sources})
add_dependencies(mg-query generate_lcp_query)
target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(mg-query dl cppitertools)
target_link_libraries(mg-query dl cppitertools Boost::headers)
target_link_libraries(mg-query mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-utils mg-kvstore mg-memory)
if("${MG_PYTHON_VERSION}" STREQUAL "")
find_package(Python3 3.5 REQUIRED COMPONENTS Development)

View File

@ -22,5 +22,6 @@ struct InterpreterConfig {
double execution_timeout_sec{600.0};
std::string default_kafka_bootstrap_servers;
std::string default_pulsar_service_url;
};
} // namespace query

View File

@ -2498,25 +2498,48 @@ cpp<#
(:serialize (:slk))
(:clone))
(defun clone-variant-topic-names (source destination)
#>cpp
if (auto *topic_expression = std::get_if<Expression*>(&${source})) {
if (*topic_expression == nullptr) {
${destination} = nullptr;
} else {
${destination} = (*topic_expression)->Clone(storage);
}
} else {
${destination} = std::get<std::vector<std::string>>(${source});
}
cpp<#)
(lcp:define-class stream-query (query)
((action "Action" :scope :public)
(type "Type" :scope :public)
(stream_name "std::string" :scope :public)
(topic_names "std::vector<std::string>" :scope :public)
(transform_name "std::string" :scope :public)
(consumer_group "std::string" :scope :public)
(batch_interval "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(batch_size "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(batch_limit "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(timeout "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(transform_name "std::string" :scope :public)
(batch_interval "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(batch_size "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(topic_names "std::variant<Expression*, std::vector<std::string>>" :initval "nullptr"
:clone #'clone-variant-topic-names
:scope :public)
(consumer_group "std::string" :scope :public)
(bootstrap_servers "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(service_url "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))
@ -2524,6 +2547,9 @@ cpp<#
(lcp:define-enum action
(create-stream drop-stream start-stream stop-stream start-all-streams stop-all-streams show-streams check-stream)
(:serialize))
(lcp:define-enum type
(kafka pulsar)
(:serialize))
#>cpp
StreamQuery() = default;

View File

@ -18,6 +18,7 @@
// of the same name, EOF.
// This hides the definition of the macro which causes
// the compilation to fail.
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/ast/ast_visitor.hpp"
#include "query/procedure/module.hpp"
//////////////////////////////////////////////////////
@ -31,14 +32,18 @@
#include <limits>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>
#include <boost/preprocessor/cat.hpp>
#include "query/exceptions.hpp"
#include "query/frontend/parsing.hpp"
#include "query/interpret/awesome_memgraph_functions.hpp"
#include "query/stream/common.hpp"
#include "utils/exceptions.hpp"
#include "utils/logging.hpp"
#include "utils/string.hpp"
@ -492,45 +497,243 @@ antlrcpp::Any CypherMainVisitor::visitStreamQuery(MemgraphCypher::StreamQueryCon
}
antlrcpp::Any CypherMainVisitor::visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) {
MG_ASSERT(ctx->children.size() == 1, "CreateStreamQuery should have exactly one child!");
auto *stream_query = ctx->children[0]->accept(this).as<StreamQuery *>();
query_ = stream_query;
return stream_query;
}
namespace {
std::vector<std::string> TopicNamesFromSymbols(
antlr4::tree::ParseTreeVisitor &visitor,
const std::vector<MemgraphCypher::SymbolicNameWithDotsAndMinusContext *> &topic_name_symbols) {
MG_ASSERT(!topic_name_symbols.empty());
std::vector<std::string> topic_names;
topic_names.reserve(topic_name_symbols.size());
std::transform(topic_name_symbols.begin(), topic_name_symbols.end(), std::back_inserter(topic_names),
[&visitor](auto *topic_name) { return JoinSymbolicNamesWithDotsAndMinus(visitor, *topic_name); });
return topic_names;
}
template <typename T>
concept EnumUint8 = std::is_enum_v<T> && std::same_as<uint8_t, std::underlying_type_t<T>>;
template <bool required, typename... ValueTypes>
void MapConfig(auto &memory, const EnumUint8 auto &enum_key, auto &destination) {
const auto key = static_cast<uint8_t>(enum_key);
if (!memory.contains(key)) {
if constexpr (required) {
throw SemanticException("Config {} is required.", ToString(enum_key));
} else {
return;
}
}
std::visit(
[&]<typename T>(T &&value) {
using ValueType = std::decay_t<T>;
if constexpr (utils::SameAsAnyOf<ValueType, ValueTypes...>) {
destination = std::forward<T>(value);
} else {
LOG_FATAL("Invalid type mapped");
}
},
std::move(memory[key]));
memory.erase(key);
}
enum class CommonStreamConfigKey : uint8_t { TRANSFORM, BATCH_INTERVAL, BATCH_SIZE, END };
std::string_view ToString(const CommonStreamConfigKey key) {
switch (key) {
case CommonStreamConfigKey::TRANSFORM:
return "TRANSFORM";
case CommonStreamConfigKey::BATCH_INTERVAL:
return "BATCH_INTERVAL";
case CommonStreamConfigKey::BATCH_SIZE:
return "BATCH_SIZE";
case CommonStreamConfigKey::END:
LOG_FATAL("Invalid config key used");
}
}
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define GENERATE_STREAM_CONFIG_KEY_ENUM(stream, first_config, ...) \
enum class BOOST_PP_CAT(stream, ConfigKey) : uint8_t { \
first_config = static_cast<uint8_t>(CommonStreamConfigKey::END), \
__VA_ARGS__ \
};
GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS);
std::string_view ToString(const KafkaConfigKey key) {
switch (key) {
case KafkaConfigKey::TOPICS:
return "TOPICS";
case KafkaConfigKey::CONSUMER_GROUP:
return "CONSUMER_GROUP";
case KafkaConfigKey::BOOTSTRAP_SERVERS:
return "BOOTSTRAP_SERVERS";
}
}
void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) {
MapConfig<true, std::string>(memory, CommonStreamConfigKey::TRANSFORM, stream_query.transform_name_);
MapConfig<false, Expression *>(memory, CommonStreamConfigKey::BATCH_INTERVAL, stream_query.batch_interval_);
MapConfig<false, Expression *>(memory, CommonStreamConfigKey::BATCH_SIZE, stream_query.batch_size_);
}
} // namespace
antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
stream_query->type_ = StreamQuery::Type::KAFKA;
stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as<std::string>();
auto *topic_names_ctx = ctx->topicNames();
MG_ASSERT(topic_names_ctx != nullptr);
auto topic_names = topic_names_ctx->symbolicNameWithDotsAndMinus();
MG_ASSERT(!topic_names.empty());
stream_query->topic_names_.reserve(topic_names.size());
std::transform(topic_names.begin(), topic_names.end(), std::back_inserter(stream_query->topic_names_),
[this](auto *topic_name) { return JoinSymbolicNamesWithDotsAndMinus(*this, *topic_name); });
for (auto *create_config_ctx : ctx->kafkaCreateStreamConfig()) {
create_config_ctx->accept(this);
}
stream_query->transform_name_ = JoinSymbolicNames(this, ctx->transformationName->symbolicName());
MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_);
MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_);
MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_);
MapCommonStreamConfigs(memory_, *stream_query);
return stream_query;
}
namespace {
void ThrowIfExists(const auto &map, const EnumUint8 auto &enum_key) {
const auto key = static_cast<uint8_t>(enum_key);
if (map.contains(key)) {
throw SemanticException("{} defined multiple times in the query", ToString(enum_key));
}
}
void GetTopicNames(auto &destination, MemgraphCypher::TopicNamesContext *topic_names_ctx,
antlr4::tree::ParseTreeVisitor &visitor) {
MG_ASSERT(topic_names_ctx != nullptr);
if (auto *symbolic_topic_names_ctx = topic_names_ctx->symbolicTopicNames()) {
destination = TopicNamesFromSymbols(visitor, symbolic_topic_names_ctx->symbolicNameWithDotsAndMinus());
} else {
if (!topic_names_ctx->literal()->StringLiteral()) {
throw SemanticException("Topic names should be defined as a string literal or as symbolic names");
}
destination = topic_names_ctx->accept(&visitor).as<Expression *>();
}
}
} // namespace
antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::KafkaCreateStreamConfigContext *ctx) {
if (ctx->commonCreateStreamConfig()) {
return ctx->commonCreateStreamConfig()->accept(this);
}
if (ctx->TOPICS()) {
ThrowIfExists(memory_, KafkaConfigKey::TOPICS);
const auto topics_key = static_cast<uint8_t>(KafkaConfigKey::TOPICS);
GetTopicNames(memory_[topics_key], ctx->topicNames(), *this);
return {};
}
if (ctx->CONSUMER_GROUP()) {
stream_query->consumer_group_ = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup);
ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP);
const auto consumer_group_key = static_cast<uint8_t>(KafkaConfigKey::CONSUMER_GROUP);
memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup);
return {};
}
MG_ASSERT(ctx->BOOTSTRAP_SERVERS());
ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS);
if (!ctx->bootstrapServers->StringLiteral()) {
throw SemanticException("Bootstrap servers should be a string!");
}
const auto bootstrap_servers_key = static_cast<uint8_t>(KafkaConfigKey::BOOTSTRAP_SERVERS);
memory_[bootstrap_servers_key] = ctx->bootstrapServers->accept(this).as<Expression *>();
return {};
}
namespace {
GENERATE_STREAM_CONFIG_KEY_ENUM(Pulsar, TOPICS, SERVICE_URL);
std::string_view ToString(const PulsarConfigKey key) {
switch (key) {
case PulsarConfigKey::TOPICS:
return "TOPICS";
case PulsarConfigKey::SERVICE_URL:
return "SERVICE_URL";
}
}
} // namespace
antlrcpp::Any CypherMainVisitor::visitPulsarCreateStream(MemgraphCypher::PulsarCreateStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
stream_query->type_ = StreamQuery::Type::PULSAR;
stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as<std::string>();
for (auto *create_config_ctx : ctx->pulsarCreateStreamConfig()) {
create_config_ctx->accept(this);
}
MapConfig<true, std::vector<std::string>, Expression *>(memory_, PulsarConfigKey::TOPICS, stream_query->topic_names_);
MapConfig<false, Expression *>(memory_, PulsarConfigKey::SERVICE_URL, stream_query->service_url_);
MapCommonStreamConfigs(memory_, *stream_query);
return stream_query;
}
antlrcpp::Any CypherMainVisitor::visitPulsarCreateStreamConfig(MemgraphCypher::PulsarCreateStreamConfigContext *ctx) {
if (ctx->commonCreateStreamConfig()) {
return ctx->commonCreateStreamConfig()->accept(this);
}
if (ctx->TOPICS()) {
ThrowIfExists(memory_, PulsarConfigKey::TOPICS);
const auto topics_key = static_cast<uint8_t>(PulsarConfigKey::TOPICS);
GetTopicNames(memory_[topics_key], ctx->topicNames(), *this);
return {};
}
MG_ASSERT(ctx->SERVICE_URL());
ThrowIfExists(memory_, PulsarConfigKey::SERVICE_URL);
if (!ctx->serviceUrl->StringLiteral()) {
throw SemanticException("Service URL must be a string!");
}
const auto service_url_key = static_cast<uint8_t>(PulsarConfigKey::SERVICE_URL);
memory_[service_url_key] = ctx->serviceUrl->accept(this).as<Expression *>();
return {};
}
antlrcpp::Any CypherMainVisitor::visitCommonCreateStreamConfig(MemgraphCypher::CommonCreateStreamConfigContext *ctx) {
if (ctx->TRANSFORM()) {
ThrowIfExists(memory_, CommonStreamConfigKey::TRANSFORM);
const auto transform_key = static_cast<uint8_t>(CommonStreamConfigKey::TRANSFORM);
memory_[transform_key] = JoinSymbolicNames(this, ctx->transformationName->symbolicName());
return {};
}
if (ctx->BATCH_INTERVAL()) {
ThrowIfExists(memory_, CommonStreamConfigKey::BATCH_INTERVAL);
if (!ctx->batchInterval->numberLiteral() || !ctx->batchInterval->numberLiteral()->integerLiteral()) {
throw SemanticException("Batch interval should be an integer literal!");
throw SemanticException("Batch interval must be an integer literal!");
}
stream_query->batch_interval_ = ctx->batchInterval->accept(this);
const auto batch_interval_key = static_cast<uint8_t>(CommonStreamConfigKey::BATCH_INTERVAL);
memory_[batch_interval_key] = ctx->batchInterval->accept(this).as<Expression *>();
return {};
}
if (ctx->BATCH_SIZE()) {
if (!ctx->batchSize->numberLiteral() || !ctx->batchSize->numberLiteral()->integerLiteral()) {
throw SemanticException("Batch size should be an integer literal!");
}
stream_query->batch_size_ = ctx->batchSize->accept(this);
MG_ASSERT(ctx->BATCH_SIZE());
ThrowIfExists(memory_, CommonStreamConfigKey::BATCH_SIZE);
if (!ctx->batchSize->numberLiteral() || !ctx->batchSize->numberLiteral()->integerLiteral()) {
throw SemanticException("Batch size must be an integer literal!");
}
if (ctx->BOOTSTRAP_SERVERS()) {
if (!ctx->bootstrapServers->StringLiteral()) {
throw SemanticException("Bootstrap servers should be a string!");
}
stream_query->bootstrap_servers_ = ctx->bootstrapServers->accept(this);
}
return stream_query;
const auto batch_size_key = static_cast<uint8_t>(CommonStreamConfigKey::BATCH_SIZE);
memory_[batch_size_key] = ctx->batchSize->accept(this).as<Expression *>();
return {};
}
antlrcpp::Any CypherMainVisitor::visitDropStream(MemgraphCypher::DropStreamContext *ctx) {

View File

@ -269,6 +269,31 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitKafkaCreateStreamConfig(MemgraphCypher::KafkaCreateStreamConfigContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitPulsarCreateStreamConfig(MemgraphCypher::PulsarCreateStreamConfigContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitPulsarCreateStream(MemgraphCypher::PulsarCreateStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitCommonCreateStreamConfig(MemgraphCypher::CommonCreateStreamConfigContext *ctx) override;
/**
* @return StreamQuery*
*/
@ -824,6 +849,7 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
ParsingContext context_;
AstStorage *storage_;
std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>>> memory_;
// Set of identifiers from queries.
std::unordered_set<std::string> users_identifiers;
// Identifiers that user didn't name.

View File

@ -295,15 +295,32 @@ symbolicNameWithMinus : symbolicName ( MINUS symbolicName )* ;
symbolicNameWithDotsAndMinus: symbolicNameWithMinus ( DOT symbolicNameWithMinus )* ;
topicNames : symbolicNameWithDotsAndMinus ( COMMA symbolicNameWithDotsAndMinus )* ;
symbolicTopicNames : symbolicNameWithDotsAndMinus ( COMMA symbolicNameWithDotsAndMinus )* ;
createStream : CREATE STREAM streamName
TOPICS topicNames
TRANSFORM transformationName=procedureName
( CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus ) ?
( BATCH_INTERVAL batchInterval=literal ) ?
( BATCH_SIZE batchSize=literal ) ?
( BOOTSTRAP_SERVERS bootstrapServers=literal) ? ;
topicNames : symbolicTopicNames | literal ;
commonCreateStreamConfig : TRANSFORM transformationName=procedureName
| BATCH_INTERVAL batchInterval=literal
| BATCH_SIZE batchSize=literal
;
createStream : kafkaCreateStream | pulsarCreateStream ;
kafkaCreateStreamConfig : TOPICS topicNames
| CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus
| BOOTSTRAP_SERVERS bootstrapServers=literal
| commonCreateStreamConfig
;
kafkaCreateStream : CREATE KAFKA STREAM streamName ( kafkaCreateStreamConfig ) * ;
pulsarCreateStreamConfig : TOPICS topicNames
| SERVICE_URL serviceUrl=literal
| commonCreateStreamConfig
;
pulsarCreateStream : CREATE PULSAR STREAM streamName ( pulsarCreateStreamConfig ) * ;
dropStream : DROP STREAM streamName ;

View File

@ -62,6 +62,7 @@ HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ;
KAFKA : K A F K A ;
LEVEL : L E V E L ;
LOAD : L O A D ;
LOCK : L O C K ;
@ -72,6 +73,7 @@ NO : N O ;
PASSWORD : P A S S W O R D ;
PORT : P O R T ;
PRIVILEGES : P R I V I L E G E S ;
PULSAR : P U L S A R ;
READ : R E A D ;
READ_FILE : R E A D UNDERSCORE F I L E ;
REGISTER : R E G I S T E R ;
@ -82,6 +84,7 @@ REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SERVICE_URL : S E R V I C E UNDERSCORE U R L ;
SESSION : S E S S I O N ;
SETTING : S E T T I N G ;
SETTINGS : S E T T I N G S ;

View File

@ -199,7 +199,10 @@ const trie::Trie kKeywords = {"union",
"check",
"setting",
"settings",
"bootstrap_servers"};
"bootstrap_servers",
"kafka",
"pulsar",
"service_url"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -40,6 +40,7 @@
#include "query/plan/planner.hpp"
#include "query/plan/profile.hpp"
#include "query/plan/vertex_count_cache.hpp"
#include "query/stream/common.hpp"
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
@ -57,6 +58,7 @@
#include "utils/settings.hpp"
#include "utils/string.hpp"
#include "utils/tsc.hpp"
#include "utils/variant_helpers.hpp"
namespace EventCounter {
extern Event ReadQuery;
@ -91,7 +93,8 @@ void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) {
struct Callback {
std::vector<std::string> header;
std::function<std::vector<std::vector<TypedValue>>()> fn;
using CallbackFunction = std::function<std::vector<std::vector<TypedValue>>()>;
CallbackFunction fn;
bool should_abort_query{false};
};
@ -529,6 +532,77 @@ std::optional<std::string> StringPointerToOptional(const std::string *str) {
return str == nullptr ? std::nullopt : std::make_optional(*str);
}
CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) {
return {.batch_interval = GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator),
.batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator),
.transformation_name = stream_query->transform_name_};
}
std::vector<std::string> EvaluateTopicNames(ExpressionEvaluator &evaluator,
std::variant<Expression *, std::vector<std::string>> topic_variant) {
return std::visit(utils::Overloaded{[&](Expression *expression) {
auto topic_names = expression->Accept(evaluator);
MG_ASSERT(topic_names.IsString());
return utils::Split(topic_names.ValueString(), ",");
},
[&](std::vector<std::string> topic_names) { return topic_names; }},
std::move(topic_variant));
}
Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator,
InterpreterContext *interpreter_context,
const std::string *username) {
constexpr std::string_view kDefaultConsumerGroup = "mg_consumer";
std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup
: stream_query->consumer_group_};
auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator);
if (bootstrap && bootstrap->empty()) {
throw SemanticException("Bootstrap servers must not be an empty string!");
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable {
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
interpreter_context->streams.Create<query::KafkaStream>(stream_name,
{.common_info = std::move(common_stream_info),
.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{};
};
}
Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator,
InterpreterContext *interpreter_context,
const std::string *username) {
auto service_url = GetOptionalStringValue(stream_query->service_url_, evaluator);
if (service_url && service_url->empty()) {
throw SemanticException("Service URL must not be an empty string!");
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
common_stream_info = std::move(common_stream_info), service_url = std::move(service_url),
owner = StringPointerToOptional(username)]() mutable {
std::string url =
service_url ? std::move(*service_url) : std::string{interpreter_context->config.default_pulsar_service_url};
interpreter_context->streams.Create<query::PulsarStream>(
stream_name,
{.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .service_url = std::move(url)},
std::move(owner));
return std::vector<std::vector<TypedValue>>{};
};
}
Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &parameters,
InterpreterContext *interpreter_context, DbAccessor *db_accessor,
const std::string *username, std::vector<Notification> *notifications) {
@ -545,44 +619,14 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
switch (stream_query->action_) {
case StreamQuery::Action::CREATE_STREAM: {
EventCounter::IncrementCounter(EventCounter::StreamsCreated);
constexpr std::string_view kDefaultConsumerGroup = "mg_consumer";
std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup
: stream_query->consumer_group_};
auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator);
if (bootstrap && bootstrap->empty()) {
throw SemanticException("Bootstrap servers must not be an empty string!");
switch (stream_query->type_) {
case StreamQuery::Type::KAFKA:
callback.fn = GetKafkaCreateCallback(stream_query, evaluator, interpreter_context, username);
break;
case StreamQuery::Type::PULSAR:
callback.fn = GetPulsarCreateCallback(stream_query, evaluator, interpreter_context, username);
break;
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = stream_query->topic_names_, consumer_group = std::move(consumer_group),
batch_interval =
GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator),
batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator),
transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap),
owner = StringPointerToOptional(username)]() mutable {
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
interpreter_context->streams.Create<query::KafkaStream>(
stream_name,
{.common_info = {.batch_interval = batch_interval,
.batch_size = batch_size,
.transformation_name = std::move(transformation_name)},
.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)},
std::move(owner));
// interpreter_context->streams.Create<query::PulsarStream>(
// stream_name,
// {.common_info = {.batch_interval = batch_interval,
// .batch_size = batch_size,
// .transformation_name = std::move(transformation_name)},
// .topics = std::move(topic_names),
// .service_url = std::move(bootstrap)},
// std::move(owner));
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM,
fmt::format("Created stream {}.", stream_query->stream_name_));
return callback;

View File

@ -0,0 +1,22 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
namespace utils {
template <class... Ts>
struct Overloaded : Ts... {
using Ts::operator()...;
};
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
} // namespace utils

View File

@ -36,7 +36,7 @@ def test_ownerless_stream(producer, topics, connection):
assert len(topics) > 0
userless_cursor = connection.cursor()
common.execute_and_fetch_all(userless_cursor,
"CREATE STREAM ownerless "
"CREATE KAFKA STREAM ownerless "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple")
common.start_stream(userless_cursor, "ownerless")
@ -73,7 +73,7 @@ def test_owner_is_shown(topics, connection):
create_stream_user(userless_cursor, stream_user)
stream_cursor = get_cursor_with_user(stream_user)
common.execute_and_fetch_all(stream_cursor, "CREATE STREAM test "
common.execute_and_fetch_all(stream_cursor, "CREATE KAFKA STREAM test "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple")
@ -94,7 +94,7 @@ def test_insufficient_privileges(producer, topics, connection):
stream_cursor = get_cursor_with_user(stream_user)
common.execute_and_fetch_all(stream_cursor,
"CREATE STREAM insufficient_test "
"CREATE KAFKA STREAM insufficient_test "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple")
@ -139,7 +139,7 @@ def test_happy_case(producer, topics, connection):
admin_cursor, f"GRANT CREATE TO {stream_user}")
common.execute_and_fetch_all(stream_cursor,
"CREATE STREAM insufficient_test "
"CREATE KAFKA STREAM insufficient_test "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple")

View File

@ -34,7 +34,7 @@ def test_simple(producer, topics, connection, transformation):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test "
"CREATE KAFKA STREAM test "
f"TOPICS {','.join(topics)} "
f"TRANSFORM {transformation}",
)
@ -61,7 +61,7 @@ def test_separate_consumers(producer, topics, connection, transformation):
stream_names.append(stream_name)
common.execute_and_fetch_all(
cursor,
f"CREATE STREAM {stream_name} "
f"CREATE KAFKA STREAM {stream_name} "
f"TOPICS {topic} "
f"TRANSFORM {transformation}",
)
@ -91,9 +91,9 @@ def test_start_from_last_committed_offset(producer, topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test "
"CREATE KAFKA STREAM test "
f"TOPICS {topics[0]} "
"TRANSFORM transform.simple",
"TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "test")
time.sleep(1)
@ -123,9 +123,9 @@ def test_start_from_last_committed_offset(producer, topics, connection):
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test "
"CREATE KAFKA STREAM test "
f"TOPICS {topics[0]} "
"TRANSFORM transform.simple",
"TRANSFORM kafka_transform.simple",
)
common.start_stream(cursor, "test")
@ -141,7 +141,7 @@ def test_check_stream(producer, topics, connection, transformation):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test "
"CREATE KAFKA STREAM test "
f"TOPICS {topics[0]} "
f"TRANSFORM {transformation} "
"BATCH_SIZE 1",
@ -202,7 +202,7 @@ def test_show_streams(producer, topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM default_values "
"CREATE KAFKA STREAM default_values "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple "
f"BOOTSTRAP_SERVERS 'localhost:9092'",
@ -213,7 +213,7 @@ def test_show_streams(producer, topics, connection):
batch_size = 3
common.execute_and_fetch_all(
cursor,
"CREATE STREAM complex_values "
"CREATE KAFKA STREAM complex_values "
f"TOPICS {','.join(topics)} "
f"TRANSFORM transform.with_parameters "
f"CONSUMER_GROUP {consumer_group} "
@ -259,7 +259,7 @@ def test_start_and_stop_during_check(producer, topics, connection, operation):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test_stream "
"CREATE KAFKA STREAM test_stream "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple",
)
@ -371,7 +371,7 @@ def test_check_already_started_stream(topics, connection):
common.execute_and_fetch_all(
cursor,
"CREATE STREAM started_stream "
"CREATE KAFKA STREAM started_stream "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple",
)
@ -385,7 +385,7 @@ def test_start_checked_stream_after_timeout(topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test_stream "
"CREATE KAFKA STREAM test_stream "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple",
)
@ -419,7 +419,7 @@ def test_restart_after_error(producer, topics, connection):
cursor = connection.cursor()
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test_stream "
"CREATE KAFKA STREAM test_stream "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.query",
)
@ -447,7 +447,7 @@ def test_bootstrap_server(producer, topics, connection, transformation):
local = "localhost:9092"
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test "
"CREATE KAFKA STREAM test "
f"TOPICS {','.join(topics)} "
f"TRANSFORM {transformation} "
f"BOOTSTRAP_SERVERS '{local}'",
@ -471,7 +471,7 @@ def test_bootstrap_server_empty(producer, topics, connection, transformation):
with pytest.raises(mgclient.DatabaseError):
common.execute_and_fetch_all(
cursor,
"CREATE STREAM test "
"CREATE KAFKA STREAM test "
f"TOPICS {','.join(topics)} "
f"TRANSFORM {transformation} "
"BOOTSTRAP_SERVERS ''",

View File

@ -52,6 +52,7 @@
#include "query/typed_value.hpp"
#include "utils/string.hpp"
#include "utils/variant_helpers.hpp"
using namespace query;
using namespace query::frontend;
@ -2943,6 +2944,7 @@ TEST_P(CypherMainVisitorTest, CallProcedureWithMemoryUnlimited) {
namespace {
template <typename TException = SyntaxException>
void TestInvalidQuery(const auto &query, Base &ast_generator) {
SCOPED_TRACE(query);
EXPECT_THROW(ast_generator.ParseQuery(query), TException) << query;
}
@ -3564,11 +3566,17 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer
ASSERT_NE(parsed_query, nullptr);
EXPECT_EQ(parsed_query->action_, action);
EXPECT_EQ(parsed_query->stream_name_, stream_name);
EXPECT_TRUE(parsed_query->topic_names_.empty());
auto topic_names = std::get_if<Expression *>(&parsed_query->topic_names_);
EXPECT_NE(topic_names, nullptr);
EXPECT_EQ(*topic_names, nullptr);
EXPECT_TRUE(topic_names);
EXPECT_FALSE(*topic_names);
EXPECT_TRUE(parsed_query->transform_name_.empty());
EXPECT_TRUE(parsed_query->consumer_group_.empty());
EXPECT_EQ(parsed_query->batch_interval_, nullptr);
EXPECT_EQ(parsed_query->batch_size_, nullptr);
EXPECT_EQ(parsed_query->service_url_, nullptr);
EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr);
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit));
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout));
}
@ -3637,17 +3645,28 @@ TEST_P(CypherMainVisitorTest, StopAllStreams) {
ValidateMostlyEmptyStreamQuery(ast_generator, "SToP ALL STReaMS", StreamQuery::Action::STOP_ALL_STREAMS, "");
}
void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_string, const std::string_view stream_name,
const std::vector<std::string> &topic_names, const std::string_view transform_name,
const std::string_view consumer_group, const std::optional<TypedValue> &batch_interval,
const std::optional<TypedValue> &batch_size,
const std::string_view bootstrap_servers = "") {
void ValidateTopicNames(const auto &topic_names, const std::vector<std::string> &expected_topic_names,
Base &ast_generator) {
std::visit(utils::Overloaded{
[&](Expression *expression) {
ast_generator.CheckLiteral(expression, utils::Join(expected_topic_names, ","));
},
[&](const std::vector<std::string> &topic_names) { EXPECT_EQ(topic_names, expected_topic_names); }},
topic_names);
}
void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &query_string,
const std::string_view stream_name, const std::vector<std::string> &topic_names,
const std::string_view transform_name, const std::string_view consumer_group,
const std::optional<TypedValue> &batch_interval,
const std::optional<TypedValue> &batch_size,
const std::string_view bootstrap_servers = "") {
SCOPED_TRACE(query_string);
StreamQuery *parsed_query{nullptr};
ASSERT_NO_THROW(parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string))) << query_string;
ASSERT_NE(parsed_query, nullptr);
EXPECT_EQ(parsed_query->stream_name_, stream_name);
EXPECT_EQ(parsed_query->topic_names_, topic_names);
ValidateTopicNames(parsed_query->topic_names_, topic_names, ast_generator);
EXPECT_EQ(parsed_query->transform_name_, transform_name);
EXPECT_EQ(parsed_query->consumer_group_, consumer_group);
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_interval_, batch_interval));
@ -3660,37 +3679,35 @@ void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_str
EXPECT_NE(parsed_query->bootstrap_servers_, nullptr);
}
TEST_P(CypherMainVisitorTest, CreateStream) {
TEST_P(CypherMainVisitorTest, CreateKafkaStream) {
auto &ast_generator = *GetParam();
TestInvalidQuery("CREATE STREAM", ast_generator);
TestInvalidQuery("CREATE STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM invalid transform name", ast_generator);
TestInvalidQuery("CREATE STREAM stream TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP invalid consumer group",
TestInvalidQuery("CREATE KAFKA STREAM", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM invalid transformation name", ast_generator);
// required configs are missing
TestInvalidQuery<SemanticException>("CREATE KAFKA STREAM stream TRANSFORM transform", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS TRANSFORM transform", ast_generator);
// required configs are missing
TestInvalidQuery<SemanticException>("CREATE KAFKA STREAM stream TOPICS topic1", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP invalid consumer group",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL", ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL 'invalid interval'", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE", ast_generator);
"CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL 'invalid interval'", ast_generator);
TestInvalidQuery<SemanticException>("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform TOPICS topic2",
ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE", ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 'invalid size'", ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 2 BATCH_INTERVAL 3",
"CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 'invalid size'", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1, TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INVERVAL 2 CONSUMER_GROUP Gru",
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1, TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator);
TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator);
const std::vector<std::string> topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots",
"topic-name.with-multiple.dots-and-dashes"};
@ -3707,45 +3724,59 @@ TEST_P(CypherMainVisitorTest, CreateStream) {
const auto topic_names_as_str = utils::Join(topic_names, ",");
ValidateCreateStreamQuery(
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName),
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt);
ValidateCreateStreamQuery(ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ", kStreamName,
topic_names_as_str, kTransformName, kConsumerGroup),
kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, std::nullopt);
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup),
kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt,
std::nullopt);
ValidateCreateStreamQuery(ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} BATCH_INTERVAL {}", kStreamName,
topic_names_as_str, kTransformName, kBatchInterval),
kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt);
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TRANSFORM {} TOPICS {} BATCH_INTERVAL {}",
kStreamName, kTransformName, topic_names_as_str, kBatchInterval),
kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt);
ValidateCreateStreamQuery(ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} BATCH_SIZE {}", kStreamName,
topic_names_as_str, kTransformName, kBatchSize),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value);
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} BATCH_SIZE {} TOPICS {} TRANSFORM {}",
kStreamName, kBatchSize, topic_names_as_str, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value);
ValidateCreateStreamQuery(
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS '{}' BATCH_SIZE {} TRANSFORM {}",
kStreamName, topic_names_as_str, kBatchSize, kTransformName),
kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value);
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}",
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value);
using namespace std::string_literals;
const auto host1 = "localhost:9094"s;
ValidateCreateStreamQuery(
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} "
fmt::format("CREATE KAFKA STREAM {} TOPICS {} CONSUMER_GROUP {} BATCH_SIZE {} BATCH_INTERVAL {} TRANSFORM {} "
"BOOTSTRAP_SERVERS '{}'",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host1),
kStreamName, topic_names_as_str, kConsumerGroup, kBatchSize, kBatchInterval, kTransformName, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1);
const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s;
ValidateCreateStreamQuery(
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} "
fmt::format("CREATE KAFKA STREAM {} CONSUMER_GROUP {} TOPICS {} BATCH_INTERVAL {} TRANSFORM {} BATCH_SIZE {} "
"BOOTSTRAP_SERVERS '{}'",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host2),
kStreamName, kConsumerGroup, topic_names_as_str, kBatchInterval, kTransformName, kBatchSize, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1);
const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s;
ValidateCreateKafkaStreamQuery(
ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} BOOTSTRAP_SERVERS '{}' CONSUMER_GROUP {} TRANSFORM {} "
"BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, host2, kConsumerGroup, kTransformName, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2);
};
@ -3757,10 +3788,11 @@ TEST_P(CypherMainVisitorTest, CreateStream) {
auto check_consumer_group = [&](const std::string_view consumer_group) {
const std::string kTopicName{"topic1"};
ValidateCreateStreamQuery(ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}", kStreamName,
kTopicName, kTransformName, consumer_group),
kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt, std::nullopt);
ValidateCreateKafkaStreamQuery(ast_generator,
fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}",
kStreamName, kTopicName, kTransformName, consumer_group),
kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt,
std::nullopt);
};
using namespace std::literals;
@ -3772,6 +3804,129 @@ TEST_P(CypherMainVisitorTest, CreateStream) {
}
}
void ValidateCreatePulsarStreamQuery(Base &ast_generator, const std::string &query_string,
const std::string_view stream_name, const std::vector<std::string> &topic_names,
const std::string_view transform_name,
const std::optional<TypedValue> &batch_interval,
const std::optional<TypedValue> &batch_size, const std::string_view service_url) {
SCOPED_TRACE(query_string);
StreamQuery *parsed_query{nullptr};
ASSERT_NO_THROW(parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string))) << query_string;
ASSERT_NE(parsed_query, nullptr);
EXPECT_EQ(parsed_query->stream_name_, stream_name);
ValidateTopicNames(parsed_query->topic_names_, topic_names, ast_generator);
EXPECT_EQ(parsed_query->transform_name_, transform_name);
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_interval_, batch_interval));
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_size_, batch_size));
EXPECT_EQ(parsed_query->batch_limit_, nullptr);
if (service_url.empty()) {
EXPECT_EQ(parsed_query->service_url_, nullptr);
return;
}
EXPECT_NE(parsed_query->service_url_, nullptr);
}
TEST_P(CypherMainVisitorTest, CreatePulsarStream) {
auto &ast_generator = *GetParam();
TestInvalidQuery("CREATE PULSAR STREAM", ast_generator);
TestInvalidQuery<SemanticException>("CREATE PULSAR STREAM stream", ast_generator);
TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS", ast_generator);
TestInvalidQuery<SemanticException>("CREATE PULSAR STREAM stream TOPICS topic_name", ast_generator);
TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM", ast_generator);
TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name SERVICE_URL", ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name SERVICE_URL 1", ast_generator);
TestInvalidQuery(
"CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name BOOTSTRAP_SERVERS 'bootstrap'",
ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test' TOPICS topic_name",
ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE PULSAR STREAM stream TRANSFORM transform.name TOPICS topic_name TRANSFORM transform.name SERVICE_URL "
"'test'",
ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE PULSAR STREAM stream BATCH_INTERVAL 1 TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test' "
"BATCH_INTERVAL 1000",
ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE PULSAR STREAM stream BATCH_INTERVAL 'a' TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test'",
ast_generator);
TestInvalidQuery<SemanticException>(
"CREATE PULSAR STREAM stream BATCH_SIZE 'a' TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test'",
ast_generator);
const std::vector<std::string> topic_names{"topic1", "topic2"};
const std::string topic_names_str = utils::Join(topic_names, ",");
constexpr std::string_view kStreamName{"PulsarStream"};
constexpr std::string_view kTransformName{"boringTransformation"};
constexpr std::string_view kServiceUrl{"localhost"};
constexpr int kBatchSize{1000};
constexpr int kBatchInterval{231321};
{
SCOPED_TRACE("single topic");
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names[0], kTransformName),
kStreamName, {topic_names[0]}, kTransformName, std::nullopt, std::nullopt, "");
}
{
SCOPED_TRACE("multiple topics");
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} TOPICS {}", kStreamName, kTransformName, topic_names_str),
kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, "");
}
{
SCOPED_TRACE("topic name in string");
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} TOPICS '{}'", kStreamName, kTransformName, topic_names_str),
kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, "");
}
{
SCOPED_TRACE("service url");
ValidateCreatePulsarStreamQuery(ast_generator,
fmt::format("CREATE PULSAR STREAM {} SERVICE_URL '{}' TRANSFORM {} TOPICS {}",
kStreamName, kServiceUrl, kTransformName, topic_names_str),
kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, kServiceUrl);
ValidateCreatePulsarStreamQuery(ast_generator,
fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} SERVICE_URL '{}' TOPICS {}",
kStreamName, kTransformName, kServiceUrl, topic_names_str),
kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, kServiceUrl);
}
{
SCOPED_TRACE("batch size");
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} SERVICE_URL '{}' BATCH_SIZE {} TRANSFORM {} TOPICS {}", kStreamName,
kServiceUrl, kBatchSize, kTransformName, topic_names_str),
kStreamName, topic_names, kTransformName, std::nullopt, TypedValue(kBatchSize), kServiceUrl);
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} SERVICE_URL '{}' TOPICS {} BATCH_SIZE {}", kStreamName,
kTransformName, kServiceUrl, topic_names_str, kBatchSize),
kStreamName, topic_names, kTransformName, std::nullopt, TypedValue(kBatchSize), kServiceUrl);
}
{
SCOPED_TRACE("batch interval");
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} BATCH_INTERVAL {} SERVICE_URL '{}' BATCH_SIZE {} TRANSFORM {} TOPICS {}",
kStreamName, kBatchInterval, kServiceUrl, kBatchSize, kTransformName, topic_names_str),
kStreamName, topic_names, kTransformName, TypedValue(kBatchInterval), TypedValue(kBatchSize), kServiceUrl);
ValidateCreatePulsarStreamQuery(
ast_generator,
fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} SERVICE_URL '{}' BATCH_INTERVAL {} TOPICS {} BATCH_SIZE {}",
kStreamName, kTransformName, kServiceUrl, kBatchInterval, topic_names_str, kBatchSize),
kStreamName, topic_names, kTransformName, TypedValue(kBatchInterval), TypedValue(kBatchSize), kServiceUrl);
}
}
TEST_P(CypherMainVisitorTest, CheckStream) {
auto &ast_generator = *GetParam();

View File

@ -286,7 +286,7 @@ TEST_F(ConsumerTest, InvalidBootstrapServers) {
TEST_F(ConsumerTest, InvalidTopic) {
auto info = CreateDefaultConsumerInfo();
info.topics = {"Non existing topic"};
info.topics = {"Nonexistingtopic"};
EXPECT_THROW(Consumer(std::move(info), kDummyConsumerFunction), TopicNotFoundException);
}