diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 630599276..2f191c113 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -170,9 +170,8 @@ uint64_t ComputeProfilingKey(const T *obj) { class DistributedCreateNodeCursor : public Cursor { public: using InputOperator = std::shared_ptr<memgraph::query::v2::plan::LogicalOperator>; - DistributedCreateNodeCursor(const InputOperator &op, utils::MemoryResource *mem, - std::vector<const NodeCreationInfo *> nodes_info) - : input_cursor_(op->MakeCursor(mem)), nodes_info_(std::move(nodes_info)) {} + DistributedCreateNodeCursor(const InputOperator &op, utils::MemoryResource *mem, const NodeCreationInfo &node_info) + : input_cursor_(op->MakeCursor(mem)), node_info_(node_info) {} bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("CreateNode"); @@ -206,27 +205,78 @@ class DistributedCreateNodeCursor : public Cursor { void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) { // TODO(kostasrim) Make this work with batching - const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]}; + const auto primary_label = msgs::Label{.id = node_info_.labels[0]}; msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])}; - frame[nodes_info_.front()->symbol] = + frame[node_info_.symbol] = TypedValue(query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.request_router)); } std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) { std::vector<msgs::NewVertex> requests; - // TODO(kostasrim) this assertion should be removed once we support multiple vertex creation - MG_ASSERT(nodes_info_.size() == 1); msgs::PrimaryKey pk; - for (const auto &node_info : nodes_info_) { + msgs::NewVertex rqst; + MG_ASSERT(!node_info_.labels.empty(), "Cannot determine primary label"); + const auto primary_label = node_info_.labels[0]; + // TODO(jbajic) Fix properties not send, + // suggestion: ignore distinction between properties and primary keys + // since schema validation is done on storage side + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, + storage::v3::View::NEW); + if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info_.properties)) { + for (const auto &[key, value_expression] : *node_info_properties) { + TypedValue val = value_expression->Accept(evaluator); + if (context.request_router->IsPrimaryKey(primary_label, key)) { + rqst.primary_key.push_back(TypedValueToValue(val)); + pk.push_back(TypedValueToValue(val)); + } + } + } else { + auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info_.properties)).ValueMap(); + for (const auto &[key, value] : property_map) { + auto key_str = std::string(key); + auto property_id = context.request_router->NameToProperty(key_str); + if (context.request_router->IsPrimaryKey(primary_label, property_id)) { + rqst.primary_key.push_back(TypedValueToValue(value)); + pk.push_back(TypedValueToValue(value)); + } + } + } + + // TODO(kostasrim) Copy non primary labels as well + rqst.label_ids.push_back(msgs::Label{.id = primary_label}); + src_vertex_props_.push_back(rqst.properties); + requests.push_back(std::move(rqst)); + + primary_keys_.push_back(std::move(pk)); + return requests; + } + + void PlaceNodesOnTheMultiFrame(MultiFrame &multi_frame, ExecutionContext &context) { + auto multi_frame_modifier = multi_frame.GetValidFramesModifier(); + size_t i = 0; + MG_ASSERT(std::distance(multi_frame_modifier.begin(), multi_frame_modifier.end())); + for (auto &frame : multi_frame_modifier) { + const auto primary_label = msgs::Label{.id = node_info_.labels[0]}; + msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[i])}; + frame[node_info_.symbol] = TypedValue( + query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[i++], context.request_router)); + } + } + + std::vector<msgs::NewVertex> NodeCreationInfoToRequests(ExecutionContext &context, MultiFrame &multi_frame) { + std::vector<msgs::NewVertex> requests; + auto multi_frame_modifier = multi_frame.GetValidFramesModifier(); + for (auto &frame : multi_frame_modifier) { + msgs::PrimaryKey pk; msgs::NewVertex rqst; - MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label"); - const auto primary_label = node_info->labels[0]; + MG_ASSERT(!node_info_.labels.empty(), "Cannot determine primary label"); + const auto primary_label = node_info_.labels[0]; // TODO(jbajic) Fix properties not send, // suggestion: ignore distinction between properties and primary keys // since schema validation is done on storage side ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, storage::v3::View::NEW); - if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) { + if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info_.properties)) { for (const auto &[key, value_expression] : *node_info_properties) { TypedValue val = value_expression->Accept(evaluator); if (context.request_router->IsPrimaryKey(primary_label, key)) { @@ -235,7 +285,7 @@ class DistributedCreateNodeCursor : public Cursor { } } } else { - auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info->properties)).ValueMap(); + auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info_.properties)).ValueMap(); for (const auto &[key, value] : property_map) { auto key_str = std::string(key); auto property_id = context.request_router->NameToProperty(key_str); @@ -246,80 +296,19 @@ class DistributedCreateNodeCursor : public Cursor { } } - if (node_info->labels.empty()) { - throw QueryRuntimeException("Primary label must be defined!"); - } // TODO(kostasrim) Copy non primary labels as well rqst.label_ids.push_back(msgs::Label{.id = primary_label}); src_vertex_props_.push_back(rqst.properties); requests.push_back(std::move(rqst)); - } - primary_keys_.push_back(std::move(pk)); - return requests; - } - - void PlaceNodesOnTheMultiFrame(MultiFrame &multi_frame, ExecutionContext &context) { - auto multi_frame_modifier = multi_frame.GetValidFramesModifier(); - size_t i = 0; - MG_ASSERT(std::distance(multi_frame_modifier.begin(), multi_frame_modifier.end())); - for (auto &frame : multi_frame_modifier) { - const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]}; - msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[i])}; - frame[nodes_info_.front()->symbol] = TypedValue( - query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[i++], context.request_router)); - } - } - - std::vector<msgs::NewVertex> NodeCreationInfoToRequests(ExecutionContext &context, MultiFrame &multi_frame) { - std::vector<msgs::NewVertex> requests; - auto multi_frame_modifier = multi_frame.GetValidFramesModifier(); - for (auto &frame : multi_frame_modifier) { - msgs::PrimaryKey pk; - for (const auto &node_info : nodes_info_) { - msgs::NewVertex rqst; - MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label"); - const auto primary_label = node_info->labels[0]; - // TODO(jbajic) Fix properties not send, - // suggestion: ignore distinction between properties and primary keys - // since schema validation is done on storage side - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, - storage::v3::View::NEW); - if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) { - for (const auto &[key, value_expression] : *node_info_properties) { - TypedValue val = value_expression->Accept(evaluator); - if (context.request_router->IsPrimaryKey(primary_label, key)) { - rqst.primary_key.push_back(TypedValueToValue(val)); - pk.push_back(TypedValueToValue(val)); - } - } - } else { - auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info->properties)).ValueMap(); - for (const auto &[key, value] : property_map) { - auto key_str = std::string(key); - auto property_id = context.request_router->NameToProperty(key_str); - if (context.request_router->IsPrimaryKey(primary_label, property_id)) { - rqst.primary_key.push_back(TypedValueToValue(value)); - pk.push_back(TypedValueToValue(value)); - } - } - } - - if (node_info->labels.empty()) { - throw QueryRuntimeException("Primary label must be defined!"); - } - // TODO(kostasrim) Copy non primary labels as well - rqst.label_ids.push_back(msgs::Label{.id = primary_label}); - src_vertex_props_.push_back(rqst.properties); - requests.push_back(std::move(rqst)); - } primary_keys_.push_back(std::move(pk)); } + return requests; } private: const UniqueCursorPtr input_cursor_; - std::vector<const NodeCreationInfo *> nodes_info_; + NodeCreationInfo node_info_; std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_; std::vector<msgs::PrimaryKey> primary_keys_; }; @@ -364,7 +353,7 @@ ACCEPT_WITH_INPUT(CreateNode) UniqueCursorPtr CreateNode::MakeCursor(utils::MemoryResource *mem) const { EventCounter::IncrementCounter(EventCounter::CreateNodeOperator); - return MakeUniqueCursorPtr<DistributedCreateNodeCursor>(mem, input_, mem, std::vector{&this->node_info_}); + return MakeUniqueCursorPtr<DistributedCreateNodeCursor>(mem, input_, mem, this->node_info_); } std::vector<Symbol> CreateNode::ModifiedSymbols(const SymbolTable &table) const {