diff --git a/src/memgraph.cpp b/src/memgraph.cpp index f9db57488..2f2567d47 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -188,7 +188,7 @@ DEFINE_bool(telemetry_enabled, false, // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_string(kafka_bootstrap_servers, "", - "List of Kafka brokers as a comma separated list of broker host or host:port."); + "List of default Kafka brokers as a comma separated list of broker host or host:port."); // Audit logging flags. #ifdef MG_ENTERPRISE @@ -999,8 +999,8 @@ int main(int argc, char **argv) { auto gil = py::EnsureGIL(); auto maybe_exc = py::AppendToSysPath(py_support_dir.c_str()); if (maybe_exc) { - spdlog::error( - utils::MessageWithLink("Unable to load support for embedded Python: {}.", *maybe_exc, "https://memgr.ph/python")); + spdlog::error(utils::MessageWithLink("Unable to load support for embedded Python: {}.", *maybe_exc, + "https://memgr.ph/python")); } } else { spdlog::error(utils::MessageWithLink("Unable to load support for embedded Python: missing directory {}.", diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index 34cbac370..1c0b546fc 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2514,6 +2514,9 @@ cpp<# :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) (timeout "Expression *" :initval "nullptr" :scope :public + :slk-save #'slk-save-ast-pointer + :slk-load (slk-load-ast-pointer "Expression")) + (bootstrap_servers "Expression *" :initval "nullptr" :scope :public :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression"))) diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 53147e811..7a1978452 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -523,6 +523,12 @@ antlrcpp::Any CypherMainVisitor::visitCreateStream(MemgraphCypher::CreateStreamC } stream_query->batch_size_ = ctx->batchSize->accept(this); } + if (ctx->BOOTSTRAP_SERVERS()) { + if (!ctx->bootstrapServers->StringLiteral()) { + throw SemanticException("Bootstrap servers should be a string!"); + } + stream_query->bootstrap_servers_ = ctx->bootstrapServers->accept(this); + } return stream_query; } diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 615cb6839..bbdad0d42 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -29,6 +29,7 @@ memgraphCypherKeyword : cypherKeyword | BATCH_LIMIT | BATCH_SIZE | BEFORE + | BOOTSTRAP_SERVERS | CHECK | CLEAR | COMMIT @@ -301,7 +302,8 @@ createStream : CREATE STREAM streamName TRANSFORM transformationName=procedureName ( CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus ) ? ( BATCH_INTERVAL batchInterval=literal ) ? - ( BATCH_SIZE batchSize=literal ) ? ; + ( BATCH_SIZE batchSize=literal ) ? + ( BOOTSTRAP_SERVERS bootstrapServers=literal) ? ; dropStream : DROP STREAM streamName ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 3b7476639..69e023656 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -25,81 +25,82 @@ import CypherLexer ; UNDERSCORE : '_' ; -AFTER : A F T E R ; -ALTER : A L T E R ; -ASYNC : A S Y N C ; -AUTH : A U T H ; -BAD : B A D ; -BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ; -BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ; -BATCH_SIZE : B A T C H UNDERSCORE S I Z E ; -BEFORE : B E F O R E ; -CHECK : C H E C K ; -CLEAR : C L E A R ; -COMMIT : C O M M I T ; -COMMITTED : C O M M I T T E D ; -CONFIG : C O N F I G ; -CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; -CSV : C S V ; -DATA : D A T A ; -DELIMITER : D E L I M I T E R ; -DATABASE : D A T A B A S E ; -DENY : D E N Y ; -DIRECTORY : D I R E C T O R Y ; -DROP : D R O P ; -DUMP : D U M P ; -DURABILITY : D U R A B I L I T Y ; -EXECUTE : E X E C U T E ; -FOR : F O R ; -FREE : F R E E ; -FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ; -FROM : F R O M ; -GLOBAL : G L O B A L ; -GRANT : G R A N T ; -GRANTS : G R A N T S ; -HEADER : H E A D E R ; -IDENTIFIED : I D E N T I F I E D ; -IGNORE : I G N O R E ; -ISOLATION : I S O L A T I O N ; -LEVEL : L E V E L ; -LOAD : L O A D ; -LOCK : L O C K ; -MAIN : M A I N ; -MODE : M O D E ; -NEXT : N E X T ; -NO : N O ; -PASSWORD : P A S S W O R D ; -PORT : P O R T ; -PRIVILEGES : P R I V I L E G E S ; -READ : R E A D ; -READ_FILE : R E A D UNDERSCORE F I L E ; -REGISTER : R E G I S T E R ; -REPLICA : R E P L I C A ; -REPLICAS : R E P L I C A S ; -REPLICATION : R E P L I C A T I O N ; -REVOKE : R E V O K E ; -ROLE : R O L E ; -ROLES : R O L E S ; -QUOTE : Q U O T E ; -SESSION : S E S S I O N ; -SETTING : S E T T I N G ; -SETTINGS : S E T T I N G S ; -SNAPSHOT : S N A P S H O T ; -START : S T A R T ; -STATS : S T A T S ; -STOP : S T O P ; -STREAM : S T R E A M ; -STREAMS : S T R E A M S ; -SYNC : S Y N C ; -TIMEOUT : T I M E O U T ; -TO : T O ; -TOPICS : T O P I C S; -TRANSACTION : T R A N S A C T I O N ; -TRANSFORM : T R A N S F O R M ; -TRIGGER : T R I G G E R ; -TRIGGERS : T R I G G E R S ; -UNCOMMITTED : U N C O M M I T T E D ; -UNLOCK : U N L O C K ; -UPDATE : U P D A T E ; -USER : U S E R ; -USERS : U S E R S ; +AFTER : A F T E R ; +ALTER : A L T E R ; +ASYNC : A S Y N C ; +AUTH : A U T H ; +BAD : B A D ; +BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ; +BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ; +BATCH_SIZE : B A T C H UNDERSCORE S I Z E ; +BEFORE : B E F O R E ; +BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ; +CHECK : C H E C K ; +CLEAR : C L E A R ; +COMMIT : C O M M I T ; +COMMITTED : C O M M I T T E D ; +CONFIG : C O N F I G ; +CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ; +CSV : C S V ; +DATA : D A T A ; +DELIMITER : D E L I M I T E R ; +DATABASE : D A T A B A S E ; +DENY : D E N Y ; +DIRECTORY : D I R E C T O R Y ; +DROP : D R O P ; +DUMP : D U M P ; +DURABILITY : D U R A B I L I T Y ; +EXECUTE : E X E C U T E ; +FOR : F O R ; +FREE : F R E E ; +FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ; +FROM : F R O M ; +GLOBAL : G L O B A L ; +GRANT : G R A N T ; +GRANTS : G R A N T S ; +HEADER : H E A D E R ; +IDENTIFIED : I D E N T I F I E D ; +IGNORE : I G N O R E ; +ISOLATION : I S O L A T I O N ; +LEVEL : L E V E L ; +LOAD : L O A D ; +LOCK : L O C K ; +MAIN : M A I N ; +MODE : M O D E ; +NEXT : N E X T ; +NO : N O ; +PASSWORD : P A S S W O R D ; +PORT : P O R T ; +PRIVILEGES : P R I V I L E G E S ; +READ : R E A D ; +READ_FILE : R E A D UNDERSCORE F I L E ; +REGISTER : R E G I S T E R ; +REPLICA : R E P L I C A ; +REPLICAS : R E P L I C A S ; +REPLICATION : R E P L I C A T I O N ; +REVOKE : R E V O K E ; +ROLE : R O L E ; +ROLES : R O L E S ; +QUOTE : Q U O T E ; +SESSION : S E S S I O N ; +SETTING : S E T T I N G ; +SETTINGS : S E T T I N G S ; +SNAPSHOT : S N A P S H O T ; +START : S T A R T ; +STATS : S T A T S ; +STOP : S T O P ; +STREAM : S T R E A M ; +STREAMS : S T R E A M S ; +SYNC : S Y N C ; +TIMEOUT : T I M E O U T ; +TO : T O ; +TOPICS : T O P I C S; +TRANSACTION : T R A N S A C T I O N ; +TRANSFORM : T R A N S F O R M ; +TRIGGER : T R I G G E R ; +TRIGGERS : T R I G G E R S ; +UNCOMMITTED : U N C O M M I T T E D ; +UNLOCK : U N L O C K ; +UPDATE : U P D A T E ; +USER : U S E R ; +USERS : U S E R S ; diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index fc98d47c3..49645f36b 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -89,61 +89,117 @@ class Trie { const int kBitsetSize = 65536; -const trie::Trie kKeywords = {"union", "all", - "optional", "match", - "unwind", "as", - "merge", "on", - "create", "set", - "detach", "delete", - "remove", "with", - "distinct", "return", - "order", "by", - "skip", "limit", - "ascending", "asc", - "descending", "desc", - "where", "or", - "xor", "and", - "not", "in", - "starts", "ends", - "contains", "is", - "null", "case", - "when", "then", - "else", "end", - "count", "filter", - "extract", "any", - "none", "single", - "true", "false", - "reduce", "coalesce", - "user", "password", - "alter", "drop", - "show", "stats", - "unique", "explain", - "profile", "storage", - "index", "info", - "exists", "assert", - "constraint", "node", - "key", "dump", - "database", "call", - "yield", "memory", - "mb", "kb", - "unlimited", "free", - "procedure", "query", - "free_memory", "read_file", - "lock_path", "after", - "before", "execute", - "transaction", "trigger", - "triggers", "update", - "comitted", "uncomitted", - "global", "isolation", - "level", "next", - "read", "session", - "snapshot", "transaction", - "batch_limit", "batch_interval", - "batch_size", "consumer_group", - "start", "stream", - "streams", "transform", - "topics", "check", - "setting", "settings"}; +const trie::Trie kKeywords = {"union", + "all", + "optional", + "match", + "unwind", + "as", + "merge", + "on", + "create", + "set", + "detach", + "delete", + "remove", + "with", + "distinct", + "return", + "order", + "by", + "skip", + "limit", + "ascending", + "asc", + "descending", + "desc", + "where", + "or", + "xor", + "and", + "not", + "in", + "starts", + "ends", + "contains", + "is", + "null", + "case", + "when", + "then", + "else", + "end", + "count", + "filter", + "extract", + "any", + "none", + "single", + "true", + "false", + "reduce", + "coalesce", + "user", + "password", + "alter", + "drop", + "show", + "stats", + "unique", + "explain", + "profile", + "storage", + "index", + "info", + "exists", + "assert", + "constraint", + "node", + "key", + "dump", + "database", + "call", + "yield", + "memory", + "mb", + "kb", + "unlimited", + "free", + "procedure", + "query", + "free_memory", + "read_file", + "lock_path", + "after", + "before", + "execute", + "transaction", + "trigger", + "triggers", + "update", + "comitted", + "uncomitted", + "global", + "isolation", + "level", + "next", + "read", + "session", + "snapshot", + "transaction", + "batch_limit", + "batch_interval", + "batch_size", + "consumer_group", + "start", + "stream", + "streams", + "transform", + "topics", + "check", + "setting", + "settings", + "bootstrap_servers"}; // Unicode codepoints that are allowed at the start of the unescaped name. const std::bitset kUnescapedNameAllowedStarts( diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index c0f946fa8..15af664e0 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -106,6 +106,17 @@ std::optional GetOptionalValue(query::Expression *expression, Expressio return {}; }; +std::optional GetOptionalStringValue(query::Expression *expression, ExpressionEvaluator &evaluator) { + if (expression != nullptr) { + auto value = expression->Accept(evaluator); + MG_ASSERT(value.IsNull() || value.IsString()); + if (value.IsString()) { + return {std::string(value.ValueString().begin(), value.ValueString().end())}; + } + } + return {}; +}; + class ReplQueryHandler final : public query::ReplicationQueryHandler { public: explicit ReplQueryHandler(storage::Storage *db) : db_(db) {} @@ -520,21 +531,28 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup : stream_query->consumer_group_}; - callback.fn = - [interpreter_context, stream_name = stream_query->stream_name_, topic_names = stream_query->topic_names_, - consumer_group = std::move(consumer_group), - batch_interval = GetOptionalValue(stream_query->batch_interval_, evaluator), - batch_size = GetOptionalValue(stream_query->batch_size_, evaluator), - transformation_name = stream_query->transform_name_, owner = StringPointerToOptional(username)]() mutable { - interpreter_context->streams.Create(stream_name, - query::StreamInfo{.topics = std::move(topic_names), - .consumer_group = std::move(consumer_group), - .batch_interval = batch_interval, - .batch_size = batch_size, - .transformation_name = std::move(transformation_name), - .owner = std::move(owner)}); - return std::vector>{}; - }; + auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator); + if (bootstrap && bootstrap->empty()) { + throw SemanticException("Bootstrap servers must not be an empty string!"); + } + callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, + topic_names = stream_query->topic_names_, consumer_group = std::move(consumer_group), + batch_interval = + GetOptionalValue(stream_query->batch_interval_, evaluator), + batch_size = GetOptionalValue(stream_query->batch_size_, evaluator), + transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap), + owner = StringPointerToOptional(username)]() mutable { + std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : ""; + interpreter_context->streams.Create(stream_name, + query::StreamInfo{.topics = std::move(topic_names), + .consumer_group = std::move(consumer_group), + .batch_interval = batch_interval, + .batch_size = batch_size, + .transformation_name = std::move(transformation_name), + .owner = std::move(owner), + .bootstrap_servers = std::move(bootstrap)}); + return std::vector>{}; + }; return callback; } case StreamQuery::Action::START_STREAM: { @@ -573,8 +591,11 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete return callback; } case StreamQuery::Action::SHOW_STREAMS: { - callback.header = {"name", "topics", "consumer_group", "batch_interval", "batch_size", "transformation_name", - "owner", "is running"}; + callback.header = {"name", "topics", + "consumer_group", "batch_interval", + "batch_size", "transformation_name", + "owner", "bootstrap_servers", + "is running"}; callback.fn = [interpreter_context]() { auto streams_status = interpreter_context->streams.GetStreamInfo(); std::vector> results; @@ -588,8 +609,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete return typed_topics; }; - auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics](auto &typed_status, - const auto &stream_info) { + auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics, interpreter_context]( + auto &typed_status, const auto &stream_info) { typed_status.emplace_back(topics_as_typed_topics(stream_info.topics)); typed_status.emplace_back(stream_info.consumer_group); if (stream_info.batch_interval.has_value()) { @@ -608,11 +629,16 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters ¶mete } else { typed_status.emplace_back(); } + if (stream_info.bootstrap_servers.empty()) { + typed_status.emplace_back(interpreter_context->streams.BootstrapServers()); + } else { + typed_status.emplace_back(stream_info.bootstrap_servers); + } }; for (const auto &status : streams_status) { std::vector typed_status; - typed_status.reserve(7); + typed_status.reserve(8); typed_status.emplace_back(status.name); stream_info_as_typed_stream_info_emplace_in(typed_status, status.info); typed_status.emplace_back(status.is_running); diff --git a/src/query/streams.cpp b/src/query/streams.cpp index 4c8534502..2841b0088 100644 --- a/src/query/streams.cpp +++ b/src/query/streams.cpp @@ -118,6 +118,7 @@ const std::string kBatchSizeKey{"batch_size"}; const std::string kIsRunningKey{"is_running"}; const std::string kTransformationName{"transformation_name"}; const std::string kOwner{"owner"}; +const std::string kBoostrapServers{"bootstrap_servers"}; void to_json(nlohmann::json &data, StreamStatus &&status) { auto &info = status.info; @@ -145,6 +146,8 @@ void to_json(nlohmann::json &data, StreamStatus &&status) { } else { data[kOwner] = nullptr; } + + data[kBoostrapServers] = std::move(info.bootstrap_servers); } void from_json(const nlohmann::json &data, StreamStatus &status) { @@ -174,6 +177,8 @@ void from_json(const nlohmann::json &data, StreamStatus &status) { } else { info.owner = {}; } + + info.owner = data.value(kBoostrapServers, ""); } Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers, @@ -410,10 +415,13 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std .batch_size = stream_info.batch_size, }; + auto bootstrap_servers = + stream_info.bootstrap_servers.empty() ? bootstrap_servers_ : std::move(stream_info.bootstrap_servers); auto insert_result = map.insert_or_assign( - stream_name, StreamData{std::move(stream_info.transformation_name), std::move(stream_info.owner), - std::make_unique(bootstrap_servers_, std::move(consumer_info), - std::move(consumer_function))}); + stream_name, + StreamData{std::move(stream_info.transformation_name), std::move(stream_info.owner), + std::make_unique(std::move(bootstrap_servers), std::move(consumer_info), + std::move(consumer_function))}); MG_ASSERT(insert_result.second, "Unexpected error during storing consumer '{}'", stream_name); return insert_result.first; } @@ -425,4 +433,5 @@ void Streams::Persist(StreamStatus &&status) { } } +std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; } } // namespace query diff --git a/src/query/streams.hpp b/src/query/streams.hpp index b05c90356..91155967d 100644 --- a/src/query/streams.hpp +++ b/src/query/streams.hpp @@ -40,6 +40,7 @@ struct StreamInfo { std::optional batch_size; std::string transformation_name; std::optional owner; + std::string bootstrap_servers; }; struct StreamStatus { @@ -139,6 +140,9 @@ class Streams final { std::optional timeout = std::nullopt, std::optional batch_limit = std::nullopt) const; + /// Return the configuration value passed to memgraph. + std::string_view BootstrapServers() const; + private: using StreamsMap = std::unordered_map; using SynchronizedStreamsMap = utils::Synchronized; diff --git a/tests/e2e/streams/common.py b/tests/e2e/streams/common.py index 4245781b4..7b0cb1044 100644 --- a/tests/e2e/streams/common.py +++ b/tests/e2e/streams/common.py @@ -10,7 +10,8 @@ BATCH_INTERVAL = 3 BATCH_SIZE = 4 TRANSFORM = 5 OWNER = 6 -IS_RUNNING = 7 +BOOTSTRAP_SERVERS = 7 +IS_RUNNING = 8 def execute_and_fetch_all(cursor, query): diff --git a/tests/e2e/streams/streams_owner_tests.py b/tests/e2e/streams/streams_owner_tests.py index 1281b8fce..6d1765f1d 100644 --- a/tests/e2e/streams/streams_owner_tests.py +++ b/tests/e2e/streams/streams_owner_tests.py @@ -68,7 +68,7 @@ def test_owner_is_shown(topics, connection): common.check_stream_info(userless_cursor, "test", ("test", [ topics[0]], "mg_consumer", None, None, - "transform.simple", stream_user, False)) + "transform.simple", stream_user, "localhost:9092", False)) def test_insufficient_privileges(producer, topics, connection): diff --git a/tests/e2e/streams/streams_tests.py b/tests/e2e/streams/streams_tests.py index ef26cafdd..7ed53530a 100755 --- a/tests/e2e/streams/streams_tests.py +++ b/tests/e2e/streams/streams_tests.py @@ -172,7 +172,8 @@ def test_show_streams(producer, topics, connection): common.execute_and_fetch_all(cursor, "CREATE STREAM default_values " f"TOPICS {topics[0]} " - f"TRANSFORM transform.simple") + f"TRANSFORM transform.simple " + f"BOOTSTRAP_SERVERS \'localhost:9092\'") consumer_group = "my_special_consumer_group" batch_interval = 42 @@ -189,7 +190,7 @@ def test_show_streams(producer, topics, connection): common.check_stream_info(cursor, "default_values", ("default_values", [ topics[0]], "mg_consumer", None, None, - "transform.simple", None, False)) + "transform.simple", None, "localhost:9092", False)) common.check_stream_info(cursor, "complex_values", ( "complex_values", @@ -199,6 +200,7 @@ def test_show_streams(producer, topics, connection): batch_size, "transform.with_parameters", None, + "localhost:9092", False)) @@ -377,5 +379,38 @@ def test_restart_after_error(producer, topics, connection): cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n") +@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) +def test_bootstrap_server(producer, topics, connection, transformation): + assert len(topics) > 0 + cursor = connection.cursor() + local = "localhost:9092" + common.execute_and_fetch_all(cursor, + "CREATE STREAM test " + f"TOPICS {','.join(topics)} " + f"TRANSFORM {transformation} " + f"BOOTSTRAP_SERVERS \'{local}\'") + common.start_stream(cursor, "test") + time.sleep(5) + + for topic in topics: + producer.send(topic, SIMPLE_MSG).get(timeout=60) + + for topic in topics: + common.check_vertex_exists_with_topic_and_payload( + cursor, topic, SIMPLE_MSG) + + +@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) +def test_bootstrap_server_empty(producer, topics, connection, transformation): + assert len(topics) > 0 + cursor = connection.cursor() + with pytest.raises(mgclient.DatabaseError): + common.execute_and_fetch_all(cursor, + "CREATE STREAM test " + f"TOPICS {','.join(topics)} " + f"TRANSFORM {transformation} " + "BOOTSTRAP_SERVERS ''") + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 64f580f6e..5569fa418 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -1,3 +1,14 @@ +// Copyright 2021 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// #include #include #include @@ -3618,7 +3629,8 @@ TEST_P(CypherMainVisitorTest, StopAllStreams) { void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_string, const std::string_view stream_name, const std::vector &topic_names, const std::string_view transform_name, const std::string_view consumer_group, const std::optional &batch_interval, - const std::optional &batch_size) { + const std::optional &batch_size, + const std::string_view bootstrap_servers = "") { StreamQuery *parsed_query{nullptr}; ASSERT_NO_THROW(parsed_query = dynamic_cast(ast_generator.ParseQuery(query_string))) << query_string; ASSERT_NE(parsed_query, nullptr); @@ -3630,6 +3642,11 @@ void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_str EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_interval_, batch_interval)); EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_size_, batch_size)); EXPECT_EQ(parsed_query->batch_limit_, nullptr); + if (bootstrap_servers.empty()) { + EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr); + return; + } + EXPECT_NE(parsed_query->bootstrap_servers_, nullptr); } TEST_P(CypherMainVisitorTest, CreateStream) { @@ -3660,6 +3677,9 @@ TEST_P(CypherMainVisitorTest, CreateStream) { ast_generator); TestInvalidQuery("CREATE STREAM stream TOPICS topic1, TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru", ast_generator); + TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092", + ast_generator); + TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator); const std::vector topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots", "topic-name.with-multiple.dots-and-dashes"}; @@ -3701,6 +3721,21 @@ TEST_P(CypherMainVisitorTest, CreateStream) { fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}", kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize), kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value); + using namespace std::string_literals; + const auto host1 = "localhost:9094"s; + ValidateCreateStreamQuery( + ast_generator, + fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} " + "BOOTSTRAP_SERVERS '{}'", + kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host1), + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1); + const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s; + ValidateCreateStreamQuery( + ast_generator, + fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} " + "BOOTSTRAP_SERVERS '{}'", + kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host2), + kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2); }; for (const auto &topic_name : topic_names) {