From 72ee3fda853a4be71d22e5761ea1d44e0631c0e2 Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Thu, 20 Dec 2018 09:38:23 +0100 Subject: [PATCH] Replace NodeAtom and EdgeAtom with CreationInfo Summary: This (almost) removes the dependency of operators on NodeAtom and EdgeAtom. Only EdgeAtom::Direction is needed. The change was done as the initial step of removing dependency on storage from Ast. Additionally, it makes sense for LogicalOperator to only depend on Expression classes. Reviewers: mtomic, llugovic Reviewed By: llugovic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1779 --- src/query/plan/distributed.cpp | 16 +- src/query/plan/distributed_ops.cpp | 61 ++++---- src/query/plan/distributed_ops.lcp | 47 +++--- src/query/plan/operator.cpp | 56 ++++--- src/query/plan/operator.lcp | 142 +++++++++++++++--- src/query/plan/rule_based_planner.cpp | 36 ++++- tests/unit/distributed_query_plan.cpp | 5 +- .../unit/query_plan_accumulate_aggregate.cpp | 9 +- tests/unit/query_plan_bag_semantics.cpp | 4 +- .../query_plan_create_set_remove_delete.cpp | 76 +++++----- 10 files changed, 283 insertions(+), 169 deletions(-) diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index fc2db5fe6..713ee76b4 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -464,8 +464,8 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { } bool PostVisit(DistributedCreateNode &op) override { prev_ops_.pop_back(); - CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom_->identifier_)))); - for (auto &kv : op.node_atom_->properties_) { + CHECK(!FindForbidden(op.node_info_.symbol)); + for (auto &kv : op.node_info_.properties) { UsedSymbolsCollector collector(*symbol_table_); kv.second->Accept(collector); CHECK(!ContainsForbidden(collector.symbols_)); @@ -480,14 +480,14 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { bool PostVisit(DistributedCreateExpand &op) override { prev_ops_.pop_back(); CHECK(!FindForbidden(op.input_symbol_)); - CHECK(!FindForbidden(symbol_table_->at(*(op.node_atom_->identifier_)))); - CHECK(!FindForbidden(symbol_table_->at(*(op.edge_atom_->identifier_)))); - for (auto &kv : op.node_atom_->properties_) { + CHECK(!FindForbidden(op.node_info_.symbol)); + CHECK(!FindForbidden(op.edge_info_.symbol)); + for (auto &kv : op.node_info_.properties) { UsedSymbolsCollector collector(*symbol_table_); kv.second->Accept(collector); CHECK(!ContainsForbidden(collector.symbols_)); } - for (auto &kv : op.edge_atom_->properties_) { + for (auto &kv : op.edge_info_.properties) { UsedSymbolsCollector collector(*symbol_table_); kv.second->Accept(collector); CHECK(!ContainsForbidden(collector.symbols_)); @@ -1441,7 +1441,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { // node creation to workers. bool create_on_random_worker = !ShouldSplit(); auto distributed_create = std::make_unique( - op.input(), op.node_atom_, create_on_random_worker); + op.input(), op.node_info_, create_on_random_worker); if (prev_ops_.empty()) distributed_plan_.master_plan = std::move(distributed_create); else @@ -1460,7 +1460,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { Split(op, PlanCartesian(op.input())); } auto distributed_create = std::make_unique( - op.node_atom_, op.edge_atom_, op.input(), op.input_symbol_, + op.node_info_, op.edge_info_, op.input(), op.input_symbol_, op.existing_node_); if (prev_ops_.empty()) distributed_plan_.master_plan = std::move(distributed_create); diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index c11f1e784..8692c5d56 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -30,8 +30,8 @@ DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10, namespace query::plan { // Create a vertex on this GraphDb and return it. Defined in operator.cpp -VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame, - Context &context); +VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, + Frame *frame, const Context &context); bool PullRemote::Accept(HierarchicalLogicalOperatorVisitor &visitor) { auto *distributed_visitor = @@ -170,10 +170,10 @@ std::vector DistributedExpandBfs::ModifiedSymbols( } DistributedCreateNode::DistributedCreateNode( - const std::shared_ptr &input, NodeAtom *node_atom, - bool on_random_worker) + const std::shared_ptr &input, + const NodeCreationInfo &node_info, bool on_random_worker) : input_(input), - node_atom_(node_atom), + node_info_(node_info), on_random_worker_(on_random_worker) {} ACCEPT_WITH_INPUT(DistributedCreateNode); @@ -181,16 +181,16 @@ ACCEPT_WITH_INPUT(DistributedCreateNode); std::vector DistributedCreateNode::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); return symbols; } DistributedCreateExpand::DistributedCreateExpand( - NodeAtom *node_atom, EdgeAtom *edge_atom, + const NodeCreationInfo &node_info, const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node) - : node_atom_(node_atom), - edge_atom_(edge_atom), + : node_info_(node_info), + edge_info_(edge_info), input_(input ? input : std::make_shared()), input_symbol_(input_symbol), existing_node_(existing_node) {} @@ -200,8 +200,8 @@ ACCEPT_WITH_INPUT(DistributedCreateExpand); std::vector DistributedCreateExpand::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); - symbols.emplace_back(table.at(*edge_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); + symbols.emplace_back(edge_info_.symbol); return symbols; } @@ -1258,7 +1258,8 @@ int RandomWorkerId(const database::DistributedGraphDb &db) { } // Creates a vertex on the GraphDb with the given worker_id. Can be this worker. -VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, +VertexAccessor &CreateVertexOnWorker(int worker_id, + const NodeCreationInfo &node_info, Frame &frame, Context &context) { auto &dba = context.db_accessor_; @@ -1267,7 +1268,7 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, int current_worker_id = distributed_db->WorkerId(); if (worker_id == current_worker_id) - return CreateLocalVertex(node_atom, frame, context); + return CreateLocalVertex(node_info, &frame, context); std::unordered_map properties; @@ -1276,20 +1277,20 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, ExpressionEvaluator evaluator(&frame, context.symbol_table_, context.evaluation_context_, &context.db_accessor_, GraphView::NEW); - for (auto &kv : node_atom->properties_) { + for (auto &kv : node_info.properties) { auto value = kv.second->Accept(evaluator); if (!value.IsPropertyValue()) { throw QueryRuntimeException("'{}' cannot be used as a property value.", value.type()); } - properties.emplace(kv.first.second, std::move(value)); + properties.emplace(kv.first, std::move(value)); } auto new_node = - database::InsertVertexIntoRemote(&dba, worker_id, node_atom->labels_, + database::InsertVertexIntoRemote(&dba, worker_id, node_info.labels, properties, std::experimental::nullopt); - frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node; - return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex(); + frame[node_info.symbol] = new_node; + return frame[node_info.symbol].ValueVertex(); } class DistributedCreateNodeCursor : public query::plan::Cursor { @@ -1299,18 +1300,17 @@ class DistributedCreateNodeCursor : public query::plan::Cursor { : input_cursor_(self->input()->MakeCursor(*dba)), // TODO: Replace this with some other mechanism db_(dynamic_cast(&dba->db())), - node_atom_(self->node_atom_), + node_info_(self->node_info_), on_random_worker_(self->on_random_worker_) { CHECK(db_); - CHECK(node_atom_); } bool Pull(Frame &frame, Context &context) override { if (input_cursor_->Pull(frame, context)) { if (on_random_worker_) { - CreateVertexOnWorker(RandomWorkerId(*db_), node_atom_, frame, context); + CreateVertexOnWorker(RandomWorkerId(*db_), node_info_, frame, context); } else { - CreateLocalVertex(node_atom_, frame, context); + CreateLocalVertex(node_info_, &frame, context); } return true; } @@ -1324,7 +1324,7 @@ class DistributedCreateNodeCursor : public query::plan::Cursor { private: std::unique_ptr input_cursor_; database::DistributedGraphDb *db_{nullptr}; - NodeAtom *node_atom_{nullptr}; + NodeCreationInfo node_info_; bool on_random_worker_{false}; }; @@ -1361,7 +1361,7 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { auto *dba = &context.db_accessor_; // create an edge between the two nodes - switch (self_->edge_atom_->direction_) { + switch (self_->edge_info_.direction) { case EdgeAtom::Direction::IN: CreateEdge(&v2, &v1, &frame, context.symbol_table_, &evaluator, dba); break; @@ -1385,13 +1385,12 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context) { if (self_->existing_node_) { - const auto &dest_node_symbol = - context.symbol_table_.at(*self_->node_atom_->identifier_); + const auto &dest_node_symbol = self_->node_info_.symbol; TypedValue &dest_node_value = frame[dest_node_symbol]; ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex); return dest_node_value.Value(); } else { - return CreateVertexOnWorker(worker_id, self_->node_atom_, frame, context); + return CreateVertexOnWorker(worker_id, self_->node_info_, frame, context); } } @@ -1400,10 +1399,10 @@ class DistributedCreateExpandCursor : public query::plan::Cursor { ExpressionEvaluator *evaluator, database::GraphDbAccessor *dba) { EdgeAccessor edge = - dba->InsertEdge(*from, *to, self_->edge_atom_->edge_types_[0]); - for (auto kv : self_->edge_atom_->properties_) - PropsSetChecked(&edge, kv.first.second, kv.second->Accept(*evaluator)); - (*frame)[symbol_table.at(*self_->edge_atom_->identifier_)] = edge; + dba->InsertEdge(*from, *to, self_->edge_info_.edge_type); + for (auto kv : self_->edge_info_.properties) + PropsSetChecked(&edge, kv.first, kv.second->Accept(*evaluator)); + (*frame)[self_->edge_info_.symbol] = edge; } private: diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp index 86d16de52..58192fd88 100644 --- a/src/query/plan/distributed_ops.lcp +++ b/src/query/plan/distributed_ops.lcp @@ -320,18 +320,22 @@ by having only one result from each worker.") :slk-load #'slk-load-operator-pointer :capnp-save #'save-operator-pointer :capnp-load #'load-operator-pointer) - (node-atom "NodeAtom *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")) + (node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) (on-random-worker :bool :initval "false" :scope :public)) (:documentation "Create nodes in distributed environment.") (:public #>cpp DistributedCreateNode() {} DistributedCreateNode(const std::shared_ptr &input, - NodeAtom *node_atom, bool on_random_worker); + const NodeCreationInfo &node_info, bool on_random_worker); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( @@ -347,16 +351,24 @@ by having only one result from each worker.") (:serialize (:slk) (:capnp))) (lcp:define-class distributed-create-expand (logical-operator) - ((node-atom "NodeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")) - (edge-atom "EdgeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "EdgeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "EdgeAtom *")) + ((node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) + (edge-info "EdgeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) (input "std::shared_ptr" :scope :public :slk-save #'slk-save-operator-pointer :slk-load #'slk-load-operator-pointer @@ -368,7 +380,8 @@ by having only one result from each worker.") (:public #>cpp DistributedCreateExpand() {} - DistributedCreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, + DistributedCreateExpand(const NodeCreationInfo &node_info, + const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index d470c5557..1d0a3b9f8 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -89,26 +89,25 @@ void Once::OnceCursor::Shutdown() {} void Once::OnceCursor::Reset() { did_pull_ = false; } CreateNode::CreateNode(const std::shared_ptr &input, - NodeAtom *node_atom) - : input_(input ? input : std::make_shared()), node_atom_(node_atom) {} + const NodeCreationInfo &node_info) + : input_(input ? input : std::make_shared()), node_info_(node_info) {} // Creates a vertex on this GraphDb. Returns a reference to vertex placed on the // frame. -VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame, - Context &context) { +VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, + Frame *frame, const Context &context) { auto &dba = context.db_accessor_; auto new_node = dba.InsertVertex(); - for (auto label : node_atom->labels_) new_node.add_label(label); - + for (auto label : node_info.labels) new_node.add_label(label); // Evaluator should use the latest accessors, as modified in this query, when // setting properties on new nodes. - ExpressionEvaluator evaluator(&frame, context.symbol_table_, + ExpressionEvaluator evaluator(frame, context.symbol_table_, context.evaluation_context_, &context.db_accessor_, GraphView::NEW); - for (auto &kv : node_atom->properties_) - PropsSetChecked(&new_node, kv.first.second, kv.second->Accept(evaluator)); - frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node; - return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex(); + for (auto &kv : node_info.properties) + PropsSetChecked(&new_node, kv.first, kv.second->Accept(evaluator)); + (*frame)[node_info.symbol] = new_node; + return (*frame)[node_info.symbol].ValueVertex(); } ACCEPT_WITH_INPUT(CreateNode) @@ -121,7 +120,7 @@ std::unique_ptr CreateNode::MakeCursor( std::vector CreateNode::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); return symbols; } @@ -131,7 +130,7 @@ CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self, bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) { if (input_cursor_->Pull(frame, context)) { - CreateLocalVertex(self_.node_atom_, frame, context); + CreateLocalVertex(self_.node_info_, &frame, context); return true; } return false; @@ -141,11 +140,12 @@ void CreateNode::CreateNodeCursor::Shutdown() { input_cursor_->Shutdown(); } void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); } -CreateExpand::CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, +CreateExpand::CreateExpand(const NodeCreationInfo &node_info, + const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node) - : node_atom_(node_atom), - edge_atom_(edge_atom), + : node_info_(node_info), + edge_info_(edge_info), input_(input ? input : std::make_shared()), input_symbol_(input_symbol), existing_node_(existing_node) {} @@ -160,8 +160,8 @@ std::unique_ptr CreateExpand::MakeCursor( std::vector CreateExpand::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(table.at(*node_atom_->identifier_)); - symbols.emplace_back(table.at(*edge_atom_->identifier_)); + symbols.emplace_back(node_info_.symbol); + symbols.emplace_back(edge_info_.symbol); return symbols; } @@ -190,7 +190,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) { v2.SwitchNew(); // create an edge between the two nodes - switch (self_.edge_atom_->direction_) { + switch (self_.edge_info_.direction) { case EdgeAtom::Direction::IN: CreateEdge(v2, v1, frame, context.symbol_table_, evaluator); break; @@ -215,24 +215,22 @@ void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); } VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex( Frame &frame, Context &context) { if (self_.existing_node_) { - const auto &dest_node_symbol = - context.symbol_table_.at(*self_.node_atom_->identifier_); - TypedValue &dest_node_value = frame[dest_node_symbol]; - ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex); + TypedValue &dest_node_value = frame[self_.node_info_.symbol]; + ExpectType(self_.node_info_.symbol, dest_node_value, + TypedValue::Type::Vertex); return dest_node_value.Value(); } else { - return CreateLocalVertex(self_.node_atom_, frame, context); + return CreateLocalVertex(self_.node_info_, &frame, context); } } void CreateExpand::CreateExpandCursor::CreateEdge( VertexAccessor &from, VertexAccessor &to, Frame &frame, const SymbolTable &symbol_table, ExpressionEvaluator &evaluator) { - EdgeAccessor edge = - db_.InsertEdge(from, to, self_.edge_atom_->edge_types_[0]); - for (auto kv : self_.edge_atom_->properties_) - PropsSetChecked(&edge, kv.first.second, kv.second->Accept(evaluator)); - frame[symbol_table.at(*self_.edge_atom_->identifier_)] = edge; + EdgeAccessor edge = db_.InsertEdge(from, to, self_.edge_info_.edge_type); + for (auto kv : self_.edge_info_.properties) + PropsSetChecked(&edge, kv.first, kv.second->Accept(evaluator)); + frame[self_.edge_info_.symbol] = edge; } template diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index f914d2b94..d3240b530 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -373,17 +373,81 @@ and false on every following Pull.") cpp<#) (:serialize (:slk) (:capnp))) +(defun slk-save-properties (member) + #>cpp + size_t size = self.${member}.size(); + slk::Save(size, builder); + for (const auto &kv : self.${member}) { + slk::Save(kv.first, builder); + query::SaveAstPointer(kv.second, builder, &helper->saved_ast_uids); + } + cpp<#) + +(defun slk-load-properties (member) + #>cpp + size_t size = 0; + slk::Load(&size, reader); + self->${member}.resize(size); + for (size_t i = 0; i < size; ++i) { + storage::Property prop; + slk::Load(&prop, reader); + auto *expr = query::LoadAstPointer( + &helper->ast_storage, reader, &helper->loaded_ast_uids); + self->${member}[i] = {prop, expr}; + } + cpp<#) + +(defun capnp-save-properties (builder member capnp-name) + #>cpp + for (size_t i = 0; i < ${member}.size(); ++i) { + auto prop_builder = ${builder}[i].initFirst(); + storage::Save(${member}[i].first, &prop_builder); + auto expr_builder = ${builder}[i].initSecond(); + Save(*${member}[i].second, &expr_builder, &helper->saved_ast_uids); + } + cpp<#) + +(defun capnp-load-properties (reader member capnp-name) + #>cpp + for (const auto &pair_reader : ${reader}) { + auto prop_reader = pair_reader.getFirst(); + storage::Property prop; + storage::Load(&prop, prop_reader); + auto *expr = static_cast(Load( + &helper->ast_storage, pair_reader.getSecond(), &helper->loaded_ast_uids)); + ${member}.emplace_back(prop, expr); + } + cpp<#) + +(lcp:define-struct node-creation-info () + ((symbol "Symbol") + (labels "std::vector") + (properties "std::vector>" + :slk-save #'slk-save-properties + :slk-load #'slk-load-properties + :capnp-type "List(Utils.Pair(Storage.Property, Ast.Tree))" + :capnp-save #'capnp-save-properties + :capnp-load #'capnp-load-properties)) + (:serialize (:slk :save-args '((helper "query::plan::LogicalOperator::SaveHelper *")) + :load-args '((helper "query::plan::LogicalOperator::SlkLoadHelper *"))) + (:capnp :save-args '((helper "LogicalOperator::SaveHelper *")) + :load-args '((helper "LogicalOperator::LoadHelper *"))))) + (lcp:define-class create-node (logical-operator) ((input "std::shared_ptr" :scope :public :slk-save #'slk-save-operator-pointer :slk-load #'slk-load-operator-pointer :capnp-save #'save-operator-pointer :capnp-load #'load-operator-pointer) - (node-atom "NodeAtom *" :initval "nullptr" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *"))) + (node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#))) (:documentation "Operator for creating a node. @@ -401,11 +465,10 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or * created (a single successful @c Cursor::Pull from this op's @c Cursor). * If a valid input, then a node will be created for each * successful pull from the given input. - * @param node_atom @c NodeAtom with information on how to create a node. - * @param on_random_worker If the node should be created locally or on random - * worker. + * @param node_info @c NodeCreationInfo */ - CreateNode(const std::shared_ptr &input, NodeAtom *node_atom); + CreateNode(const std::shared_ptr &input, + const NodeCreationInfo &node_info); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( database::GraphDbAccessor &db) const override; @@ -433,19 +496,47 @@ a preceeding `MATCH`), or multiple nodes (`MATCH ... CREATE` or cpp<#) (:serialize (:slk) (:capnp))) +(lcp:define-struct edge-creation-info () + ((symbol "Symbol") + (properties "std::vector>" + :slk-save #'slk-save-properties + :slk-load #'slk-load-properties + :capnp-type "List(Utils.Pair(Storage.Property, Ast.Tree))" + :capnp-save #'capnp-save-properties + :capnp-load #'capnp-load-properties) + (edge-type "storage::EdgeType") + (direction "EdgeAtom::Direction" :initval "EdgeAtom::Direction::BOTH" + :capnp-type "Ast.EdgeAtom.Direction" :capnp-init nil + :capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction" + "EdgeAtom::Direction" + '(in out both)) + :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction" + "EdgeAtom::Direction" + '(in out both)))) + (:serialize (:slk :save-args '((helper "query::plan::LogicalOperator::SaveHelper *")) + :load-args '((helper "query::plan::LogicalOperator::SlkLoadHelper *"))) + (:capnp :save-args '((helper "LogicalOperator::SaveHelper *")) + :load-args '((helper "LogicalOperator::LoadHelper *"))))) + (lcp:define-class create-expand (logical-operator) - ( - ;; info on what's getting expanded - (node-atom "NodeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "NodeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "NodeAtom *")) - (edge-atom "EdgeAtom *" :scope :public - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "EdgeAtom") - :capnp-type "Ast.Tree" :capnp-init nil - :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "EdgeAtom *")) + ((node-info "NodeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) + (edge-info "EdgeCreationInfo" :scope :public + :slk-save (lambda (m) + #>cpp + slk::Save(self.${m}, builder, helper); + cpp<#) + :slk-load (lambda (m) + #>cpp + slk::Load(&self->${m}, reader, helper); + cpp<#)) ;; the input op and the symbol under which the op's result ;; can be found in the frame (input "std::shared_ptr" :scope :public @@ -475,9 +566,9 @@ chained in cases when longer paths need creating. /** @brief Construct @c CreateExpand. * - * @param node_atom @c NodeAtom at the end of the edge. Used to create a node, - * unless it refers to an existing one. - * @param edge_atom @c EdgeAtom with information for the edge to be created. + * @param node_info @c NodeCreationInfo at the end of the edge. + * Used to create a node, unless it refers to an existing one. + * @param edge_info @c EdgeCreationInfo for the edge to be created. * @param input Optional. Previous @c LogicalOperator which will be pulled. * For each successful @c Cursor::Pull, this operator will create an * expansion. @@ -485,7 +576,8 @@ chained in cases when longer paths need creating. * @param existing_node @c bool indicating whether the @c node_atom refers to * an existing node. If @c false, the operator will also create the node. */ - CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom, + CreateExpand(const NodeCreationInfo &node_info, + const EdgeCreationInfo &edge_info, const std::shared_ptr &input, Symbol input_symbol, bool existing_node); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; diff --git a/src/query/plan/rule_based_planner.cpp b/src/query/plan/rule_based_planner.cpp index b7792ccd6..2fd2435d6 100644 --- a/src/query/plan/rule_based_planner.cpp +++ b/src/query/plan/rule_based_planner.cpp @@ -616,11 +616,24 @@ std::unique_ptr GenCreateForPattern( Pattern &pattern, std::unique_ptr input_op, const SymbolTable &symbol_table, std::unordered_set &bound_symbols) { + auto node_to_creation_info = [&](const NodeAtom &node) { + const auto &node_symbol = symbol_table.at(*node.identifier_); + std::vector labels(node.labels_); + std::vector> properties; + properties.reserve(node.properties_.size()); + for (const auto &kv : node.properties_) { + properties.push_back({kv.first.second, kv.second}); + } + return NodeCreationInfo{node_symbol, labels, properties}; + }; + auto base = [&](NodeAtom *node) -> std::unique_ptr { - if (bound_symbols.insert(symbol_table.at(*node->identifier_)).second) - return std::make_unique(std::move(input_op), node); - else + if (bound_symbols.insert(symbol_table.at(*node->identifier_)).second) { + auto node_info = node_to_creation_info(*node); + return std::make_unique(std::move(input_op), node_info); + } else { return std::move(input_op); + } }; auto collect = [&](std::unique_ptr last_op, @@ -633,11 +646,22 @@ std::unique_ptr GenCreateForPattern( if (!bound_symbols.insert(symbol_table.at(*node->identifier_)).second) { node_existing = true; } - if (!bound_symbols.insert(symbol_table.at(*edge->identifier_)).second) { + const auto &edge_symbol = symbol_table.at(*edge->identifier_); + if (!bound_symbols.insert(edge_symbol).second) { LOG(FATAL) << "Symbols used for created edges cannot be redeclared."; } - return std::make_unique(node, edge, std::move(last_op), - input_symbol, node_existing); + auto node_info = node_to_creation_info(*node); + std::vector> properties; + properties.reserve(edge->properties_.size()); + for (const auto &kv : edge->properties_) { + properties.push_back({kv.first.second, kv.second}); + } + CHECK(edge->edge_types_.size() == 1) + << "Creating an edge with a single type should be required by syntax"; + EdgeCreationInfo edge_info{edge_symbol, properties, edge->edge_types_[0], + edge->direction_}; + return std::make_unique( + node_info, edge_info, std::move(last_op), input_symbol, node_existing); }; auto last_op = diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index ead2f549e..541474668 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -285,9 +285,8 @@ TEST_F(DistributedQueryPlan, Create) { auto range = FN("range", LITERAL(0), LITERAL(1000)); auto x = ctx.symbol_table_.CreateSymbol("x", true); auto unwind = std::make_shared(nullptr, range, x); - auto node = NODE("n"); - ctx.symbol_table_[*node->identifier_] = - ctx.symbol_table_.CreateSymbol("n", true); + NodeCreationInfo node; + node.symbol = ctx.symbol_table_.CreateSymbol("n", true); auto create = std::make_shared(unwind, node, true); PullAll(create, *dba, ctx.symbol_table_); diff --git a/tests/unit/query_plan_accumulate_aggregate.cpp b/tests/unit/query_plan_accumulate_aggregate.cpp index a4c3dcf5b..7f659a47a 100644 --- a/tests/unit/query_plan_accumulate_aggregate.cpp +++ b/tests/unit/query_plan_accumulate_aggregate.cpp @@ -86,19 +86,16 @@ TEST(QueryPlan, Accumulate) { TEST(QueryPlan, AccumulateAdvance) { // we simulate 'CREATE (n) WITH n AS n MATCH (m) RETURN m' // to get correct results we need to advance the command - auto check = [&](bool advance) { database::GraphDb db; auto dba = db.Access(); AstStorage storage; SymbolTable symbol_table; - - auto node = NODE("n"); - auto sym_n = symbol_table.CreateSymbol("n", true); - symbol_table[*node->identifier_] = sym_n; + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); auto create = std::make_shared(nullptr, node); auto accumulate = std::make_shared( - create, std::vector{sym_n}, advance); + create, std::vector{node.symbol}, advance); auto match = MakeScanAll(storage, symbol_table, "m", accumulate); EXPECT_EQ(advance ? 1 : 0, PullAll(match.op_, *dba, symbol_table)); }; diff --git a/tests/unit/query_plan_bag_semantics.cpp b/tests/unit/query_plan_bag_semantics.cpp index 21d808ac4..eac724e4c 100644 --- a/tests/unit/query_plan_bag_semantics.cpp +++ b/tests/unit/query_plan_bag_semantics.cpp @@ -93,8 +93,8 @@ TEST(QueryPlan, CreateLimit) { SymbolTable symbol_table; auto n = MakeScanAll(storage, symbol_table, "n1"); - auto m = NODE("m"); - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = symbol_table.CreateSymbol("m", true); auto c = std::make_shared(n.op_, m); auto skip = std::make_shared(c, LITERAL(1)); diff --git a/tests/unit/query_plan_create_set_remove_delete.cpp b/tests/unit/query_plan_create_set_remove_delete.cpp index dae6eb80f..ea95c5f37 100644 --- a/tests/unit/query_plan_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_create_set_remove_delete.cpp @@ -27,10 +27,10 @@ TEST(QueryPlan, CreateNodeWithAttributes) { AstStorage storage; SymbolTable symbol_table; - auto node = NODE("n"); - symbol_table[*node->identifier_] = symbol_table.CreateSymbol("n", true); - node->labels_.emplace_back(label); - node->properties_[property] = LITERAL(42); + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); + node.labels.emplace_back(label); + node.properties.emplace_back(property.second, LITERAL(42)); auto create = std::make_shared(nullptr, node); PullAll(create, dba, symbol_table); @@ -62,22 +62,21 @@ TEST(QueryPlan, CreateReturn) { AstStorage storage; SymbolTable symbol_table; - auto node = NODE("n"); - auto sym_n = symbol_table.CreateSymbol("n", true); - symbol_table[*node->identifier_] = sym_n; - node->labels_.emplace_back(label); - node->properties_[property] = LITERAL(42); + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); + node.labels.emplace_back(label); + node.properties.emplace_back(property.second, LITERAL(42)); auto create = std::make_shared(nullptr, node); auto named_expr_n = NEXPR("n", IDENT("n")); symbol_table[*named_expr_n] = symbol_table.CreateSymbol("named_expr_n", true); - symbol_table[*named_expr_n->expression_] = sym_n; + symbol_table[*named_expr_n->expression_] = node.symbol; auto prop_lookup = PROPERTY_LOOKUP("n", property); - symbol_table[*prop_lookup->expression_] = sym_n; + symbol_table[*prop_lookup->expression_] = node.symbol; auto named_expr_n_p = NEXPR("n", prop_lookup); symbol_table[*named_expr_n_p] = symbol_table.CreateSymbol("named_expr_n_p", true); - symbol_table[*named_expr_n->expression_] = sym_n; + symbol_table[*named_expr_n->expression_] = node.symbol; auto produce = MakeProduce(create, named_expr_n, named_expr_n_p); auto results = CollectProduce(produce.get(), symbol_table, dba); @@ -112,29 +111,25 @@ TEST(QueryPlan, CreateExpand) { int before_e = CountIterable(dba.Edges(false)); // data for the first node - auto n = NODE("n"); - n->labels_.emplace_back(label_node_1); - n->properties_[property] = LITERAL(1); - auto n_sym = symbol_table.CreateSymbol("n", true); - symbol_table[*n->identifier_] = n_sym; + NodeCreationInfo n; + n.symbol = symbol_table.CreateSymbol("n", true); + n.labels.emplace_back(label_node_1); + n.properties.emplace_back(property.second, LITERAL(1)); // data for the second node - auto m = NODE("m"); - m->labels_.emplace_back(label_node_2); - m->properties_[property] = LITERAL(2); - if (cycle) - symbol_table[*m->identifier_] = n_sym; - else - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = cycle ? n.symbol : symbol_table.CreateSymbol("m", true); + m.labels.emplace_back(label_node_2); + m.properties.emplace_back(property.second, LITERAL(2)); - auto r = EDGE("r", EdgeAtom::Direction::OUT); - symbol_table[*r->identifier_] = symbol_table.CreateSymbol("r", true); - r->edge_types_.emplace_back(edge_type); - r->properties_[property] = LITERAL(3); + EdgeCreationInfo r; + r.symbol = symbol_table.CreateSymbol("r", true); + r.edge_type = edge_type; + r.properties.emplace_back(property.second, LITERAL(3)); auto create_op = std::make_shared(nullptr, n); auto create_expand = - std::make_shared(m, r, create_op, n_sym, cycle); + std::make_shared(m, r, create_op, n.symbol, cycle); PullAll(create_expand, dba, symbol_table); dba.AdvanceCommand(); @@ -184,8 +179,8 @@ TEST(QueryPlan, MatchCreateNode) { // first node auto n_scan_all = MakeScanAll(storage, symbol_table, "n"); // second node - auto m = NODE("m"); - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = symbol_table.CreateSymbol("m", true); // creation op auto create_node = std::make_shared(n_scan_all.op_, m); @@ -222,15 +217,13 @@ TEST(QueryPlan, MatchCreateExpand) { auto n_scan_all = MakeScanAll(storage, symbol_table, "n"); // data for the second node - auto m = NODE("m"); - if (cycle) - symbol_table[*m->identifier_] = n_scan_all.sym_; - else - symbol_table[*m->identifier_] = symbol_table.CreateSymbol("m", true); + NodeCreationInfo m; + m.symbol = cycle ? n_scan_all.sym_ : symbol_table.CreateSymbol("m", true); - auto r = EDGE("r", EdgeAtom::Direction::OUT); - symbol_table[*r->identifier_] = symbol_table.CreateSymbol("r", true); - r->edge_types_.emplace_back(edge_type); + EdgeCreationInfo r; + r.symbol = symbol_table.CreateSymbol("r", true); + r.direction = EdgeAtom::Direction::OUT; + r.edge_type = edge_type; auto create_expand = std::make_shared(m, r, n_scan_all.op_, n_scan_all.sym_, cycle); @@ -855,9 +848,8 @@ TEST(QueryPlan, MergeNoInput) { AstStorage storage; SymbolTable symbol_table; - auto node = NODE("n"); - auto sym_n = symbol_table.CreateSymbol("n", true); - symbol_table[*node->identifier_] = sym_n; + NodeCreationInfo node; + node.symbol = symbol_table.CreateSymbol("n", true); auto create = std::make_shared(nullptr, node); auto merge = std::make_shared(nullptr, create, create);