From 24e2b31367d437e511d8c4dca301b4deec85c5ea Mon Sep 17 00:00:00 2001
From: Teon Banek <teon.banek@memgraph.io>
Date: Wed, 29 Aug 2018 10:28:05 +0200
Subject: [PATCH] Extract distributed BFS as a new operator

Reviewers: mtomic, msantl

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1570
---
 src/query/interpret/eval.hpp                  |  16 ++
 src/query/plan/distributed.cpp                |  55 +++++
 src/query/plan/distributed_ops.cpp            | 216 +++++++++++++++++-
 src/query/plan/distributed_ops.lcp            |  44 +++-
 src/query/plan/operator.cpp                   | 208 +----------------
 src/query/plan/operator.lcp                   |   7 +-
 src/query/plan/pretty_print.cpp               |   8 +
 tests/unit/query_plan_match_filter_return.cpp |  26 ++-
 tests/unit/query_planner.cpp                  |  23 +-
 9 files changed, 378 insertions(+), 225 deletions(-)

diff --git a/src/query/interpret/eval.hpp b/src/query/interpret/eval.hpp
index 075050e59..540eaeb9a 100644
--- a/src/query/interpret/eval.hpp
+++ b/src/query/interpret/eval.hpp
@@ -1,3 +1,4 @@
+/// @file
 #pragma once
 
 #include <algorithm>
@@ -554,4 +555,19 @@ class ExpressionEvaluator : public TreeVisitor<TypedValue> {
   const GraphView graph_view_;
 };
 
+/// A helper function for evaluating an expression that's an int.
+///
+/// @param what - Name of what's getting evaluated. Used for user feedback (via
+///               exception) when the evaluated value is not an int.
+/// @throw QueryRuntimeException if expression doesn't evaluate to an int.
+inline int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr,
+                           const std::string &what) {
+  TypedValue value = expr->Accept(*evaluator);
+  try {
+    return value.Value<int64_t>();
+  } catch (TypedValueException &e) {
+    throw QueryRuntimeException(what + " must be an int");
+  }
+}
+
 }  // namespace query
diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp
index 4ff5e8bde..90fce1f40 100644
--- a/src/query/plan/distributed.cpp
+++ b/src/query/plan/distributed.cpp
@@ -338,6 +338,49 @@ class IndependentSubtreeFinder : public DistributedOperatorVisitor {
     return true;
   }
 
+  bool PreVisit(DistributedExpandBfs &exp) override {
+    prev_ops_.push_back(&exp);
+    return true;
+  }
+  bool PostVisit(DistributedExpandBfs &exp) override {
+    prev_ops_.pop_back();
+    if (branch_.subtree) return true;
+    if (auto found = FindForbidden(exp.input_symbol())) {
+      SetBranch(exp.input(), &exp, *found);
+    }
+    if (exp.existing_node()) {
+      if (auto found = FindForbidden(exp.node_symbol())) {
+        SetBranch(exp.input(), &exp, *found);
+      }
+    }
+    CHECK(!FindForbidden(exp.edge_symbol()))
+        << "Expand uses an already used edge symbol.";
+    // Check for bounding expressions.
+    if (exp.lower_bound()) {
+      UsedSymbolsCollector collector(*symbol_table_);
+      exp.lower_bound()->Accept(collector);
+      if (auto found = ContainsForbidden(collector.symbols_)) {
+        SetBranch(exp.input(), &exp, *found);
+      }
+    }
+    if (exp.upper_bound()) {
+      UsedSymbolsCollector collector(*symbol_table_);
+      exp.upper_bound()->Accept(collector);
+      if (auto found = ContainsForbidden(collector.symbols_)) {
+        SetBranch(exp.input(), &exp, *found);
+      }
+    }
+    // Check for lambda expressions
+    if (exp.filter_lambda().expression) {
+      UsedSymbolsCollector collector(*symbol_table_);
+      exp.filter_lambda().expression->Accept(collector);
+      if (auto found = ContainsForbidden(collector.symbols_)) {
+        SetBranch(exp.input(), &exp, *found);
+      }
+    }
+    return true;
+  }
+
   bool PreVisit(ExpandUniquenessFilter<EdgeAccessor> &op) override {
     prev_ops_.push_back(&op);
     return true;
@@ -935,6 +978,18 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
     prev_ops_.push_back(&exp);
     return true;
   }
+  bool PostVisit(ExpandVariable &exp) override {
+    prev_ops_.pop_back();
+    if (exp.type() == EdgeAtom::Type::BREADTH_FIRST) {
+      auto distributed_bfs = std::make_unique<DistributedExpandBfs>(
+          exp.node_symbol(), exp.edge_symbol(), exp.direction(),
+          exp.edge_types(), exp.input(), exp.input_symbol(),
+          exp.existing_node(), exp.graph_view(), exp.lower_bound(),
+          exp.upper_bound(), exp.filter_lambda());
+      SetOnPrevious(std::move(distributed_bfs));
+    }
+    return true;
+  }
 
   // The following operators filter the frame or put something on it. They
   // should be worker local.
diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp
index e6ef3331f..0753212de 100644
--- a/src/query/plan/distributed_ops.cpp
+++ b/src/query/plan/distributed_ops.cpp
@@ -104,13 +104,37 @@ std::vector<Symbol> PullRemoteOrderBy::ModifiedSymbols(
   return input_->ModifiedSymbols(table);
 }
 
+DistributedExpandBfs::DistributedExpandBfs(
+    Symbol node_symbol, Symbol edge_symbol, EdgeAtom::Direction direction,
+    const std::vector<storage::EdgeType> &edge_types,
+    const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol,
+    bool existing_node, GraphView graph_view, Expression *lower_bound,
+    Expression *upper_bound, const ExpandVariable::Lambda &filter_lambda)
+    : ExpandCommon(node_symbol, edge_symbol, direction, edge_types, input,
+                   input_symbol, existing_node, graph_view),
+      lower_bound_(lower_bound),
+      upper_bound_(upper_bound),
+      filter_lambda_(filter_lambda) {}
+
+ACCEPT_WITH_INPUT(DistributedExpandBfs);
+
+std::vector<Symbol> DistributedExpandBfs::ModifiedSymbols(
+    const SymbolTable &table) const {
+  auto symbols = input_->ModifiedSymbols(table);
+  symbols.emplace_back(node_symbol());
+  symbols.emplace_back(edge_symbol());
+  return symbols;
+}
+
+//////////////////////////////////////////////////////////////////////
+// Cursors
+//////////////////////////////////////////////////////////////////////
+
 namespace {
 
-/** Helper class that wraps remote pulling for cursors that handle results from
- * distributed workers.
- *
- * The command_id should be the command_id at the initialization of a cursor.
- */
+// Helper class that wraps remote pulling for cursors that handle results from
+// distributed workers. The command_id should be the command_id at the
+// initialization of a cursor.
 class RemotePuller {
  public:
   RemotePuller(distributed::PullRpcClients *pull_clients,
@@ -686,6 +710,183 @@ class PullRemoteOrderByCursor : public Cursor {
   bool merge_initialized_ = false;
 };
 
+class DistributedExpandBfsCursor : public query::plan::Cursor {
+ public:
+  DistributedExpandBfsCursor(const DistributedExpandBfs &self,
+                             database::GraphDbAccessor &db)
+      : self_(self), db_(db), input_cursor_(self_.input()->MakeCursor(db)) {
+    // TODO: Pass in a DistributedGraphDb.
+    if (auto *distributed_db =
+            dynamic_cast<database::DistributedGraphDb *>(&db.db())) {
+      bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
+    }
+    CHECK(bfs_subcursor_clients_);
+    subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
+        db_.transaction_id(), self_.direction(), self_.edge_types(),
+        self_.graph_view());
+    bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
+    VLOG(10) << "BFS subcursors initialized";
+    pull_pos_ = subcursor_ids_.end();
+  }
+
+  ~DistributedExpandBfsCursor() {
+    VLOG(10) << "Removing BFS subcursors";
+    bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
+  }
+
+  bool Pull(Frame &frame, Context &context) override {
+    // TODO(mtomic): lambda filtering in distributed
+    if (self_.filter_lambda().expression) {
+      throw utils::NotYetImplemented("lambda filtering in distributed BFS");
+    }
+
+    // Evaluator for the filtering condition and expansion depth.
+    ExpressionEvaluator evaluator(frame, &context, self_.graph_view());
+
+    while (true) {
+      TypedValue last_vertex;
+
+      if (!skip_rest_) {
+        if (current_depth_ >= lower_bound_) {
+          for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) {
+            auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first,
+                                                       pull_pos_->second, &db_);
+            if (vertex) {
+              last_vertex = *vertex;
+              SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view());
+              break;
+            }
+            VLOG(10) << "Nothing to pull from " << pull_pos_->first;
+          }
+        }
+
+        if (last_vertex.IsVertex()) {
+          // Handle existence flag
+          if (self_.existing_node()) {
+            TypedValue &node = frame[self_.node_symbol()];
+            // Due to optional matching the existing node could be null
+            if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
+            // There is no point in traversing the rest of the graph because BFS
+            // can find only one path to a certain node.
+            skip_rest_ = true;
+          } else {
+            frame[self_.node_symbol()] = last_vertex;
+          }
+
+          VLOG(10) << "Expanded to vertex: " << last_vertex;
+
+          // Reconstruct path
+          std::vector<TypedValue> edges;
+
+          // During path reconstruction, edges crossing worker boundary are
+          // obtained from edge owner to reduce network traffic. If the last
+          // worker queried for its path segment owned the crossing edge,
+          // `current_vertex_addr` will be set. Otherwise, `current_edge_addr`
+          // will be set.
+          std::experimental::optional<storage::VertexAddress>
+              current_vertex_addr = last_vertex.ValueVertex().GlobalAddress();
+          std::experimental::optional<storage::EdgeAddress> current_edge_addr;
+
+          while (true) {
+            DCHECK(static_cast<bool>(current_edge_addr) ^
+                   static_cast<bool>(current_vertex_addr))
+                << "Exactly one of `current_edge_addr` or "
+                   "`current_vertex_addr` "
+                   "should be set during path reconstruction";
+            auto ret = current_edge_addr
+                           ? bfs_subcursor_clients_->ReconstructPath(
+                                 subcursor_ids_, *current_edge_addr, &db_)
+                           : bfs_subcursor_clients_->ReconstructPath(
+                                 subcursor_ids_, *current_vertex_addr, &db_);
+            edges.insert(edges.end(), ret.edges.begin(), ret.edges.end());
+            current_vertex_addr = ret.next_vertex;
+            current_edge_addr = ret.next_edge;
+            if (!current_vertex_addr && !current_edge_addr) break;
+          }
+          std::reverse(edges.begin(), edges.end());
+          for (auto &edge : edges)
+            SwitchAccessor(edge.ValueEdge(), self_.graph_view());
+          frame[self_.edge_symbol()] = std::move(edges);
+          return true;
+        }
+
+        // We're done pulling for this level
+        pull_pos_ = subcursor_ids_.begin();
+
+        // Try to expand again
+        if (current_depth_ < upper_bound_) {
+          VLOG(10) << "Trying to expand again...";
+          current_depth_++;
+          bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false);
+          if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) {
+            continue;
+          }
+        }
+      }
+
+      VLOG(10) << "Trying to get a new source...";
+      // We're done with this source, try getting a new one
+      if (!input_cursor_->Pull(frame, context)) return false;
+
+      auto vertex_value = frame[self_.input_symbol()];
+
+      // It is possible that the vertex is Null due to optional matching.
+      if (vertex_value.IsNull()) continue;
+
+      auto vertex = vertex_value.ValueVertex();
+      lower_bound_ = self_.lower_bound()
+                         ? EvaluateInt(&evaluator, self_.lower_bound(),
+                                       "Min depth in breadth-first expansion")
+                         : 1;
+      upper_bound_ = self_.upper_bound()
+                         ? EvaluateInt(&evaluator, self_.upper_bound(),
+                                       "Max depth in breadth-first expansion")
+                         : std::numeric_limits<int>::max();
+      skip_rest_ = false;
+
+      if (upper_bound_ < 1) {
+        throw QueryRuntimeException(
+            "Max depth in breadth-first expansion must be at least 1");
+      }
+
+      VLOG(10) << "Starting BFS from " << vertex << " with limits "
+               << lower_bound_ << ".." << upper_bound_;
+      bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true);
+      bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
+      current_depth_ = 1;
+    }
+  }
+
+  void Reset() override {
+    bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_);
+    pull_pos_ = subcursor_ids_.end();
+  }
+
+ private:
+  const DistributedExpandBfs &self_;
+  database::GraphDbAccessor &db_;
+  distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr};
+  const std::unique_ptr<query::plan::Cursor> input_cursor_;
+
+  // Depth bounds. Calculated on each pull from the input, the initial value
+  // is irrelevant.
+  int lower_bound_{-1};
+  int upper_bound_{-1};
+
+  // When set to true, expansion is restarted from a new source.
+  bool skip_rest_{false};
+
+  // Current depth. Reset for each new expansion, the initial value is
+  // irrelevant.
+  int current_depth_{-1};
+
+  // Map from worker IDs to their corresponding subcursors.
+  std::unordered_map<int16_t, int64_t> subcursor_ids_;
+
+  // Next worker master should try pulling from.
+  std::unordered_map<int16_t, int64_t>::iterator pull_pos_;
+};
+
 }  // namespace
 
 std::unique_ptr<Cursor> PullRemote::MakeCursor(
@@ -703,4 +904,9 @@ std::unique_ptr<Cursor> PullRemoteOrderBy::MakeCursor(
   return std::make_unique<PullRemoteOrderByCursor>(*this, db);
 }
 
+std::unique_ptr<Cursor> DistributedExpandBfs::MakeCursor(
+    database::GraphDbAccessor &db) const {
+  return std::make_unique<DistributedExpandBfsCursor>(*this, db);
+}
+
 }  // namespace query::plan
diff --git a/src/query/plan/distributed_ops.lcp b/src/query/plan/distributed_ops.lcp
index b00a4494d..3cd02bcbc 100644
--- a/src/query/plan/distributed_ops.lcp
+++ b/src/query/plan/distributed_ops.lcp
@@ -15,9 +15,11 @@ cpp<#
 class PullRemote;
 class Synchronize;
 class PullRemoteOrderBy;
+class DistributedExpandBfs;
 
 using DistributedOperatorCompositeVisitor =
-    ::utils::CompositeVisitor<PullRemote, Synchronize, PullRemoteOrderBy>;
+    ::utils::CompositeVisitor<PullRemote, Synchronize, PullRemoteOrderBy,
+                              DistributedExpandBfs>;
 
 /// Base class for visiting regular and distributed LogicalOperator instances.
 class DistributedOperatorVisitor : public HierarchicalLogicalOperatorVisitor,
@@ -169,5 +171,45 @@ by having only one result from each worker.")
   (:private #>cpp PullRemoteOrderBy() {} cpp<#)
   (:serialize :capnp))
 
+(lcp:define-class distributed-expand-bfs (logical-operator expand-common)
+  ((lower-bound "Expression *" :reader t
+                :documentation "Optional lower bound, default is 1"
+                :capnp-type "Ast.Tree" :capnp-init nil
+                :capnp-save #'save-ast-pointer
+                :capnp-load (load-ast-pointer "Expression *"))
+   (upper-bound "Expression *" :reader t
+                :documentation "Optional upper bound, default is infinity"
+                :capnp-type "Ast.Tree" :capnp-init nil
+                :capnp-save #'save-ast-pointer
+                :capnp-load (load-ast-pointer "Expression *"))
+   (filter-lambda "ExpandVariable::Lambda" :reader t
+                  :documentation "Filter that must be satisfied for expansion to succeed."
+                  :capnp-type "ExpandVariable.Lambda"))
+  (:documentation "BFS expansion operator suited for distributed execution.")
+  (:public
+   #>cpp
+   DistributedExpandBfs(Symbol node_symbol, Symbol edge_symbol,
+                        EdgeAtom::Direction direction,
+                        const std::vector<storage::EdgeType> &edge_types,
+                        const std::shared_ptr<LogicalOperator> &input,
+                        Symbol input_symbol, bool existing_node,
+                        GraphView graph_view, Expression *lower_bound,
+                        Expression *upper_bound,
+                        const ExpandVariable::Lambda &filter_lambda);
+
+   bool Accept(HierarchicalLogicalOperatorVisitor &visitor) override;
+   std::unique_ptr<Cursor> MakeCursor(
+       database::GraphDbAccessor &db) const override;
+   std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override;
+
+   bool HasSingleInput() const override { return true; }
+   std::shared_ptr<LogicalOperator> input() const override { return input_; }
+   void set_input(std::shared_ptr<LogicalOperator> input) override {
+     input_ = input;
+   }
+   cpp<#)
+  (:private #>cpp DistributedExpandBfs() {} cpp<#)
+  (:serialize :capnp :inherit-compose '(expand-common)))
+
 (lcp:pop-namespace)
 (lcp:pop-namespace)
diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index 69e7d77c0..c7be4eb8d 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -17,7 +17,6 @@
 #include "communication/result_stream_faker.hpp"
 #include "database/distributed_graph_db.hpp"
 #include "database/graph_db_accessor.hpp"
-#include "distributed/bfs_rpc_clients.hpp"
 #include "glue/auth.hpp"
 #include "glue/communication.hpp"
 #include "integrations/kafka/exceptions.hpp"
@@ -800,22 +799,6 @@ auto ExpandFromVertex(const VertexAccessor &vertex,
   return iter::chain.from_iterable(std::move(chain_elements));
 }
 
-/** A helper function for evaluating an expression that's an int.
- *
- * @param evaluator
- * @param expr
- * @param what - Name of what's getting evaluated. Used for user
- * feedback (via exception) when the evaluated value is not an int.
- */
-int64_t EvaluateInt(ExpressionEvaluator &evaluator, Expression *expr,
-                    const std::string &what) {
-  TypedValue value = expr->Accept(evaluator);
-  try {
-    return value.Value<int64_t>();
-  } catch (TypedValueException &e) {
-    throw QueryRuntimeException(what + " must be an int");
-  }
-}
 }  // namespace
 
 class ExpandVariableCursor : public Cursor {
@@ -899,7 +882,7 @@ class ExpandVariableCursor : public Cursor {
       // Evaluate the upper and lower bounds.
       ExpressionEvaluator evaluator(frame, &context, self_.graph_view_);
       auto calc_bound = [&evaluator](auto &bound) {
-        auto value = EvaluateInt(evaluator, bound, "Variable expansion bound");
+        auto value = EvaluateInt(&evaluator, bound, "Variable expansion bound");
         if (value < 0)
           throw QueryRuntimeException(
               "Variable expansion bound must be positive or zero");
@@ -1110,11 +1093,11 @@ class ExpandBfsCursor : public query::plan::Cursor {
         processed_.emplace(vertex, std::experimental::nullopt);
         expand_from_vertex(vertex);
         lower_bound_ = self_.lower_bound_
-                           ? EvaluateInt(evaluator, self_.lower_bound_,
+                           ? EvaluateInt(&evaluator, self_.lower_bound_,
                                          "Min depth in breadth-first expansion")
                            : 1;
         upper_bound_ = self_.upper_bound_
-                           ? EvaluateInt(evaluator, self_.upper_bound_,
+                           ? EvaluateInt(&evaluator, self_.upper_bound_,
                                          "Max depth in breadth-first expansion")
                            : std::numeric_limits<int>::max();
         skip_rest_ = false;
@@ -1199,183 +1182,6 @@ class ExpandBfsCursor : public query::plan::Cursor {
   std::deque<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_;
 };
 
-class DistributedExpandBfsCursor : public query::plan::Cursor {
- public:
-  DistributedExpandBfsCursor(const ExpandVariable &self,
-                             database::GraphDbAccessor &db)
-      : self_(self), db_(db), input_cursor_(self_.input_->MakeCursor(db)) {
-    // TODO: Pass in a DistributedGraphDb.
-    if (auto *distributed_db =
-            dynamic_cast<database::DistributedGraphDb *>(&db.db())) {
-      bfs_subcursor_clients_ = &distributed_db->bfs_subcursor_clients();
-    }
-    CHECK(bfs_subcursor_clients_);
-    subcursor_ids_ = bfs_subcursor_clients_->CreateBfsSubcursors(
-        db_.transaction_id(), self_.direction(), self_.edge_types(),
-        self_.graph_view());
-    bfs_subcursor_clients_->RegisterSubcursors(subcursor_ids_);
-    VLOG(10) << "BFS subcursors initialized";
-    pull_pos_ = subcursor_ids_.end();
-  }
-
-  ~DistributedExpandBfsCursor() {
-    VLOG(10) << "Removing BFS subcursors";
-    bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
-  }
-
-  bool Pull(Frame &frame, Context &context) override {
-    // TODO(mtomic): lambda filtering in distributed
-    if (self_.filter_lambda_.expression) {
-      throw utils::NotYetImplemented("lambda filtering in distributed BFS");
-    }
-
-    // Evaluator for the filtering condition and expansion depth.
-    ExpressionEvaluator evaluator(frame, &context, self_.graph_view_);
-
-    while (true) {
-      TypedValue last_vertex;
-
-      if (!skip_rest_) {
-        if (current_depth_ >= lower_bound_) {
-          for (; pull_pos_ != subcursor_ids_.end(); ++pull_pos_) {
-            auto vertex = bfs_subcursor_clients_->Pull(pull_pos_->first,
-                                                       pull_pos_->second, &db_);
-            if (vertex) {
-              last_vertex = *vertex;
-              SwitchAccessor(last_vertex.ValueVertex(), self_.graph_view_);
-              break;
-            }
-            VLOG(10) << "Nothing to pull from " << pull_pos_->first;
-          }
-        }
-
-        if (last_vertex.IsVertex()) {
-          // Handle existence flag
-          if (self_.existing_node_) {
-            TypedValue &node = frame[self_.node_symbol_];
-            // Due to optional matching the existing node could be null
-            if (node.IsNull() || (node != last_vertex).ValueBool()) continue;
-            // There is no point in traversing the rest of the graph because BFS
-            // can find only one path to a certain node.
-            skip_rest_ = true;
-          } else {
-            frame[self_.node_symbol_] = last_vertex;
-          }
-
-          VLOG(10) << "Expanded to vertex: " << last_vertex;
-
-          // Reconstruct path
-          std::vector<TypedValue> edges;
-
-          // During path reconstruction, edges crossing worker boundary are
-          // obtained from edge owner to reduce network traffic. If the last
-          // worker queried for its path segment owned the crossing edge,
-          // `current_vertex_addr` will be set. Otherwise, `current_edge_addr`
-          // will be set.
-          std::experimental::optional<storage::VertexAddress>
-              current_vertex_addr = last_vertex.ValueVertex().GlobalAddress();
-          std::experimental::optional<storage::EdgeAddress> current_edge_addr;
-
-          while (true) {
-            DCHECK(static_cast<bool>(current_edge_addr) ^
-                   static_cast<bool>(current_vertex_addr))
-                << "Exactly one of `current_edge_addr` or "
-                   "`current_vertex_addr` "
-                   "should be set during path reconstruction";
-            auto ret = current_edge_addr
-                           ? bfs_subcursor_clients_->ReconstructPath(
-                                 subcursor_ids_, *current_edge_addr, &db_)
-                           : bfs_subcursor_clients_->ReconstructPath(
-                                 subcursor_ids_, *current_vertex_addr, &db_);
-            edges.insert(edges.end(), ret.edges.begin(), ret.edges.end());
-            current_vertex_addr = ret.next_vertex;
-            current_edge_addr = ret.next_edge;
-            if (!current_vertex_addr && !current_edge_addr) break;
-          }
-          std::reverse(edges.begin(), edges.end());
-          for (auto &edge : edges)
-            SwitchAccessor(edge.ValueEdge(), self_.graph_view_);
-          frame[self_.edge_symbol_] = std::move(edges);
-          return true;
-        }
-
-        // We're done pulling for this level
-        pull_pos_ = subcursor_ids_.begin();
-
-        // Try to expand again
-        if (current_depth_ < upper_bound_) {
-          VLOG(10) << "Trying to expand again...";
-          current_depth_++;
-          bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, false);
-          if (bfs_subcursor_clients_->ExpandLevel(subcursor_ids_)) {
-            continue;
-          }
-        }
-      }
-
-      VLOG(10) << "Trying to get a new source...";
-      // We're done with this source, try getting a new one
-      if (!input_cursor_->Pull(frame, context)) return false;
-
-      auto vertex_value = frame[self_.input_symbol_];
-
-      // It is possible that the vertex is Null due to optional matching.
-      if (vertex_value.IsNull()) continue;
-
-      auto vertex = vertex_value.ValueVertex();
-      lower_bound_ = self_.lower_bound_
-                         ? EvaluateInt(evaluator, self_.lower_bound_,
-                                       "Min depth in breadth-first expansion")
-                         : 1;
-      upper_bound_ = self_.upper_bound_
-                         ? EvaluateInt(evaluator, self_.upper_bound_,
-                                       "Max depth in breadth-first expansion")
-                         : std::numeric_limits<int>::max();
-      skip_rest_ = false;
-
-      if (upper_bound_ < 1) {
-        throw QueryRuntimeException(
-            "Max depth in breadth-first expansion must be at least 1");
-      }
-
-      VLOG(10) << "Starting BFS from " << vertex << " with limits "
-               << lower_bound_ << ".." << upper_bound_;
-      bfs_subcursor_clients_->PrepareForExpand(subcursor_ids_, true);
-      bfs_subcursor_clients_->SetSource(subcursor_ids_, vertex.GlobalAddress());
-      current_depth_ = 1;
-    }
-  }
-
-  void Reset() override {
-    bfs_subcursor_clients_->ResetSubcursors(subcursor_ids_);
-    pull_pos_ = subcursor_ids_.end();
-  }
-
- private:
-  const ExpandVariable &self_;
-  database::GraphDbAccessor &db_;
-  distributed::BfsRpcClients *bfs_subcursor_clients_{nullptr};
-  const std::unique_ptr<query::plan::Cursor> input_cursor_;
-
-  // Depth bounds. Calculated on each pull from the input, the initial value
-  // is irrelevant.
-  int lower_bound_{-1};
-  int upper_bound_{-1};
-
-  // When set to true, expansion is restarted from a new source.
-  bool skip_rest_{false};
-
-  // Current depth. Reset for each new expansion, the initial value is
-  // irrelevant.
-  int current_depth_{-1};
-
-  // Map from worker IDs to their corresponding subcursors.
-  std::unordered_map<int16_t, int64_t> subcursor_ids_;
-
-  // Next worker master should try pulling from.
-  std::unordered_map<int16_t, int64_t>::iterator pull_pos_;
-};
-
 class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
  public:
   ExpandWeightedShortestPathCursor(const ExpandVariable &self,
@@ -1459,7 +1265,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
         SwitchAccessor(vertex, self_.graph_view_);
         if (self_.upper_bound_) {
           upper_bound_ =
-              EvaluateInt(evaluator, self_.upper_bound_,
+              EvaluateInt(&evaluator, self_.upper_bound_,
                           "Max depth in weighted shortest path expansion");
           upper_bound_set_ = true;
         } else {
@@ -1613,11 +1419,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
 std::unique_ptr<Cursor> ExpandVariable::MakeCursor(
     database::GraphDbAccessor &db) const {
   if (type_ == EdgeAtom::Type::BREADTH_FIRST) {
-    if (db.db().type() == database::GraphDb::Type::SINGLE_NODE) {
-      return std::make_unique<ExpandBfsCursor>(*this, db);
-    } else {
-      return std::make_unique<DistributedExpandBfsCursor>(*this, db);
-    }
+    return std::make_unique<ExpandBfsCursor>(*this, db);
   } else if (type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) {
     return std::make_unique<ExpandWeightedShortestPathCursor>(*this, db);
   } else {
diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp
index de6c90aa4..ce9f7da11 100644
--- a/src/query/plan/operator.lcp
+++ b/src/query/plan/operator.lcp
@@ -837,7 +837,7 @@ pulled.")
                                           '(single depth-first breadth-first weighted-shortest-path))
          :capnp-load (lcp:capnp-load-enum "::query::capnp::EdgeAtom::Type" "EdgeAtom::Type"
                                           '(single depth-first breadth-first weighted-shortest-path)))
-   (is-reverse :bool :documentation
+   (is-reverse :bool :reader t :documentation
                "True if the path should be written as expanding from node_symbol to input_symbol.")
    (lower-bound "Expression *" :reader t
                 :capnp-type "Ast.Tree" :capnp-init nil
@@ -855,7 +855,7 @@ pulled.")
                   :capnp-load (lcp:capnp-load-optional
                                "capnp::ExpandVariable::Lambda" "Lambda"
                                "[helper](const auto &reader) { Lambda val; val.Load(reader, helper); return val; }"))
-   (total-weight "std::experimental::optional<Symbol>"
+   (total-weight "std::experimental::optional<Symbol>" :reader t
                  :capnp-save (lcp:capnp-save-optional "::query::capnp::Symbol" "Symbol")
                  :capnp-load (lcp:capnp-load-optional "::query::capnp::Symbol" "Symbol")))
   (:documentation
@@ -936,14 +936,13 @@ pulled.")
    // the Cursors are not declared in the header because
    // it's edges_ and edges_it_ are decltyped using a helper function
    // that should be inaccessible (private class function won't compile)
-   friend class DistributedExpandBfsCursor;
    friend class ExpandVariableCursor;
    friend class ExpandBfsCursor;
    friend class ExpandWeightedShortestPathCursor;
 
    ExpandVariable() {}
    cpp<#)
-   (:serialize :capnp :inherit-compose '(expand-common)))
+  (:serialize :capnp :inherit-compose '(expand-common)))
 
 (lcp:define-class construct-named-path (logical-operator)
   ((input "std::shared_ptr<LogicalOperator>"
diff --git a/src/query/plan/pretty_print.cpp b/src/query/plan/pretty_print.cpp
index d3c5dfc4f..c80bcc376 100644
--- a/src/query/plan/pretty_print.cpp
+++ b/src/query/plan/pretty_print.cpp
@@ -80,6 +80,14 @@ class PlanPrinter final : public DistributedOperatorVisitor {
     return true;
   }
 
+  bool PreVisit(query::plan::DistributedExpandBfs &op) override {
+    WithPrintLn([&](auto &out) {
+      out << "* DistributedExpandBfs";
+      PrintExpand(out, op);
+    });
+    return true;
+  }
+
   bool PreVisit(query::plan::Produce &op) override {
     WithPrintLn([&](auto &out) {
       out << "* Produce {";
diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp
index d925d303a..def0cdb3a 100644
--- a/tests/unit/query_plan_match_filter_return.cpp
+++ b/tests/unit/query_plan_match_filter_return.cpp
@@ -17,6 +17,7 @@
 #include "distributed/updates_rpc_server.hpp"
 #include "query/context.hpp"
 #include "query/exceptions.hpp"
+#include "query/plan/distributed_ops.hpp"
 #include "query/plan/operator.hpp"
 
 #include "distributed_common.hpp"
@@ -924,13 +925,21 @@ class QueryPlanExpandBfs
     auto node_sym = symbol_table.CreateSymbol("node", true);
     auto edge_list_sym = symbol_table.CreateSymbol("edgelist_", true);
 
-    last_op = std::make_shared<ExpandVariable>(
-        node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction,
-        std::vector<storage::EdgeType>{}, false, LITERAL(min_depth),
-        LITERAL(max_depth), last_op, source_sym,
-        static_cast<bool>(existing_node),
-        ExpandVariable::Lambda{inner_edge, inner_node, where},
-        std::experimental::nullopt, std::experimental::nullopt, graph_view);
+    if (GetParam().first == TestType::DISTRIBUTED) {
+      last_op = std::make_shared<DistributedExpandBfs>(
+          node_sym, edge_list_sym, direction, std::vector<storage::EdgeType>{},
+          last_op, source_sym, !!existing_node, graph_view, LITERAL(min_depth),
+          LITERAL(max_depth),
+          ExpandVariable::Lambda{inner_edge, inner_node, where});
+    } else {
+      last_op = std::make_shared<ExpandVariable>(
+          node_sym, edge_list_sym, EdgeAtom::Type::BREADTH_FIRST, direction,
+          std::vector<storage::EdgeType>{}, false, LITERAL(min_depth),
+          LITERAL(max_depth), last_op, source_sym,
+          static_cast<bool>(existing_node),
+          ExpandVariable::Lambda{inner_edge, inner_node, where},
+          std::experimental::nullopt, std::experimental::nullopt, graph_view);
+    }
 
     Frame frame(symbol_table.max_position());
 
@@ -2394,8 +2403,7 @@ TEST(QueryPlan, ScanAllEqualsScanAllByLabelProperty) {
   };
 
   // Make sure there are `vertex_count` results when using scan all
-  auto count_with_scan_all = [&db, &label, &prop](int prop_value,
-                                                  int prop_count) {
+  auto count_with_scan_all = [&db, &prop](int prop_value, int prop_count) {
     AstStorage storage;
     SymbolTable symbol_table;
     auto dba_ptr = db.Access();
diff --git a/tests/unit/query_planner.cpp b/tests/unit/query_planner.cpp
index 24290e868..c582a8252 100644
--- a/tests/unit/query_planner.cpp
+++ b/tests/unit/query_planner.cpp
@@ -131,6 +131,7 @@ class PlanChecker : public DistributedOperatorVisitor {
   }
 
   PRE_VISIT(PullRemoteOrderBy);
+  PRE_VISIT(DistributedExpandBfs);
 
   VISIT(AuthHandler);
 
@@ -192,6 +193,7 @@ using ExpectOrderBy = OpChecker<OrderBy>;
 using ExpectUnwind = OpChecker<Unwind>;
 using ExpectDistinct = OpChecker<Distinct>;
 using ExpectShowStreams = OpChecker<ShowStreams>;
+using ExpectDistributedExpandBfs = OpChecker<DistributedExpandBfs>;
 
 class ExpectExpandVariable : public OpChecker<ExpandVariable> {
  public:
@@ -2241,9 +2243,24 @@ TYPED_TEST(TestPlanner, MatchBfs) {
   bfs->filter_lambda_.inner_node = IDENT("n");
   bfs->filter_lambda_.expression = IDENT("n");
   bfs->upper_bound_ = LITERAL(10);
-  QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN("r")));
-  CheckPlan<TypeParam>(storage, ExpectScanAll(), ExpectExpandBfs(),
-                       ExpectProduce());
+  auto *as_r = NEXPR("r", IDENT("r"));
+  QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n"), bfs, NODE("m"))), RETURN(as_r)));
+  auto symbol_table = MakeSymbolTable(*storage.query());
+  {
+    FakeDbAccessor dba;
+    auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
+    CheckPlan(planner.plan(), symbol_table, ExpectScanAll(), ExpectExpandBfs(),
+              ExpectProduce());
+  }
+  {
+    ExpectPullRemote pull({symbol_table.at(*as_r)});
+    auto expected = ExpectDistributed(
+        MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(),
+                     ExpectProduce(), pull),
+        MakeCheckers(ExpectScanAll(), ExpectDistributedExpandBfs(),
+                     ExpectProduce()));
+    CheckDistributedPlan<TypeParam>(storage, expected);
+  }
 }
 
 TYPED_TEST(TestPlanner, MatchDoubleScanToExpandExisting) {