Use string literals in config maps
This commit is contained in:
parent
382d96496a
commit
0bc73da66d
@ -93,6 +93,36 @@ cpp<#
|
|||||||
}
|
}
|
||||||
cpp<#)
|
cpp<#)
|
||||||
|
|
||||||
|
(defun slk-save-expression-map (member)
|
||||||
|
#>cpp
|
||||||
|
size_t size = self.${member}.size();
|
||||||
|
slk::Save(size, builder);
|
||||||
|
for (const auto &entry : self.${member}) {
|
||||||
|
query::SaveAstPointer(entry.first, builder);
|
||||||
|
query::SaveAstPointer(entry.second, builder);
|
||||||
|
}
|
||||||
|
cpp<#)
|
||||||
|
|
||||||
|
(defun slk-load-expression-map (member)
|
||||||
|
#>cpp
|
||||||
|
size_t size = 0;
|
||||||
|
slk::Load(&size, reader);
|
||||||
|
for (size_t i = 0;
|
||||||
|
i < size;
|
||||||
|
++i) {
|
||||||
|
auto *key = query::LoadAstPointer<query::Expression>(storage, reader);
|
||||||
|
auto *value = query::LoadAstPointer<query::Expression>(storage, reader);
|
||||||
|
self->${member}.emplace(key, value);
|
||||||
|
}
|
||||||
|
cpp<#)
|
||||||
|
|
||||||
|
(defun clone-expression-map (source dest)
|
||||||
|
#>cpp
|
||||||
|
for (const auto &[key, value] : ${source}) {
|
||||||
|
${dest}[key->Clone(storage)] = value->Clone(storage);
|
||||||
|
}
|
||||||
|
cpp<#)
|
||||||
|
|
||||||
(defun slk-load-name-ix (name-type)
|
(defun slk-load-name-ix (name-type)
|
||||||
(lambda (member)
|
(lambda (member)
|
||||||
#>cpp
|
#>cpp
|
||||||
@ -2543,13 +2573,15 @@ cpp<#
|
|||||||
:slk-save #'slk-save-ast-pointer
|
:slk-save #'slk-save-ast-pointer
|
||||||
:slk-load (slk-load-ast-pointer "Expression"))
|
:slk-load (slk-load-ast-pointer "Expression"))
|
||||||
|
|
||||||
(configs "Expression *" :initval "nullptr" :scope :public
|
(configs "std::unordered_map<Expression *, Expression *>" :scope :public
|
||||||
:slk-save #'slk-save-ast-pointer
|
:slk-save #'slk-save-expression-map
|
||||||
:slk-load (slk-load-ast-pointer "Expression"))
|
:slk-load #'slk-load-expression-map
|
||||||
|
:clone #'clone-expression-map)
|
||||||
|
|
||||||
(credentials "Expression *" :initval "nullptr" :scope :public
|
(credentials "std::unordered_map<Expression *, Expression *>" :scope :public
|
||||||
:slk-save #'slk-save-ast-pointer
|
:slk-save #'slk-save-expression-map
|
||||||
:slk-load (slk-load-ast-pointer "Expression")))
|
:slk-load #'slk-load-expression-map
|
||||||
|
:clone #'clone-expression-map))
|
||||||
|
|
||||||
(:public
|
(:public
|
||||||
(lcp:define-enum action
|
(lcp:define-enum action
|
||||||
|
@ -588,6 +588,19 @@ void MapCommonStreamConfigs(auto &memory, StreamQuery &stream_query) {
|
|||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
antlrcpp::Any CypherMainVisitor::visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) {
|
||||||
|
MG_ASSERT(ctx->literal().size() == 2);
|
||||||
|
return std::pair{ctx->literal(0)->accept(this).as<Expression *>(), ctx->literal(1)->accept(this).as<Expression *>()};
|
||||||
|
}
|
||||||
|
|
||||||
|
antlrcpp::Any CypherMainVisitor::visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) {
|
||||||
|
std::unordered_map<Expression *, Expression *> map;
|
||||||
|
for (auto *key_value_pair : ctx->configKeyValuePair()) {
|
||||||
|
map.insert(key_value_pair->accept(this).as<std::pair<Expression *, Expression *>>());
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) {
|
antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCreateStreamContext *ctx) {
|
||||||
auto *stream_query = storage_->Create<StreamQuery>();
|
auto *stream_query = storage_->Create<StreamQuery>();
|
||||||
stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
|
stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
|
||||||
@ -601,8 +614,10 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStream(MemgraphCypher::KafkaCre
|
|||||||
MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_);
|
MapConfig<true, std::vector<std::string>, Expression *>(memory_, KafkaConfigKey::TOPICS, stream_query->topic_names_);
|
||||||
MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_);
|
MapConfig<false, std::string>(memory_, KafkaConfigKey::CONSUMER_GROUP, stream_query->consumer_group_);
|
||||||
MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_);
|
MapConfig<false, Expression *>(memory_, KafkaConfigKey::BOOTSTRAP_SERVERS, stream_query->bootstrap_servers_);
|
||||||
MapConfig<false, Expression *>(memory_, KafkaConfigKey::CONFIGS, stream_query->configs_);
|
MapConfig<false, std::unordered_map<Expression *, Expression *>>(memory_, KafkaConfigKey::CONFIGS,
|
||||||
MapConfig<false, Expression *>(memory_, KafkaConfigKey::CREDENTIALS, stream_query->credentials_);
|
stream_query->configs_);
|
||||||
|
MapConfig<false, std::unordered_map<Expression *, Expression *>>(memory_, KafkaConfigKey::CREDENTIALS,
|
||||||
|
stream_query->credentials_);
|
||||||
|
|
||||||
MapCommonStreamConfigs(memory_, *stream_query);
|
MapCommonStreamConfigs(memory_, *stream_query);
|
||||||
|
|
||||||
@ -651,22 +666,17 @@ antlrcpp::Any CypherMainVisitor::visitKafkaCreateStreamConfig(MemgraphCypher::Ka
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ctx->CONFIGS()) {
|
if (ctx->CONFIGS()) {
|
||||||
if (!ctx->configsMap->mapLiteral()) {
|
|
||||||
throw SemanticException("Configs must be a map literal!");
|
|
||||||
}
|
|
||||||
ThrowIfExists(memory_, KafkaConfigKey::CONFIGS);
|
ThrowIfExists(memory_, KafkaConfigKey::CONFIGS);
|
||||||
constexpr auto configs_key = static_cast<uint8_t>(KafkaConfigKey::CONFIGS);
|
constexpr auto configs_key = static_cast<uint8_t>(KafkaConfigKey::CONFIGS);
|
||||||
memory_.emplace(configs_key, ctx->configsMap->accept(this).as<Expression *>());
|
memory_.emplace(configs_key, ctx->configsMap->accept(this).as<std::unordered_map<Expression *, Expression *>>());
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx->CREDENTIALS()) {
|
if (ctx->CREDENTIALS()) {
|
||||||
if (!ctx->credentialsMap->mapLiteral()) {
|
|
||||||
throw SemanticException("Credentials must be a map literal!");
|
|
||||||
}
|
|
||||||
ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS);
|
ThrowIfExists(memory_, KafkaConfigKey::CREDENTIALS);
|
||||||
constexpr auto credentials_key = static_cast<uint8_t>(KafkaConfigKey::CREDENTIALS);
|
constexpr auto credentials_key = static_cast<uint8_t>(KafkaConfigKey::CREDENTIALS);
|
||||||
memory_.emplace(credentials_key, ctx->credentialsMap->accept(this).as<Expression *>());
|
memory_.emplace(credentials_key,
|
||||||
|
ctx->credentialsMap->accept(this).as<std::unordered_map<Expression *, Expression *>>());
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2021 Memgraph Ltd.
|
// Copyright 2022 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -269,6 +269,16 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
|
|||||||
*/
|
*/
|
||||||
antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override;
|
antlrcpp::Any visitCreateStream(MemgraphCypher::CreateStreamContext *ctx) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return StreamQuery*
|
||||||
|
*/
|
||||||
|
antlrcpp::Any visitConfigKeyValuePair(MemgraphCypher::ConfigKeyValuePairContext *ctx) override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return StreamQuery*
|
||||||
|
*/
|
||||||
|
antlrcpp::Any visitConfigMap(MemgraphCypher::ConfigMapContext *ctx) override;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return StreamQuery*
|
* @return StreamQuery*
|
||||||
*/
|
*/
|
||||||
@ -849,7 +859,9 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
|
|||||||
ParsingContext context_;
|
ParsingContext context_;
|
||||||
AstStorage *storage_;
|
AstStorage *storage_;
|
||||||
|
|
||||||
std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>>> memory_;
|
std::unordered_map<uint8_t, std::variant<Expression *, std::string, std::vector<std::string>,
|
||||||
|
std::unordered_map<Expression *, Expression *>>>
|
||||||
|
memory_;
|
||||||
// Set of identifiers from queries.
|
// Set of identifiers from queries.
|
||||||
std::unordered_set<std::string> users_identifiers;
|
std::unordered_set<std::string> users_identifiers;
|
||||||
// Identifiers that user didn't name.
|
// Identifiers that user didn't name.
|
||||||
|
@ -308,11 +308,15 @@ commonCreateStreamConfig : TRANSFORM transformationName=procedureName
|
|||||||
|
|
||||||
createStream : kafkaCreateStream | pulsarCreateStream ;
|
createStream : kafkaCreateStream | pulsarCreateStream ;
|
||||||
|
|
||||||
|
configKeyValuePair : literal ':' literal ;
|
||||||
|
|
||||||
|
configMap : '{' ( configKeyValuePair ( ',' configKeyValuePair )* )? '}' ;
|
||||||
|
|
||||||
kafkaCreateStreamConfig : TOPICS topicNames
|
kafkaCreateStreamConfig : TOPICS topicNames
|
||||||
| CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus
|
| CONSUMER_GROUP consumerGroup=symbolicNameWithDotsAndMinus
|
||||||
| BOOTSTRAP_SERVERS bootstrapServers=literal
|
| BOOTSTRAP_SERVERS bootstrapServers=literal
|
||||||
| CONFIGS configsMap=literal
|
| CONFIGS configsMap=configMap
|
||||||
| CREDENTIALS credentialsMap=literal
|
| CREDENTIALS credentialsMap=configMap
|
||||||
| commonCreateStreamConfig
|
| commonCreateStreamConfig
|
||||||
;
|
;
|
||||||
|
|
||||||
|
@ -564,19 +564,16 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
|
|||||||
}
|
}
|
||||||
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
|
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
|
||||||
|
|
||||||
const auto get_config_map = [&evaluator](Expression *config_literal,
|
const auto get_config_map = [&evaluator](std::unordered_map<Expression *, Expression *> map,
|
||||||
std::string_view map_name) -> std::unordered_map<std::string, std::string> {
|
std::string_view map_name) -> std::unordered_map<std::string, std::string> {
|
||||||
if (config_literal == nullptr) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
const auto evaluated_config = config_literal->Accept(evaluator);
|
|
||||||
MG_ASSERT(evaluated_config.IsMap());
|
|
||||||
std::unordered_map<std::string, std::string> config_map;
|
std::unordered_map<std::string, std::string> config_map;
|
||||||
for (const auto &[key, value] : evaluated_config.ValueMap()) {
|
for (const auto [key_expr, value_expr] : map) {
|
||||||
if (!value.IsString()) {
|
auto key = key_expr->Accept(evaluator);
|
||||||
throw SemanticException("{} must contain only string values!", map_name);
|
auto value = value_expr->Accept(evaluator);
|
||||||
|
if (!key.IsString() || !value.IsString()) {
|
||||||
|
throw SemanticException("{} must contain only string keys and values!", map_name);
|
||||||
}
|
}
|
||||||
config_map.emplace(key, value.ValueString());
|
config_map.emplace(key.ValueString(), value.ValueString());
|
||||||
}
|
}
|
||||||
return config_map;
|
return config_map;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user