diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index b4436abaa..d5ba0db69 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -93,6 +93,36 @@ cpp<# } cpp<#) +(defun slk-save-expression-map (member) + #>cpp + size_t size = self.${member}.size(); + slk::Save(size, builder); + for (const auto &entry : self.${member}) { + query::SaveAstPointer(entry.first, builder); + query::SaveAstPointer(entry.second, builder); + } + cpp<#) + +(defun slk-load-expression-map (member) + #>cpp + size_t size = 0; + slk::Load(&size, reader); + for (size_t i = 0; + i < size; + ++i) { + auto *key = query::LoadAstPointer(storage, reader); + auto *value = query::LoadAstPointer(storage, reader); + self->${member}.emplace(key, value); + } + cpp<#) + +(defun clone-expression-map (source dest) + #>cpp + for (const auto &[key, value] : ${source}) { + ${dest}[key->Clone(storage)] = value->Clone(storage); + } + cpp<#) + (defun slk-load-name-ix (name-type) (lambda (member) #>cpp @@ -2543,13 +2573,15 @@ cpp<# :slk-save #'slk-save-ast-pointer :slk-load (slk-load-ast-pointer "Expression")) - (configs "Expression *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) + (configs "std::unordered_map" :scope :public + :slk-save #'slk-save-expression-map + :slk-load #'slk-load-expression-map + :clone #'clone-expression-map) - (credentials "Expression *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression"))) + (credentials "std::unordered_map" :scope :public + :slk-save #'slk-save-expression-map + :slk-load #'slk-load-expression-map + :clone #'clone-expression-map)) (:public (lcp:define-enum action diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index 42832858c..90d3c2af1 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -588,6 +588,19 @@ void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) { } } // namespace +antlrcpp::Any CypherMainVisitor::visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) { + MG_ASSERT(ctx->literal().size() == 2); + return std::pair{ctx->literal(0)->accept(this).as(), ctx->literal(1)->accept(this).as()}; +} + +antlrcpp::Any CypherMainVisitor::visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) { + std::unordered_map map; + for (auto *key_value_pair : ctx->configKeyValuePair()) { + map.insert(key_value_pair->accept(this).as>()); + } + return map; +} + antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) { auto *stream_query = storage_->Create(); stream_query->action_ = StreamQuery::Action::CREATE_STREAM; @@ -601,8 +614,10 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCre MapConfig, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_); MapConfig(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_); MapConfig(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_); - MapConfig(memory_, KafkaConfigKey::CONFIGS, stream_query->configs_); - MapConfig(memory_, KafkaConfigKey::CREDENTIALS, stream_query->credentials_); + MapConfig>(memory_, KafkaConfigKey::CONFIGS, + stream_query->configs_); + MapConfig>(memory_, KafkaConfigKey::CREDENTIALS, + stream_query->credentials_); MapCommonStreamConfigs(memory_, *stream_query); @@ -651,22 +666,17 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::Ka } if (ctx->CONFIGS()) { - if (!ctx->configsMap->mapLiteral()) { - throw SemanticException("Configs must be a map literal!"); - } ThrowIfExists(memory_, KafkaConfigKey::CONFIGS); constexpr auto configs_key = static_cast(KafkaConfigKey::CONFIGS); - memory_.emplace(configs_key, ctx->configsMap->accept(this).as()); + memory_.emplace(configs_key, ctx->configsMap->accept(this).as>()); return {}; } if (ctx->CREDENTIALS()) { - if (!ctx->credentialsMap->mapLiteral()) { - throw SemanticException("Credentials must be a map literal!"); - } ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS); constexpr auto credentials_key = static_cast(KafkaConfigKey::CREDENTIALS); - memory_.emplace(credentials_key, ctx->credentialsMap->accept(this).as()); + memory_.emplace(credentials_key, + ctx->credentialsMap->accept(this).as>()); return {}; } diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index b5290d86a..431b58480 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -1,4 +1,4 @@ -// Copyright 2021 Memgraph Ltd. +// Copyright 2022 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -269,6 +269,16 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override; + /** + * @return StreamQuery* + */ + antlrcpp::Any visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) override; + + /** + * @return StreamQuery* + */ + antlrcpp::Any visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) override; + /** * @return StreamQuery* */ @@ -849,7 +859,9 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { ParsingContext context_; AstStorage *storage_; - std::unordered_map>> memory_; + std::unordered_map, + std::unordered_map>> + memory_; // Set of identifiers from queries. std::unordered_set users_identifiers; // Identifiers that user didn't name. diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 188dd473f..d91bb42e8 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -308,11 +308,15 @@ commonCreateStreamConfig : TRANSFORM transformationName=procedureName createStream : kafkaCreateStream | pulsarCreateStream ; +configKeyValuePair : literal ':' literal ; + +configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ; + kafkaCreateStreamConfig : TOPICS topicNames | CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus | BOOTSTRAP_SERVERS bootstrapServers=literal - | CONFIGS configsMap=literal - | CREDENTIALS credentialsMap=literal + | CONFIGS configsMap=configMap + | CREDENTIALS credentialsMap=configMap | commonCreateStreamConfig ; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 6e3c984f9..e33d40d61 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -564,19 +564,16 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp } auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator); - const auto get_config_map = [&evaluator](Expression *config_literal, + const auto get_config_map = [&evaluator](std::unordered_map map, std::string_view map_name) -> std::unordered_map { - if (config_literal == nullptr) { - return {}; - } - const auto evaluated_config = config_literal->Accept(evaluator); - MG_ASSERT(evaluated_config.IsMap()); std::unordered_map config_map; - for (const auto &[key, value] : evaluated_config.ValueMap()) { - if (!value.IsString()) { - throw SemanticException("{} must contain only string values!", map_name); + for (const auto [key_expr, value_expr] : map) { + auto key = key_expr->Accept(evaluator); + auto value = value_expr->Accept(evaluator); + if (!key.IsString() || !value.IsString()) { + throw SemanticException("{} must contain only string keys and values!", map_name); } - config_map.emplace(key, value.ValueString()); + config_map.emplace(key.ValueString(), value.ValueString()); } return config_map; };