diff --git a/src/query/plan/distributed.cpp b/src/query/plan/distributed.cpp
index 8bcd534e8..5505aedf2 100644
--- a/src/query/plan/distributed.cpp
+++ b/src/query/plan/distributed.cpp
@@ -45,7 +45,7 @@ struct Branch {
   std::shared_ptr<LogicalOperator> subtree;
   // Optional parent of the subtree which needs to come after the Cartesian
   // operator, because it depends on some other Cartesian branch.
-  // parent_start is shared_ptr, because the ownership will be transfered to
+  // parent_start is shared_ptr, because the ownership will be transferred to
   // some other operator.
   std::shared_ptr<LogicalOperator> parent_start;
   // parent_end is pointer, because we may only change its input.
@@ -54,6 +54,20 @@ struct Branch {
   std::experimental::optional<int64_t> depends_on;
 };
 
+// Find the subtree parent, below which no operator uses symbols found in the
+// `forbidden_symbols` set.
+//
+// Branch is returned {subtree, parent, depends_on}, where the parent may be
+// nullptr if the given `op` is already a subtree which doesn't use
+// `forbidden_symbols`. `depends_on` is set to the minimum index of the
+// `forbidden_symbols` that the operators above the `subtree` depend on. The
+// returned `parent` is therefore the last operator which depends on
+// `forbidden_symbols`.
+Branch FindIndependentSubtree(
+    const std::shared_ptr<LogicalOperator> &op,
+    const std::vector<std::vector<Symbol>> &forbidden_symbols,
+    SymbolTable *symbol_table, AstStorage *storage);
+
 // Find the subtree parent, below which no operator uses symbols found in the
 // `forbidden_symbols` set. The operator tree may be modified in cases when an
 // indexed lookup is split to regular lookup + filtering.
@@ -332,6 +346,19 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
     return true;
   }
 
+  bool PreVisit(ConstructNamedPath &op) override {
+    prev_ops_.push_back(&op);
+    return true;
+  }
+  bool PostVisit(ConstructNamedPath &op) override {
+    prev_ops_.pop_back();
+    if (branch_.subtree) return true;
+    if (auto found = ContainsForbidden(op.path_elements())) {
+      SetBranch(op.input(), &op, *found);
+    }
+    return true;
+  }
+
   bool PreVisit(Filter &op) override {
     prev_ops_.push_back(&op);
     return true;
@@ -366,7 +393,19 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
   }
 
   bool PreVisit(Optional &optional) override {
-    throw utils::NotYetImplemented("distributed Cartesian planning");
+    prev_ops_.push_back(&optional);
+    optional.input()->Accept(*this);
+    return false;
+  }
+  bool PostVisit(Optional &optional) override {
+    prev_ops_.pop_back();
+    if (branch_.subtree) return true;
+    auto optional_branch = FindIndependentSubtree(
+        optional.optional(), forbidden_symbols_, symbol_table_, storage_);
+    if (optional_branch.depends_on) {
+      SetBranch(optional.input(), &optional, *optional_branch.depends_on);
+    }
+    return true;
   }
 
   bool PreVisit(Unwind &unwind) override {
@@ -663,16 +702,7 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
   }
 };
 
-// Find the subtree parent, below which no operator uses symbols found in the
-// `forbidden_symbols` set.
-//
-// Branch is returned {subtree, parent, depends_on}, where the parent may be
-// nullptr if the given `op` is already a subtree which doesn't use
-// `forbidden_symbols`. `depends_on` is set to the minimum index of the
-// `forbidden_symbols` that the operators above the `subtree` depend on. The
-// returned `parent` is therefore the last operator which depends on
-// `forbidden_symbols`.
-auto FindIndependentSubtree(
+Branch FindIndependentSubtree(
     const std::shared_ptr<LogicalOperator> &op,
     const std::vector<std::vector<Symbol>> &forbidden_symbols,
     SymbolTable *symbol_table, AstStorage *storage) {
@@ -809,11 +839,25 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
   }
 
   void AddForCartesian(ScanAll *scan) {
+    if (!lhs_optional_symbols_.empty() && cartesian_symbols_.empty()) {
+      // If we are planning Optional, include the LHS symbols as initial
+      // Cartesian, because Optional behaves like a Cartesian.
+      cartesian_symbols_.emplace_back(lhs_optional_symbols_);
+    }
     cartesian_branches_.emplace_back(MakeCartesianBranch(scan->input()));
     // Collect modified symbols of the whole branch (independent subtree +
     // parent subtree).
-    cartesian_symbols_.emplace_back(
-        scan->input()->ModifiedSymbols(distributed_plan_.symbol_table));
+    auto modified_symbols =
+        scan->input()->ModifiedSymbols(distributed_plan_.symbol_table);
+    if (!lhs_optional_symbols_.empty() && cartesian_symbols_.size() == 1) {
+      // If we are planning Optional, its LHS symbols are set as initial
+      // Cartesian, so we need to extend with true initial symbols.
+      cartesian_symbols_[0].insert(cartesian_symbols_[0].end(),
+                                   modified_symbols.begin(),
+                                   modified_symbols.end());
+    } else {
+      cartesian_symbols_.emplace_back(modified_symbols);
+    }
     // Rewire the scan to be cut from the branch.
     scan->set_input(std::make_shared<Once>());
   }
@@ -902,8 +946,101 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
     prev_ops_.push_back(&op);
     return true;
   }
+
   bool PreVisit(Optional &op) override {
     prev_ops_.push_back(&op);
+    // Optional part will be planned separately in PostVisit.
+    op.input()->Accept(*this);
+    return false;
+  }
+
+  bool PostVisit(Optional &op) override {
+    prev_ops_.pop_back();
+    // Optional is kind of like a special case of Cartesian, for each LHS pull
+    // it will exhaust the RHS (optional part). We want to plan the RHS as if it
+    // was a regular stand-alone plan. For that we will create a new
+    // DistributedPlan and the corresponding planner. After the plan is
+    // complete, we need to merge it into our current plan. The behaviour is
+    // controlled as follows:
+    // 1) RHS plan doesn't have any worker plans
+    //    - We don't care where it is executed, so we don't perform any merges.
+    // 2) RHS plan does have worker plans
+    //    - We need to finish planning our worker plans and merge them
+    //    - We preserve RHS worker plans by appending to our distributed plan.
+    //    - Merge the independent subtree of RHS if it needs worker plan.
+    DistributedPlan optional_plan;
+    optional_plan.master_plan_id = distributed_plan_.master_plan_id;
+    // Use dummy produce to simulate an end of the query, thus forcing merge
+    // points to be planned (like Cartesian).
+    optional_plan.master_plan = std::make_unique<Produce>(
+        op.optional(), std::vector<NamedExpression *>{});
+    // Temporary transfer symbol table and storage
+    optional_plan.symbol_table = std::move(distributed_plan_.symbol_table);
+    optional_plan.ast_storage = std::move(distributed_plan_.ast_storage);
+    // Plan the optional branch
+    DistributedPlanner optional_planner(optional_plan, next_plan_id_);
+    CHECK(lhs_optional_symbols_.empty()) << "Unexpected nested Optional";
+    // Pass LHS symbols, so that the planner puts dependant operations after a
+    // merge point.
+    optional_planner.lhs_optional_symbols_ =
+        op.input()->ModifiedSymbols(optional_plan.symbol_table);
+    optional_plan.master_plan->Accept(optional_planner);
+    // Revert storage and symbol table
+    distributed_plan_.ast_storage = std::move(optional_plan.ast_storage);
+    distributed_plan_.symbol_table = std::move(optional_plan.symbol_table);
+    CHECK(!optional_planner.NeedsSynchronize()) << "Optional shouldn't write";
+    CHECK(!NeedsSynchronize()) << "Expected Synchronize before Optional";
+    if (optional_plan.worker_plans.empty() && !optional_planner.ShouldSplit()) {
+      // Case 1)
+      // Optional subtree doesn't create any worker plans (i.e. has no ScanAll),
+      // we continue as normal.
+      return true;
+    }
+    // Case 2)
+    // Since we have worker plans in optional subtree, we need to merge our
+    // plans on master, because Optional behaves like Cartesian.
+    if (!cartesian_branches_.empty()) {
+      Split(op, PlanCartesian(op.input()));
+    } else if (ShouldSplit()) {
+      auto input = op.input();
+      auto pull_id = AddWorkerPlan(input);
+      Split(op, std::make_shared<PullRemote>(
+                    input, pull_id,
+                    input->ModifiedSymbols(distributed_plan_.symbol_table)));
+    }
+    on_master_ = true;
+    // Add new worker plans from optional subtree
+    for (const auto &plan : optional_plan.worker_plans) {
+      distributed_plan_.worker_plans.emplace_back(plan);
+    }
+    CHECK(dynamic_cast<Produce *>(optional_plan.master_plan.get()));
+    if (optional_planner.on_master_) {
+      // This means that optional planned a Cartesian and the dependencies
+      // on LHS symbols should have been taken care of.
+      SetOnPrevious(std::make_unique<Optional>(
+          op.input(), optional_plan.master_plan->input(),
+          op.optional_symbols()));
+      return true;
+    }
+    CHECK(optional_planner.ShouldSplit());
+    // We need to plan a pull remote, but first determine the which subtree
+    // can be independently pulled.
+    auto branch = FindIndependentSubtree(
+        optional_plan.master_plan->input(),
+        {optional_planner.lhs_optional_symbols_},
+        &distributed_plan_.symbol_table, &distributed_plan_.ast_storage);
+    auto pull_id = AddWorkerPlan(branch.subtree);
+    // TODO: Possible optimization is to pull only needed symbols.
+    auto pull_remote = std::make_shared<PullRemote>(
+        branch.subtree, pull_id,
+        branch.subtree->ModifiedSymbols(distributed_plan_.symbol_table));
+    std::shared_ptr<LogicalOperator> new_opt = pull_remote;
+    if (branch.depends_on) {
+      branch.parent_end->set_input(pull_remote);
+      new_opt = branch.parent_start;
+    }
+    SetOnPrevious(
+        std::make_unique<Optional>(op.input(), new_opt, op.optional_symbols()));
     return true;
   }
 
@@ -1238,7 +1375,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
   }
   bool PostVisit(Accumulate &acc) override {
     prev_ops_.pop_back();
-    DCHECK(needs_synchronize_)
+    CHECK(needs_synchronize_)
         << "Expected Accumulate to follow a write operator";
     // Create a synchronization point. Use pull remote to fetch accumulated
     // symbols from workers. Accumulation is done through Synchronize, so we
@@ -1405,6 +1542,9 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
   // Symbols modified by the currently stored Cartesian branches. Each vector
   // corresponds to the above Cartesian branch.
   std::vector<std::vector<Symbol>> cartesian_symbols_;
+  // Symbols used in the left hand side (input) of the Optional operator. This
+  // is set only if we are planning the optional part separately.
+  std::vector<Symbol> lhs_optional_symbols_;
   bool has_scan_all_ = false;
   bool needs_synchronize_ = false;
   bool should_split_ = false;
@@ -1431,7 +1571,7 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
 
   void SetOnPrevious(std::unique_ptr<LogicalOperator> input_op) {
     auto *prev_op = prev_ops_.back();
-    DCHECK(prev_op)
+    CHECK(prev_op)
         << "SetOnPrevious should only be called when there is a previously "
            "visited operation";
     if (!prev_op->HasSingleInput())
diff --git a/tests/unit/distributed_interpretation.cpp b/tests/unit/distributed_interpretation.cpp
index 33d97c082..7a7eff06e 100644
--- a/tests/unit/distributed_interpretation.cpp
+++ b/tests/unit/distributed_interpretation.cpp
@@ -57,7 +57,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
 
 TEST_F(DistributedInterpretationTest, PullTest) {
   auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1");
-  ASSERT_EQ(results.size(), 3 * 21);
+  ASSERT_EQ(results.size(), 1 * 21);
 
   for (auto result : results) {
     ASSERT_EQ(result.size(), 1U);
diff --git a/tests/unit/query_planner.cpp b/tests/unit/query_planner.cpp
index c781cf898..d5d164977 100644
--- a/tests/unit/query_planner.cpp
+++ b/tests/unit/query_planner.cpp
@@ -1143,8 +1143,6 @@ TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) {
   auto as_p = AS("p");
   QUERY(SINGLE_QUERY(OPTIONAL_MATCH(pattern), RETURN("p", as_p)));
   auto symbol_table = MakeSymbolTable(*storage.query());
-  std::list<BaseOpChecker *> optional{new ExpectScanAll(), new ExpectExpand(),
-                                      new ExpectConstructNamedPath()};
   auto get_symbol = [&symbol_table](const auto *ast_node) {
     return symbol_table.at(*ast_node->identifier_);
   };
@@ -1152,13 +1150,15 @@ TYPED_TEST(TestPlanner, OptionalMatchNamedPatternReturn) {
                                        get_symbol(edge), get_symbol(node_m)};
   FakeDbAccessor dba;
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
+  std::list<BaseOpChecker *> optional{new ExpectScanAll(), new ExpectExpand(),
+                                      new ExpectConstructNamedPath()};
   CheckPlan(planner.plan(), symbol_table,
             ExpectOptional(optional_symbols, optional), ExpectProduce());
+  optional.push_back(new ExpectPullRemote(optional_symbols));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectOptional(optional_symbols, optional), ExpectProduce(),
-                   ExpectPullRemote({symbol_table.at(*as_p)})),
-      MakeCheckers(ExpectOptional(optional_symbols, optional),
-                   ExpectProduce()));
+      MakeCheckers(ExpectOptional(optional_symbols, optional), ExpectProduce()),
+      MakeCheckers(ExpectScanAll(), ExpectExpand(),
+                   ExpectConstructNamedPath()));
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
 }
 
@@ -1304,8 +1304,7 @@ TYPED_TEST(TestPlanner, MultiMatch) {
       MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectExpand(),
                    ExpectExpandUniquenessFilter<EdgeAccessor>(), right_pull);
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectProduce()),
       MakeCheckers(ExpectScanAll(), ExpectExpand()),
       MakeCheckers(ExpectScanAll(), ExpectExpand(), ExpectExpand(),
                    ExpectExpandUniquenessFilter<EdgeAccessor>()));
@@ -2647,9 +2646,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianCreateExpand) {
       MakeCheckers(ExpectScanAll(),
                    ExpectPullRemote({symbol_table.at(*node_b->identifier_)}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectCreateExpand(), ExpectSynchronize(false),
-                   ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectCreateExpand(),
+                   ExpectSynchronize(false), ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
@@ -2673,8 +2671,7 @@ TYPED_TEST(TestPlanner, DistributedCartesianExpand) {
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectExpand(),
                                  ExpectPullRemote({sym_b, sym_e, sym_c}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectProduce()),
       MakeCheckers(ExpectScanAll()),
       MakeCheckers(ExpectScanAll(), ExpectExpand()));
   FakeDbAccessor dba;
@@ -2696,8 +2693,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianExpandToExisting) {
   auto sym_b = symbol_table.at(*node_b->identifier_);
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectExpand(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectExpand(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
   FakeDbAccessor dba;
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
@@ -2718,8 +2715,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianExpandFromExisting) {
   auto sym_b = symbol_table.at(*node_b->identifier_);
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectExpand(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectExpand(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
   FakeDbAccessor dba;
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
@@ -2746,12 +2743,10 @@ TYPED_TEST(TestPlanner, DistributedCartesianFilter) {
   auto mid_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c}));
   auto mid_right_cart =
-      MakeCheckers(ExpectCartesian(std::move(mid_cart), std::move(right_cart)),
-                   ExpectFilter());
+      MakeCheckers(ExpectCartesian(mid_cart, right_cart), ExpectFilter());
   auto expected = ExpectDistributed(
-      MakeCheckers(
-          ExpectCartesian(std::move(left_cart), std::move(mid_right_cart)),
-          ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, mid_right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll(), ExpectFilter()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
   FakeDbAccessor dba;
@@ -2782,8 +2777,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByProperty) {
   auto right_cart =
       MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
@@ -2812,8 +2807,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByLowerBound) {
   auto right_cart =
       MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
@@ -2842,8 +2837,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianIndexedScanByUpperBound) {
   auto right_cart =
       MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
@@ -2881,8 +2876,8 @@ TEST(TestPlanner, DistributedCartesianIndexedScanByBothBounds) {
   auto right_cart =
       MakeCheckers(ExpectScanAllByLabel(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAllByLabel()));
   CheckDistributedPlan(*produce, symbol_table, expected);
 }
@@ -2920,8 +2915,8 @@ TEST(TestPlanner, DistributedCartesianIndexedScanByLowerWithBothBounds) {
                        label, prop, lower_bound, std::experimental::nullopt),
                    ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()),
       MakeCheckers(ExpectScanAllByLabelPropertyRange(
           label, prop, lower_bound, std::experimental::nullopt)));
@@ -2961,8 +2956,8 @@ TEST(TestPlanner, DistributedCartesianIndexedScanByUpperWithBothBounds) {
                        label, prop, std::experimental::nullopt, upper_bound),
                    ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectFilter(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()),
       MakeCheckers(ExpectScanAllByLabelPropertyRange(
           label, prop, std::experimental::nullopt, upper_bound)));
@@ -2982,11 +2977,11 @@ TYPED_TEST(TestPlanner, DistributedCartesianProduce) {
       MakeCheckers(ExpectScanAll(), ExpectProduce(), ExpectPullRemote({sym_a}));
   auto sym_b = symbol_table.at(*node_b->identifier_);
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
-  auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectFilter(), ExpectProduce()),
-      MakeCheckers(ExpectScanAll(), ExpectProduce()),
-      MakeCheckers(ExpectScanAll()));
+  auto expected =
+      ExpectDistributed(MakeCheckers(ExpectCartesian(left_cart, right_cart),
+                                     ExpectFilter(), ExpectProduce()),
+                        MakeCheckers(ExpectScanAll(), ExpectProduce()),
+                        MakeCheckers(ExpectScanAll()));
   FakeDbAccessor dba;
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
@@ -3005,8 +3000,8 @@ TYPED_TEST(TestPlanner, DistributedCartesianUnwind) {
   auto sym_b = symbol_table.at(*node_b->identifier_);
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
-                   ExpectUnwind(), ExpectProduce()),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart), ExpectUnwind(),
+                   ExpectProduce()),
       MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
   FakeDbAccessor dba;
   auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
@@ -3029,7 +3024,7 @@ TYPED_TEST(TestPlanner, DistributedCartesianCreateNode) {
   auto sym_c = symbol_table.at(*node_c->identifier_);
   auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c}));
   auto expected = ExpectDistributed(
-      MakeCheckers(ExpectCartesian(std::move(left_cart), std::move(right_cart)),
+      MakeCheckers(ExpectCartesian(left_cart, right_cart),
                    ExpectCreateNode(true), ExpectSynchronize(false)),
       MakeCheckers(ExpectScanAll(), ExpectCreateNode()),
       MakeCheckers(ExpectScanAll()));
@@ -3038,6 +3033,76 @@ TYPED_TEST(TestPlanner, DistributedCartesianCreateNode) {
   CheckDistributedPlan(planner.plan(), symbol_table, expected);
 }
 
+TYPED_TEST(TestPlanner, DistributedOptionalExpand) {
+  // Test MATCH (n) OPTIONAL MATCH (n)-[e]-(m) RETURN e;
+  AstStorage storage;
+  auto *node_n = NODE("n");
+  auto *edge_e = EDGE("e");
+  auto *node_m = NODE("m");
+  auto *ret_e = RETURN("e");
+  QUERY(SINGLE_QUERY(MATCH(PATTERN(node_n)),
+                     OPTIONAL_MATCH(PATTERN(node_n, edge_e, node_m)), ret_e));
+  auto symbol_table = MakeSymbolTable(*storage.query());
+  auto sym_e = symbol_table.at(*ret_e->body_.named_expressions[0]);
+  std::list<BaseOpChecker *> optional{new ExpectExpand()};
+  auto expected = ExpectDistributed(
+      MakeCheckers(ExpectScanAll(), ExpectOptional(optional), ExpectProduce(),
+                   ExpectPullRemote({sym_e})),
+      MakeCheckers(ExpectScanAll(), ExpectOptional(optional), ExpectProduce()));
+  FakeDbAccessor dba;
+  auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
+  CheckDistributedPlan(planner.plan(), symbol_table, expected);
+}
+
+TYPED_TEST(TestPlanner, DistributedOptionalCartesian) {
+  // Test MATCH (a) OPTIONAL MATCH (b), (c) WHERE b > a RETURN c;
+  AstStorage storage;
+  auto *node_a = NODE("a");
+  auto *node_b = NODE("b");
+  auto *node_c = NODE("c");
+  QUERY(SINGLE_QUERY(
+      MATCH(PATTERN(node_a)), OPTIONAL_MATCH(PATTERN(node_b), PATTERN(node_c)),
+      WHERE(GREATER(node_b->identifier_, node_a->identifier_)), RETURN("c")));
+  auto symbol_table = MakeSymbolTable(*storage.query());
+  auto sym_a = symbol_table.at(*node_a->identifier_);
+  auto sym_b = symbol_table.at(*node_b->identifier_);
+  auto sym_c = symbol_table.at(*node_c->identifier_);
+  auto left_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_b}));
+  auto right_cart = MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_c}));
+  std::list<BaseOpChecker *> optional{
+      new ExpectCartesian(left_cart, right_cart), new ExpectFilter()};
+  auto expected = ExpectDistributed(
+      MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}),
+                   ExpectOptional(optional), ExpectProduce()),
+      MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()),
+      MakeCheckers(ExpectScanAll()));
+  FakeDbAccessor dba;
+  auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
+  CheckDistributedPlan(planner.plan(), symbol_table, expected);
+}
+
+TYPED_TEST(TestPlanner, DistributedOptionalScanExpandExisting) {
+  // Test MATCH (a) OPTIONAL MATCH (b)-[e]-(a) RETURN e;
+  AstStorage storage;
+  auto *node_a = NODE("a");
+  auto *node_b = NODE("b");
+  QUERY(SINGLE_QUERY(MATCH(PATTERN(node_a)),
+                     OPTIONAL_MATCH(PATTERN(node_b, EDGE("e"), NODE("a"))),
+                     RETURN("e")));
+  auto symbol_table = MakeSymbolTable(*storage.query());
+  auto sym_a = symbol_table.at(*node_a->identifier_);
+  auto sym_b = symbol_table.at(*node_b->identifier_);
+  std::list<BaseOpChecker *> optional{
+      new ExpectScanAll(), new ExpectPullRemote({sym_b}), new ExpectExpand()};
+  auto expected = ExpectDistributed(
+      MakeCheckers(ExpectScanAll(), ExpectPullRemote({sym_a}),
+                   ExpectOptional(optional), ExpectProduce()),
+      MakeCheckers(ExpectScanAll()), MakeCheckers(ExpectScanAll()));
+  FakeDbAccessor dba;
+  auto planner = MakePlanner<TypeParam>(dba, storage, symbol_table);
+  CheckDistributedPlan(planner.plan(), symbol_table, expected);
+}
+
 TEST(CapnpSerial, Union) {
   std::vector<Symbol> left_symbols{
       Symbol("symbol", 1, true, Symbol::Type::Edge)};