diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 1cd2dcd72..a2d350557 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -260,6 +260,7 @@ import_external_library(protobuf STATIC CONFIGURE_COMMAND true) set(BOOST_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/boost/lib) +set(BOOST_ROOT ${BOOST_ROOT} PARENT_SCOPE) import_external_library(pulsar STATIC ${CMAKE_CURRENT_SOURCE_DIR}/pulsar/pulsar-client-cpp/lib/libpulsarwithdeps.a diff --git a/libs/setup.sh b/libs/setup.sh index 282dfaf52..0099d7c37 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -280,7 +280,7 @@ tar -xzf boost_1_77_0.tar.gz mv boost_1_77_0 boost pushd boost ./bootstrap.sh --prefix=$(pwd)/lib --with-libraries="system,regex" -./b2 -j$(nproc) install +./b2 -j$(nproc) install variant=release popd #pulsar diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index b7a287450..ce5e5d928 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -175,7 +175,16 @@ Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function) std::inserter(topic_names_from_metadata, topic_names_from_metadata.begin()), [](const auto topic_metadata) { return topic_metadata->topic(); }); + constexpr size_t max_topic_name_length = 249; + constexpr auto is_valid_topic_name = [](const auto c) { return std::isalnum(c) || c == '.' || c == '_' || c == '-'; }; + for (const auto &topic_name : info_.topics) { + if (topic_name.size() > max_topic_name_length || + std::any_of(topic_name.begin(), topic_name.end(), [&](const auto c) { return !is_valid_topic_name(c); })) { + throw ConsumerFailedToInitializeException(info_.consumer_name, + fmt::format("'{}' is an invalid topic name", topic_name)); + } + if (!topic_names_from_metadata.contains(topic_name)) { throw TopicNotFoundException(info_.consumer_name, topic_name); } diff --git a/src/integrations/pulsar/consumer.cpp b/src/integrations/pulsar/consumer.cpp index 224aca9f8..8567049f9 100644 --- a/src/integrations/pulsar/consumer.cpp +++ b/src/integrations/pulsar/consumer.cpp @@ -99,9 +99,8 @@ class SpdlogLoggerFactory : public pulsar_client::LoggerFactory { }; pulsar_client::Client CreateClient(const std::string &service_url) { - static SpdlogLoggerFactory logger_factory; pulsar_client::ClientConfiguration conf; - conf.setLogger(&logger_factory); + conf.setLogger(new SpdlogLoggerFactory); return {service_url, conf}; } } // namespace diff --git a/src/memgraph.cpp b/src/memgraph.cpp index e4c9ce120..ed1414752 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -190,6 +190,9 @@ DEFINE_bool(telemetry_enabled, false, DEFINE_string(kafka_bootstrap_servers, "", "List of default Kafka brokers as a comma separated list of broker host or host:port."); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(pulsar_service_url, "", "Default URL used while connecting to Pulsar brokers."); + // Audit logging flags. #ifdef MG_ENTERPRISE DEFINE_bool(audit_enabled, false, "Set to true to enable audit logging."); @@ -1122,7 +1125,8 @@ int main(int argc, char **argv) { query::InterpreterContext interpreter_context{&db, {.query = {.allow_load_csv = FLAGS_allow_load_csv}, .execution_timeout_sec = FLAGS_query_execution_timeout_sec, - .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers}, + .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers, + .default_pulsar_service_url = FLAGS_pulsar_service_url}, FLAGS_data_directory}; #ifdef MG_ENTERPRISE SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 7e3ee8cd9..a816a465e 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -40,10 +40,12 @@ set(mg_query_sources trigger_context.cpp typed_value.cpp) +find_package(Boost REQUIRED) + add_library(mg-query STATIC ${mg_query_sources}) add_dependencies(mg-query generate_lcp_query) target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(mg-query dl cppitertools) +target_link_libraries(mg-query dl cppitertools Boost::headers) target_link_libraries(mg-query mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-utils mg-kvstore mg-memory) if("${MG_PYTHON_VERSION}" STREQUAL "") find_package(Python3 3.5 REQUIRED COMPONENTS Development) diff --git a/src/query/config.hpp b/src/query/config.hpp index 16a7b5b1c..4d18cf19c 100644 --- a/src/query/config.hpp +++ b/src/query/config.hpp @@ -22,5 +22,6 @@ struct InterpreterConfig { double execution_timeout_sec{600.0}; std::string default_kafka_bootstrap_servers; + std::string default_pulsar_service_url; }; } // namespace query diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index 1c0b546fc..ba3998bc5 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2498,25 +2498,48 @@ cpp<# (:serialize (:slk)) (:clone)) +(defun clone-variant-topic-names (source destination) + #>cpp + if (auto *topic_expression = std::get_if(&${source})) { + if (*topic_expression == nullptr) { + ${destination} = nullptr; + } else { + ${destination} = (*topic_expression)->Clone(storage); + } + } else { + ${destination} = std::get>(${source}); + } + cpp<#) + (lcp:define-class stream-query (query) ((action "Action" :scope :public) + (type "Type" :scope :public) (stream_name "std::string" :scope :public) - (topic_names "std::vector" :scope :public) - (transform_name "std::string" :scope :public) - (consumer_group "std::string" :scope :public) - (batch_interval "Expression *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) - (batch_size "Expression *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) + (batch_limit "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) (timeout "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) + + (transform_name "std::string" :scope :public) + (batch_interval "Expression *" :initval "nullptr" :scope :public + :slk-save #'slk-save-ast-pointer + :slk-load (slk-load-ast-pointer "Expression")) + (batch_size "Expression *" :initval "nullptr" :scope :public + :slk-save #'slk-save-ast-pointer + :slk-load (slk-load-ast-pointer "Expression")) + + (topic_names "std::variant>" :initval "nullptr" + :clone #'clone-variant-topic-names + :scope :public) + (consumer_group "std::string" :scope :public) (bootstrap_servers "Expression *" :initval "nullptr" :scope :public + :slk-save #'slk-save-ast-pointer + :slk-load (slk-load-ast-pointer "Expression")) + + (service_url "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression"))) @@ -2524,6 +2547,9 @@ cpp<# (lcp:define-enum action (create-stream drop-stream start-stream stop-stream start-all-streams stop-all-streams show-streams check-stream) (:serialize)) + (lcp:define-enum type + (kafka pulsar) + (:serialize)) #>cpp StreamQuery() = default; diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 7a1978452..5d46aa292 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -18,6 +18,7 @@ // of the same name, EOF. // This hides the definition of the macro which causes // the compilation to fail. +#include "query/frontend/ast/ast.hpp" #include "query/frontend/ast/ast_visitor.hpp" #include "query/procedure/module.hpp" ////////////////////////////////////////////////////// @@ -31,14 +32,18 @@ #include #include #include +#include #include #include #include #include +#include + #include "query/exceptions.hpp" #include "query/frontend/parsing.hpp" #include "query/interpret/awesome_memgraph_functions.hpp" +#include "query/stream/common.hpp" #include "utils/exceptions.hpp" #include "utils/logging.hpp" #include "utils/string.hpp" @@ -492,45 +497,243 @@ antlrcpp::Any CypherMainVisitor::visitStreamQuery(MemgraphCypher::StreamQueryCon } antlrcpp::Any CypherMainVisitor::visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) { + MG_ASSERT(ctx->children.size() == 1, "CreateStreamQuery should have exactly one child!"); + auto *stream_query = ctx->children[0]->accept(this).as(); + query_ = stream_query; + return stream_query; +} + +namespace { +std::vector TopicNamesFromSymbols( + antlr4::tree::ParseTreeVisitor &visitor, + const std::vector &topic_name_symbols) { + MG_ASSERT(!topic_name_symbols.empty()); + std::vector topic_names; + topic_names.reserve(topic_name_symbols.size()); + std::transform(topic_name_symbols.begin(), topic_name_symbols.end(), std::back_inserter(topic_names), + [&visitor](auto *topic_name) { return JoinSymbolicNamesWithDotsAndMinus(visitor, *topic_name); }); + return topic_names; +} + +template +concept EnumUint8 = std::is_enum_v && std::same_as>; + +template +void MapConfig(auto &memory, const EnumUint8 auto &enum_key, auto &destination) { + const auto key = static_cast(enum_key); + if (!memory.contains(key)) { + if constexpr (required) { + throw SemanticException("Config {} is required.", ToString(enum_key)); + } else { + return; + } + } + + std::visit( + [&](T &&value) { + using ValueType = std::decay_t; + if constexpr (utils::SameAsAnyOf) { + destination = std::forward(value); + } else { + LOG_FATAL("Invalid type mapped"); + } + }, + std::move(memory[key])); + memory.erase(key); +} + +enum class CommonStreamConfigKey : uint8_t { TRANSFORM, BATCH_INTERVAL, BATCH_SIZE, END }; + +std::string_view ToString(const CommonStreamConfigKey key) { + switch (key) { + case CommonStreamConfigKey::TRANSFORM: + return "TRANSFORM"; + case CommonStreamConfigKey::BATCH_INTERVAL: + return "BATCH_INTERVAL"; + case CommonStreamConfigKey::BATCH_SIZE: + return "BATCH_SIZE"; + case CommonStreamConfigKey::END: + LOG_FATAL("Invalid config key used"); + } +} + +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define GENERATE_STREAM_CONFIG_KEY_ENUM(stream, first_config, ...) \ + enum class BOOST_PP_CAT(stream, ConfigKey) : uint8_t { \ + first_config = static_cast(CommonStreamConfigKey::END), \ + __VA_ARGS__ \ + }; + +GENERATE_STREAM_CONFIG_KEY_ENUM(Kafka, TOPICS, CONSUMER_GROUP, BOOTSTRAP_SERVERS); + +std::string_view ToString(const KafkaConfigKey key) { + switch (key) { + case KafkaConfigKey::TOPICS: + return "TOPICS"; + case KafkaConfigKey::CONSUMER_GROUP: + return "CONSUMER_GROUP"; + case KafkaConfigKey::BOOTSTRAP_SERVERS: + return "BOOTSTRAP_SERVERS"; + } +} + +void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) { + MapConfig(memory, CommonStreamConfigKey::TRANSFORM, stream_query.transform_name_); + MapConfig(memory, CommonStreamConfigKey::BATCH_INTERVAL, stream_query.batch_interval_); + MapConfig(memory, CommonStreamConfigKey::BATCH_SIZE, stream_query.batch_size_); +} +} // namespace + +antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) { auto *stream_query = storage_->Create(); stream_query->action_ = StreamQuery::Action::CREATE_STREAM; + stream_query->type_ = StreamQuery::Type::KAFKA; stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as(); - auto *topic_names_ctx = ctx->topicNames(); - MG_ASSERT(topic_names_ctx != nullptr); - auto topic_names = topic_names_ctx->symbolicNameWithDotsAndMinus(); - MG_ASSERT(!topic_names.empty()); - stream_query->topic_names_.reserve(topic_names.size()); - std::transform(topic_names.begin(), topic_names.end(), std::back_inserter(stream_query->topic_names_), - [this](auto *topic_name) { return JoinSymbolicNamesWithDotsAndMinus(*this, *topic_name); }); + for (auto *create_config_ctx : ctx->kafkaCreateStreamConfig()) { + create_config_ctx->accept(this); + } - stream_query->transform_name_ = JoinSymbolicNames(this, ctx->transformationName->symbolicName()); + MapConfig, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_); + MapConfig(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_); + MapConfig(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_); + + MapCommonStreamConfigs(memory_, *stream_query); + + return stream_query; +} + +namespace { +void ThrowIfExists(const auto &map, const EnumUint8 auto &enum_key) { + const auto key = static_cast(enum_key); + if (map.contains(key)) { + throw SemanticException("{} defined multiple times in the query", ToString(enum_key)); + } +} + +void GetTopicNames(auto &destination, MemgraphCypher::TopicNamesContext *topic_names_ctx, + antlr4::tree::ParseTreeVisitor &visitor) { + MG_ASSERT(topic_names_ctx != nullptr); + if (auto *symbolic_topic_names_ctx = topic_names_ctx->symbolicTopicNames()) { + destination = TopicNamesFromSymbols(visitor, symbolic_topic_names_ctx->symbolicNameWithDotsAndMinus()); + } else { + if (!topic_names_ctx->literal()->StringLiteral()) { + throw SemanticException("Topic names should be defined as a string literal or as symbolic names"); + } + destination = topic_names_ctx->accept(&visitor).as(); + } +} +} // namespace + +antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::KafkaCreateStreamConfigContext *ctx) { + if (ctx->commonCreateStreamConfig()) { + return ctx->commonCreateStreamConfig()->accept(this); + } + + if (ctx->TOPICS()) { + ThrowIfExists(memory_, KafkaConfigKey::TOPICS); + const auto topics_key = static_cast(KafkaConfigKey::TOPICS); + GetTopicNames(memory_[topics_key], ctx->topicNames(), *this); + return {}; + } if (ctx->CONSUMER_GROUP()) { - stream_query->consumer_group_ = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup); + ThrowIfExists(memory_, KafkaConfigKey::CONSUMER_GROUP); + const auto consumer_group_key = static_cast(KafkaConfigKey::CONSUMER_GROUP); + memory_[consumer_group_key] = JoinSymbolicNamesWithDotsAndMinus(*this, *ctx->consumerGroup); + return {}; + } + + MG_ASSERT(ctx->BOOTSTRAP_SERVERS()); + ThrowIfExists(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS); + if (!ctx->bootstrapServers->StringLiteral()) { + throw SemanticException("Bootstrap servers should be a string!"); + } + + const auto bootstrap_servers_key = static_cast(KafkaConfigKey::BOOTSTRAP_SERVERS); + memory_[bootstrap_servers_key] = ctx->bootstrapServers->accept(this).as(); + return {}; +} + +namespace { +GENERATE_STREAM_CONFIG_KEY_ENUM(Pulsar, TOPICS, SERVICE_URL); + +std::string_view ToString(const PulsarConfigKey key) { + switch (key) { + case PulsarConfigKey::TOPICS: + return "TOPICS"; + case PulsarConfigKey::SERVICE_URL: + return "SERVICE_URL"; + } +} +} // namespace + +antlrcpp::Any CypherMainVisitor::visitPulsarCreateStream(MemgraphCypher::PulsarCreateStreamContext *ctx) { + auto *stream_query = storage_->Create(); + stream_query->action_ = StreamQuery::Action::CREATE_STREAM; + stream_query->type_ = StreamQuery::Type::PULSAR; + stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as(); + + for (auto *create_config_ctx : ctx->pulsarCreateStreamConfig()) { + create_config_ctx->accept(this); + } + + MapConfig, Expression *>(memory_, PulsarConfigKey::TOPICS, stream_query->topic_names_); + MapConfig(memory_, PulsarConfigKey::SERVICE_URL, stream_query->service_url_); + + MapCommonStreamConfigs(memory_, *stream_query); + + return stream_query; +} + +antlrcpp::Any CypherMainVisitor::visitPulsarCreateStreamConfig(MemgraphCypher::PulsarCreateStreamConfigContext *ctx) { + if (ctx->commonCreateStreamConfig()) { + return ctx->commonCreateStreamConfig()->accept(this); + } + + if (ctx->TOPICS()) { + ThrowIfExists(memory_, PulsarConfigKey::TOPICS); + const auto topics_key = static_cast(PulsarConfigKey::TOPICS); + GetTopicNames(memory_[topics_key], ctx->topicNames(), *this); + return {}; + } + + MG_ASSERT(ctx->SERVICE_URL()); + ThrowIfExists(memory_, PulsarConfigKey::SERVICE_URL); + if (!ctx->serviceUrl->StringLiteral()) { + throw SemanticException("Service URL must be a string!"); + } + const auto service_url_key = static_cast(PulsarConfigKey::SERVICE_URL); + memory_[service_url_key] = ctx->serviceUrl->accept(this).as(); + return {}; +} + +antlrcpp::Any CypherMainVisitor::visitCommonCreateStreamConfig(MemgraphCypher::CommonCreateStreamConfigContext *ctx) { + if (ctx->TRANSFORM()) { + ThrowIfExists(memory_, CommonStreamConfigKey::TRANSFORM); + const auto transform_key = static_cast(CommonStreamConfigKey::TRANSFORM); + memory_[transform_key] = JoinSymbolicNames(this, ctx->transformationName->symbolicName()); + return {}; } if (ctx->BATCH_INTERVAL()) { + ThrowIfExists(memory_, CommonStreamConfigKey::BATCH_INTERVAL); if (!ctx->batchInterval->numberLiteral() || !ctx->batchInterval->numberLiteral()->integerLiteral()) { - throw SemanticException("Batch interval should be an integer literal!"); + throw SemanticException("Batch interval must be an integer literal!"); } - stream_query->batch_interval_ = ctx->batchInterval->accept(this); + const auto batch_interval_key = static_cast(CommonStreamConfigKey::BATCH_INTERVAL); + memory_[batch_interval_key] = ctx->batchInterval->accept(this).as(); + return {}; } - if (ctx->BATCH_SIZE()) { - if (!ctx->batchSize->numberLiteral() || !ctx->batchSize->numberLiteral()->integerLiteral()) { - throw SemanticException("Batch size should be an integer literal!"); - } - stream_query->batch_size_ = ctx->batchSize->accept(this); + MG_ASSERT(ctx->BATCH_SIZE()); + ThrowIfExists(memory_, CommonStreamConfigKey::BATCH_SIZE); + if (!ctx->batchSize->numberLiteral() || !ctx->batchSize->numberLiteral()->integerLiteral()) { + throw SemanticException("Batch size must be an integer literal!"); } - if (ctx->BOOTSTRAP_SERVERS()) { - if (!ctx->bootstrapServers->StringLiteral()) { - throw SemanticException("Bootstrap servers should be a string!"); - } - stream_query->bootstrap_servers_ = ctx->bootstrapServers->accept(this); - } - - return stream_query; + const auto batch_size_key = static_cast(CommonStreamConfigKey::BATCH_SIZE); + memory_[batch_size_key] = ctx->batchSize->accept(this).as(); + return {}; } antlrcpp::Any CypherMainVisitor::visitDropStream(MemgraphCypher::DropStreamContext *ctx) { diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index bef65c4f5..b5290d86a 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -269,6 +269,31 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override; + /** + * @return StreamQuery* + */ + antlrcpp::Any visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) override; + + /** + * @return StreamQuery* + */ + antlrcpp::Any visitKafkaCreateStreamConfig(MemgraphCypher::KafkaCreateStreamConfigContext *ctx) override; + + /** + * @return StreamQuery* + */ + antlrcpp::Any visitPulsarCreateStreamConfig(MemgraphCypher::PulsarCreateStreamConfigContext *ctx) override; + + /** + * @return StreamQuery* + */ + antlrcpp::Any visitPulsarCreateStream(MemgraphCypher::PulsarCreateStreamContext *ctx) override; + + /** + * @return StreamQuery* + */ + antlrcpp::Any visitCommonCreateStreamConfig(MemgraphCypher::CommonCreateStreamConfigContext *ctx) override; + /** * @return StreamQuery* */ @@ -824,6 +849,7 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { ParsingContext context_; AstStorage *storage_; + std::unordered_map>> memory_; // Set of identifiers from queries. std::unordered_set users_identifiers; // Identifiers that user didn't name. diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index bbdad0d42..e6fc8736f 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -295,15 +295,32 @@ symbolicNameWithMinus : symbolicName ( MINUS symbolicName )* ; symbolicNameWithDotsAndMinus: symbolicNameWithMinus ( DOT symbolicNameWithMinus )* ; -topicNames : symbolicNameWithDotsAndMinus ( COMMA symbolicNameWithDotsAndMinus )* ; +symbolicTopicNames : symbolicNameWithDotsAndMinus ( COMMA symbolicNameWithDotsAndMinus )* ; -createStream : CREATE STREAM streamName - TOPICS topicNames - TRANSFORM transformationName=procedureName - ( CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus ) ? - ( BATCH_INTERVAL batchInterval=literal ) ? - ( BATCH_SIZE batchSize=literal ) ? - ( BOOTSTRAP_SERVERS bootstrapServers=literal) ? ; +topicNames : symbolicTopicNames | literal ; + +commonCreateStreamConfig : TRANSFORM transformationName=procedureName + | BATCH_INTERVAL batchInterval=literal + | BATCH_SIZE batchSize=literal + ; + +createStream : kafkaCreateStream | pulsarCreateStream ; + +kafkaCreateStreamConfig : TOPICS topicNames + | CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus + | BOOTSTRAP_SERVERS bootstrapServers=literal + | commonCreateStreamConfig + ; + +kafkaCreateStream : CREATE KAFKA STREAM streamName ( kafkaCreateStreamConfig ) * ; + + +pulsarCreateStreamConfig : TOPICS topicNames + | SERVICE_URL serviceUrl=literal + | commonCreateStreamConfig + ; + +pulsarCreateStream : CREATE PULSAR STREAM streamName ( pulsarCreateStreamConfig ) * ; dropStream : DROP STREAM streamName ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 69e023656..1ebca5497 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -62,6 +62,7 @@ HEADER : H E A D E R ; IDENTIFIED : I D E N T I F I E D ; IGNORE : I G N O R E ; ISOLATION : I S O L A T I O N ; +KAFKA : K A F K A ; LEVEL : L E V E L ; LOAD : L O A D ; LOCK : L O C K ; @@ -72,6 +73,7 @@ NO : N O ; PASSWORD : P A S S W O R D ; PORT : P O R T ; PRIVILEGES : P R I V I L E G E S ; +PULSAR : P U L S A R ; READ : R E A D ; READ_FILE : R E A D UNDERSCORE F I L E ; REGISTER : R E G I S T E R ; @@ -82,6 +84,7 @@ REVOKE : R E V O K E ; ROLE : R O L E ; ROLES : R O L E S ; QUOTE : Q U O T E ; +SERVICE_URL : S E R V I C E UNDERSCORE U R L ; SESSION : S E S S I O N ; SETTING : S E T T I N G ; SETTINGS : S E T T I N G S ; diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index 49645f36b..ff28498b1 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -199,7 +199,10 @@ const trie::Trie kKeywords = {"union", "check", "setting", "settings", - "bootstrap_servers"}; + "bootstrap_servers", + "kafka", + "pulsar", + "service_url"}; // Unicode codepoints that are allowed at the start of the unescaped name. const std::bitset kUnescapedNameAllowedStarts( diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 571c96f08..9ddd27c4f 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -40,6 +40,7 @@ #include "query/plan/planner.hpp" #include "query/plan/profile.hpp" #include "query/plan/vertex_count_cache.hpp" +#include "query/stream/common.hpp" #include "query/trigger.hpp" #include "query/typed_value.hpp" #include "storage/v2/property_value.hpp" @@ -57,6 +58,7 @@ #include "utils/settings.hpp" #include "utils/string.hpp" #include "utils/tsc.hpp" +#include "utils/variant_helpers.hpp" namespace EventCounter { extern Event ReadQuery; @@ -91,7 +93,8 @@ void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) { struct Callback { std::vector header; - std::function>()> fn; + using CallbackFunction = std::function>()>; + CallbackFunction fn; bool should_abort_query{false}; }; @@ -529,6 +532,77 @@ std::optional StringPointerToOptional(const std::string *str) { return str == nullptr ? std::nullopt : std::make_optional(*str); } +CommonStreamInfo GetCommonStreamInfo(StreamQuery *stream_query, ExpressionEvaluator &evaluator) { + return {.batch_interval = GetOptionalValue(stream_query->batch_interval_, evaluator), + .batch_size = GetOptionalValue(stream_query->batch_size_, evaluator), + .transformation_name = stream_query->transform_name_}; +} + +std::vector EvaluateTopicNames(ExpressionEvaluator &evaluator, + std::variant> topic_variant) { + return std::visit(utils::Overloaded{[&](Expression *expression) { + auto topic_names = expression->Accept(evaluator); + MG_ASSERT(topic_names.IsString()); + return utils::Split(topic_names.ValueString(), ","); + }, + [&](std::vector topic_names) { return topic_names; }}, + std::move(topic_variant)); +} + +Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator, + InterpreterContext *interpreter_context, + const std::string *username) { + constexpr std::string_view kDefaultConsumerGroup = "mg_consumer"; + std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup + : stream_query->consumer_group_}; + + auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator); + if (bootstrap && bootstrap->empty()) { + throw SemanticException("Bootstrap servers must not be an empty string!"); + } + auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator); + + return [interpreter_context, stream_name = stream_query->stream_name_, + topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_), + consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info), + bootstrap_servers = std::move(bootstrap), owner = StringPointerToOptional(username)]() mutable { + std::string bootstrap = bootstrap_servers + ? std::move(*bootstrap_servers) + : std::string{interpreter_context->config.default_kafka_bootstrap_servers}; + interpreter_context->streams.Create(stream_name, + {.common_info = std::move(common_stream_info), + .topics = std::move(topic_names), + .consumer_group = std::move(consumer_group), + .bootstrap_servers = std::move(bootstrap)}, + std::move(owner)); + + return std::vector>{}; + }; +} + +Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, ExpressionEvaluator &evaluator, + InterpreterContext *interpreter_context, + const std::string *username) { + auto service_url = GetOptionalStringValue(stream_query->service_url_, evaluator); + if (service_url && service_url->empty()) { + throw SemanticException("Service URL must not be an empty string!"); + } + auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator); + return [interpreter_context, stream_name = stream_query->stream_name_, + topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_), + common_stream_info = std::move(common_stream_info), service_url = std::move(service_url), + owner = StringPointerToOptional(username)]() mutable { + std::string url = + service_url ? std::move(*service_url) : std::string{interpreter_context->config.default_pulsar_service_url}; + interpreter_context->streams.Create( + stream_name, + {.common_info = std::move(common_stream_info), .topics = std::move(topic_names), .service_url = std::move(url)}, + std::move(owner)); + + return std::vector>{}; + }; +} + Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶meters, InterpreterContext *interpreter_context, DbAccessor *db_accessor, const std::string *username, std::vector *notifications) { @@ -545,44 +619,14 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete switch (stream_query->action_) { case StreamQuery::Action::CREATE_STREAM: { EventCounter::IncrementCounter(EventCounter::StreamsCreated); - constexpr std::string_view kDefaultConsumerGroup = "mg_consumer"; - std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup - : stream_query->consumer_group_}; - - auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator); - if (bootstrap && bootstrap->empty()) { - throw SemanticException("Bootstrap servers must not be an empty string!"); + switch (stream_query->type_) { + case StreamQuery::Type::KAFKA: + callback.fn = GetKafkaCreateCallback(stream_query, evaluator, interpreter_context, username); + break; + case StreamQuery::Type::PULSAR: + callback.fn = GetPulsarCreateCallback(stream_query, evaluator, interpreter_context, username); + break; } - callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, - topic_names = stream_query->topic_names_, consumer_group = std::move(consumer_group), - batch_interval = - GetOptionalValue(stream_query->batch_interval_, evaluator), - batch_size = GetOptionalValue(stream_query->batch_size_, evaluator), - transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap), - owner = StringPointerToOptional(username)]() mutable { - std::string bootstrap = bootstrap_servers - ? std::move(*bootstrap_servers) - : std::string{interpreter_context->config.default_kafka_bootstrap_servers}; - interpreter_context->streams.Create( - stream_name, - {.common_info = {.batch_interval = batch_interval, - .batch_size = batch_size, - .transformation_name = std::move(transformation_name)}, - .topics = std::move(topic_names), - .consumer_group = std::move(consumer_group), - .bootstrap_servers = std::move(bootstrap)}, - std::move(owner)); - // interpreter_context->streams.Create( - // stream_name, - // {.common_info = {.batch_interval = batch_interval, - // .batch_size = batch_size, - // .transformation_name = std::move(transformation_name)}, - // .topics = std::move(topic_names), - // .service_url = std::move(bootstrap)}, - // std::move(owner)); - - return std::vector>{}; - }; notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM, fmt::format("Created stream {}.", stream_query->stream_name_)); return callback; diff --git a/src/utils/variant_helpers.hpp b/src/utils/variant_helpers.hpp new file mode 100644 index 000000000..c7321fc64 --- /dev/null +++ b/src/utils/variant_helpers.hpp @@ -0,0 +1,22 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +namespace utils { +template +struct Overloaded : Ts... { + using Ts::operator()...; +}; + +template +Overloaded(Ts...) -> Overloaded; +} // namespace utils diff --git a/tests/e2e/streams/streams_owner_tests.py b/tests/e2e/streams/streams_owner_tests.py index d1fb62f79..777346ae4 100644 --- a/tests/e2e/streams/streams_owner_tests.py +++ b/tests/e2e/streams/streams_owner_tests.py @@ -36,7 +36,7 @@ def test_ownerless_stream(producer, topics, connection): assert len(topics) > 0 userless_cursor = connection.cursor() common.execute_and_fetch_all(userless_cursor, - "CREATE STREAM ownerless " + "CREATE KAFKA STREAM ownerless " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple") common.start_stream(userless_cursor, "ownerless") @@ -73,7 +73,7 @@ def test_owner_is_shown(topics, connection): create_stream_user(userless_cursor, stream_user) stream_cursor = get_cursor_with_user(stream_user) - common.execute_and_fetch_all(stream_cursor, "CREATE STREAM test " + common.execute_and_fetch_all(stream_cursor, "CREATE KAFKA STREAM test " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple") @@ -94,7 +94,7 @@ def test_insufficient_privileges(producer, topics, connection): stream_cursor = get_cursor_with_user(stream_user) common.execute_and_fetch_all(stream_cursor, - "CREATE STREAM insufficient_test " + "CREATE KAFKA STREAM insufficient_test " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple") @@ -139,7 +139,7 @@ def test_happy_case(producer, topics, connection): admin_cursor, f"GRANT CREATE TO {stream_user}") common.execute_and_fetch_all(stream_cursor, - "CREATE STREAM insufficient_test " + "CREATE KAFKA STREAM insufficient_test " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple") diff --git a/tests/e2e/streams/streams_tests.py b/tests/e2e/streams/streams_tests.py index 9e2ac241b..a573a0cb2 100755 --- a/tests/e2e/streams/streams_tests.py +++ b/tests/e2e/streams/streams_tests.py @@ -34,7 +34,7 @@ def test_simple(producer, topics, connection, transformation): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM test " + "CREATE KAFKA STREAM test " f"TOPICS {','.join(topics)} " f"TRANSFORM {transformation}", ) @@ -61,7 +61,7 @@ def test_separate_consumers(producer, topics, connection, transformation): stream_names.append(stream_name) common.execute_and_fetch_all( cursor, - f"CREATE STREAM {stream_name} " + f"CREATE KAFKA STREAM {stream_name} " f"TOPICS {topic} " f"TRANSFORM {transformation}", ) @@ -91,9 +91,9 @@ def test_start_from_last_committed_offset(producer, topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM test " + "CREATE KAFKA STREAM test " f"TOPICS {topics[0]} " - "TRANSFORM transform.simple", + "TRANSFORM kafka_transform.simple", ) common.start_stream(cursor, "test") time.sleep(1) @@ -123,9 +123,9 @@ def test_start_from_last_committed_offset(producer, topics, connection): common.execute_and_fetch_all( cursor, - "CREATE STREAM test " + "CREATE KAFKA STREAM test " f"TOPICS {topics[0]} " - "TRANSFORM transform.simple", + "TRANSFORM kafka_transform.simple", ) common.start_stream(cursor, "test") @@ -141,7 +141,7 @@ def test_check_stream(producer, topics, connection, transformation): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM test " + "CREATE KAFKA STREAM test " f"TOPICS {topics[0]} " f"TRANSFORM {transformation} " "BATCH_SIZE 1", @@ -202,7 +202,7 @@ def test_show_streams(producer, topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM default_values " + "CREATE KAFKA STREAM default_values " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple " f"BOOTSTRAP_SERVERS 'localhost:9092'", @@ -213,7 +213,7 @@ def test_show_streams(producer, topics, connection): batch_size = 3 common.execute_and_fetch_all( cursor, - "CREATE STREAM complex_values " + "CREATE KAFKA STREAM complex_values " f"TOPICS {','.join(topics)} " f"TRANSFORM transform.with_parameters " f"CONSUMER_GROUP {consumer_group} " @@ -259,7 +259,7 @@ def test_start_and_stop_during_check(producer, topics, connection, operation): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM test_stream " + "CREATE KAFKA STREAM test_stream " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple", ) @@ -371,7 +371,7 @@ def test_check_already_started_stream(topics, connection): common.execute_and_fetch_all( cursor, - "CREATE STREAM started_stream " + "CREATE KAFKA STREAM started_stream " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple", ) @@ -385,7 +385,7 @@ def test_start_checked_stream_after_timeout(topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM test_stream " + "CREATE KAFKA STREAM test_stream " f"TOPICS {topics[0]} " f"TRANSFORM transform.simple", ) @@ -419,7 +419,7 @@ def test_restart_after_error(producer, topics, connection): cursor = connection.cursor() common.execute_and_fetch_all( cursor, - "CREATE STREAM test_stream " + "CREATE KAFKA STREAM test_stream " f"TOPICS {topics[0]} " f"TRANSFORM transform.query", ) @@ -447,7 +447,7 @@ def test_bootstrap_server(producer, topics, connection, transformation): local = "localhost:9092" common.execute_and_fetch_all( cursor, - "CREATE STREAM test " + "CREATE KAFKA STREAM test " f"TOPICS {','.join(topics)} " f"TRANSFORM {transformation} " f"BOOTSTRAP_SERVERS '{local}'", @@ -471,7 +471,7 @@ def test_bootstrap_server_empty(producer, topics, connection, transformation): with pytest.raises(mgclient.DatabaseError): common.execute_and_fetch_all( cursor, - "CREATE STREAM test " + "CREATE KAFKA STREAM test " f"TOPICS {','.join(topics)} " f"TRANSFORM {transformation} " "BOOTSTRAP_SERVERS ''", diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 4963c22a4..3f57f5735 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -52,6 +52,7 @@ #include "query/typed_value.hpp" #include "utils/string.hpp" +#include "utils/variant_helpers.hpp" using namespace query; using namespace query::frontend; @@ -2943,6 +2944,7 @@ TEST_P(CypherMainVisitorTest, CallProcedureWithMemoryUnlimited) { namespace { template void TestInvalidQuery(const auto &query, Base &ast_generator) { + SCOPED_TRACE(query); EXPECT_THROW(ast_generator.ParseQuery(query), TException) << query; } @@ -3564,11 +3566,17 @@ void ValidateMostlyEmptyStreamQuery(Base &ast_generator, const std::string &quer ASSERT_NE(parsed_query, nullptr); EXPECT_EQ(parsed_query->action_, action); EXPECT_EQ(parsed_query->stream_name_, stream_name); - EXPECT_TRUE(parsed_query->topic_names_.empty()); + auto topic_names = std::get_if(&parsed_query->topic_names_); + EXPECT_NE(topic_names, nullptr); + EXPECT_EQ(*topic_names, nullptr); + EXPECT_TRUE(topic_names); + EXPECT_FALSE(*topic_names); EXPECT_TRUE(parsed_query->transform_name_.empty()); EXPECT_TRUE(parsed_query->consumer_group_.empty()); EXPECT_EQ(parsed_query->batch_interval_, nullptr); EXPECT_EQ(parsed_query->batch_size_, nullptr); + EXPECT_EQ(parsed_query->service_url_, nullptr); + EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_limit_, batch_limit)); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->timeout_, timeout)); } @@ -3637,17 +3645,28 @@ TEST_P(CypherMainVisitorTest, StopAllStreams) { ValidateMostlyEmptyStreamQuery(ast_generator, "SToP ALL STReaMS", StreamQuery::Action::STOP_ALL_STREAMS, ""); } -void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_string, const std::string_view stream_name, - const std::vector &topic_names, const std::string_view transform_name, - const std::string_view consumer_group, const std::optional &batch_interval, - const std::optional &batch_size, - const std::string_view bootstrap_servers = "") { +void ValidateTopicNames(const auto &topic_names, const std::vector &expected_topic_names, + Base &ast_generator) { + std::visit(utils::Overloaded{ + [&](Expression *expression) { + ast_generator.CheckLiteral(expression, utils::Join(expected_topic_names, ",")); + }, + [&](const std::vector &topic_names) { EXPECT_EQ(topic_names, expected_topic_names); }}, + topic_names); +} + +void ValidateCreateKafkaStreamQuery(Base &ast_generator, const std::string &query_string, + const std::string_view stream_name, const std::vector &topic_names, + const std::string_view transform_name, const std::string_view consumer_group, + const std::optional &batch_interval, + const std::optional &batch_size, + const std::string_view bootstrap_servers = "") { + SCOPED_TRACE(query_string); StreamQuery *parsed_query{nullptr}; ASSERT_NO_THROW(parsed_query = dynamic_cast(ast_generator.ParseQuery(query_string))) << query_string; ASSERT_NE(parsed_query, nullptr); EXPECT_EQ(parsed_query->stream_name_, stream_name); - - EXPECT_EQ(parsed_query->topic_names_, topic_names); + ValidateTopicNames(parsed_query->topic_names_, topic_names, ast_generator); EXPECT_EQ(parsed_query->transform_name_, transform_name); EXPECT_EQ(parsed_query->consumer_group_, consumer_group); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_interval_, batch_interval)); @@ -3660,37 +3679,35 @@ void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_str EXPECT_NE(parsed_query->bootstrap_servers_, nullptr); } -TEST_P(CypherMainVisitorTest, CreateStream) { +TEST_P(CypherMainVisitorTest, CreateKafkaStream) { auto &ast_generator = *GetParam(); - TestInvalidQuery("CREATE STREAM", ast_generator); - TestInvalidQuery("CREATE STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM invalid transform name", ast_generator); - TestInvalidQuery("CREATE STREAM stream TRANSFORM transform", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS TRANSFORM transform", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP invalid consumer group", + TestInvalidQuery("CREATE KAFKA STREAM", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM invalid stream name TOPICS topic1 TRANSFORM transform", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS invalid topic name TRANSFORM transform", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM invalid transformation name", ast_generator); + // required configs are missing + TestInvalidQuery("CREATE KAFKA STREAM stream TRANSFORM transform", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS TRANSFORM transform", ast_generator); + // required configs are missing + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform CONSUMER_GROUP invalid consumer group", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL", ast_generator); TestInvalidQuery( - "CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL 'invalid interval'", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE", ast_generator); + "CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INTERVAL 'invalid interval'", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform TOPICS topic2", + ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE", ast_generator); TestInvalidQuery( - "CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 'invalid size'", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 2 BATCH_INTERVAL 3", + "CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 'invalid size'", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1, TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_INVERVAL 2 CONSUMER_GROUP Gru", + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092", ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru", - ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1, TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru", - ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092", - ast_generator); - TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator); + TestInvalidQuery("CREATE KAFKA STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator); const std::vector topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots", "topic-name.with-multiple.dots-and-dashes"}; @@ -3707,45 +3724,59 @@ TEST_P(CypherMainVisitorTest, CreateStream) { const auto topic_names_as_str = utils::Join(topic_names, ","); - ValidateCreateStreamQuery( + ValidateCreateKafkaStreamQuery( ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName), + fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names_as_str, kTransformName), kStreamName, topic_names, kTransformName, "", std::nullopt, std::nullopt); - ValidateCreateStreamQuery(ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ", kStreamName, - topic_names_as_str, kTransformName, kConsumerGroup), - kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, std::nullopt); + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} ", + kStreamName, topic_names_as_str, kTransformName, kConsumerGroup), + kStreamName, topic_names, kTransformName, kConsumerGroup, std::nullopt, + std::nullopt); - ValidateCreateStreamQuery(ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} BATCH_INTERVAL {}", kStreamName, - topic_names_as_str, kTransformName, kBatchInterval), - kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt); + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} TRANSFORM {} TOPICS {} BATCH_INTERVAL {}", + kStreamName, kTransformName, topic_names_as_str, kBatchInterval), + kStreamName, topic_names, kTransformName, "", batch_interval_value, std::nullopt); - ValidateCreateStreamQuery(ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} BATCH_SIZE {}", kStreamName, - topic_names_as_str, kTransformName, kBatchSize), - kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} BATCH_SIZE {} TOPICS {} TRANSFORM {}", + kStreamName, kBatchSize, topic_names_as_str, kTransformName), + kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); - ValidateCreateStreamQuery( + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} TOPICS '{}' BATCH_SIZE {} TRANSFORM {}", + kStreamName, topic_names_as_str, kBatchSize, kTransformName), + kStreamName, topic_names, kTransformName, "", std::nullopt, batch_size_value); + + ValidateCreateKafkaStreamQuery( ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}", + fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}", kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize), kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value); using namespace std::string_literals; const auto host1 = "localhost:9094"s; - ValidateCreateStreamQuery( + ValidateCreateKafkaStreamQuery( ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} " + fmt::format("CREATE KAFKA STREAM {} TOPICS {} CONSUMER_GROUP {} BATCH_SIZE {} BATCH_INTERVAL {} TRANSFORM {} " "BOOTSTRAP_SERVERS '{}'", - kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host1), + kStreamName, topic_names_as_str, kConsumerGroup, kBatchSize, kBatchInterval, kTransformName, host1), kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); - const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s; - ValidateCreateStreamQuery( + + ValidateCreateKafkaStreamQuery( ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} " + fmt::format("CREATE KAFKA STREAM {} CONSUMER_GROUP {} TOPICS {} BATCH_INTERVAL {} TRANSFORM {} BATCH_SIZE {} " "BOOTSTRAP_SERVERS '{}'", - kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host2), + kStreamName, kConsumerGroup, topic_names_as_str, kBatchInterval, kTransformName, kBatchSize, host1), + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); + + const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s; + ValidateCreateKafkaStreamQuery( + ast_generator, + fmt::format("CREATE KAFKA STREAM {} TOPICS {} BOOTSTRAP_SERVERS '{}' CONSUMER_GROUP {} TRANSFORM {} " + "BATCH_INTERVAL {} BATCH_SIZE {}", + kStreamName, topic_names_as_str, host2, kConsumerGroup, kTransformName, kBatchInterval, kBatchSize), kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2); }; @@ -3757,10 +3788,11 @@ TEST_P(CypherMainVisitorTest, CreateStream) { auto check_consumer_group = [&](const std::string_view consumer_group) { const std::string kTopicName{"topic1"}; - ValidateCreateStreamQuery(ast_generator, - fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}", kStreamName, - kTopicName, kTransformName, consumer_group), - kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt, std::nullopt); + ValidateCreateKafkaStreamQuery(ast_generator, + fmt::format("CREATE KAFKA STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {}", + kStreamName, kTopicName, kTransformName, consumer_group), + kStreamName, {kTopicName}, kTransformName, consumer_group, std::nullopt, + std::nullopt); }; using namespace std::literals; @@ -3772,6 +3804,129 @@ TEST_P(CypherMainVisitorTest, CreateStream) { } } +void ValidateCreatePulsarStreamQuery(Base &ast_generator, const std::string &query_string, + const std::string_view stream_name, const std::vector &topic_names, + const std::string_view transform_name, + const std::optional &batch_interval, + const std::optional &batch_size, const std::string_view service_url) { + SCOPED_TRACE(query_string); + + StreamQuery *parsed_query{nullptr}; + ASSERT_NO_THROW(parsed_query = dynamic_cast(ast_generator.ParseQuery(query_string))) << query_string; + ASSERT_NE(parsed_query, nullptr); + EXPECT_EQ(parsed_query->stream_name_, stream_name); + ValidateTopicNames(parsed_query->topic_names_, topic_names, ast_generator); + EXPECT_EQ(parsed_query->transform_name_, transform_name); + EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_interval_, batch_interval)); + EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_size_, batch_size)); + EXPECT_EQ(parsed_query->batch_limit_, nullptr); + if (service_url.empty()) { + EXPECT_EQ(parsed_query->service_url_, nullptr); + return; + } + EXPECT_NE(parsed_query->service_url_, nullptr); +} + +TEST_P(CypherMainVisitorTest, CreatePulsarStream) { + auto &ast_generator = *GetParam(); + + TestInvalidQuery("CREATE PULSAR STREAM", ast_generator); + TestInvalidQuery("CREATE PULSAR STREAM stream", ast_generator); + TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS", ast_generator); + TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS topic_name", ast_generator); + TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM", ast_generator); + TestInvalidQuery("CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name SERVICE_URL", ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name SERVICE_URL 1", ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name BOOTSTRAP_SERVERS 'bootstrap'", + ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test' TOPICS topic_name", + ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream TRANSFORM transform.name TOPICS topic_name TRANSFORM transform.name SERVICE_URL " + "'test'", + ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream BATCH_INTERVAL 1 TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test' " + "BATCH_INTERVAL 1000", + ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream BATCH_INTERVAL 'a' TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test'", + ast_generator); + TestInvalidQuery( + "CREATE PULSAR STREAM stream BATCH_SIZE 'a' TOPICS topic_name TRANSFORM transform.name SERVICE_URL 'test'", + ast_generator); + + const std::vector topic_names{"topic1", "topic2"}; + const std::string topic_names_str = utils::Join(topic_names, ","); + constexpr std::string_view kStreamName{"PulsarStream"}; + constexpr std::string_view kTransformName{"boringTransformation"}; + constexpr std::string_view kServiceUrl{"localhost"}; + constexpr int kBatchSize{1000}; + constexpr int kBatchInterval{231321}; + + { + SCOPED_TRACE("single topic"); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} TOPICS {} TRANSFORM {}", kStreamName, topic_names[0], kTransformName), + kStreamName, {topic_names[0]}, kTransformName, std::nullopt, std::nullopt, ""); + } + { + SCOPED_TRACE("multiple topics"); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} TOPICS {}", kStreamName, kTransformName, topic_names_str), + kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, ""); + } + { + SCOPED_TRACE("topic name in string"); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} TOPICS '{}'", kStreamName, kTransformName, topic_names_str), + kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, ""); + } + { + SCOPED_TRACE("service url"); + ValidateCreatePulsarStreamQuery(ast_generator, + fmt::format("CREATE PULSAR STREAM {} SERVICE_URL '{}' TRANSFORM {} TOPICS {}", + kStreamName, kServiceUrl, kTransformName, topic_names_str), + kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, kServiceUrl); + ValidateCreatePulsarStreamQuery(ast_generator, + fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} SERVICE_URL '{}' TOPICS {}", + kStreamName, kTransformName, kServiceUrl, topic_names_str), + kStreamName, topic_names, kTransformName, std::nullopt, std::nullopt, kServiceUrl); + } + { + SCOPED_TRACE("batch size"); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} SERVICE_URL '{}' BATCH_SIZE {} TRANSFORM {} TOPICS {}", kStreamName, + kServiceUrl, kBatchSize, kTransformName, topic_names_str), + kStreamName, topic_names, kTransformName, std::nullopt, TypedValue(kBatchSize), kServiceUrl); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} SERVICE_URL '{}' TOPICS {} BATCH_SIZE {}", kStreamName, + kTransformName, kServiceUrl, topic_names_str, kBatchSize), + kStreamName, topic_names, kTransformName, std::nullopt, TypedValue(kBatchSize), kServiceUrl); + } + { + SCOPED_TRACE("batch interval"); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} BATCH_INTERVAL {} SERVICE_URL '{}' BATCH_SIZE {} TRANSFORM {} TOPICS {}", + kStreamName, kBatchInterval, kServiceUrl, kBatchSize, kTransformName, topic_names_str), + kStreamName, topic_names, kTransformName, TypedValue(kBatchInterval), TypedValue(kBatchSize), kServiceUrl); + ValidateCreatePulsarStreamQuery( + ast_generator, + fmt::format("CREATE PULSAR STREAM {} TRANSFORM {} SERVICE_URL '{}' BATCH_INTERVAL {} TOPICS {} BATCH_SIZE {}", + kStreamName, kTransformName, kServiceUrl, kBatchInterval, topic_names_str, kBatchSize), + kStreamName, topic_names, kTransformName, TypedValue(kBatchInterval), TypedValue(kBatchSize), kServiceUrl); + } +} + TEST_P(CypherMainVisitorTest, CheckStream) { auto &ast_generator = *GetParam(); diff --git a/tests/unit/integrations_kafka_consumer.cpp b/tests/unit/integrations_kafka_consumer.cpp index 639f7b93c..08dd6a0b1 100644 --- a/tests/unit/integrations_kafka_consumer.cpp +++ b/tests/unit/integrations_kafka_consumer.cpp @@ -286,7 +286,7 @@ TEST_F(ConsumerTest, InvalidBootstrapServers) { TEST_F(ConsumerTest, InvalidTopic) { auto info = CreateDefaultConsumerInfo(); - info.topics = {"Non existing topic"}; + info.topics = {"Nonexistingtopic"}; EXPECT_THROW(Consumer(std::move(info), kDummyConsumerFunction), TopicNotFoundException); }