diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index fc2db5fe6..713ee76b4 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -464,8 +464,8 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { } bool PostVisit(DistributedCreateNode &op) override { prev_ops_.pop_back(); - CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom_->identifier_)))); - for (auto &kv : op.node_atom_->properties_) { + CHECK(!FindForbidden(op.node_info_.symbol)); + for (auto &kv : op.node_info_.properties) { UsedSymbolsCollector collector(*symbol_table_); kv.second->Accept(collector); CHECK(!ContainsForbidden(collector.symbols_)); @@ -480,14 +480,14 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { bool PostVisit(DistributedCreateExpand &op) override { prev_ops_.pop_back(); CHECK(!FindForbidden(op.input_symbol_)); - CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom_->identifier_)))); - CHECK(!FindForbidden(symbol_table_->at(*(op.edge_atom_->identifier_)))); - for (auto &kv : op.node_atom_->properties_) { + CHECK(!FindForbidden(op.node_info_.symbol)); + CHECK(!FindForbidden(op.edge_info_.symbol)); + for (auto &kv : op.node_info_.properties) { UsedSymbolsCollector collector(*symbol_table_); kv.second->Accept(collector); CHECK(!ContainsForbidden(collector.symbols_)); } - for (auto &kv : op.edge_atom_->properties_) { + for (auto &kv : op.edge_info_.properties) { UsedSymbolsCollector collector(*symbol_table_); kv.second->Accept(collector); CHECK(!ContainsForbidden(collector.symbols_)); @@ -1441,7 +1441,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // node creation to workers. bool create_on_random_worker = !ShouldSplit(); auto distributed_create = std::make_unique( - op.input(), op.node_atom_, create_on_random_worker); + op.input(), op.node_info_, create_on_random_worker); if (prev_ops_.empty()) distributed_plan_.master_plan = std::move(distributed_create); else @@ -1460,7 +1460,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { Split(op, PlanCartesian(op.input())); } auto distributed_create = std::make_unique( - op.node_atom_, op.edge_atom_, op.input(), op.input_symbol_, + op.node_info_, op.edge_info_, op.input(), op.input_symbol_, op.existing_node_); if (prev_ops_.empty()) distributed_plan_.master_plan = std::move(distributed_create); diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index c11f1e784..8692c5d56 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -30,8 +30,8 @@ DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10, namespace query::plan { // Create a vertex on this GraphDb and return it. Defined in operator.cpp -VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame, - Context &context); +VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, + Frame *frame, const Context &context); bool PullRemote::Accept(HierarchicalLogicalOperatorVisitor &visitor) { auto *distributed_visitor = @@ -170,10 +170,10 @@ std::vector DistributedExpandBfs::ModifiedSymbols( } DistributedCreateNode::DistributedCreateNode( - const std::shared_ptr &input, NodeAtom *node_atom, - bool on_random_worker) + const std::shared_ptr &input, + const NodeCreationInfo &node_info, bool on_random_worker) : input_(input), - node_atom_(node_atom), + node_info_(node_info), on_random_worker_(on_random_worker) {} ACCEPT_WITH_INPUT(DistributedCreateNode); @@ -181,16 +181,16 @@ ACCEPT_WITH_INPUT(DistributedCreateNode); std::vector DistributedCreateNode::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); return symbols; } DistributedCreateExpand::DistributedCreateExpand( - NodeAtom *node_atom, EdgeAtom *edge_atom, + const NodeCreationInfo &node_info, const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node) - : node_atom_(node_atom), - edge_atom_(edge_atom), + : node_info_(node_info), + edge_info_(edge_info), input_(input ? input : std::make_shared()), input_symbol_(input_symbol), existing_node_(existing_node) {} @@ -200,8 +200,8 @@ ACCEPT_WITH_INPUT(DistributedCreateExpand); std::vector DistributedCreateExpand::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); - symbols.emplace_back(table.at(*edge_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); + symbols.emplace_back(edge_info_.symbol); return symbols; } @@ -1258,7 +1258,8 @@ int RandomWorkerId(const database::DistributedGraphDb &db) { } // Creates a vertex on the GraphDb with the given worker_id. Can be this worker. -VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, +VertexAccessor &CreateVertexOnWorker(int worker_id, + const NodeCreationInfo &node_info, Frame &frame, Context &context) { auto &dba = context.db_accessor_; @@ -1267,7 +1268,7 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, int current_worker_id = distributed_db->WorkerId(); if (worker_id == current_worker_id) - return CreateLocalVertex(node_atom, frame, context); + return CreateLocalVertex(node_info, &frame, context); std::unordered_map properties; @@ -1276,20 +1277,20 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, ExpressionEvaluator evaluator(&frame, context.symbol_table_, context.evaluation_context_, &context.db_accessor_, GraphView::NEW); - for (auto &kv : node_atom->properties_) { + for (auto &kv : node_info.properties) { auto value = kv.second->Accept(evaluator); if (!value.IsPropertyValue()) { throw QueryRuntimeException("'{}' cannot be used as a property value.", value.type()); } - properties.emplace(kv.first.second, std::move(value)); + properties.emplace(kv.first, std::move(value)); } auto new_node = - database::InsertVertexIntoRemote(&dba, worker_id, node_atom->labels_, + database::InsertVertexIntoRemote(&dba, worker_id, node_info.labels, properties, std::experimental::nullopt); - frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node; - return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex(); + frame[node_info.symbol] = new_node; + return frame[node_info.symbol].ValueVertex(); } class DistributedCreateNodeCursor : public query::plan::Cursor { @@ -1299,18 +1300,17 @@ class DistributedCreateNodeCursor : public query::plan::Cursor { : input_cursor_(self->input()->MakeCursor(*dba)), // TODO: Replace this with some other mechanism db_(dynamic_cast(&dba->db())), - node_atom_(self->node_atom_), + node_info_(self->node_info_), on_random_worker_(self->on_random_worker_) { CHECK(db_); - CHECK(node_atom_); } bool Pull(Frame &frame, Context &context) override { if (input_cursor_->Pull(frame, context)) { if (on_random_worker_) { - CreateVertexOnWorker(RandomWorkerId(*db_), node_atom_, frame, context); + CreateVertexOnWorker(RandomWorkerId(*db_), node_info_, frame, context); } else { - CreateLocalVertex(node_atom_, frame, context); + CreateLocalVertex(node_info_, &frame, context); } return true; } @@ -1324,7 +1324,7 @@ class DistributedCreateNodeCursor : public query::plan::Cursor { private: std::unique_ptr input_cursor_; database::DistributedGraphDb *db_{nullptr}; - NodeAtom *node_atom_{nullptr}; + NodeCreationInfo node_info_; bool on_random_worker_{false}; }; @@ -1361,7 +1361,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { auto *dba = &context.db_accessor_; // create an edge between the two nodes - switch (self_->edge_atom_->direction_) { + switch (self_->edge_info_.direction) { case EdgeAtom::Direction::IN: CreateEdge(&v2, &v1, &frame, context.symbol_table_, &evaluator, dba); break; @@ -1385,13 +1385,12 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context) { if (self_->existing_node_) { - const auto &dest_node_symbol = - context.symbol_table_.at(*self_->node_atom_->identifier_); + const auto &dest_node_symbol = self_->node_info_.symbol; TypedValue &dest_node_value = frame[dest_node_symbol]; ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex); return dest_node_value.Value(); } else { - return CreateVertexOnWorker(worker_id, self_->node_atom_, frame, context); + return CreateVertexOnWorker(worker_id, self_->node_info_, frame, context); } } @@ -1400,10 +1399,10 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { ExpressionEvaluator *evaluator, database::GraphDbAccessor *dba) { EdgeAccessor edge = - dba->InsertEdge(*from, *to, self_->edge_atom_->edge_types_[0]); - for (auto kv : self_->edge_atom_->properties_) - PropsSetChecked(&edge, kv.first.second, kv.second->Accept(*evaluator)); - (*frame)[symbol_table.at(*self_->edge_atom_->identifier_)] = edge; + dba->InsertEdge(*from, *to, self_->edge_info_.edge_type); + for (auto kv : self_->edge_info_.properties) + PropsSetChecked(&edge, kv.first, kv.second->Accept(*evaluator)); + (*frame)[self_->edge_info_.symbol] = edge; } private: diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp index 86d16de52..58192fd88 100644 --- a/src/query/plan/distributed_ops.lcp +++ b/src/query/plan/distributed_ops.lcp @@ -320,18 +320,22 @@ by having only one result from each worker.") :slk-load #'slk-load-operator-pointer :capnp-save #'save-operator-pointer :capnp-load #'load-operator-pointer) - (node-atom "NodeAtom *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")) + (node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) (on-random-worker :bool :initval "false" :scope :public)) (:documentation "Create nodes in distributed environment.") (:public #>cpp DistributedCreateNode() {} DistributedCreateNode(const std::shared_ptr &input, - NodeAtom *node_atom, bool on_random_worker); + const NodeCreationInfo &node_info, bool on_random_worker); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( @@ -347,16 +351,24 @@ by having only one result from each worker.") (:serialize (:slk) (:capnp))) (lcp:define-class distributed-create-expand (logical-operator) - ((node-atom "NodeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")) - (edge-atom "EdgeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "EdgeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "EdgeAtom *")) + ((node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) + (edge-info "EdgeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) (input "std::shared_ptr" :scope :public :slk-save #'slk-save-operator-pointer :slk-load #'slk-load-operator-pointer @@ -368,7 +380,8 @@ by having only one result from each worker.") (:public #>cpp DistributedCreateExpand() {} - DistributedCreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, + DistributedCreateExpand(const NodeCreationInfo &node_info, + const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index d470c5557..1d0a3b9f8 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -89,26 +89,25 @@ void Once::OnceCursor::Shutdown() {} void Once::OnceCursor::Reset() { did_pull_ = false; } CreateNode::CreateNode(const std::shared_ptr &input, - NodeAtom *node_atom) - : input_(input ? input : std::make_shared()), node_atom_(node_atom) {} + const NodeCreationInfo &node_info) + : input_(input ? input : std::make_shared()), node_info_(node_info) {} // Creates a vertex on this GraphDb. Returns a reference to vertex placed on the // frame. -VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame, - Context &context) { +VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, + Frame *frame, const Context &context) { auto &dba = context.db_accessor_; auto new_node = dba.InsertVertex(); - for (auto label : node_atom->labels_) new_node.add_label(label); - + for (auto label : node_info.labels) new_node.add_label(label); // Evaluator should use the latest accessors, as modified in this query, when // setting properties on new nodes. - ExpressionEvaluator evaluator(&frame, context.symbol_table_, + ExpressionEvaluator evaluator(frame, context.symbol_table_, context.evaluation_context_, &context.db_accessor_, GraphView::NEW); - for (auto &kv : node_atom->properties_) - PropsSetChecked(&new_node, kv.first.second, kv.second->Accept(evaluator)); - frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node; - return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex(); + for (auto &kv : node_info.properties) + PropsSetChecked(&new_node, kv.first, kv.second->Accept(evaluator)); + (*frame)[node_info.symbol] = new_node; + return (*frame)[node_info.symbol].ValueVertex(); } ACCEPT_WITH_INPUT(CreateNode) @@ -121,7 +120,7 @@ std::unique_ptr CreateNode::MakeCursor( std::vector CreateNode::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); return symbols; } @@ -131,7 +130,7 @@ CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self, bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) { if (input_cursor_->Pull(frame, context)) { - CreateLocalVertex(self_.node_atom_, frame, context); + CreateLocalVertex(self_.node_info_, &frame, context); return true; } return false; @@ -141,11 +140,12 @@ void CreateNode::CreateNodeCursor::Shutdown() { input_cursor_->Shutdown(); } void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); } -CreateExpand::CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, +CreateExpand::CreateExpand(const NodeCreationInfo &node_info, + const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node) - : node_atom_(node_atom), - edge_atom_(edge_atom), + : node_info_(node_info), + edge_info_(edge_info), input_(input ? input : std::make_shared()), input_symbol_(input_symbol), existing_node_(existing_node) {} @@ -160,8 +160,8 @@ std::unique_ptr CreateExpand::MakeCursor( std::vector CreateExpand::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); - symbols.emplace_back(table.at(*edge_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); + symbols.emplace_back(edge_info_.symbol); return symbols; } @@ -190,7 +190,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) { v2.SwitchNew(); // create an edge between the two nodes - switch (self_.edge_atom_->direction_) { + switch (self_.edge_info_.direction) { case EdgeAtom::Direction::IN: CreateEdge(v2, v1, frame, context.symbol_table_, evaluator); break; @@ -215,24 +215,22 @@ void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); } VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex( Frame &frame, Context &context) { if (self_.existing_node_) { - const auto &dest_node_symbol = - context.symbol_table_.at(*self_.node_atom_->identifier_); - TypedValue &dest_node_value = frame[dest_node_symbol]; - ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex); + TypedValue &dest_node_value = frame[self_.node_info_.symbol]; + ExpectType(self_.node_info_.symbol, dest_node_value, + TypedValue::Type::Vertex); return dest_node_value.Value(); } else { - return CreateLocalVertex(self_.node_atom_, frame, context); + return CreateLocalVertex(self_.node_info_, &frame, context); } } void CreateExpand::CreateExpandCursor::CreateEdge( VertexAccessor &from, VertexAccessor &to, Frame &frame, const SymbolTable &symbol_table, ExpressionEvaluator &evaluator) { - EdgeAccessor edge = - db_.InsertEdge(from, to, self_.edge_atom_->edge_types_[0]); - for (auto kv : self_.edge_atom_->properties_) - PropsSetChecked(&edge, kv.first.second, kv.second->Accept(evaluator)); - frame[symbol_table.at(*self_.edge_atom_->identifier_)] = edge; + EdgeAccessor edge = db_.InsertEdge(from, to, self_.edge_info_.edge_type); + for (auto kv : self_.edge_info_.properties) + PropsSetChecked(&edge, kv.first, kv.second->Accept(evaluator)); + frame[self_.edge_info_.symbol] = edge; } template diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index f914d2b94..d3240b530 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -373,17 +373,81 @@ and false on every following Pull.") cpp<#) (:serialize (:slk) (:capnp))) +(defun slk-save-properties (member) + #>cpp + size_t size = self.${member}.size(); + slk::Save(size, builder); + for (const auto &kv : self.${member}) { + slk::Save(kv.first, builder); + query::SaveAstPointer(kv.second, builder, &helper->saved_ast_uids); + } + cpp<#) + +(defun slk-load-properties (member) + #>cpp + size_t size = 0; + slk::Load(&size, reader); + self->${member}.resize(size); + for (size_t i = 0; i < size; ++i) { + storage::Property prop; + slk::Load(&prop, reader); + auto *expr = query::LoadAstPointer( + &helper->ast_storage, reader, &helper->loaded_ast_uids); + self->${member}[i] = {prop, expr}; + } + cpp<#) + +(defun capnp-save-properties (builder member capnp-name) + #>cpp + for (size_t i = 0; i < ${member}.size(); ++i) { + auto prop_builder = ${builder}[i].initFirst(); + storage::Save(${member}[i].first, &prop_builder); + auto expr_builder = ${builder}[i].initSecond(); + Save(*${member}[i].second, &expr_builder, &helper->saved_ast_uids); + } + cpp<#) + +(defun capnp-load-properties (reader member capnp-name) + #>cpp + for (const auto &pair_reader : ${reader}) { + auto prop_reader = pair_reader.getFirst(); + storage::Property prop; + storage::Load(&prop, prop_reader); + auto *expr = static_cast(Load( + &helper->ast_storage, pair_reader.getSecond(), &helper->loaded_ast_uids)); + ${member}.emplace_back(prop, expr); + } + cpp<#) + +(lcp:define-struct node-creation-info () + ((symbol "Symbol") + (labels "std::vector") + (properties "std::vector>" + :slk-save #'slk-save-properties + :slk-load #'slk-load-properties + :capnp-type "List(Utils.Pair(Storage.Property, Ast.Tree))" + :capnp-save #'capnp-save-properties + :capnp-load #'capnp-load-properties)) + (:serialize (:slk :save-args '((helper "query::plan::LogicalOperator::SaveHelper *")) + :load-args '((helper "query::plan::LogicalOperator::SlkLoadHelper *"))) + (:capnp :save-args '((helper "LogicalOperator::SaveHelper *")) + :load-args '((helper "LogicalOperator::LoadHelper *"))))) + (lcp:define-class create-node (logical-operator) ((input "std::shared_ptr" :scope :public :slk-save #'slk-save-operator-pointer :slk-load #'slk-load-operator-pointer :capnp-save #'save-operator-pointer :capnp-load #'load-operator-pointer) - (node-atom "NodeAtom *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *"))) + (node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#))) (:documentation "Operator for creating a node. @@ -401,11 +465,10 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or * created (a single successful @c Cursor::Pull from this op's @c Cursor). * If a valid input, then a node will be created for each * successful pull from the given input. - * @param node_atom @c NodeAtom with information on how to create a node. - * @param on_random_worker If the node should be created locally or on random - * worker. + * @param node_info @c NodeCreationInfo */ - CreateNode(const std::shared_ptr &input, NodeAtom *node_atom); + CreateNode(const std::shared_ptr &input, + const NodeCreationInfo &node_info); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( database::GraphDbAccessor &db) const override; @@ -433,19 +496,47 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or cpp<#) (:serialize (:slk) (:capnp))) +(lcp:define-struct edge-creation-info () + ((symbol "Symbol") + (properties "std::vector>" + :slk-save #'slk-save-properties + :slk-load #'slk-load-properties + :capnp-type "List(Utils.Pair(Storage.Property, Ast.Tree))" + :capnp-save #'capnp-save-properties + :capnp-load #'capnp-load-properties) + (edge-type "storage::EdgeType") + (direction "EdgeAtom::Direction" :initval "EdgeAtom::Direction::BOTH" + :capnp-type "Ast.EdgeAtom.Direction" :capnp-init nil + :capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction" + "EdgeAtom::Direction" + '(in out both)) + :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction" + "EdgeAtom::Direction" + '(in out both)))) + (:serialize (:slk :save-args '((helper "query::plan::LogicalOperator::SaveHelper *")) + :load-args '((helper "query::plan::LogicalOperator::SlkLoadHelper *"))) + (:capnp :save-args '((helper "LogicalOperator::SaveHelper *")) + :load-args '((helper "LogicalOperator::LoadHelper *"))))) + (lcp:define-class create-expand (logical-operator) - ( - ;; info on what's getting expanded - (node-atom "NodeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")) - (edge-atom "EdgeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "EdgeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "EdgeAtom *")) + ((node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) + (edge-info "EdgeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) ;; the input op and the symbol under which the op's result ;; can be found in the frame (input "std::shared_ptr" :scope :public @@ -475,9 +566,9 @@ chained in cases when longer paths need creating. /** @brief Construct @c CreateExpand. * - * @param node_atom @c NodeAtom at the end of the edge. Used to create a node, - * unless it refers to an existing one. - * @param edge_atom @c EdgeAtom with information for the edge to be created. + * @param node_info @c NodeCreationInfo at the end of the edge. + * Used to create a node, unless it refers to an existing one. + * @param edge_info @c EdgeCreationInfo for the edge to be created. * @param input Optional. Previous @c LogicalOperator which will be pulled. * For each successful @c Cursor::Pull, this operator will create an * expansion. @@ -485,7 +576,8 @@ chained in cases when longer paths need creating. * @param existing_node @c bool indicating whether the @c node_atom refers to * an existing node. If @c false, the operator will also create the node. */ - CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, + CreateExpand(const NodeCreationInfo &node_info, + const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; diff --git a/src/query/plan/rule_based_planner.cpp b/src/query/plan/rule_based_planner.cpp index b7792ccd6..2fd2435d6 100644 --- a/src/query/plan/rule_based_planner.cpp +++ b/src/query/plan/rule_based_planner.cpp @@ -616,11 +616,24 @@ std::unique_ptr GenCreateForPattern( Pattern &pattern, std::unique_ptr input_op, const SymbolTable &symbol_table, std::unordered_set &bound_symbols) { + auto node_to_creation_info = [&](const NodeAtom &node) { + const auto &node_symbol = symbol_table.at(*node.identifier_); + std::vector labels(node.labels_); + std::vector> properties; + properties.reserve(node.properties_.size()); + for (const auto &kv : node.properties_) { + properties.push_back({kv.first.second, kv.second}); + } + return NodeCreationInfo{node_symbol, labels, properties}; + }; + auto base = [&](NodeAtom *node) -> std::unique_ptr { - if (bound_symbols.insert(symbol_table.at(*node->identifier_)).second) - return std::make_unique(std::move(input_op), node); - else + if (bound_symbols.insert(symbol_table.at(*node->identifier_)).second) { + auto node_info = node_to_creation_info(*node); + return std::make_unique(std::move(input_op), node_info); + } else { return std::move(input_op); + } }; auto collect = [&](std::unique_ptr last_op, @@ -633,11 +646,22 @@ std::unique_ptr GenCreateForPattern( if (!bound_symbols.insert(symbol_table.at(*node->identifier_)).second) { node_existing = true; } - if (!bound_symbols.insert(symbol_table.at(*edge->identifier_)).second) { + const auto &edge_symbol = symbol_table.at(*edge->identifier_); + if (!bound_symbols.insert(edge_symbol).second) { LOG(FATAL) << "Symbols used for created edges cannot be redeclared."; } - return std::make_unique(node, edge, std::move(last_op), - input_symbol, node_existing); + auto node_info = node_to_creation_info(*node); + std::vector> properties; + properties.reserve(edge->properties_.size()); + for (const auto &kv : edge->properties_) { + properties.push_back({kv.first.second, kv.second}); + } + CHECK(edge->edge_types_.size() == 1) + << "Creating an edge with a single type should be required by syntax"; + EdgeCreationInfo edge_info{edge_symbol, properties, edge->edge_types_[0], + edge->direction_}; + return std::make_unique( + node_info, edge_info, std::move(last_op), input_symbol, node_existing); }; auto last_op = diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index ead2f549e..541474668 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -285,9 +285,8 @@ TEST_F(DistributedQueryPlan, Create) { auto range = FN("range", LITERAL(0), LITERAL(1000)); auto x = ctx.symbol_table_.CreateSymbol("x", true); auto unwind = std::make_shared(nullptr, range, x); - auto node = NODE("n"); - ctx.symbol_table_[*node->identifier_] = - ctx.symbol_table_.CreateSymbol("n", true); + NodeCreationInfo node; + node.symbol = ctx.symbol_table_.CreateSymbol("n", true); auto create = std::make_shared(unwind, node, true); PullAll(create, *dba, ctx.symbol_table_); diff --git a/tests/unit/query_plan_accumulate_aggregate.cpp b/tests/unit/query_plan_accumulate_aggregate.cpp index a4c3dcf5b..7f659a47a 100644 --- a/tests/unit/query_plan_accumulate_aggregate.cpp +++ b/tests/unit/query_plan_accumulate_aggregate.cpp @@ -86,19 +86,16 @@ TEST(QueryPlan, Accumulate) { TEST(QueryPlan, AccumulateAdvance) { // we simulate 'CREATE (n) WITH n AS n MATCH (m) RETURN m' // to get correct results we need to advance the command - auto check = [&](bool advance) { database::GraphDb db; auto dba = db.Access(); AstStorage storage; SymbolTable symbol_table; - - auto node = NODE("n"); - auto sym_n = symbol_table.CreateSymbol("n", true); - symbol_table[*node->identifier_] = sym_n; + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); auto create = std::make_shared(nullptr, node); auto accumulate = std::make_shared( - create, std::vector{sym_n}, advance); + create, std::vector{node.symbol}, advance); auto match = MakeScanAll(storage, symbol_table, "m", accumulate); EXPECT_EQ(advance ? 1 : 0, PullAll(match.op_, *dba, symbol_table)); }; diff --git a/tests/unit/query_plan_bag_semantics.cpp b/tests/unit/query_plan_bag_semantics.cpp index 21d808ac4..eac724e4c 100644 --- a/tests/unit/query_plan_bag_semantics.cpp +++ b/tests/unit/query_plan_bag_semantics.cpp @@ -93,8 +93,8 @@ TEST(QueryPlan, CreateLimit) { SymbolTable symbol_table; auto n = MakeScanAll(storage, symbol_table, "n1"); - auto m = NODE("m"); - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = symbol_table.CreateSymbol("m", true); auto c = std::make_shared(n.op_, m); auto skip = std::make_shared(c, LITERAL(1)); diff --git a/tests/unit/query_plan_create_set_remove_delete.cpp b/tests/unit/query_plan_create_set_remove_delete.cpp index dae6eb80f..ea95c5f37 100644 --- a/tests/unit/query_plan_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_create_set_remove_delete.cpp @@ -27,10 +27,10 @@ TEST(QueryPlan, CreateNodeWithAttributes) { AstStorage storage; SymbolTable symbol_table; - auto node = NODE("n"); - symbol_table[*node->identifier_] = symbol_table.CreateSymbol("n", true); - node->labels_.emplace_back(label); - node->properties_[property] = LITERAL(42); + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); + node.labels.emplace_back(label); + node.properties.emplace_back(property.second, LITERAL(42)); auto create = std::make_shared(nullptr, node); PullAll(create, dba, symbol_table); @@ -62,22 +62,21 @@ TEST(QueryPlan, CreateReturn) { AstStorage storage; SymbolTable symbol_table; - auto node = NODE("n"); - auto sym_n = symbol_table.CreateSymbol("n", true); - symbol_table[*node->identifier_] = sym_n; - node->labels_.emplace_back(label); - node->properties_[property] = LITERAL(42); + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); + node.labels.emplace_back(label); + node.properties.emplace_back(property.second, LITERAL(42)); auto create = std::make_shared(nullptr, node); auto named_expr_n = NEXPR("n", IDENT("n")); symbol_table[*named_expr_n] = symbol_table.CreateSymbol("named_expr_n", true); - symbol_table[*named_expr_n->expression_] = sym_n; + symbol_table[*named_expr_n->expression_] = node.symbol; auto prop_lookup = PROPERTY_LOOKUP("n", property); - symbol_table[*prop_lookup->expression_] = sym_n; + symbol_table[*prop_lookup->expression_] = node.symbol; auto named_expr_n_p = NEXPR("n", prop_lookup); symbol_table[*named_expr_n_p] = symbol_table.CreateSymbol("named_expr_n_p", true); - symbol_table[*named_expr_n->expression_] = sym_n; + symbol_table[*named_expr_n->expression_] = node.symbol; auto produce = MakeProduce(create, named_expr_n, named_expr_n_p); auto results = CollectProduce(produce.get(), symbol_table, dba); @@ -112,29 +111,25 @@ TEST(QueryPlan, CreateExpand) { int before_e = CountIterable(dba.Edges(false)); // data for the first node - auto n = NODE("n"); - n->labels_.emplace_back(label_node_1); - n->properties_[property] = LITERAL(1); - auto n_sym = symbol_table.CreateSymbol("n", true); - symbol_table[*n->identifier_] = n_sym; + NodeCreationInfo n; + n.symbol = symbol_table.CreateSymbol("n", true); + n.labels.emplace_back(label_node_1); + n.properties.emplace_back(property.second, LITERAL(1)); // data for the second node - auto m = NODE("m"); - m->labels_.emplace_back(label_node_2); - m->properties_[property] = LITERAL(2); - if (cycle) - symbol_table[*m->identifier_] = n_sym; - else - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = cycle ? n.symbol : symbol_table.CreateSymbol("m", true); + m.labels.emplace_back(label_node_2); + m.properties.emplace_back(property.second, LITERAL(2)); - auto r = EDGE("r", EdgeAtom::Direction::OUT); - symbol_table[*r->identifier_] = symbol_table.CreateSymbol("r", true); - r->edge_types_.emplace_back(edge_type); - r->properties_[property] = LITERAL(3); + EdgeCreationInfo r; + r.symbol = symbol_table.CreateSymbol("r", true); + r.edge_type = edge_type; + r.properties.emplace_back(property.second, LITERAL(3)); auto create_op = std::make_shared(nullptr, n); auto create_expand = - std::make_shared(m, r, create_op, n_sym, cycle); + std::make_shared(m, r, create_op, n.symbol, cycle); PullAll(create_expand, dba, symbol_table); dba.AdvanceCommand(); @@ -184,8 +179,8 @@ TEST(QueryPlan, MatchCreateNode) { // first node auto n_scan_all = MakeScanAll(storage, symbol_table, "n"); // second node - auto m = NODE("m"); - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = symbol_table.CreateSymbol("m", true); // creation op auto create_node = std::make_shared(n_scan_all.op_, m); @@ -222,15 +217,13 @@ TEST(QueryPlan, MatchCreateExpand) { auto n_scan_all = MakeScanAll(storage, symbol_table, "n"); // data for the second node - auto m = NODE("m"); - if (cycle) - symbol_table[*m->identifier_] = n_scan_all.sym_; - else - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = cycle ? n_scan_all.sym_ : symbol_table.CreateSymbol("m", true); - auto r = EDGE("r", EdgeAtom::Direction::OUT); - symbol_table[*r->identifier_] = symbol_table.CreateSymbol("r", true); - r->edge_types_.emplace_back(edge_type); + EdgeCreationInfo r; + r.symbol = symbol_table.CreateSymbol("r", true); + r.direction = EdgeAtom::Direction::OUT; + r.edge_type = edge_type; auto create_expand = std::make_shared(m, r, n_scan_all.op_, n_scan_all.sym_, cycle); @@ -855,9 +848,8 @@ TEST(QueryPlan, MergeNoInput) { AstStorage storage; SymbolTable symbol_table; - auto node = NODE("n"); - auto sym_n = symbol_table.CreateSymbol("n", true); - symbol_table[*node->identifier_] = sym_n; + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); auto create = std::make_shared(nullptr, node); auto merge = std::make_shared(nullptr, create, create);