Add bootstrap servers to create stream (#274)

This commit is contained in:
Kostas Kyrimis 2021-10-18 10:49:00 +02:00 committed by GitHub
parent 2c86fefbb5
commit 10196f3d7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 343 additions and 165 deletions

View File

@ -188,7 +188,7 @@ DEFINE_bool(telemetry_enabled, false,
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_string(kafka_bootstrap_servers, "",
"List of Kafka brokers as a comma separated list of broker host or host:port.");
"List of default Kafka brokers as a comma separated list of broker host or host:port.");
// Audit logging flags.
#ifdef MG_ENTERPRISE
@ -999,8 +999,8 @@ int main(int argc, char **argv) {
auto gil = py::EnsureGIL();
auto maybe_exc = py::AppendToSysPath(py_support_dir.c_str());
if (maybe_exc) {
spdlog::error(
utils::MessageWithLink("Unable to load support for embedded Python: {}.", *maybe_exc, "https://memgr.ph/python"));
spdlog::error(utils::MessageWithLink("Unable to load support for embedded Python: {}.", *maybe_exc,
"https://memgr.ph/python"));
}
} else {
spdlog::error(utils::MessageWithLink("Unable to load support for embedded Python: missing directory {}.",

View File

@ -2514,6 +2514,9 @@ cpp<#
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(timeout "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(bootstrap_servers "Expression *" :initval "nullptr" :scope :public
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))

View File

@ -523,6 +523,12 @@ antlrcpp::Any CypherMainVisitor::visitCreateStream(MemgraphCypher::CreateStreamC
}
stream_query->batch_size_ = ctx->batchSize->accept(this);
}
if (ctx->BOOTSTRAP_SERVERS()) {
if (!ctx->bootstrapServers->StringLiteral()) {
throw SemanticException("Bootstrap servers should be a string!");
}
stream_query->bootstrap_servers_ = ctx->bootstrapServers->accept(this);
}
return stream_query;
}

View File

@ -29,6 +29,7 @@ memgraphCypherKeyword : cypherKeyword
| BATCH_LIMIT
| BATCH_SIZE
| BEFORE
| BOOTSTRAP_SERVERS
| CHECK
| CLEAR
| COMMIT
@ -301,7 +302,8 @@ createStream : CREATE STREAM streamName
TRANSFORM transformationName=procedureName
( CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus ) ?
( BATCH_INTERVAL batchInterval=literal ) ?
( BATCH_SIZE batchSize=literal ) ? ;
( BATCH_SIZE batchSize=literal ) ?
( BOOTSTRAP_SERVERS bootstrapServers=literal) ? ;
dropStream : DROP STREAM streamName ;

View File

@ -25,81 +25,82 @@ import CypherLexer ;
UNDERSCORE : '_' ;
AFTER : A F T E R ;
ALTER : A L T E R ;
ASYNC : A S Y N C ;
AUTH : A U T H ;
BAD : B A D ;
BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ;
BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ;
BATCH_SIZE : B A T C H UNDERSCORE S I Z E ;
BEFORE : B E F O R E ;
CHECK : C H E C K ;
CLEAR : C L E A R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CSV : C S V ;
DATA : D A T A ;
DELIMITER : D E L I M I T E R ;
DATABASE : D A T A B A S E ;
DENY : D E N Y ;
DIRECTORY : D I R E C T O R Y ;
DROP : D R O P ;
DUMP : D U M P ;
DURABILITY : D U R A B I L I T Y ;
EXECUTE : E X E C U T E ;
FOR : F O R ;
FREE : F R E E ;
FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ;
FROM : F R O M ;
GLOBAL : G L O B A L ;
GRANT : G R A N T ;
GRANTS : G R A N T S ;
HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ;
LEVEL : L E V E L ;
LOAD : L O A D ;
LOCK : L O C K ;
MAIN : M A I N ;
MODE : M O D E ;
NEXT : N E X T ;
NO : N O ;
PASSWORD : P A S S W O R D ;
PORT : P O R T ;
PRIVILEGES : P R I V I L E G E S ;
READ : R E A D ;
READ_FILE : R E A D UNDERSCORE F I L E ;
REGISTER : R E G I S T E R ;
REPLICA : R E P L I C A ;
REPLICAS : R E P L I C A S ;
REPLICATION : R E P L I C A T I O N ;
REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SESSION : S E S S I O N ;
SETTING : S E T T I N G ;
SETTINGS : S E T T I N G S ;
SNAPSHOT : S N A P S H O T ;
START : S T A R T ;
STATS : S T A T S ;
STOP : S T O P ;
STREAM : S T R E A M ;
STREAMS : S T R E A M S ;
SYNC : S Y N C ;
TIMEOUT : T I M E O U T ;
TO : T O ;
TOPICS : T O P I C S;
TRANSACTION : T R A N S A C T I O N ;
TRANSFORM : T R A N S F O R M ;
TRIGGER : T R I G G E R ;
TRIGGERS : T R I G G E R S ;
UNCOMMITTED : U N C O M M I T T E D ;
UNLOCK : U N L O C K ;
UPDATE : U P D A T E ;
USER : U S E R ;
USERS : U S E R S ;
AFTER : A F T E R ;
ALTER : A L T E R ;
ASYNC : A S Y N C ;
AUTH : A U T H ;
BAD : B A D ;
BATCH_INTERVAL : B A T C H UNDERSCORE I N T E R V A L ;
BATCH_LIMIT : B A T C H UNDERSCORE L I M I T ;
BATCH_SIZE : B A T C H UNDERSCORE S I Z E ;
BEFORE : B E F O R E ;
BOOTSTRAP_SERVERS : B O O T S T R A P UNDERSCORE S E R V E R S ;
CHECK : C H E C K ;
CLEAR : C L E A R ;
COMMIT : C O M M I T ;
COMMITTED : C O M M I T T E D ;
CONFIG : C O N F I G ;
CONSUMER_GROUP : C O N S U M E R UNDERSCORE G R O U P ;
CSV : C S V ;
DATA : D A T A ;
DELIMITER : D E L I M I T E R ;
DATABASE : D A T A B A S E ;
DENY : D E N Y ;
DIRECTORY : D I R E C T O R Y ;
DROP : D R O P ;
DUMP : D U M P ;
DURABILITY : D U R A B I L I T Y ;
EXECUTE : E X E C U T E ;
FOR : F O R ;
FREE : F R E E ;
FREE_MEMORY : F R E E UNDERSCORE M E M O R Y ;
FROM : F R O M ;
GLOBAL : G L O B A L ;
GRANT : G R A N T ;
GRANTS : G R A N T S ;
HEADER : H E A D E R ;
IDENTIFIED : I D E N T I F I E D ;
IGNORE : I G N O R E ;
ISOLATION : I S O L A T I O N ;
LEVEL : L E V E L ;
LOAD : L O A D ;
LOCK : L O C K ;
MAIN : M A I N ;
MODE : M O D E ;
NEXT : N E X T ;
NO : N O ;
PASSWORD : P A S S W O R D ;
PORT : P O R T ;
PRIVILEGES : P R I V I L E G E S ;
READ : R E A D ;
READ_FILE : R E A D UNDERSCORE F I L E ;
REGISTER : R E G I S T E R ;
REPLICA : R E P L I C A ;
REPLICAS : R E P L I C A S ;
REPLICATION : R E P L I C A T I O N ;
REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SESSION : S E S S I O N ;
SETTING : S E T T I N G ;
SETTINGS : S E T T I N G S ;
SNAPSHOT : S N A P S H O T ;
START : S T A R T ;
STATS : S T A T S ;
STOP : S T O P ;
STREAM : S T R E A M ;
STREAMS : S T R E A M S ;
SYNC : S Y N C ;
TIMEOUT : T I M E O U T ;
TO : T O ;
TOPICS : T O P I C S;
TRANSACTION : T R A N S A C T I O N ;
TRANSFORM : T R A N S F O R M ;
TRIGGER : T R I G G E R ;
TRIGGERS : T R I G G E R S ;
UNCOMMITTED : U N C O M M I T T E D ;
UNLOCK : U N L O C K ;
UPDATE : U P D A T E ;
USER : U S E R ;
USERS : U S E R S ;

View File

@ -89,61 +89,117 @@ class Trie {
const int kBitsetSize = 65536;
const trie::Trie kKeywords = {"union", "all",
"optional", "match",
"unwind", "as",
"merge", "on",
"create", "set",
"detach", "delete",
"remove", "with",
"distinct", "return",
"order", "by",
"skip", "limit",
"ascending", "asc",
"descending", "desc",
"where", "or",
"xor", "and",
"not", "in",
"starts", "ends",
"contains", "is",
"null", "case",
"when", "then",
"else", "end",
"count", "filter",
"extract", "any",
"none", "single",
"true", "false",
"reduce", "coalesce",
"user", "password",
"alter", "drop",
"show", "stats",
"unique", "explain",
"profile", "storage",
"index", "info",
"exists", "assert",
"constraint", "node",
"key", "dump",
"database", "call",
"yield", "memory",
"mb", "kb",
"unlimited", "free",
"procedure", "query",
"free_memory", "read_file",
"lock_path", "after",
"before", "execute",
"transaction", "trigger",
"triggers", "update",
"comitted", "uncomitted",
"global", "isolation",
"level", "next",
"read", "session",
"snapshot", "transaction",
"batch_limit", "batch_interval",
"batch_size", "consumer_group",
"start", "stream",
"streams", "transform",
"topics", "check",
"setting", "settings"};
const trie::Trie kKeywords = {"union",
"all",
"optional",
"match",
"unwind",
"as",
"merge",
"on",
"create",
"set",
"detach",
"delete",
"remove",
"with",
"distinct",
"return",
"order",
"by",
"skip",
"limit",
"ascending",
"asc",
"descending",
"desc",
"where",
"or",
"xor",
"and",
"not",
"in",
"starts",
"ends",
"contains",
"is",
"null",
"case",
"when",
"then",
"else",
"end",
"count",
"filter",
"extract",
"any",
"none",
"single",
"true",
"false",
"reduce",
"coalesce",
"user",
"password",
"alter",
"drop",
"show",
"stats",
"unique",
"explain",
"profile",
"storage",
"index",
"info",
"exists",
"assert",
"constraint",
"node",
"key",
"dump",
"database",
"call",
"yield",
"memory",
"mb",
"kb",
"unlimited",
"free",
"procedure",
"query",
"free_memory",
"read_file",
"lock_path",
"after",
"before",
"execute",
"transaction",
"trigger",
"triggers",
"update",
"comitted",
"uncomitted",
"global",
"isolation",
"level",
"next",
"read",
"session",
"snapshot",
"transaction",
"batch_limit",
"batch_interval",
"batch_size",
"consumer_group",
"start",
"stream",
"streams",
"transform",
"topics",
"check",
"setting",
"settings",
"bootstrap_servers"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -106,6 +106,17 @@ std::optional<TResult> GetOptionalValue(query::Expression *expression, Expressio
return {};
};
std::optional<std::string> GetOptionalStringValue(query::Expression *expression, ExpressionEvaluator &evaluator) {
if (expression != nullptr) {
auto value = expression->Accept(evaluator);
MG_ASSERT(value.IsNull() || value.IsString());
if (value.IsString()) {
return {std::string(value.ValueString().begin(), value.ValueString().end())};
}
}
return {};
};
class ReplQueryHandler final : public query::ReplicationQueryHandler {
public:
explicit ReplQueryHandler(storage::Storage *db) : db_(db) {}
@ -520,21 +531,28 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
std::string consumer_group{stream_query->consumer_group_.empty() ? kDefaultConsumerGroup
: stream_query->consumer_group_};
callback.fn =
[interpreter_context, stream_name = stream_query->stream_name_, topic_names = stream_query->topic_names_,
consumer_group = std::move(consumer_group),
batch_interval = GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator),
batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator),
transformation_name = stream_query->transform_name_, owner = StringPointerToOptional(username)]() mutable {
interpreter_context->streams.Create(stream_name,
query::StreamInfo{.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.batch_interval = batch_interval,
.batch_size = batch_size,
.transformation_name = std::move(transformation_name),
.owner = std::move(owner)});
return std::vector<std::vector<TypedValue>>{};
};
auto bootstrap = GetOptionalStringValue(stream_query->bootstrap_servers_, evaluator);
if (bootstrap && bootstrap->empty()) {
throw SemanticException("Bootstrap servers must not be an empty string!");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = stream_query->topic_names_, consumer_group = std::move(consumer_group),
batch_interval =
GetOptionalValue<std::chrono::milliseconds>(stream_query->batch_interval_, evaluator),
batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator),
transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap),
owner = StringPointerToOptional(username)]() mutable {
std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : "";
interpreter_context->streams.Create(stream_name,
query::StreamInfo{.topics = std::move(topic_names),
.consumer_group = std::move(consumer_group),
.batch_interval = batch_interval,
.batch_size = batch_size,
.transformation_name = std::move(transformation_name),
.owner = std::move(owner),
.bootstrap_servers = std::move(bootstrap)});
return std::vector<std::vector<TypedValue>>{};
};
return callback;
}
case StreamQuery::Action::START_STREAM: {
@ -573,8 +591,11 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return callback;
}
case StreamQuery::Action::SHOW_STREAMS: {
callback.header = {"name", "topics", "consumer_group", "batch_interval", "batch_size", "transformation_name",
"owner", "is running"};
callback.header = {"name", "topics",
"consumer_group", "batch_interval",
"batch_size", "transformation_name",
"owner", "bootstrap_servers",
"is running"};
callback.fn = [interpreter_context]() {
auto streams_status = interpreter_context->streams.GetStreamInfo();
std::vector<std::vector<TypedValue>> results;
@ -588,8 +609,8 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return typed_topics;
};
auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics](auto &typed_status,
const auto &stream_info) {
auto stream_info_as_typed_stream_info_emplace_in = [topics_as_typed_topics, interpreter_context](
auto &typed_status, const auto &stream_info) {
typed_status.emplace_back(topics_as_typed_topics(stream_info.topics));
typed_status.emplace_back(stream_info.consumer_group);
if (stream_info.batch_interval.has_value()) {
@ -608,11 +629,16 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
} else {
typed_status.emplace_back();
}
if (stream_info.bootstrap_servers.empty()) {
typed_status.emplace_back(interpreter_context->streams.BootstrapServers());
} else {
typed_status.emplace_back(stream_info.bootstrap_servers);
}
};
for (const auto &status : streams_status) {
std::vector<TypedValue> typed_status;
typed_status.reserve(7);
typed_status.reserve(8);
typed_status.emplace_back(status.name);
stream_info_as_typed_stream_info_emplace_in(typed_status, status.info);
typed_status.emplace_back(status.is_running);

View File

@ -118,6 +118,7 @@ const std::string kBatchSizeKey{"batch_size"};
const std::string kIsRunningKey{"is_running"};
const std::string kTransformationName{"transformation_name"};
const std::string kOwner{"owner"};
const std::string kBoostrapServers{"bootstrap_servers"};
void to_json(nlohmann::json &data, StreamStatus &&status) {
auto &info = status.info;
@ -145,6 +146,8 @@ void to_json(nlohmann::json &data, StreamStatus &&status) {
} else {
data[kOwner] = nullptr;
}
data[kBoostrapServers] = std::move(info.bootstrap_servers);
}
void from_json(const nlohmann::json &data, StreamStatus &status) {
@ -174,6 +177,8 @@ void from_json(const nlohmann::json &data, StreamStatus &status) {
} else {
info.owner = {};
}
info.owner = data.value(kBoostrapServers, "");
}
Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers,
@ -410,10 +415,13 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
.batch_size = stream_info.batch_size,
};
auto bootstrap_servers =
stream_info.bootstrap_servers.empty() ? bootstrap_servers_ : std::move(stream_info.bootstrap_servers);
auto insert_result = map.insert_or_assign(
stream_name, StreamData{std::move(stream_info.transformation_name), std::move(stream_info.owner),
std::make_unique<SynchronizedConsumer>(bootstrap_servers_, std::move(consumer_info),
std::move(consumer_function))});
stream_name,
StreamData{std::move(stream_info.transformation_name), std::move(stream_info.owner),
std::make_unique<SynchronizedConsumer>(std::move(bootstrap_servers), std::move(consumer_info),
std::move(consumer_function))});
MG_ASSERT(insert_result.second, "Unexpected error during storing consumer '{}'", stream_name);
return insert_result.first;
}
@ -425,4 +433,5 @@ void Streams::Persist(StreamStatus &&status) {
}
}
std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; }
} // namespace query

View File

@ -40,6 +40,7 @@ struct StreamInfo {
std::optional<int64_t> batch_size;
std::string transformation_name;
std::optional<std::string> owner;
std::string bootstrap_servers;
};
struct StreamStatus {
@ -139,6 +140,9 @@ class Streams final {
std::optional<std::chrono::milliseconds> timeout = std::nullopt,
std::optional<int64_t> batch_limit = std::nullopt) const;
/// Return the configuration value passed to memgraph.
std::string_view BootstrapServers() const;
private:
using StreamsMap = std::unordered_map<std::string, StreamData>;
using SynchronizedStreamsMap = utils::Synchronized<StreamsMap, utils::WritePrioritizedRWLock>;

View File

@ -10,7 +10,8 @@ BATCH_INTERVAL = 3
BATCH_SIZE = 4
TRANSFORM = 5
OWNER = 6
IS_RUNNING = 7
BOOTSTRAP_SERVERS = 7
IS_RUNNING = 8
def execute_and_fetch_all(cursor, query):

View File

@ -68,7 +68,7 @@ def test_owner_is_shown(topics, connection):
common.check_stream_info(userless_cursor, "test", ("test", [
topics[0]], "mg_consumer", None, None,
"transform.simple", stream_user, False))
"transform.simple", stream_user, "localhost:9092", False))
def test_insufficient_privileges(producer, topics, connection):

View File

@ -172,7 +172,8 @@ def test_show_streams(producer, topics, connection):
common.execute_and_fetch_all(cursor,
"CREATE STREAM default_values "
f"TOPICS {topics[0]} "
f"TRANSFORM transform.simple")
f"TRANSFORM transform.simple "
f"BOOTSTRAP_SERVERS \'localhost:9092\'")
consumer_group = "my_special_consumer_group"
batch_interval = 42
@ -189,7 +190,7 @@ def test_show_streams(producer, topics, connection):
common.check_stream_info(cursor, "default_values", ("default_values", [
topics[0]], "mg_consumer", None, None,
"transform.simple", None, False))
"transform.simple", None, "localhost:9092", False))
common.check_stream_info(cursor, "complex_values", (
"complex_values",
@ -199,6 +200,7 @@ def test_show_streams(producer, topics, connection):
batch_size,
"transform.with_parameters",
None,
"localhost:9092",
False))
@ -377,5 +379,38 @@ def test_restart_after_error(producer, topics, connection):
cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
def test_bootstrap_server(producer, topics, connection, transformation):
assert len(topics) > 0
cursor = connection.cursor()
local = "localhost:9092"
common.execute_and_fetch_all(cursor,
"CREATE STREAM test "
f"TOPICS {','.join(topics)} "
f"TRANSFORM {transformation} "
f"BOOTSTRAP_SERVERS \'{local}\'")
common.start_stream(cursor, "test")
time.sleep(5)
for topic in topics:
producer.send(topic, SIMPLE_MSG).get(timeout=60)
for topic in topics:
common.check_vertex_exists_with_topic_and_payload(
cursor, topic, SIMPLE_MSG)
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
def test_bootstrap_server_empty(producer, topics, connection, transformation):
assert len(topics) > 0
cursor = connection.cursor()
with pytest.raises(mgclient.DatabaseError):
common.execute_and_fetch_all(cursor,
"CREATE STREAM test "
f"TOPICS {','.join(topics)} "
f"TRANSFORM {transformation} "
"BOOTSTRAP_SERVERS ''")
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -1,3 +1,14 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//
#include <algorithm>
#include <climits>
#include <limits>
@ -3618,7 +3629,8 @@ TEST_P(CypherMainVisitorTest, StopAllStreams) {
void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_string, const std::string_view stream_name,
const std::vector<std::string> &topic_names, const std::string_view transform_name,
const std::string_view consumer_group, const std::optional<TypedValue> &batch_interval,
const std::optional<TypedValue> &batch_size) {
const std::optional<TypedValue> &batch_size,
const std::string_view bootstrap_servers = "") {
StreamQuery *parsed_query{nullptr};
ASSERT_NO_THROW(parsed_query = dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(query_string))) << query_string;
ASSERT_NE(parsed_query, nullptr);
@ -3630,6 +3642,11 @@ void ValidateCreateStreamQuery(Base &ast_generator, const std::string &query_str
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_interval_, batch_interval));
EXPECT_NO_FATAL_FAILURE(CheckOptionalExpression(ast_generator, parsed_query->batch_size_, batch_size));
EXPECT_EQ(parsed_query->batch_limit_, nullptr);
if (bootstrap_servers.empty()) {
EXPECT_EQ(parsed_query->bootstrap_servers_, nullptr);
return;
}
EXPECT_NE(parsed_query->bootstrap_servers_, nullptr);
}
TEST_P(CypherMainVisitorTest, CreateStream) {
@ -3660,6 +3677,9 @@ TEST_P(CypherMainVisitorTest, CreateStream) {
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1, TRANSFORM transform BATCH_SIZE 2 CONSUMER_GROUP Gru",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS localhost:9092",
ast_generator);
TestInvalidQuery("CREATE STREAM stream TOPICS topic1 TRANSFORM transform BOOTSTRAP_SERVERS", ast_generator);
const std::vector<std::string> topic_names{"topic1_name.with_dot", "topic1_name.with_multiple.dots",
"topic-name.with-multiple.dots-and-dashes"};
@ -3701,6 +3721,21 @@ TEST_P(CypherMainVisitorTest, CreateStream) {
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {}",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value);
using namespace std::string_literals;
const auto host1 = "localhost:9094"s;
ValidateCreateStreamQuery(
ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} "
"BOOTSTRAP_SERVERS '{}'",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host1),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host1);
const auto host2 = "localhost:9094,localhost:1994,168.1.1.256:345"s;
ValidateCreateStreamQuery(
ast_generator,
fmt::format("CREATE STREAM {} TOPICS {} TRANSFORM {} CONSUMER_GROUP {} BATCH_INTERVAL {} BATCH_SIZE {} "
"BOOTSTRAP_SERVERS '{}'",
kStreamName, topic_names_as_str, kTransformName, kConsumerGroup, kBatchInterval, kBatchSize, host2),
kStreamName, topic_names, kTransformName, kConsumerGroup, batch_interval_value, batch_size_value, host2);
};
for (const auto &topic_name : topic_names) {