From 50b2646765027f068d3c77c57786bf968f7e4c82 Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Mon, 29 Oct 2018 14:26:47 +0100 Subject: [PATCH] Compose ExpandCommon instead of inheriting from it Reviewers: teon.banek, llugovic Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1703 --- src/query/plan/distributed.cpp | 36 ++- src/query/plan/distributed_ops.cpp | 136 ++++++---- src/query/plan/distributed_ops.lcp | 45 +++- src/query/plan/distributed_pretty_print.cpp | 16 +- src/query/plan/operator.cpp | 238 +++++++++--------- src/query/plan/operator.lcp | 179 ++++++------- src/query/plan/pretty_print.cpp | 26 +- src/query/plan/pretty_print.hpp | 2 - src/query/plan/rule_based_planner.hpp | 10 +- tests/unit/bfs_distributed.cpp | 2 +- tests/unit/bfs_single_node.cpp | 14 +- tests/unit/distributed_query_plan.cpp | 4 +- tests/unit/query_cost_estimator.cpp | 10 +- tests/unit/query_plan_common.hpp | 6 +- tests/unit/query_plan_match_filter_return.cpp | 32 +-- 15 files changed, 394 insertions(+), 362 deletions(-) diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index c72aa5da4..e62e8375f 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -157,9 +157,8 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { branch_.depends_on = lower_depends; if (upper_depends || !scan.upper_bound_) { // Cases 2) and 1.b) - new_scan = - std::make_shared(scan.input(), scan.output_symbol_, - scan.label_, scan.graph_view_); + new_scan = std::make_shared( + scan.input(), scan.output_symbol_, scan.label_, scan.graph_view_); } else { // Case 1.a) new_scan = std::make_shared( @@ -193,13 +192,11 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { // Case 1.a) new_scan = std::make_shared( scan.input(), scan.output_symbol_, scan.label_, scan.property_, - scan.lower_bound_, std::experimental::nullopt, - scan.graph_view_); + scan.lower_bound_, std::experimental::nullopt, scan.graph_view_); } else { // Case 1.b) new_scan = std::make_shared( - scan.input(), scan.output_symbol_, scan.label_, - scan.graph_view_); + scan.input(), scan.output_symbol_, scan.label_, scan.graph_view_); } } } @@ -259,12 +256,12 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { if (auto found = FindForbidden(exp.input_symbol_)) { SetBranch(exp.input(), &exp, *found); } - if (exp.existing_node_) { - if (auto found = FindForbidden(exp.node_symbol_)) { + if (exp.common_.existing_node) { + if (auto found = FindForbidden(exp.common_.node_symbol)) { SetBranch(exp.input(), &exp, *found); } } - CHECK(!FindForbidden(exp.edge_symbol_)) + CHECK(!FindForbidden(exp.common_.edge_symbol)) << "Expand uses an already used edge symbol."; return true; } @@ -279,12 +276,12 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { if (auto found = FindForbidden(exp.input_symbol_)) { SetBranch(exp.input(), &exp, *found); } - if (exp.existing_node_) { - if (auto found = FindForbidden(exp.node_symbol_)) { + if (exp.common_.existing_node) { + if (auto found = FindForbidden(exp.common_.node_symbol)) { SetBranch(exp.input(), &exp, *found); } } - CHECK(!FindForbidden(exp.edge_symbol_)) + CHECK(!FindForbidden(exp.common_.edge_symbol)) << "Expand uses an already used edge symbol."; // Check for bounding expressions. if (exp.lower_bound_) { @@ -331,12 +328,12 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { if (auto found = FindForbidden(exp.input_symbol_)) { SetBranch(exp.input(), &exp, *found); } - if (exp.existing_node_) { - if (auto found = FindForbidden(exp.node_symbol_)) { + if (exp.common_.existing_node) { + if (auto found = FindForbidden(exp.common_.node_symbol)) { SetBranch(exp.input(), &exp, *found); } } - CHECK(!FindForbidden(exp.edge_symbol_)) + CHECK(!FindForbidden(exp.common_.edge_symbol)) << "Expand uses an already used edge symbol."; // Check for bounding expressions. if (exp.lower_bound_) { @@ -959,8 +956,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { bool PostVisit(Expand &exp) override { prev_ops_.pop_back(); auto distributed_expand = std::make_unique( - exp.node_symbol_, exp.edge_symbol_, exp.direction_, exp.edge_types_, - exp.input(), exp.input_symbol_, exp.existing_node_, exp.graph_view_); + exp.input(), exp.input_symbol_, exp.common_); SetOnPrevious(std::move(distributed_expand)); return true; } @@ -973,9 +969,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { prev_ops_.pop_back(); if (exp.type_ == EdgeAtom::Type::BREADTH_FIRST) { auto distributed_bfs = std::make_unique( - exp.node_symbol_, exp.edge_symbol_, exp.direction_, - exp.edge_types_, exp.input(), exp.input_symbol_, - exp.existing_node_, exp.graph_view_, exp.lower_bound_, + exp.input(), exp.input_symbol_, exp.common_, exp.lower_bound_, exp.upper_bound_, exp.filter_lambda_); SetOnPrevious(std::move(distributed_bfs)); } diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index af029a9fd..edcebfe60 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -107,24 +107,54 @@ std::vector PullRemoteOrderBy::ModifiedSymbols( return input_->ModifiedSymbols(table); } +DistributedExpand::DistributedExpand( + const std::shared_ptr &input, Symbol input_symbol, + Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction, + const std::vector &edge_types, bool existing_node, + GraphView graph_view) + : input_(input ? input : std::make_shared()), + input_symbol_(input_symbol), + common_{node_symbol, edge_symbol, direction, + edge_types, existing_node, graph_view} {} + +DistributedExpand::DistributedExpand( + const std::shared_ptr &input, Symbol input_symbol, + const ExpandCommon &common) + : input_(input ? input : std::make_shared()), + input_symbol_(input_symbol), + common_(common) {} + ACCEPT_WITH_INPUT(DistributedExpand); std::vector DistributedExpand::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(node_symbol_); - symbols.emplace_back(edge_symbol_); + symbols.emplace_back(common_.node_symbol); + symbols.emplace_back(common_.edge_symbol); return symbols; } DistributedExpandBfs::DistributedExpandBfs( - Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction, - const std::vector &edge_types, const std::shared_ptr &input, Symbol input_symbol, - bool existing_node, GraphView graph_view, Expression *lower_bound, + Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction, + const std::vector &edge_types, bool existing_node, + GraphView graph_view, Expression *lower_bound, Expression *upper_bound, + const ExpansionLambda &filter_lambda) + : input_(input ? input : std::make_shared()), + input_symbol_(input_symbol), + common_{node_symbol, edge_symbol, direction, + edge_types, existing_node, graph_view}, + lower_bound_(lower_bound), + upper_bound_(upper_bound), + filter_lambda_(filter_lambda) {} + +DistributedExpandBfs::DistributedExpandBfs( + const std::shared_ptr &input, Symbol input_symbol, + const ExpandCommon &common, Expression *lower_bound, Expression *upper_bound, const ExpansionLambda &filter_lambda) - : ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input, - input_symbol, existing_node, graph_view), + : input_(input ? input : std::make_shared()), + input_symbol_(input_symbol), + common_(common), lower_bound_(lower_bound), upper_bound_(upper_bound), filter_lambda_(filter_lambda) {} @@ -134,8 +164,8 @@ ACCEPT_WITH_INPUT(DistributedExpandBfs); std::vector DistributedExpandBfs::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(node_symbol_); - symbols.emplace_back(edge_symbol_); + symbols.emplace_back(common_.node_symbol); + symbols.emplace_back(common_.edge_symbol); return symbols; } @@ -794,13 +824,13 @@ class DistributedExpandCursor : public query::plan::Cursor { // A helper function for expanding a node from an edge. auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) { - if (self_->existing_node_) return; + if (self_->common_.existing_node) return; switch (direction) { case EdgeAtom::Direction::IN: - frame[self_->node_symbol_] = new_edge.from(); + frame[self_->common_.node_symbol] = new_edge.from(); break; case EdgeAtom::Direction::OUT: - frame[self_->node_symbol_] = new_edge.to(); + frame[self_->common_.node_symbol] = new_edge.to(); break; case EdgeAtom::Direction::BOTH: LOG(FATAL) << "Must indicate exact expansion direction here"; @@ -828,8 +858,8 @@ class DistributedExpandCursor : public query::plan::Cursor { auto put_future_edge_on_frame = [this, &frame](auto &future) { auto edge_to = future.edge_to.get(); frame.elems() = future.frame_elems; - frame[self_->edge_symbol_] = edge_to.first; - frame[self_->node_symbol_] = edge_to.second; + frame[self_->common_.edge_symbol] = edge_to.first; + frame[self_->common_.node_symbol] = edge_to.second; }; while (true) { @@ -858,8 +888,8 @@ class DistributedExpandCursor : public query::plan::Cursor { // attempt to get a value from the incoming edges if (in_edges_ && *in_edges_it_ != in_edges_->end()) { auto edge = *(*in_edges_it_)++; - if (edge.address().is_local() || self_->existing_node_) { - frame[self_->edge_symbol_] = edge; + if (edge.address().is_local() || self_->common_.existing_node) { + frame[self_->common_.edge_symbol] = edge; pull_node(edge, EdgeAtom::Direction::IN); return true; } else { @@ -874,10 +904,11 @@ class DistributedExpandCursor : public query::plan::Cursor { // when expanding in EdgeAtom::Direction::BOTH directions // we should do only one expansion for cycles, and it was // already done in the block above - if (self_->direction_ == EdgeAtom::Direction::BOTH && edge.is_cycle()) + if (self_->common_.direction == EdgeAtom::Direction::BOTH && + edge.is_cycle()) continue; - if (edge.address().is_local() || self_->existing_node_) { - frame[self_->edge_symbol_] = edge; + if (edge.address().is_local() || self_->common_.existing_node) { + frame[self_->common_.edge_symbol] = edge; pull_node(edge, EdgeAtom::Direction::OUT); return true; } else { @@ -947,22 +978,22 @@ class DistributedExpandCursor : public query::plan::Cursor { ExpectType(self_->input_symbol_, vertex_value, TypedValue::Type::Vertex); auto &vertex = vertex_value.Value(); - SwitchAccessor(vertex, self_->graph_view_); + SwitchAccessor(vertex, self_->common_.graph_view); - auto direction = self_->direction_; + auto direction = self_->common_.direction; if (direction == EdgeAtom::Direction::IN || direction == EdgeAtom::Direction::BOTH) { - if (self_->existing_node_) { - TypedValue &existing_node = frame[self_->node_symbol_]; + if (self_->common_.existing_node) { + TypedValue &existing_node = frame[self_->common_.node_symbol]; // old_node_value may be Null when using optional matching if (!existing_node.IsNull()) { - ExpectType(self_->node_symbol_, existing_node, + ExpectType(self_->common_.node_symbol, existing_node, TypedValue::Type::Vertex); - in_edges_.emplace( - vertex.in(existing_node.ValueVertex(), &self_->edge_types_)); + in_edges_.emplace(vertex.in(existing_node.ValueVertex(), + &self_->common_.edge_types)); } } else { - in_edges_.emplace(vertex.in(&self_->edge_types_)); + in_edges_.emplace(vertex.in(&self_->common_.edge_types)); } if (in_edges_) { in_edges_it_.emplace(in_edges_->begin()); @@ -971,17 +1002,17 @@ class DistributedExpandCursor : public query::plan::Cursor { if (direction == EdgeAtom::Direction::OUT || direction == EdgeAtom::Direction::BOTH) { - if (self_->existing_node_) { - TypedValue &existing_node = frame[self_->node_symbol_]; + if (self_->common_.existing_node) { + TypedValue &existing_node = frame[self_->common_.node_symbol]; // old_node_value may be Null when using optional matching if (!existing_node.IsNull()) { - ExpectType(self_->node_symbol_, existing_node, + ExpectType(self_->common_.node_symbol, existing_node, TypedValue::Type::Vertex); - out_edges_.emplace( - vertex.out(existing_node.ValueVertex(), &self_->edge_types_)); + out_edges_.emplace(vertex.out(existing_node.ValueVertex(), + &self_->common_.edge_types)); } } else { - out_edges_.emplace(vertex.out(&self_->edge_types_)); + out_edges_.emplace(vertex.out(&self_->common_.edge_types)); } if (out_edges_) { out_edges_it_.emplace(out_edges_->begin()); @@ -993,6 +1024,12 @@ class DistributedExpandCursor : public query::plan::Cursor { } private: + using InEdgeT = decltype(std::declval().in()); + using InEdgeIteratorT = decltype(std::declval().in().begin()); + using OutEdgeT = decltype(std::declval().out()); + using OutEdgeIteratorT = + decltype(std::declval().out().begin()); + struct FutureExpand { utils::Future> edge_to; std::vector frame_elems; @@ -1003,11 +1040,10 @@ class DistributedExpandCursor : public query::plan::Cursor { // The iterable over edges and the current edge iterator are referenced via // optional because they can not be initialized in the constructor of // this class. They are initialized once for each pull from the input. - std::experimental::optional in_edges_; - std::experimental::optional in_edges_it_; - std::experimental::optional out_edges_; - std::experimental::optional - out_edges_it_; + std::experimental::optional in_edges_; + std::experimental::optional in_edges_it_; + std::experimental::optional out_edges_; + std::experimental::optional out_edges_it_; // Stores the last frame before we yield the frame for future edge. It needs // to be restored afterward. std::vector last_frame_; @@ -1022,7 +1058,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { DistributedExpandBfsCursor(const DistributedExpandBfs &self, database::GraphDbAccessor &db) : self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db)) { - CHECK(self_.graph_view_ == GraphView::OLD) + CHECK(self_.common_.graph_view == GraphView::OLD) << "ExpandVariable should only be planned with GraphView::OLD"; } @@ -1041,8 +1077,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { } CHECK(bfs_subcursor_clients_); subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors( - dba, self_.direction_, self_.edge_types_, self_.filter_lambda_, - symbol_table, evaluation_context); + dba, self_.common_.direction, self_.common_.edge_types, + self_.filter_lambda_, symbol_table, evaluation_context); bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_); VLOG(10) << "BFS subcursors initialized"; pull_pos_ = subcursor_ids_.end(); @@ -1056,9 +1092,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { } // Evaluator for the filtering condition and expansion depth. - ExpressionEvaluator evaluator(&frame, context.symbol_table_, - context.evaluation_context_, - &context.db_accessor_, self_.graph_view_); + ExpressionEvaluator evaluator( + &frame, context.symbol_table_, context.evaluation_context_, + &context.db_accessor_, self_.common_.graph_view); while (true) { if (context.db_accessor_.should_abort()) throw HintedAbortError(); @@ -1079,14 +1115,14 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { if (last_vertex.IsVertex()) { // Handle existence flag - if (self_.existing_node_) { - TypedValue &node = frame[self_.node_symbol_]; + if (self_.common_.existing_node) { + TypedValue &node = frame[self_.common_.node_symbol]; if ((node != last_vertex).ValueBool()) continue; // There is no point in traversing the rest of the graph because BFS // can find only one path to a certain node. skip_rest_ = true; } else { - frame[self_.node_symbol_] = last_vertex; + frame[self_.common_.node_symbol] = last_vertex; } VLOG(10) << "Expanded to vertex: " << last_vertex; @@ -1120,7 +1156,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { if (!current_vertex_addr && !current_edge_addr) break; } std::reverse(edges.begin(), edges.end()); - frame[self_.edge_symbol_] = std::move(edges); + frame[self_.common_.edge_symbol] = std::move(edges); return true; } @@ -1146,7 +1182,9 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { // Source or sink node could be null due to optional matching. if (vertex_value.IsNull()) continue; - if (self_.existing_node_ && frame[self_.node_symbol_].IsNull()) continue; + if (self_.common_.existing_node && + frame[self_.common_.node_symbol].IsNull()) + continue; auto vertex = vertex_value.ValueVertex(); lower_bound_ = self_.lower_bound_ diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp index b66f3065f..ef939d0f8 100644 --- a/src/query/plan/distributed_ops.lcp +++ b/src/query/plan/distributed_ops.lcp @@ -190,12 +190,23 @@ by having only one result from each worker.") cpp<#) (:serialize :capnp)) -(lcp:define-class distributed-expand (logical-operator expand-common) - () +(lcp:define-class distributed-expand (logical-operator) + ((input "std::shared_ptr" :scope :public + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (input-symbol "Symbol" :scope :public) + (common "ExpandCommon" :scope :public)) (:documentation "Distributed version of Expand operator") (:public #>cpp - using ExpandCommon::ExpandCommon; + DistributedExpand() {} + DistributedExpand(const std::shared_ptr &input, + Symbol input_symbol, Symbol node_symbol, Symbol edge_symbol, + EdgeAtom::Direction direction, + const std::vector &edge_types, + bool existing_node, GraphView graph_view); + DistributedExpand(const std::shared_ptr &input, + Symbol input_symbol, const ExpandCommon &common); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( @@ -208,10 +219,15 @@ by having only one result from each worker.") input_ = input; } cpp<#) - (:serialize :capnp :inherit-compose '(expand-common))) + (:serialize :capnp)) -(lcp:define-class distributed-expand-bfs (logical-operator expand-common) - ((lower-bound "Expression *" :scope :public +(lcp:define-class distributed-expand-bfs (logical-operator) + ((input "std::shared_ptr" :scope :public + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (input-symbol "Symbol" :scope :public) + (common "ExpandCommon" :scope :public) + (lower-bound "Expression *" :scope :public :documentation "Optional lower bound, default is 1" :capnp-type "Ast.Tree" :capnp-init nil :capnp-save #'save-ast-pointer @@ -236,13 +252,16 @@ by having only one result from each worker.") (:public #>cpp DistributedExpandBfs() {} - DistributedExpandBfs(Symbol node_symbol, Symbol edge_symbol, - EdgeAtom::Direction direction, + DistributedExpandBfs(const std::shared_ptr &input, + Symbol input_symbol, Symbol node_symbol, + Symbol edge_symbol, EdgeAtom::Direction direction, const std::vector &edge_types, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node, - GraphView graph_view, Expression *lower_bound, - Expression *upper_bound, + bool existing_node, GraphView graph_view, + Expression *lower_bound, Expression *upper_bound, + const ExpansionLambda &filter_lambda); + DistributedExpandBfs(const std::shared_ptr &input, + Symbol input_symbol, const ExpandCommon &common, + Expression *lower_bound, Expression *upper_bound, const ExpansionLambda &filter_lambda); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; @@ -256,7 +275,7 @@ by having only one result from each worker.") input_ = input; } cpp<#) - (:serialize :capnp :inherit-compose '(expand-common))) + (:serialize :capnp)) (lcp:define-class distributed-create-node (logical-operator) ((input "std::shared_ptr" :scope :public diff --git a/src/query/plan/distributed_pretty_print.cpp b/src/query/plan/distributed_pretty_print.cpp index db092e49a..3e5208ee5 100644 --- a/src/query/plan/distributed_pretty_print.cpp +++ b/src/query/plan/distributed_pretty_print.cpp @@ -4,16 +4,24 @@ namespace query::plan { bool DistributedPlanPrinter::PreVisit(query::plan::DistributedExpand &op) { WithPrintLn([&](auto &out) { - out << "* DistributedExpand"; - PrintExpand(op); + out << "* DistributedExpand (" << op.input_symbol_.name() << ")" + << (op.common_.direction == query::EdgeAtom::Direction::IN ? "<-" : "-") + << "[" << op.common_.edge_symbol.name() << "]" + << (op.common_.direction == query::EdgeAtom::Direction::OUT ? "->" + : "-") + << "(" << op.common_.node_symbol.name() << ")"; }); return true; } bool DistributedPlanPrinter::PreVisit(query::plan::DistributedExpandBfs &op) { WithPrintLn([&](auto &out) { - out << "* DistributedExpandBfs"; - PrintExpand(op); + out << "* DistributedExpandBfs (" << op.input_symbol_.name() << ")" + << (op.common_.direction == query::EdgeAtom::Direction::IN ? "<-" : "-") + << "[" << op.common_.edge_symbol.name() << "]" + << (op.common_.direction == query::EdgeAtom::Direction::OUT ? "->" + : "-") + << "(" << op.common_.node_symbol.name() << ")"; }); return true; } diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 21ed347ef..7f88ecfeb 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -414,34 +414,25 @@ std::unique_ptr ScanAllByLabelPropertyValue::MakeCursor( output_symbol_, input_->MakeCursor(db), std::move(vertices), db); } -ExpandCommon::ExpandCommon(Symbol node_symbol, Symbol edge_symbol, - EdgeAtom::Direction direction, - const std::vector &edge_types, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node, - GraphView graph_view) - : node_symbol_(node_symbol), - edge_symbol_(edge_symbol), - direction_(direction), - edge_types_(edge_types), - input_(input ? input : std::make_shared()), - input_symbol_(input_symbol), - existing_node_(existing_node), - graph_view_(graph_view) {} - -bool ExpandCommon::HandleExistingNode(const VertexAccessor &new_node, - Frame &frame) const { - if (existing_node_) { - TypedValue &old_node_value = frame[node_symbol_]; - // old_node_value may be Null when using optional matching - if (old_node_value.IsNull()) return false; - ExpectType(node_symbol_, old_node_value, TypedValue::Type::Vertex); - return old_node_value.Value() == new_node; - } else { - frame[node_symbol_] = new_node; - return true; - } +namespace { +bool CheckExistingNode(const VertexAccessor &new_node, + const Symbol &existing_node_sym, Frame &frame) { + const TypedValue &existing_node = frame[existing_node_sym]; + if (existing_node.IsNull()) return false; + ExpectType(existing_node_sym, existing_node, TypedValue::Type::Vertex); + return existing_node.ValueVertex() != new_node; } +} // namespace + +Expand::Expand(const std::shared_ptr &input, + Symbol input_symbol, Symbol node_symbol, Symbol edge_symbol, + EdgeAtom::Direction direction, + const std::vector &edge_types, + bool existing_node, GraphView graph_view) + : input_(input ? input : std::make_shared()), + input_symbol_(input_symbol), + common_{node_symbol, edge_symbol, direction, + edge_types, existing_node, graph_view} {} ACCEPT_WITH_INPUT(Expand) @@ -452,8 +443,8 @@ std::unique_ptr Expand::MakeCursor( std::vector Expand::ModifiedSymbols(const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(node_symbol_); - symbols.emplace_back(edge_symbol_); + symbols.emplace_back(common_.node_symbol); + symbols.emplace_back(common_.edge_symbol); return symbols; } @@ -465,13 +456,13 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) { // A helper function for expanding a node from an edge. auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) { - if (self_.existing_node_) return; + if (self_.common_.existing_node) return; switch (direction) { case EdgeAtom::Direction::IN: - frame[self_.node_symbol_] = new_edge.from(); + frame[self_.common_.node_symbol] = new_edge.from(); break; case EdgeAtom::Direction::OUT: - frame[self_.node_symbol_] = new_edge.to(); + frame[self_.common_.node_symbol] = new_edge.to(); break; case EdgeAtom::Direction::BOTH: LOG(FATAL) << "Must indicate exact expansion direction here"; @@ -483,7 +474,7 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) { // attempt to get a value from the incoming edges if (in_edges_ && *in_edges_it_ != in_edges_->end()) { auto edge = *(*in_edges_it_)++; - frame[self_.edge_symbol_] = edge; + frame[self_.common_.edge_symbol] = edge; pull_node(edge, EdgeAtom::Direction::IN); return true; } @@ -494,9 +485,10 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) { // when expanding in EdgeAtom::Direction::BOTH directions // we should do only one expansion for cycles, and it was // already done in the block above - if (self_.direction_ == EdgeAtom::Direction::BOTH && edge.is_cycle()) + if (self_.common_.direction == EdgeAtom::Direction::BOTH && + edge.is_cycle()) continue; - frame[self_.edge_symbol_] = edge; + frame[self_.common_.edge_symbol] = edge; pull_node(edge, EdgeAtom::Direction::OUT); return true; } @@ -531,22 +523,22 @@ bool Expand::ExpandCursor::InitEdges(Frame &frame, Context &context) { ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); auto &vertex = vertex_value.Value(); - SwitchAccessor(vertex, self_.graph_view_); + SwitchAccessor(vertex, self_.common_.graph_view); - auto direction = self_.direction_; + auto direction = self_.common_.direction; if (direction == EdgeAtom::Direction::IN || direction == EdgeAtom::Direction::BOTH) { - if (self_.existing_node_) { - TypedValue &existing_node = frame[self_.node_symbol_]; + if (self_.common_.existing_node) { + TypedValue &existing_node = frame[self_.common_.node_symbol]; // old_node_value may be Null when using optional matching if (!existing_node.IsNull()) { - ExpectType(self_.node_symbol_, existing_node, + ExpectType(self_.common_.node_symbol, existing_node, TypedValue::Type::Vertex); - in_edges_.emplace( - vertex.in(existing_node.ValueVertex(), &self_.edge_types_)); + in_edges_.emplace(vertex.in(existing_node.ValueVertex(), + &self_.common_.edge_types)); } } else { - in_edges_.emplace(vertex.in(&self_.edge_types_)); + in_edges_.emplace(vertex.in(&self_.common_.edge_types)); } if (in_edges_) { in_edges_it_.emplace(in_edges_->begin()); @@ -555,17 +547,17 @@ bool Expand::ExpandCursor::InitEdges(Frame &frame, Context &context) { if (direction == EdgeAtom::Direction::OUT || direction == EdgeAtom::Direction::BOTH) { - if (self_.existing_node_) { - TypedValue &existing_node = frame[self_.node_symbol_]; + if (self_.common_.existing_node) { + TypedValue &existing_node = frame[self_.common_.node_symbol]; // old_node_value may be Null when using optional matching if (!existing_node.IsNull()) { - ExpectType(self_.node_symbol_, existing_node, + ExpectType(self_.common_.node_symbol, existing_node, TypedValue::Type::Vertex); - out_edges_.emplace( - vertex.out(existing_node.ValueVertex(), &self_.edge_types_)); + out_edges_.emplace(vertex.out(existing_node.ValueVertex(), + &self_.common_.edge_types)); } } else { - out_edges_.emplace(vertex.out(&self_.edge_types_)); + out_edges_.emplace(vertex.out(&self_.common_.edge_types)); } if (out_edges_) { out_edges_it_.emplace(out_edges_->begin()); @@ -577,16 +569,18 @@ bool Expand::ExpandCursor::InitEdges(Frame &frame, Context &context) { } ExpandVariable::ExpandVariable( + const std::shared_ptr &input, Symbol input_symbol, Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Type type, EdgeAtom::Direction direction, const std::vector &edge_types, bool is_reverse, - Expression *lower_bound, Expression *upper_bound, - const std::shared_ptr &input, Symbol input_symbol, - bool existing_node, ExpansionLambda filter_lambda, + Expression *lower_bound, Expression *upper_bound, bool existing_node, + ExpansionLambda filter_lambda, std::experimental::optional weight_lambda, std::experimental::optional total_weight, GraphView graph_view) - : ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input, - input_symbol, existing_node, graph_view), + : input_(input ? input : std::make_shared()), + input_symbol_(input_symbol), + common_{node_symbol, edge_symbol, direction, + edge_types, existing_node, graph_view}, type_(type), is_reverse_(is_reverse), lower_bound_(lower_bound), @@ -608,8 +602,8 @@ ACCEPT_WITH_INPUT(ExpandVariable) std::vector ExpandVariable::ModifiedSymbols( const SymbolTable &table) const { auto symbols = input_->ModifiedSymbols(table); - symbols.emplace_back(node_symbol_); - symbols.emplace_back(edge_symbol_); + symbols.emplace_back(common_.node_symbol); + symbols.emplace_back(common_.edge_symbol); return symbols; } @@ -666,9 +660,9 @@ class ExpandVariableCursor : public Cursor { : self_(self), input_cursor_(self.input_->MakeCursor(db)) {} bool Pull(Frame &frame, Context &context) override { - ExpressionEvaluator evaluator(&frame, context.symbol_table_, - context.evaluation_context_, - &context.db_accessor_, self_.graph_view_); + ExpressionEvaluator evaluator( + &frame, context.symbol_table_, context.evaluation_context_, + &context.db_accessor_, self_.common_.graph_view); while (true) { if (Expand(frame, context)) return true; @@ -677,7 +671,10 @@ class ExpandVariableCursor : public Cursor { if (lower_bound_ == 0) { auto &start_vertex = frame[self_.input_symbol_].Value(); - if (self_.HandleExistingNode(start_vertex, frame)) { + if (!self_.common_.existing_node || + CheckExistingNode(start_vertex, self_.common_.node_symbol, + frame)) { + frame[self_.common_.node_symbol] = start_vertex; return true; } } @@ -712,7 +709,7 @@ class ExpandVariableCursor : public Cursor { // the expansion currently being Pulled std::vector(), EdgeAtom::Direction::IN, - self_.edge_types_))> + self_.common_.edge_types))> edges_; // an iterator indicating the possition in the corresponding edges_ @@ -740,13 +737,14 @@ class ExpandVariableCursor : public Cursor { ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); auto &vertex = vertex_value.Value(); - SwitchAccessor(vertex, self_.graph_view_); + SwitchAccessor(vertex, self_.common_.graph_view); // Evaluate the upper and lower bounds. ExpressionEvaluator evaluator(&frame, context.symbol_table_, context.evaluation_context_, - &context.db_accessor_, self_.graph_view_); + &context.db_accessor_, + self_.common_.graph_view); auto calc_bound = [&evaluator](auto &bound) { auto value = EvaluateInt(&evaluator, bound, "Variable expansion bound"); if (value < 0) @@ -760,14 +758,14 @@ class ExpandVariableCursor : public Cursor { : std::numeric_limits::max(); if (upper_bound_ > 0) { - SwitchAccessor(vertex, self_.graph_view_); - edges_.emplace_back( - ExpandFromVertex(vertex, self_.direction_, self_.edge_types_)); + SwitchAccessor(vertex, self_.common_.graph_view); + edges_.emplace_back(ExpandFromVertex(vertex, self_.common_.direction, + self_.common_.edge_types)); edges_it_.emplace_back(edges_.back().begin()); } // reset the frame value to an empty edge list - frame[self_.edge_symbol_] = std::vector(); + frame[self_.common_.edge_symbol] = std::vector(); return true; } @@ -805,9 +803,9 @@ class ExpandVariableCursor : public Cursor { * vertex and another Pull from the input cursor should be performed. */ bool Expand(Frame &frame, Context &context) { - ExpressionEvaluator evaluator(&frame, context.symbol_table_, - context.evaluation_context_, - &context.db_accessor_, self_.graph_view_); + ExpressionEvaluator evaluator( + &frame, context.symbol_table_, context.evaluation_context_, + &context.db_accessor_, self_.common_.graph_view); // Some expansions might not be valid due to edge uniqueness and // existing_node criterions, so expand in a loop until either the input // vertex is exhausted or a valid variable-length expansion is available. @@ -825,7 +823,7 @@ class ExpandVariableCursor : public Cursor { // we use this a lot std::vector &edges_on_frame = - frame[self_.edge_symbol_].Value>(); + frame[self_.common_.edge_symbol].Value>(); // it is possible that edges_on_frame does not contain as many // elements as edges_ due to edge-uniqueness (when a whole layer @@ -861,7 +859,11 @@ class ExpandVariableCursor : public Cursor { ? current_edge.first.from() : current_edge.first.to(); - if (!self_.HandleExistingNode(current_vertex, frame)) continue; + if (self_.common_.existing_node && + !CheckExistingNode(current_vertex, self_.common_.node_symbol, frame)) + continue; + + frame[self_.common_.node_symbol] = current_vertex; // Skip expanding out of filtered expansion. frame[self_.filter_lambda_.inner_edge_symbol] = current_edge.first; @@ -873,9 +875,9 @@ class ExpandVariableCursor : public Cursor { // we are doing depth-first search, so place the current // edge's expansions onto the stack, if we should continue to expand if (upper_bound_ > static_cast(edges_.size())) { - SwitchAccessor(current_vertex, self_.graph_view_); - edges_.emplace_back(ExpandFromVertex(current_vertex, self_.direction_, - self_.edge_types_)); + SwitchAccessor(current_vertex, self_.common_.graph_view); + edges_.emplace_back(ExpandFromVertex( + current_vertex, self_.common_.direction, self_.common_.edge_types)); edges_it_.emplace_back(edges_.back().begin()); } @@ -893,11 +895,12 @@ class STShortestPathCursor : public query::plan::Cursor { STShortestPathCursor(const ExpandVariable &self, database::GraphDbAccessor &dba) : self_(self), input_cursor_(self_.input()->MakeCursor(dba)) { - CHECK(self_.graph_view_ == GraphView::OLD) + CHECK(self_.common_.graph_view == GraphView::OLD) << "ExpandVariable should only be planned with GraphView::OLD"; - CHECK(self_.existing_node_) << "s-t shortest path algorithm should only " - "be used when `existing_node` flag is " - "set!"; + CHECK(self_.common_.existing_node) + << "s-t shortest path algorithm should only " + "be used when `existing_node` flag is " + "set!"; } bool Pull(Frame &frame, Context &context) override { @@ -906,7 +909,7 @@ class STShortestPathCursor : public query::plan::Cursor { &context.db_accessor_, GraphView::OLD); while (input_cursor_->Pull(frame, context)) { auto source_tv = frame[self_.input_symbol_]; - auto sink_tv = frame[self_.node_symbol_]; + auto sink_tv = frame[self_.common_.node_symbol]; // It is possible that source or sink vertex is Null due to optional // matching. @@ -969,7 +972,7 @@ class STShortestPathCursor : public query::plan::Cursor { last_edge->from_is(last_vertex) ? last_edge->to() : last_edge->from(); result.emplace_back(*last_edge); } - frame->at(self_.edge_symbol_) = std::move(result); + frame->at(self_.common_.edge_symbol) = std::move(result); } bool ShouldExpand(const VertexAccessor &vertex, const EdgeAccessor &edge, @@ -1029,8 +1032,8 @@ class STShortestPathCursor : public query::plan::Cursor { if (current_length > upper_bound) return false; for (const auto &vertex : source_frontier) { - if (self_.direction_ != EdgeAtom::Direction::IN) { - for (const auto &edge : vertex.out(&self_.edge_types_)) { + if (self_.common_.direction != EdgeAtom::Direction::IN) { + for (const auto &edge : vertex.out(&self_.common_.edge_types)) { if (ShouldExpand(edge.to(), edge, frame, evaluator) && !Contains(in_edge, edge.to())) { in_edge.emplace(edge.to(), edge); @@ -1046,8 +1049,8 @@ class STShortestPathCursor : public query::plan::Cursor { } } } - if (self_.direction_ != EdgeAtom::Direction::OUT) { - for (const auto &edge : vertex.in(&self_.edge_types_)) { + if (self_.common_.direction != EdgeAtom::Direction::OUT) { + for (const auto &edge : vertex.in(&self_.common_.edge_types)) { if (ShouldExpand(edge.from(), edge, frame, evaluator) && !Contains(in_edge, edge.from())) { in_edge.emplace(edge.from(), edge); @@ -1077,8 +1080,8 @@ class STShortestPathCursor : public query::plan::Cursor { // endpoint we pass to `should_expand`, because everything is // reversed. for (const auto &vertex : sink_frontier) { - if (self_.direction_ != EdgeAtom::Direction::OUT) { - for (const auto &edge : vertex.out(&self_.edge_types_)) { + if (self_.common_.direction != EdgeAtom::Direction::OUT) { + for (const auto &edge : vertex.out(&self_.common_.edge_types)) { if (ShouldExpand(vertex, edge, frame, evaluator) && !Contains(out_edge, edge.to())) { out_edge.emplace(edge.to(), edge); @@ -1094,8 +1097,8 @@ class STShortestPathCursor : public query::plan::Cursor { } } } - if (self_.direction_ != EdgeAtom::Direction::IN) { - for (const auto &edge : vertex.in(&self_.edge_types_)) { + if (self_.common_.direction != EdgeAtom::Direction::IN) { + for (const auto &edge : vertex.in(&self_.common_.edge_types)) { if (ShouldExpand(vertex, edge, frame, evaluator) && !Contains(out_edge, edge.from())) { out_edge.emplace(edge.from(), edge); @@ -1125,12 +1128,13 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { SingleSourceShortestPathCursor(const ExpandVariable &self, database::GraphDbAccessor &db) : self_(self), input_cursor_(self_.input()->MakeCursor(db)) { - CHECK(self_.graph_view_ == GraphView::OLD) + CHECK(self_.common_.graph_view == GraphView::OLD) << "ExpandVariable should only be planned with GraphView::OLD"; - CHECK(!self_.existing_node_) << "Single source shortest path algorithm " - "should not be used when `existing_node` " - "flag is set, s-t shortest path algorithm " - "should be used instead!"; + CHECK(!self_.common_.existing_node) + << "Single source shortest path algorithm " + "should not be used when `existing_node` " + "flag is set, s-t shortest path algorithm " + "should be used instead!"; } bool Pull(Frame &frame, Context &context) override { @@ -1169,12 +1173,12 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { // from the given vertex. skips expansions that don't satisfy // the "where" condition. auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex) { - if (self_.direction_ != EdgeAtom::Direction::IN) { - for (const EdgeAccessor &edge : vertex.out(&self_.edge_types_)) + if (self_.common_.direction != EdgeAtom::Direction::IN) { + for (const EdgeAccessor &edge : vertex.out(&self_.common_.edge_types)) expand_pair(edge, edge.to()); } - if (self_.direction_ != EdgeAtom::Direction::OUT) { - for (const EdgeAccessor &edge : vertex.in(&self_.edge_types_)) + if (self_.common_.direction != EdgeAtom::Direction::OUT) { + for (const EdgeAccessor &edge : vertex.in(&self_.common_.edge_types)) expand_pair(edge, edge.from()); } }; @@ -1241,11 +1245,11 @@ class SingleSourceShortestPathCursor : public query::plan::Cursor { if (static_cast(edge_list.size()) < lower_bound_) continue; - frame[self_.node_symbol_] = expansion.second; + frame[self_.common_.node_symbol] = expansion.second; // place edges on the frame in the correct order std::reverse(edge_list.begin(), edge_list.end()); - frame[self_.edge_symbol_] = std::move(edge_list); + frame[self_.common_.edge_symbol] = std::move(edge_list); return true; } @@ -1286,9 +1290,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { : self_(self), input_cursor_(self_.input_->MakeCursor(db)) {} bool Pull(Frame &frame, Context &context) override { - ExpressionEvaluator evaluator(&frame, context.symbol_table_, - context.evaluation_context_, - &context.db_accessor_, self_.graph_view_); + ExpressionEvaluator evaluator( + &frame, context.symbol_table_, context.evaluation_context_, + &context.db_accessor_, self_.common_.graph_view); auto create_state = [this](VertexAccessor vertex, int depth) { return std::make_pair(vertex, upper_bound_set_ ? depth : 0); }; @@ -1299,8 +1303,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { auto expand_pair = [this, &evaluator, &frame, &create_state]( EdgeAccessor edge, VertexAccessor vertex, double weight, int depth) { - SwitchAccessor(edge, self_.graph_view_); - SwitchAccessor(vertex, self_.graph_view_); + SwitchAccessor(edge, self_.common_.graph_view); + SwitchAccessor(vertex, self_.common_.graph_view); if (self_.filter_lambda_.expression) { frame[self_.filter_lambda_.inner_edge_symbol] = edge; @@ -1338,13 +1342,13 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // the "where" condition. auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex, double weight, int depth) { - if (self_.direction_ != EdgeAtom::Direction::IN) { - for (const EdgeAccessor &edge : vertex.out(&self_.edge_types_)) { + if (self_.common_.direction != EdgeAtom::Direction::IN) { + for (const EdgeAccessor &edge : vertex.out(&self_.common_.edge_types)) { expand_pair(edge, edge.to(), weight, depth); } } - if (self_.direction_ != EdgeAtom::Direction::OUT) { - for (const EdgeAccessor &edge : vertex.in(&self_.edge_types_)) { + if (self_.common_.direction != EdgeAtom::Direction::OUT) { + for (const EdgeAccessor &edge : vertex.in(&self_.common_.edge_types)) { expand_pair(edge, edge.from(), weight, depth); } } @@ -1357,13 +1361,13 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { auto vertex_value = frame[self_.input_symbol_]; if (vertex_value.IsNull()) continue; auto vertex = vertex_value.Value(); - if (self_.existing_node_) { - TypedValue &node = frame[self_.node_symbol_]; + if (self_.common_.existing_node) { + TypedValue &node = frame[self_.common_.node_symbol]; // Due to optional matching the existing node could be null. // Skip expansion for such nodes. if (node.IsNull()) continue; } - SwitchAccessor(vertex, self_.graph_view_); + SwitchAccessor(vertex, self_.common_.graph_view); if (self_.upper_bound_) { upper_bound_ = EvaluateInt(&evaluator, self_.upper_bound_, @@ -1435,8 +1439,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { } // Place destination node on the frame, handle existence flag. - if (self_.existing_node_) { - TypedValue &node = frame[self_.node_symbol_]; + if (self_.common_.existing_node) { + TypedValue &node = frame[self_.common_.node_symbol]; if ((node != current_vertex).Value()) continue; else @@ -1444,14 +1448,14 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { // shortest to existing node. ClearQueue(); } else { - frame[self_.node_symbol_] = current_vertex; + frame[self_.common_.node_symbol] = current_vertex; } if (!self_.is_reverse_) { // Place edges on the frame in the correct order. std::reverse(edge_list.begin(), edge_list.end()); } - frame[self_.edge_symbol_] = std::move(edge_list); + frame[self_.common_.edge_symbol] = std::move(edge_list); frame[self_.total_weight_.value()] = current_weight; yielded_vertices_.insert(current_vertex); return true; @@ -1524,7 +1528,7 @@ std::unique_ptr ExpandVariable::MakeCursor( database::GraphDbAccessor &db) const { switch (type_) { case EdgeAtom::Type::BREADTH_FIRST: - if (existing_node_) { + if (common_.existing_node) { return std::make_unique(*this, db); } else { return std::make_unique(*this, db); diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index 53d1a76b3..bc855392b 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -649,106 +649,55 @@ property value. cpp<#) (:serialize :capnp)) -(lcp:define-class expand-common () +(lcp:define-struct expand-common () ( ;; info on what's getting expanded - (node-symbol "Symbol" :scope :public) - (edge-symbol "Symbol" :scope :public) - (direction "EdgeAtom::Direction" :scope :public + (node-symbol "Symbol" + :documentation "Symbol pointing to the node to be expanded. +This is where the new node will be stored.") + (edge-symbol "Symbol" + :documentation "Symbol for the edges to be expanded. +This is where a TypedValue containing a list of expanded edges will be stored.") + (direction "EdgeAtom::Direction" + :documentation "EdgeAtom::Direction determining the direction of edge +expansion. The direction is relative to the starting vertex for each expansion." :capnp-type "Ast.EdgeAtom.Direction" :capnp-init nil - :capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction" "EdgeAtom::Direction" + :capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Direction" + "EdgeAtom::Direction" '(in out both)) - :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction" "EdgeAtom::Direction" + :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Direction" + "EdgeAtom::Direction" '(in out both))) - (edge-types "std::vector" :scope :public - :capnp-save (lcp:capnp-save-vector "::storage::capnp::Common" "storage::EdgeType") - :capnp-load (lcp:capnp-load-vector "::storage::capnp::Common" "storage::EdgeType")) - ;; the input op and the symbol under which the op's result - ;; can be found in the frame - (input "std::shared_ptr" :scope :public - :capnp-save #'save-operator-pointer - :capnp-load #'load-operator-pointer) - (input-symbol "Symbol" :scope :public) - (existing-node :bool :scope :public :documentation - "If the given node atom refer to a symbol that has already - been expanded and should be just validated in the frame.") + (edge-types "std::vector" + :documentation "storage::EdgeType specifying which edges we want +to expand. If empty, all edges are valid. If not empty, only edges with one of +the given types are valid." + :capnp-save (lcp:capnp-save-vector "::storage::capnp::Common" + "storage::EdgeType") + :capnp-load (lcp:capnp-load-vector "::storage::capnp::Common" + "storage::EdgeType")) + (existing-node :bool :documentation "If the given node atom refer to a symbol +that has already been expanded and should be just validated in the frame.") (graph-view "GraphView" :scope :public :capnp-init nil - :capnp-save (lcp:capnp-save-enum "::query::capnp::GraphView" "GraphView" + :capnp-save (lcp:capnp-save-enum "::query::capnp::GraphView" + "GraphView" '(old new)) - :capnp-load (lcp:capnp-load-enum "::query::capnp::GraphView" "GraphView" + :capnp-load (lcp:capnp-load-enum "::query::capnp::GraphView" + "GraphView" '(old new)) :documentation - "from which state the input node should get expanded")) - (:documentation - "Common functionality and data members of single-edge and variable-length -expansion") - (:public - #>cpp - /** - * Initializes common expansion parameters. - * - * Edge/Node existence are controlled with booleans. 'true' - * denotes that this expansion references an already - * Pulled node/edge, and should only be checked for equality - * during expansion. - * - * Expansion can be done from old or new state of the vertex - * the expansion originates from. This is controlled with a - * constructor argument. - * - * @param node_symbol Symbol pointing to the node to be expanded. This is - * where the new node will be stored. - * @param edge_symbol Symbol for the edges to be expanded. This is where - * a TypedValue containing a list of expanded edges will be stored. - * @param direction EdgeAtom::Direction determining the direction of edge - * expansion. The direction is relative to the starting vertex for each - * expansion. - * @param edge_types storage::EdgeType specifying which edges we - * want to expand. If empty, all edges are valid. If not empty, only edges - * with one of the given types are valid. - * @param input Optional LogicalOperator that preceeds this one. - * @param input_symbol Symbol that points to a VertexAccessor in the Frame - * that expansion should emanate from. - * @param existing_node If or not the node to be expanded is already present - * in the Frame and should just be checked for equality. - */ - ExpandCommon(Symbol node_symbol, Symbol edge_symbol, - EdgeAtom::Direction direction, - const std::vector &edge_types, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node, - GraphView graph_view); - - // types that we'll use for members in subclasses - using InEdgeT = decltype(std::declval().in()); - using InEdgeIteratorT = decltype(std::declval().in().begin()); - using OutEdgeT = decltype(std::declval().out()); - using OutEdgeIteratorT = - decltype(std::declval().out().begin()); - cpp<#) - (:protected - #>cpp - virtual ~ExpandCommon() {} - - /** - * For a newly expanded node handles existence checking and - * frame placement. - * - * @return If or not the given new_node is a valid expansion. It is not - * valid only when matching and existing node and new_node does not match - * the old. - */ - bool HandleExistingNode(const VertexAccessor &new_node, Frame &frame) const; - - ExpandCommon() {} - cpp<#) + "State from which the input node should get expanded.")) (:serialize :capnp :save-args '((helper "LogicalOperator::SaveHelper *")) :load-args '((helper "LogicalOperator::LoadHelper *")))) -(lcp:define-class expand (logical-operator expand-common) - () +(lcp:define-class expand (logical-operator) + ((input "std::shared_ptr" :scope :public + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (input-symbol "Symbol" :scope :public) + (common "ExpandCommon" :scope :public)) (:documentation "Expansion operator. For a node existing in the frame it expands one edge and one node and places them on the frame. @@ -765,10 +714,19 @@ pulled.") (:public #>cpp /** - * Creates an expansion. All parameters are forwarded to @c ExpandCommon and - * are documented there. + * Creates an expansion. All parameters except input and input_symbol are + * forwarded to @c ExpandCommon and are documented there. + * + * @param input Optional logical operator that preceeds this one. + * @param input_symbol Symbol that points to a VertexAccessor in the frame + * that expansion should emanate from. */ - using ExpandCommon::ExpandCommon; + Expand(const std::shared_ptr &input, Symbol input_symbol, + Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction, + const std::vector &edge_types, bool existing_node, + GraphView graph_view); + + Expand() {} bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( @@ -789,6 +747,12 @@ pulled.") void Reset() override; private: + using InEdgeT = decltype(std::declval().in()); + using InEdgeIteratorT = decltype(std::declval().in().begin()); + using OutEdgeT = decltype(std::declval().out()); + using OutEdgeIteratorT = + decltype(std::declval().out().begin()); + const Expand &self_; const std::unique_ptr input_cursor_; database::GraphDbAccessor &db_; @@ -804,7 +768,7 @@ pulled.") bool InitEdges(Frame &, Context &); }; cpp<#) - (:serialize :capnp :inherit-compose '(expand-common))) + (:serialize :capnp)) (lcp:define-struct expansion-lambda () ((inner-edge-symbol "Symbol" :documentation "Currently expanded edge symbol.") @@ -832,8 +796,13 @@ pulled.") :load-args '((ast-storage "AstStorage *") (loaded-ast-uids "std::vector *")))) -(lcp:define-class expand-variable (logical-operator expand-common) - ((type "EdgeAtom::Type" :scope :public :capnp-type "Ast.EdgeAtom.Type" +(lcp:define-class expand-variable (logical-operator) + ((input "std::shared_ptr" :scope :public + :capnp-save #'save-operator-pointer + :capnp-load #'load-operator-pointer) + (input-symbol "Symbol" :scope :public) + (common "ExpandCommon" :scope :public) + (type "EdgeAtom::Type" :scope :public :capnp-type "Ast.EdgeAtom.Type" :capnp-init nil :capnp-save (lcp:capnp-save-enum "::query::capnp::EdgeAtom::Type" "EdgeAtom::Type" '(single depth-first breadth-first weighted-shortest-path)) @@ -903,6 +872,9 @@ pulled.") * Expansion length bounds are both inclusive (as in Neo's Cypher * implementation). * + * @param input Optional logical operator that preceeds this one. + * @param input_symbol Symbol that points to a VertexAccessor in the frame + * that expansion should emanate from. * @param type - Either Type::DEPTH_FIRST (default variable-length expansion), * or Type::BREADTH_FIRST. * @param is_reverse Set to `true` if the edges written on frame should expand @@ -919,17 +891,16 @@ pulled.") * @param filter_ The filter that must be satisfied for an expansion to * succeed. Can use inner(node/edge) symbols. If nullptr, it is ignored. */ - ExpandVariable(Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Type type, - EdgeAtom::Direction direction, - const std::vector &edge_types, - bool is_reverse, Expression *lower_bound, - Expression *upper_bound, - const std::shared_ptr &input, - Symbol input_symbol, bool existing_node, - ExpansionLambda filter_lambda, - std::experimental::optional weight_lambda, - std::experimental::optional total_weight, - GraphView graph_view); + ExpandVariable(const std::shared_ptr &input, + Symbol input_symbol, Symbol node_symbol, Symbol edge_symbol, + EdgeAtom::Type type, EdgeAtom::Direction direction, + const std::vector &edge_types, + bool is_reverse, Expression *lower_bound, + Expression *upper_bound, bool existing_node, + ExpansionLambda filter_lambda, + std::experimental::optional weight_lambda, + std::experimental::optional total_weight, + GraphView graph_view); bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; std::unique_ptr MakeCursor( @@ -950,7 +921,7 @@ pulled.") friend class ExpandVariableCursor; friend class ExpandWeightedShortestPathCursor; cpp<#) - (:serialize :capnp :inherit-compose '(expand-common))) + (:serialize :capnp)) (lcp:define-class construct-named-path (logical-operator) ((input "std::shared_ptr" :scope :public diff --git a/src/query/plan/pretty_print.cpp b/src/query/plan/pretty_print.cpp index 1a0c346f2..ee9e644c6 100644 --- a/src/query/plan/pretty_print.cpp +++ b/src/query/plan/pretty_print.cpp @@ -57,16 +57,26 @@ bool PlanPrinter::PreVisit(query::plan::ScanAllByLabelPropertyRange &op) { bool PlanPrinter::PreVisit(query::plan::Expand &op) { WithPrintLn([&](auto &out) { - out << "* Expand"; - PrintExpand(op); + *out_ << "* Expand (" << op.input_symbol_.name() << ")" + << (op.common_.direction == query::EdgeAtom::Direction::IN ? "<-" + : "-") + << "[" << op.common_.edge_symbol.name() << "]" + << (op.common_.direction == query::EdgeAtom::Direction::OUT ? "->" + : "-") + << "(" << op.common_.node_symbol.name() << ")"; }); return true; } bool PlanPrinter::PreVisit(query::plan::ExpandVariable &op) { WithPrintLn([&](auto &out) { - out << "* ExpandVariable"; - PrintExpand(op); + *out_ << "* ExpandVariable (" << op.input_symbol_.name() << ")" + << (op.common_.direction == query::EdgeAtom::Direction::IN ? "<-" + : "-") + << "[" << op.common_.edge_symbol.name() << "]" + << (op.common_.direction == query::EdgeAtom::Direction::OUT ? "->" + : "-") + << "(" << op.common_.node_symbol.name() << ")"; }); return true; } @@ -173,14 +183,6 @@ void PlanPrinter::Branch(query::plan::LogicalOperator &op, --depth_; } -void PlanPrinter::PrintExpand(const query::plan::ExpandCommon &op) { - *out_ << " (" << op.input_symbol_.name() << ")" - << (op.direction_ == query::EdgeAtom::Direction::IN ? "<-" : "-") << "[" - << op.edge_symbol_.name() << "]" - << (op.direction_ == query::EdgeAtom::Direction::OUT ? "->" : "-") - << "(" << op.node_symbol_.name() << ")"; -} - void PrettyPrint(const database::GraphDbAccessor &dba, const LogicalOperator *plan_root, std::ostream *out) { PlanPrinter printer(&dba, out); diff --git a/src/query/plan/pretty_print.hpp b/src/query/plan/pretty_print.hpp index 1d257b51a..b480db57d 100644 --- a/src/query/plan/pretty_print.hpp +++ b/src/query/plan/pretty_print.hpp @@ -93,8 +93,6 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { /// and printing the branch name. void Branch(LogicalOperator &op, const std::string &branch_name = ""); - void PrintExpand(const query::plan::ExpandCommon &op); - int64_t depth_{0}; private: diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp index 072407d1c..c3c763a17 100644 --- a/src/query/plan/rule_based_planner.hpp +++ b/src/query/plan/rule_based_planner.hpp @@ -404,9 +404,9 @@ class RuleBasedPlanner { // TODO: Pass weight lambda. last_op = std::make_unique( - node_symbol, edge_symbol, edge->type_, expansion.direction, - edge->edge_types_, expansion.is_flipped, edge->lower_bound_, - edge->upper_bound_, std::move(last_op), node1_symbol, + std::move(last_op), node1_symbol, node_symbol, edge_symbol, + edge->type_, expansion.direction, edge->edge_types_, + expansion.is_flipped, edge->lower_bound_, edge->upper_bound_, existing_node, filter_lambda, weight_lambda, total_weight, match_context.graph_view); } else { @@ -427,8 +427,8 @@ class RuleBasedPlanner { } } last_op = std::make_unique( - node_symbol, edge_symbol, expansion.direction, edge->edge_types_, - std::move(last_op), node1_symbol, existing_node, + std::move(last_op), node1_symbol, node_symbol, edge_symbol, + expansion.direction, edge->edge_types_, existing_node, match_context.graph_view); } diff --git a/tests/unit/bfs_distributed.cpp b/tests/unit/bfs_distributed.cpp index f085f3beb..4ca5b140c 100644 --- a/tests/unit/bfs_distributed.cpp +++ b/tests/unit/bfs_distributed.cpp @@ -29,7 +29,7 @@ class DistributedDb : public Database { Expression *lower_bound, Expression *upper_bound, const ExpansionLambda &filter_lambda) override { return std::make_unique( - sink_sym, edge_sym, direction, edge_types, input, source_sym, + input, source_sym, sink_sym, edge_sym, direction, edge_types, existing_node, GraphView::OLD, lower_bound, upper_bound, filter_lambda); } diff --git a/tests/unit/bfs_single_node.cpp b/tests/unit/bfs_single_node.cpp index 577e63e57..a766deb26 100644 --- a/tests/unit/bfs_single_node.cpp +++ b/tests/unit/bfs_single_node.cpp @@ -24,15 +24,13 @@ class SingleNodeDb : public Database { Expression *lower_bound, Expression *upper_bound, const ExpansionLambda &filter_lambda) override { return std::make_unique( - sink_sym, edge_sym, EdgeAtom::Type::BREADTH_FIRST, direction, - edge_types, false, lower_bound, upper_bound, input, source_sym, - existing_node, filter_lambda, std::experimental::nullopt, - std::experimental::nullopt, GraphView::OLD); + input, source_sym, sink_sym, edge_sym, EdgeAtom::Type::BREADTH_FIRST, + direction, edge_types, false, lower_bound, upper_bound, existing_node, + filter_lambda, std::experimental::nullopt, std::experimental::nullopt, + GraphView::OLD); } - std::pair, - std::vector> - BuildGraph( + std::pair, std::vector> BuildGraph( database::GraphDbAccessor *dba, const std::vector &vertex_locations, const std::vector> &edges) override { std::vector vertex_addr; @@ -84,7 +82,7 @@ TEST_P(SingleNodeBfsTest, All) { FilterLambdaType filter_lambda_type; std::tie(lower_bound, upper_bound, direction, edge_types, known_sink, filter_lambda_type) = GetParam(); - BfsTest(db_.get(), lower_bound, upper_bound, direction,edge_types, + BfsTest(db_.get(), lower_bound, upper_bound, direction, edge_types, known_sink, filter_lambda_type); } diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index da514aa34..f2dfbdc89 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -51,8 +51,8 @@ ExpandTuple MakeDistributedExpand( auto node_sym = symbol_table.CreateSymbol(node_identifier, true); symbol_table[*node->identifier_] = node_sym; - auto op = std::make_shared(node_sym, edge_sym, direction, - edge_types, input, input_symbol, + auto op = std::make_shared(input, input_symbol, node_sym, + edge_sym, direction, edge_types, existing_node, graph_view); return ExpandTuple{edge, edge_sym, node, node_sym, op}; diff --git a/tests/unit/query_cost_estimator.cpp b/tests/unit/query_cost_estimator.cpp index b69e740a9..43c34db5d 100644 --- a/tests/unit/query_cost_estimator.cpp +++ b/tests/unit/query_cost_estimator.cpp @@ -163,17 +163,17 @@ TEST_F(QueryCostEstimator, ScanAllByLabelPropertyRangeConstExpr) { } TEST_F(QueryCostEstimator, Expand) { - MakeOp(NextSymbol(), NextSymbol(), EdgeAtom::Direction::IN, - std::vector{}, last_op_, NextSymbol(), + MakeOp(last_op_, NextSymbol(), NextSymbol(), NextSymbol(), + EdgeAtom::Direction::IN, std::vector{}, false, GraphView::OLD); EXPECT_COST(CardParam::kExpand * CostParam::kExpand); } TEST_F(QueryCostEstimator, ExpandVariable) { MakeOp( - NextSymbol(), NextSymbol(), EdgeAtom::Type::DEPTH_FIRST, - EdgeAtom::Direction::IN, std::vector{}, false, nullptr, - nullptr, last_op_, NextSymbol(), false, + last_op_, NextSymbol(), NextSymbol(), NextSymbol(), + EdgeAtom::Type::DEPTH_FIRST, EdgeAtom::Direction::IN, + std::vector{}, false, nullptr, nullptr, false, ExpansionLambda{NextSymbol(), NextSymbol(), nullptr}, std::experimental::nullopt, std::experimental::nullopt, GraphView::OLD); EXPECT_COST(CardParam::kExpandVariable * CostParam::kExpandVariable); diff --git a/tests/unit/query_plan_common.hpp b/tests/unit/query_plan_common.hpp index c00fae60a..33df9dbd1 100644 --- a/tests/unit/query_plan_common.hpp +++ b/tests/unit/query_plan_common.hpp @@ -168,9 +168,9 @@ ExpandTuple MakeExpand(AstStorage &storage, SymbolTable &symbol_table, auto node_sym = symbol_table.CreateSymbol(node_identifier, true); symbol_table[*node->identifier_] = node_sym; - auto op = - std::make_shared(node_sym, edge_sym, direction, edge_types, input, - input_symbol, existing_node, graph_view); + auto op = std::make_shared(input, input_symbol, node_sym, edge_sym, + direction, edge_types, existing_node, + graph_view); return ExpandTuple{edge, edge_sym, node, node_sym, op}; } diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index d4c0e0fd8..3dec32810 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -6,12 +6,12 @@ #include +#include +#include #include #include #include #include -#include -#include #include "communication/result_stream_faker.hpp" #include "database/single_node/graph_db.hpp" @@ -561,16 +561,16 @@ class QueryPlanExpandVariable : public testing::Test { }; return std::make_shared( - n_to_sym, edge_sym, EdgeAtom::Type::DEPTH_FIRST, direction, - edge_types, is_reverse, convert(lower), convert(upper), filter_op, - n_from.sym_, false, + filter_op, n_from.sym_, n_to_sym, edge_sym, + EdgeAtom::Type::DEPTH_FIRST, direction, edge_types, is_reverse, + convert(lower), convert(upper), false, ExpansionLambda{symbol_table.CreateSymbol("inner_edge", false), symbol_table.CreateSymbol("inner_node", false), nullptr}, std::experimental::nullopt, std::experimental::nullopt, graph_view); } else - return std::make_shared(n_to_sym, edge_sym, direction, edge_types, - filter_op, n_from.sym_, false, + return std::make_shared(filter_op, n_from.sym_, n_to_sym, + edge_sym, direction, edge_types, false, graph_view); } @@ -916,9 +916,10 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test { : symbol_table.CreateSymbol("node", true); auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true); auto filter_lambda = last_op = std::make_shared( - node_sym, edge_list_sym, EdgeAtom::Type::WEIGHTED_SHORTEST_PATH, - direction, std::vector{}, false, nullptr, - max_depth ? LITERAL(max_depth.value()) : nullptr, last_op, n.sym_, + last_op, n.sym_, node_sym, edge_list_sym, + EdgeAtom::Type::WEIGHTED_SHORTEST_PATH, direction, + std::vector{}, false, nullptr, + max_depth ? LITERAL(max_depth.value()) : nullptr, existing_node_input != nullptr, ExpansionLambda{filter_edge, filter_node, where}, ExpansionLambda{weight_edge, weight_node, @@ -1339,8 +1340,8 @@ TEST(QueryPlan, OptionalMatchThenExpandToMissingNode) { auto node = NODE("n"); symbol_table[*node->identifier_] = with_n_sym; auto expand = std::make_shared( - with_n_sym, edge_sym, edge_direction, std::vector{}, - m.op_, m.sym_, true, GraphView::OLD); + m.op_, m.sym_, with_n_sym, edge_sym, edge_direction, + std::vector{}, true, GraphView::OLD); // RETURN m auto m_ne = NEXPR("m", IDENT("m")); symbol_table[*m_ne->expression_] = m.sym_; @@ -1372,10 +1373,9 @@ TEST(QueryPlan, ExpandExistingNode) { EdgeAtom::Direction::OUT, {}, "n", with_existing, GraphView::OLD); if (with_existing) - r_n.op_ = - std::make_shared(n.sym_, r_n.edge_sym_, r_n.edge_->direction_, - std::vector{}, n.op_, - n.sym_, with_existing, GraphView::OLD); + r_n.op_ = std::make_shared( + n.op_, n.sym_, n.sym_, r_n.edge_sym_, r_n.edge_->direction_, + std::vector{}, with_existing, GraphView::OLD); // make a named expression and a produce auto output = NEXPR("n", IDENT("n"));