diff --git a/src/query/interpret/eval.hpp b/src/query/interpret/eval.hpp index 075050e59..540eaeb9a 100644 --- a/src/query/interpret/eval.hpp +++ b/src/query/interpret/eval.hpp @@ -1,3 +1,4 @@ +/// @file #pragma once #include @@ -554,4 +555,19 @@ class ExpressionEvaluator : public TreeVisitor { const GraphView graph_view_; }; +/// A helper function for evaluating an expression that's an int. +/// +/// @param what - Name of what's getting evaluated. Used for user feedback (via +/// exception) when the evaluated value is not an int. +/// @throw QueryRuntimeException if expression doesn't evaluate to an int. +inline int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, + const std::string &what) { + TypedValue value = expr->Accept(*evaluator); + try { + return value.Value(); + } catch (TypedValueException &e) { + throw QueryRuntimeException(what + " must be an int"); + } +} + } // namespace query diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp index 4ff5e8bde..90fce1f40 100644 --- a/src/query/plan/distributed.cpp +++ b/src/query/plan/distributed.cpp @@ -338,6 +338,49 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor { return true; } + bool PreVisit(DistributedExpandBfs &exp) override { + prev_ops_.push_back(&exp); + return true; + } + bool PostVisit(DistributedExpandBfs &exp) override { + prev_ops_.pop_back(); + if (branch_.subtree) return true; + if (auto found = FindForbidden(exp.input_symbol())) { + SetBranch(exp.input(), &exp, *found); + } + if (exp.existing_node()) { + if (auto found = FindForbidden(exp.node_symbol())) { + SetBranch(exp.input(), &exp, *found); + } + } + CHECK(!FindForbidden(exp.edge_symbol())) + << "Expand uses an already used edge symbol."; + // Check for bounding expressions. + if (exp.lower_bound()) { + UsedSymbolsCollector collector(*symbol_table_); + exp.lower_bound()->Accept(collector); + if (auto found = ContainsForbidden(collector.symbols_)) { + SetBranch(exp.input(), &exp, *found); + } + } + if (exp.upper_bound()) { + UsedSymbolsCollector collector(*symbol_table_); + exp.upper_bound()->Accept(collector); + if (auto found = ContainsForbidden(collector.symbols_)) { + SetBranch(exp.input(), &exp, *found); + } + } + // Check for lambda expressions + if (exp.filter_lambda().expression) { + UsedSymbolsCollector collector(*symbol_table_); + exp.filter_lambda().expression->Accept(collector); + if (auto found = ContainsForbidden(collector.symbols_)) { + SetBranch(exp.input(), &exp, *found); + } + } + return true; + } + bool PreVisit(ExpandUniquenessFilter &op) override { prev_ops_.push_back(&op); return true; @@ -935,6 +978,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor { prev_ops_.push_back(&exp); return true; } + bool PostVisit(ExpandVariable &exp) override { + prev_ops_.pop_back(); + if (exp.type() == EdgeAtom::Type::BREADTH_FIRST) { + auto distributed_bfs = std::make_unique( + exp.node_symbol(), exp.edge_symbol(), exp.direction(), + exp.edge_types(), exp.input(), exp.input_symbol(), + exp.existing_node(), exp.graph_view(), exp.lower_bound(), + exp.upper_bound(), exp.filter_lambda()); + SetOnPrevious(std::move(distributed_bfs)); + } + return true; + } // The following operators filter the frame or put something on it. They // should be worker local. diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index e6ef3331f..0753212de 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -104,13 +104,37 @@ std::vector PullRemoteOrderBy::ModifiedSymbols( return input_->ModifiedSymbols(table); } +DistributedExpandBfs::DistributedExpandBfs( + Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction, + const std::vector &edge_types, + const std::shared_ptr &input, Symbol input_symbol, + bool existing_node, GraphView graph_view, Expression *lower_bound, + Expression *upper_bound, const ExpandVariable::Lambda &filter_lambda) + : ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input, + input_symbol, existing_node, graph_view), + lower_bound_(lower_bound), + upper_bound_(upper_bound), + filter_lambda_(filter_lambda) {} + +ACCEPT_WITH_INPUT(DistributedExpandBfs); + +std::vector DistributedExpandBfs::ModifiedSymbols( + const SymbolTable &table) const { + auto symbols = input_->ModifiedSymbols(table); + symbols.emplace_back(node_symbol()); + symbols.emplace_back(edge_symbol()); + return symbols; +} + +////////////////////////////////////////////////////////////////////// +// Cursors +////////////////////////////////////////////////////////////////////// + namespace { -/** Helper class that wraps remote pulling for cursors that handle results from - * distributed workers. - * - * The command_id should be the command_id at the initialization of a cursor. - */ +// Helper class that wraps remote pulling for cursors that handle results from +// distributed workers. The command_id should be the command_id at the +// initialization of a cursor. class RemotePuller { public: RemotePuller(distributed::PullRpcClients *pull_clients, @@ -686,6 +710,183 @@ class PullRemoteOrderByCursor : public Cursor { bool merge_initialized_ = false; }; +class DistributedExpandBfsCursor : public query::plan::Cursor { + public: + DistributedExpandBfsCursor(const DistributedExpandBfs &self, + database::GraphDbAccessor &db) + : self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db)) { + // TODO: Pass in a DistributedGraphDb. + if (auto *distributed_db = + dynamic_cast(&db.db())) { + bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients(); + } + CHECK(bfs_subcursor_clients_); + subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors( + db_.transaction_id(), self_.direction(), self_.edge_types(), + self_.graph_view()); + bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_); + VLOG(10) << "BFS subcursors initialized"; + pull_pos_ = subcursor_ids_.end(); + } + + ~DistributedExpandBfsCursor() { + VLOG(10) << "Removing BFS subcursors"; + bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_); + } + + bool Pull(Frame &frame, Context &context) override { + // TODO(mtomic): lambda filtering in distributed + if (self_.filter_lambda().expression) { + throw utils::NotYetImplemented("lambda filtering in distributed BFS"); + } + + // Evaluator for the filtering condition and expansion depth. + ExpressionEvaluator evaluator(frame, &context, self_.graph_view()); + + while (true) { + TypedValue last_vertex; + + if (!skip_rest_) { + if (current_depth_ >= lower_bound_) { + for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) { + auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first, + pull_pos_->second, &db_); + if (vertex) { + last_vertex = *vertex; + SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view()); + break; + } + VLOG(10) << "Nothing to pull from " << pull_pos_->first; + } + } + + if (last_vertex.IsVertex()) { + // Handle existence flag + if (self_.existing_node()) { + TypedValue &node = frame[self_.node_symbol()]; + // Due to optional matching the existing node could be null + if (node.IsNull() || (node != last_vertex).ValueBool()) continue; + // There is no point in traversing the rest of the graph because BFS + // can find only one path to a certain node. + skip_rest_ = true; + } else { + frame[self_.node_symbol()] = last_vertex; + } + + VLOG(10) << "Expanded to vertex: " << last_vertex; + + // Reconstruct path + std::vector edges; + + // During path reconstruction, edges crossing worker boundary are + // obtained from edge owner to reduce network traffic. If the last + // worker queried for its path segment owned the crossing edge, + // `current_vertex_addr` will be set. Otherwise, `current_edge_addr` + // will be set. + std::experimental::optional + current_vertex_addr = last_vertex.ValueVertex().GlobalAddress(); + std::experimental::optional current_edge_addr; + + while (true) { + DCHECK(static_cast(current_edge_addr) ^ + static_cast(current_vertex_addr)) + << "Exactly one of `current_edge_addr` or " + "`current_vertex_addr` " + "should be set during path reconstruction"; + auto ret = current_edge_addr + ? bfs_subcursor_clients_->ReconstructPath( + subcursor_ids_, *current_edge_addr, &db_) + : bfs_subcursor_clients_->ReconstructPath( + subcursor_ids_, *current_vertex_addr, &db_); + edges.insert(edges.end(), ret.edges.begin(), ret.edges.end()); + current_vertex_addr = ret.next_vertex; + current_edge_addr = ret.next_edge; + if (!current_vertex_addr && !current_edge_addr) break; + } + std::reverse(edges.begin(), edges.end()); + for (auto &edge : edges) + SwitchAccessor(edge.ValueEdge(), self_.graph_view()); + frame[self_.edge_symbol()] = std::move(edges); + return true; + } + + // We're done pulling for this level + pull_pos_ = subcursor_ids_.begin(); + + // Try to expand again + if (current_depth_ < upper_bound_) { + VLOG(10) << "Trying to expand again..."; + current_depth_++; + bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false); + if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) { + continue; + } + } + } + + VLOG(10) << "Trying to get a new source..."; + // We're done with this source, try getting a new one + if (!input_cursor_->Pull(frame, context)) return false; + + auto vertex_value = frame[self_.input_symbol()]; + + // It is possible that the vertex is Null due to optional matching. + if (vertex_value.IsNull()) continue; + + auto vertex = vertex_value.ValueVertex(); + lower_bound_ = self_.lower_bound() + ? EvaluateInt(&evaluator, self_.lower_bound(), + "Min depth in breadth-first expansion") + : 1; + upper_bound_ = self_.upper_bound() + ? EvaluateInt(&evaluator, self_.upper_bound(), + "Max depth in breadth-first expansion") + : std::numeric_limits::max(); + skip_rest_ = false; + + if (upper_bound_ < 1) { + throw QueryRuntimeException( + "Max depth in breadth-first expansion must be at least 1"); + } + + VLOG(10) << "Starting BFS from " << vertex << " with limits " + << lower_bound_ << ".." << upper_bound_; + bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true); + bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress()); + current_depth_ = 1; + } + } + + void Reset() override { + bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_); + pull_pos_ = subcursor_ids_.end(); + } + + private: + const DistributedExpandBfs &self_; + database::GraphDbAccessor &db_; + distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr}; + const std::unique_ptr input_cursor_; + + // Depth bounds. Calculated on each pull from the input, the initial value + // is irrelevant. + int lower_bound_{-1}; + int upper_bound_{-1}; + + // When set to true, expansion is restarted from a new source. + bool skip_rest_{false}; + + // Current depth. Reset for each new expansion, the initial value is + // irrelevant. + int current_depth_{-1}; + + // Map from worker IDs to their corresponding subcursors. + std::unordered_map subcursor_ids_; + + // Next worker master should try pulling from. + std::unordered_map::iterator pull_pos_; +}; + } // namespace std::unique_ptr PullRemote::MakeCursor( @@ -703,4 +904,9 @@ std::unique_ptr PullRemoteOrderBy::MakeCursor( return std::make_unique(*this, db); } +std::unique_ptr DistributedExpandBfs::MakeCursor( + database::GraphDbAccessor &db) const { + return std::make_unique(*this, db); +} + } // namespace query::plan diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp index b00a4494d..3cd02bcbc 100644 --- a/src/query/plan/distributed_ops.lcp +++ b/src/query/plan/distributed_ops.lcp @@ -15,9 +15,11 @@ cpp<# class PullRemote; class Synchronize; class PullRemoteOrderBy; +class DistributedExpandBfs; using DistributedOperatorCompositeVisitor = - ::utils::CompositeVisitor; + ::utils::CompositeVisitor; /// Base class for visiting regular and distributed LogicalOperator instances. class DistributedOperatorVisitor : public HierarchicalLogicalOperatorVisitor, @@ -169,5 +171,45 @@ by having only one result from each worker.") (:private #>cpp PullRemoteOrderBy() {} cpp<#) (:serialize :capnp)) +(lcp:define-class distributed-expand-bfs (logical-operator expand-common) + ((lower-bound "Expression *" :reader t + :documentation "Optional lower bound, default is 1" + :capnp-type "Ast.Tree" :capnp-init nil + :capnp-save #'save-ast-pointer + :capnp-load (load-ast-pointer "Expression *")) + (upper-bound "Expression *" :reader t + :documentation "Optional upper bound, default is infinity" + :capnp-type "Ast.Tree" :capnp-init nil + :capnp-save #'save-ast-pointer + :capnp-load (load-ast-pointer "Expression *")) + (filter-lambda "ExpandVariable::Lambda" :reader t + :documentation "Filter that must be satisfied for expansion to succeed." + :capnp-type "ExpandVariable.Lambda")) + (:documentation "BFS expansion operator suited for distributed execution.") + (:public + #>cpp + DistributedExpandBfs(Symbol node_symbol, Symbol edge_symbol, + EdgeAtom::Direction direction, + const std::vector &edge_types, + const std::shared_ptr &input, + Symbol input_symbol, bool existing_node, + GraphView graph_view, Expression *lower_bound, + Expression *upper_bound, + const ExpandVariable::Lambda &filter_lambda); + + bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override; + std::unique_ptr MakeCursor( + database::GraphDbAccessor &db) const override; + std::vector ModifiedSymbols(const SymbolTable &) const override; + + bool HasSingleInput() const override { return true; } + std::shared_ptr input() const override { return input_; } + void set_input(std::shared_ptr input) override { + input_ = input; + } + cpp<#) + (:private #>cpp DistributedExpandBfs() {} cpp<#) + (:serialize :capnp :inherit-compose '(expand-common))) + (lcp:pop-namespace) (lcp:pop-namespace) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 69e7d77c0..c7be4eb8d 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -17,7 +17,6 @@ #include "communication/result_stream_faker.hpp" #include "database/distributed_graph_db.hpp" #include "database/graph_db_accessor.hpp" -#include "distributed/bfs_rpc_clients.hpp" #include "glue/auth.hpp" #include "glue/communication.hpp" #include "integrations/kafka/exceptions.hpp" @@ -800,22 +799,6 @@ auto ExpandFromVertex(const VertexAccessor &vertex, return iter::chain.from_iterable(std::move(chain_elements)); } -/** A helper function for evaluating an expression that's an int. - * - * @param evaluator - * @param expr - * @param what - Name of what's getting evaluated. Used for user - * feedback (via exception) when the evaluated value is not an int. - */ -int64_t EvaluateInt(ExpressionEvaluator &evaluator, Expression *expr, - const std::string &what) { - TypedValue value = expr->Accept(evaluator); - try { - return value.Value(); - } catch (TypedValueException &e) { - throw QueryRuntimeException(what + " must be an int"); - } -} } // namespace class ExpandVariableCursor : public Cursor { @@ -899,7 +882,7 @@ class ExpandVariableCursor : public Cursor { // Evaluate the upper and lower bounds. ExpressionEvaluator evaluator(frame, &context, self_.graph_view_); auto calc_bound = [&evaluator](auto &bound) { - auto value = EvaluateInt(evaluator, bound, "Variable expansion bound"); + auto value = EvaluateInt(&evaluator, bound, "Variable expansion bound"); if (value < 0) throw QueryRuntimeException( "Variable expansion bound must be positive or zero"); @@ -1110,11 +1093,11 @@ class ExpandBfsCursor : public query::plan::Cursor { processed_.emplace(vertex, std::experimental::nullopt); expand_from_vertex(vertex); lower_bound_ = self_.lower_bound_ - ? EvaluateInt(evaluator, self_.lower_bound_, + ? EvaluateInt(&evaluator, self_.lower_bound_, "Min depth in breadth-first expansion") : 1; upper_bound_ = self_.upper_bound_ - ? EvaluateInt(evaluator, self_.upper_bound_, + ? EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in breadth-first expansion") : std::numeric_limits::max(); skip_rest_ = false; @@ -1199,183 +1182,6 @@ class ExpandBfsCursor : public query::plan::Cursor { std::deque> to_visit_next_; }; -class DistributedExpandBfsCursor : public query::plan::Cursor { - public: - DistributedExpandBfsCursor(const ExpandVariable &self, - database::GraphDbAccessor &db) - : self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) { - // TODO: Pass in a DistributedGraphDb. - if (auto *distributed_db = - dynamic_cast(&db.db())) { - bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients(); - } - CHECK(bfs_subcursor_clients_); - subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors( - db_.transaction_id(), self_.direction(), self_.edge_types(), - self_.graph_view()); - bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_); - VLOG(10) << "BFS subcursors initialized"; - pull_pos_ = subcursor_ids_.end(); - } - - ~DistributedExpandBfsCursor() { - VLOG(10) << "Removing BFS subcursors"; - bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_); - } - - bool Pull(Frame &frame, Context &context) override { - // TODO(mtomic): lambda filtering in distributed - if (self_.filter_lambda_.expression) { - throw utils::NotYetImplemented("lambda filtering in distributed BFS"); - } - - // Evaluator for the filtering condition and expansion depth. - ExpressionEvaluator evaluator(frame, &context, self_.graph_view_); - - while (true) { - TypedValue last_vertex; - - if (!skip_rest_) { - if (current_depth_ >= lower_bound_) { - for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) { - auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first, - pull_pos_->second, &db_); - if (vertex) { - last_vertex = *vertex; - SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_); - break; - } - VLOG(10) << "Nothing to pull from " << pull_pos_->first; - } - } - - if (last_vertex.IsVertex()) { - // Handle existence flag - if (self_.existing_node_) { - TypedValue &node = frame[self_.node_symbol_]; - // Due to optional matching the existing node could be null - if (node.IsNull() || (node != last_vertex).ValueBool()) continue; - // There is no point in traversing the rest of the graph because BFS - // can find only one path to a certain node. - skip_rest_ = true; - } else { - frame[self_.node_symbol_] = last_vertex; - } - - VLOG(10) << "Expanded to vertex: " << last_vertex; - - // Reconstruct path - std::vector edges; - - // During path reconstruction, edges crossing worker boundary are - // obtained from edge owner to reduce network traffic. If the last - // worker queried for its path segment owned the crossing edge, - // `current_vertex_addr` will be set. Otherwise, `current_edge_addr` - // will be set. - std::experimental::optional - current_vertex_addr = last_vertex.ValueVertex().GlobalAddress(); - std::experimental::optional current_edge_addr; - - while (true) { - DCHECK(static_cast(current_edge_addr) ^ - static_cast(current_vertex_addr)) - << "Exactly one of `current_edge_addr` or " - "`current_vertex_addr` " - "should be set during path reconstruction"; - auto ret = current_edge_addr - ? bfs_subcursor_clients_->ReconstructPath( - subcursor_ids_, *current_edge_addr, &db_) - : bfs_subcursor_clients_->ReconstructPath( - subcursor_ids_, *current_vertex_addr, &db_); - edges.insert(edges.end(), ret.edges.begin(), ret.edges.end()); - current_vertex_addr = ret.next_vertex; - current_edge_addr = ret.next_edge; - if (!current_vertex_addr && !current_edge_addr) break; - } - std::reverse(edges.begin(), edges.end()); - for (auto &edge : edges) - SwitchAccessor(edge.ValueEdge(), self_.graph_view_); - frame[self_.edge_symbol_] = std::move(edges); - return true; - } - - // We're done pulling for this level - pull_pos_ = subcursor_ids_.begin(); - - // Try to expand again - if (current_depth_ < upper_bound_) { - VLOG(10) << "Trying to expand again..."; - current_depth_++; - bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false); - if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) { - continue; - } - } - } - - VLOG(10) << "Trying to get a new source..."; - // We're done with this source, try getting a new one - if (!input_cursor_->Pull(frame, context)) return false; - - auto vertex_value = frame[self_.input_symbol_]; - - // It is possible that the vertex is Null due to optional matching. - if (vertex_value.IsNull()) continue; - - auto vertex = vertex_value.ValueVertex(); - lower_bound_ = self_.lower_bound_ - ? EvaluateInt(evaluator, self_.lower_bound_, - "Min depth in breadth-first expansion") - : 1; - upper_bound_ = self_.upper_bound_ - ? EvaluateInt(evaluator, self_.upper_bound_, - "Max depth in breadth-first expansion") - : std::numeric_limits::max(); - skip_rest_ = false; - - if (upper_bound_ < 1) { - throw QueryRuntimeException( - "Max depth in breadth-first expansion must be at least 1"); - } - - VLOG(10) << "Starting BFS from " << vertex << " with limits " - << lower_bound_ << ".." << upper_bound_; - bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true); - bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress()); - current_depth_ = 1; - } - } - - void Reset() override { - bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_); - pull_pos_ = subcursor_ids_.end(); - } - - private: - const ExpandVariable &self_; - database::GraphDbAccessor &db_; - distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr}; - const std::unique_ptr input_cursor_; - - // Depth bounds. Calculated on each pull from the input, the initial value - // is irrelevant. - int lower_bound_{-1}; - int upper_bound_{-1}; - - // When set to true, expansion is restarted from a new source. - bool skip_rest_{false}; - - // Current depth. Reset for each new expansion, the initial value is - // irrelevant. - int current_depth_{-1}; - - // Map from worker IDs to their corresponding subcursors. - std::unordered_map subcursor_ids_; - - // Next worker master should try pulling from. - std::unordered_map::iterator pull_pos_; -}; - class ExpandWeightedShortestPathCursor : public query::plan::Cursor { public: ExpandWeightedShortestPathCursor(const ExpandVariable &self, @@ -1459,7 +1265,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { SwitchAccessor(vertex, self_.graph_view_); if (self_.upper_bound_) { upper_bound_ = - EvaluateInt(evaluator, self_.upper_bound_, + EvaluateInt(&evaluator, self_.upper_bound_, "Max depth in weighted shortest path expansion"); upper_bound_set_ = true; } else { @@ -1613,11 +1419,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { std::unique_ptr ExpandVariable::MakeCursor( database::GraphDbAccessor &db) const { if (type_ == EdgeAtom::Type::BREADTH_FIRST) { - if (db.db().type() == database::GraphDb::Type::SINGLE_NODE) { - return std::make_unique(*this, db); - } else { - return std::make_unique(*this, db); - } + return std::make_unique(*this, db); } else if (type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) { return std::make_unique(*this, db); } else { diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index de6c90aa4..ce9f7da11 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -837,7 +837,7 @@ pulled.") '(single depth-first breadth-first weighted-shortest-path)) :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Type" "EdgeAtom::Type" '(single depth-first breadth-first weighted-shortest-path))) - (is-reverse :bool :documentation + (is-reverse :bool :reader t :documentation "True if the path should be written as expanding from node_symbol to input_symbol.") (lower-bound "Expression *" :reader t :capnp-type "Ast.Tree" :capnp-init nil @@ -855,7 +855,7 @@ pulled.") :capnp-load (lcp:capnp-load-optional "capnp::ExpandVariable::Lambda" "Lambda" "[helper](const auto &reader) { Lambda val; val.Load(reader, helper); return val; }")) - (total-weight "std::experimental::optional" + (total-weight "std::experimental::optional" :reader t :capnp-save (lcp:capnp-save-optional "::query::capnp::Symbol" "Symbol") :capnp-load (lcp:capnp-load-optional "::query::capnp::Symbol" "Symbol"))) (:documentation @@ -936,14 +936,13 @@ pulled.") // the Cursors are not declared in the header because // it's edges_ and edges_it_ are decltyped using a helper function // that should be inaccessible (private class function won't compile) - friend class DistributedExpandBfsCursor; friend class ExpandVariableCursor; friend class ExpandBfsCursor; friend class ExpandWeightedShortestPathCursor; ExpandVariable() {} cpp<#) - (:serialize :capnp :inherit-compose '(expand-common))) + (:serialize :capnp :inherit-compose '(expand-common))) (lcp:define-class construct-named-path (logical-operator) ((input "std::shared_ptr" diff --git a/src/query/plan/pretty_print.cpp b/src/query/plan/pretty_print.cpp index d3c5dfc4f..c80bcc376 100644 --- a/src/query/plan/pretty_print.cpp +++ b/src/query/plan/pretty_print.cpp @@ -80,6 +80,14 @@ class PlanPrinter final : public DistributedOperatorVisitor { return true; } + bool PreVisit(query::plan::DistributedExpandBfs &op) override { + WithPrintLn([&](auto &out) { + out << "* DistributedExpandBfs"; + PrintExpand(out, op); + }); + return true; + } + bool PreVisit(query::plan::Produce &op) override { WithPrintLn([&](auto &out) { out << "* Produce {"; diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index d925d303a..def0cdb3a 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -17,6 +17,7 @@ #include "distributed/updates_rpc_server.hpp" #include "query/context.hpp" #include "query/exceptions.hpp" +#include "query/plan/distributed_ops.hpp" #include "query/plan/operator.hpp" #include "distributed_common.hpp" @@ -924,13 +925,21 @@ class QueryPlanExpandBfs auto node_sym = symbol_table.CreateSymbol("node", true); auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true); - last_op = std::make_shared( - node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction, - std::vector{}, false, LITERAL(min_depth), - LITERAL(max_depth), last_op, source_sym, - static_cast(existing_node), - ExpandVariable::Lambda{inner_edge, inner_node, where}, - std::experimental::nullopt, std::experimental::nullopt, graph_view); + if (GetParam().first == TestType::DISTRIBUTED) { + last_op = std::make_shared( + node_sym, edge_list_sym, direction, std::vector{}, + last_op, source_sym, !!existing_node, graph_view, LITERAL(min_depth), + LITERAL(max_depth), + ExpandVariable::Lambda{inner_edge, inner_node, where}); + } else { + last_op = std::make_shared( + node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction, + std::vector{}, false, LITERAL(min_depth), + LITERAL(max_depth), last_op, source_sym, + static_cast(existing_node), + ExpandVariable::Lambda{inner_edge, inner_node, where}, + std::experimental::nullopt, std::experimental::nullopt, graph_view); + } Frame frame(symbol_table.max_position()); @@ -2394,8 +2403,7 @@ TEST(QueryPlan, ScanAllEqualsScanAllByLabelProperty) { }; // Make sure there are `vertex_count` results when using scan all - auto count_with_scan_all = [&db, &label, &prop](int prop_value, - int prop_count) { + auto count_with_scan_all = [&db, &prop](int prop_value, int prop_count) { AstStorage storage; SymbolTable symbol_table; auto dba_ptr = db.Access(); diff --git a/tests/unit/query_planner.cpp b/tests/unit/query_planner.cpp index 24290e868..c582a8252 100644 --- a/tests/unit/query_planner.cpp +++ b/tests/unit/query_planner.cpp @@ -131,6 +131,7 @@ class PlanChecker : public DistributedOperatorVisitor { } PRE_VISIT(PullRemoteOrderBy); + PRE_VISIT(DistributedExpandBfs); VISIT(AuthHandler); @@ -192,6 +193,7 @@ using ExpectOrderBy = OpChecker; using ExpectUnwind = OpChecker; using ExpectDistinct = OpChecker; using ExpectShowStreams = OpChecker; +using ExpectDistributedExpandBfs = OpChecker; class ExpectExpandVariable : public OpChecker { public: @@ -2241,9 +2243,24 @@ TYPED_TEST(TestPlanner, MatchBfs) { bfs->filter_lambda_.inner_node = IDENT("n"); bfs->filter_lambda_.expression = IDENT("n"); bfs->upper_bound_ = LITERAL(10); - QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN("r"))); - CheckPlan(storage, ExpectScanAll(), ExpectExpandBfs(), - ExpectProduce()); + auto *as_r = NEXPR("r", IDENT("r")); + QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN(as_r))); + auto symbol_table = MakeSymbolTable(*storage.query()); + { + FakeDbAccessor dba; + auto planner = MakePlanner(dba, storage, symbol_table); + CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpandBfs(), + ExpectProduce()); + } + { + ExpectPullRemote pull({symbol_table.at(*as_r)}); + auto expected = ExpectDistributed( + MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(), + ExpectProduce(), pull), + MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(), + ExpectProduce())); + CheckDistributedPlan(storage, expected); + } } TYPED_TEST(TestPlanner, MatchDoubleScanToExpandExisting) {