From cbc9420c170ada593d80fa241c6533025f8a11a5 Mon Sep 17 00:00:00 2001
From: florijan <florijan@memgraph.io>
Date: Thu, 22 Feb 2018 17:20:17 +0100
Subject: [PATCH] Support distributed query::plan::CreateExpand

Summary:
- The expansion vertex gets created on the origin's worker
- The edge automatically gets created wherever necessary
- Vertex creation logic reuse
- End to end test

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1227
---
 src/query/plan/operator.cpp               | 124 +++++-----
 src/query/plan/operator.hpp               |  20 +-
 tests/unit/distributed_common.hpp         |   6 +
 tests/unit/distributed_graph_db.cpp       | 240 --------------------
 tests/unit/distributed_interpretation.cpp |  98 ++++----
 tests/unit/distributed_query_plan.cpp     | 264 ++++++++++++++++++++++
 6 files changed, 388 insertions(+), 364 deletions(-)
 create mode 100644 tests/unit/distributed_query_plan.cpp

diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index 670a1985e..1dfe21c91 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -4,6 +4,7 @@
 #include <future>
 #include <limits>
 #include <queue>
+#include <random>
 #include <string>
 #include <type_traits>
 #include <utility>
@@ -102,6 +103,65 @@ CreateNode::CreateNode(const std::shared_ptr<LogicalOperator> &input,
       node_atom_(node_atom),
       on_random_worker_(on_random_worker) {}
 
+namespace {
+
+// Returns a random worker id. Worker ID is obtained from the Db.
+int RandomWorkerId(database::GraphDb &db) {
+  thread_local std::mt19937 gen_{std::random_device{}()};
+  thread_local std::uniform_int_distribution<int> rand_;
+
+  auto worker_ids = db.GetWorkerIds();
+  return worker_ids[rand_(gen_) % worker_ids.size()];
+}
+
+// Creates a vertex on this GraphDb. Returns a reference to vertex placed on the
+// frame.
+VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame,
+                                  Context &context) {
+  auto &dba = context.db_accessor_;
+  auto new_node = dba.InsertVertex();
+  for (auto label : node_atom->labels_) new_node.add_label(label);
+
+  // Evaluator should use the latest accessors, as modified in this query, when
+  // setting properties on new nodes.
+  ExpressionEvaluator evaluator(frame, context.parameters_,
+                                context.symbol_table_, dba, GraphView::NEW);
+  for (auto &kv : node_atom->properties_)
+    PropsSetChecked(new_node, kv.first.second, kv.second->Accept(evaluator));
+  frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
+  return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
+}
+
+// Creates a vertex on the GraphDb with the given worker_id. Can be this worker.
+VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
+                                     Frame &frame, Context &context) {
+  auto &dba = context.db_accessor_;
+
+  if (worker_id == dba.db().WorkerId())
+    return CreateLocalVertex(node_atom, frame, context);
+
+  std::unordered_map<storage::Property, query::TypedValue> properties;
+
+  // Evaluator should use the latest accessors, as modified in this query, when
+  // setting properties on new nodes.
+  ExpressionEvaluator evaluator(frame, context.parameters_,
+                                context.symbol_table_, dba, GraphView::NEW);
+  for (auto &kv : node_atom->properties_) {
+    auto value = kv.second->Accept(evaluator);
+    if (!value.IsPropertyValue()) {
+      throw QueryRuntimeException("'{}' cannot be used as a property value.",
+                                  value.type());
+    }
+    properties.emplace(kv.first.second, std::move(value));
+  }
+
+  auto new_node =
+      dba.InsertVertexIntoRemote(worker_id, node_atom->labels_, properties);
+  frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
+  return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
+}
+}  // namespace
+
 ACCEPT_WITH_INPUT(CreateNode)
 
 std::unique_ptr<Cursor> CreateNode::MakeCursor(
@@ -123,15 +183,10 @@ CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self,
 bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) {
   if (input_cursor_->Pull(frame, context)) {
     if (self_.on_random_worker_) {
-      auto worker_ids = context.db_accessor_.db().GetWorkerIds();
-      auto worker_id = worker_ids[rand_(gen_) % worker_ids.size()];
-      if (worker_id == context.db_accessor_.db().WorkerId()) {
-        CreateLocally(frame, context);
-      } else {
-        CreateOnWorker(worker_id, frame, context);
-      }
+      CreateVertexOnWorker(RandomWorkerId(db_.db()), self_.node_atom_, frame,
+                           context);
     } else {
-      CreateLocally(frame, context);
+      CreateLocalVertex(self_.node_atom_, frame, context);
     }
     return true;
   }
@@ -140,43 +195,6 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) {
 
 void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); }
 
-void CreateNode::CreateNodeCursor::CreateLocally(Frame &frame,
-                                                 Context &context) {
-  auto new_node = db_.InsertVertex();
-  for (auto label : self_.node_atom_->labels_) new_node.add_label(label);
-
-  // Evaluator should use the latest accessors, as modified in this query, when
-  // setting properties on new nodes.
-  ExpressionEvaluator evaluator(frame, context.parameters_,
-                                context.symbol_table_, db_, GraphView::NEW);
-  for (auto &kv : self_.node_atom_->properties_)
-    PropsSetChecked(new_node, kv.first.second, kv.second->Accept(evaluator));
-  frame[context.symbol_table_.at(*self_.node_atom_->identifier_)] = new_node;
-}
-
-void CreateNode::CreateNodeCursor::CreateOnWorker(int worker_id, Frame &frame,
-                                                  Context &context) {
-  std::unordered_map<storage::Property, query::TypedValue> properties;
-
-  // Evaluator should use the latest accessors, as modified in this query, when
-  // setting properties on new nodes.
-  ExpressionEvaluator evaluator(frame, context.parameters_,
-                                context.symbol_table_, db_, GraphView::NEW);
-  for (auto &kv : self_.node_atom_->properties_) {
-    auto value = kv.second->Accept(evaluator);
-    if (!value.IsPropertyValue()) {
-      throw QueryRuntimeException("'{}' cannot be used as a property value.",
-                                  value.type());
-    }
-    properties.emplace(kv.first.second, std::move(value));
-  }
-
-  auto new_node = context.db_accessor_.InsertVertexIntoRemote(
-      worker_id, self_.node_atom_->labels_, properties);
-
-  frame[context.symbol_table_.at(*self_.node_atom_->identifier_)] = new_node;
-}
-
 CreateExpand::CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom,
                            const std::shared_ptr<LogicalOperator> &input,
                            Symbol input_symbol, bool existing_node)
@@ -221,7 +239,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
   v1.SwitchNew();
 
   // get the destination vertex (possibly an existing node)
-  auto &v2 = OtherVertex(frame, context.symbol_table_, evaluator);
+  auto &v2 = OtherVertex(v1.GlobalAddress().worker_id(), frame, context);
   v2.SwitchNew();
 
   // create an edge between the two nodes
@@ -246,23 +264,15 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
 void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); }
 
 VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
-    Frame &frame, const SymbolTable &symbol_table,
-    ExpressionEvaluator &evaluator) {
+    int worker_id, Frame &frame, Context &context) {
   if (self_.existing_node_) {
     const auto &dest_node_symbol =
-        symbol_table.at(*self_.node_atom_->identifier_);
+        context.symbol_table_.at(*self_.node_atom_->identifier_);
     TypedValue &dest_node_value = frame[dest_node_symbol];
     ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex);
     return dest_node_value.Value<VertexAccessor>();
   } else {
-    // the node does not exist, it needs to be created
-    auto node = db_.InsertVertex();
-    for (auto label : self_.node_atom_->labels_) node.add_label(label);
-    for (auto kv : self_.node_atom_->properties_)
-      PropsSetChecked(node, kv.first.second, kv.second->Accept(evaluator));
-    auto symbol = symbol_table.at(*self_.node_atom_->identifier_);
-    frame[symbol] = node;
-    return frame[symbol].Value<VertexAccessor>();
+    return CreateVertexOnWorker(worker_id, self_.node_atom_, frame, context);
   }
 }
 
diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp
index c3ba23e95..5d5d5f0e2 100644
--- a/src/query/plan/operator.hpp
+++ b/src/query/plan/operator.hpp
@@ -5,7 +5,6 @@
 #include <experimental/optional>
 #include <future>
 #include <memory>
-#include <random>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
@@ -274,16 +273,6 @@ class CreateNode : public LogicalOperator {
     const CreateNode &self_;
     database::GraphDbAccessor &db_;
     const std::unique_ptr<Cursor> input_cursor_;
-
-    // For random worker choosing in distributed.
-    std::mt19937 gen_{std::random_device{}()};
-    std::uniform_int_distribution<int> rand_;
-
-    /** Creates a single node locally and places it in the frame. */
-    void CreateLocally(Frame &, Context &);
-
-    /** Creates a single node on the given worker and places it in the frame. */
-    void CreateOnWorker(int worker_id, Frame &, Context &);
   };
 
   friend class boost::serialization::access;
@@ -372,12 +361,9 @@ class CreateExpand : public LogicalOperator {
     database::GraphDbAccessor &db_;
     const std::unique_ptr<Cursor> input_cursor_;
 
-    /**
-     *  Helper function for getting an existing node or creating a new one.
-     *  @return The newly created or already existing node.
-     */
-    VertexAccessor &OtherVertex(Frame &frame, const SymbolTable &symbol_table,
-                                ExpressionEvaluator &evaluator);
+    /** Gets the existing node (if existing_node_ == true), or creates a new
+     * node (on the given worker) and returns it. */
+    VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context);
 
     /**
      * Helper function for creating an edge and adding it
diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp
index 57c0dbdce..3fed8029e 100644
--- a/tests/unit/distributed_common.hpp
+++ b/tests/unit/distributed_common.hpp
@@ -109,6 +109,12 @@ class DistributedGraphDbTest : public ::testing::Test {
     return edge_ga;
   }
 
+  auto VertexCount(database::GraphDb &db) {
+    database::GraphDbAccessor dba{db};
+    auto vertices = dba.Vertices(false);
+    return std::distance(vertices.begin(), vertices.end());
+  };
+
  private:
   std::unique_ptr<database::Master> master_;
   std::vector<std::unique_ptr<WorkerInThread>> workers_;
diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp
index 68d4d48a6..77ab5c9fd 100644
--- a/tests/unit/distributed_graph_db.cpp
+++ b/tests/unit/distributed_graph_db.cpp
@@ -119,164 +119,6 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) {
   check_for_worker(worker(2));
 }
 
-TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
-  GraphDbAccessor dba{master()};
-  Context ctx{dba};
-  SymbolGenerator symbol_generator{ctx.symbol_table_};
-  AstTreeStorage storage;
-
-  // Query plan for: UNWIND [42, true, "bla", 1, 2] as x RETURN x
-  using namespace query;
-  auto list =
-      LIST(LITERAL(42), LITERAL(true), LITERAL("bla"), LITERAL(1), LITERAL(2));
-  auto x = ctx.symbol_table_.CreateSymbol("x", true);
-  auto unwind = std::make_shared<plan::Unwind>(nullptr, list, x);
-  auto x_expr = IDENT("x");
-  ctx.symbol_table_[*x_expr] = x;
-  auto x_ne = NEXPR("x", x_expr);
-  ctx.symbol_table_[*x_ne] = ctx.symbol_table_.CreateSymbol("x_ne", true);
-  auto produce = MakeProduce(unwind, x_ne);
-
-  // Test that the plan works locally.
-  auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
-  ASSERT_EQ(results.size(), 5);
-
-  const int plan_id = 42;
-  master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
-
-  Parameters params;
-  std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
-  auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
-                                               int worker_id) {
-    return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
-                                                     params, symbols, false, 3);
-  };
-  auto expect_first_batch = [](auto &batch) {
-    EXPECT_EQ(batch.pull_state,
-              distributed::RemotePullState::CURSOR_IN_PROGRESS);
-    ASSERT_EQ(batch.frames.size(), 3);
-    ASSERT_EQ(batch.frames[0].size(), 1);
-    EXPECT_EQ(batch.frames[0][0].ValueInt(), 42);
-    EXPECT_EQ(batch.frames[1][0].ValueBool(), true);
-    EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
-  };
-  auto expect_second_batch = [](auto &batch) {
-    EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED);
-    ASSERT_EQ(batch.frames.size(), 2);
-    ASSERT_EQ(batch.frames[0].size(), 1);
-    EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
-    EXPECT_EQ(batch.frames[1][0].ValueInt(), 2);
-  };
-
-  GraphDbAccessor dba_1{master()};
-  GraphDbAccessor dba_2{master()};
-  for (int worker_id : {1, 2}) {
-    // TODO flor, proper test async here.
-    auto tx1_batch1 = remote_pull(dba_1, worker_id).get();
-    expect_first_batch(tx1_batch1);
-    auto tx2_batch1 = remote_pull(dba_2, worker_id).get();
-    expect_first_batch(tx2_batch1);
-    auto tx2_batch2 = remote_pull(dba_2, worker_id).get();
-    expect_second_batch(tx2_batch2);
-    auto tx1_batch2 = remote_pull(dba_1, worker_id).get();
-    expect_second_batch(tx1_batch2);
-  }
-}
-
-TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
-  // Create some data on the master and both workers. Eeach edge (3 of them) and
-  // vertex (6 of them) will be uniquely identified with their worker id and
-  // sequence ID, so we can check we retrieved all.
-  storage::Property prop;
-  {
-    GraphDbAccessor dba{master()};
-    prop = dba.Property("prop");
-    auto create_data = [prop](GraphDbAccessor &dba, int worker_id) {
-      auto v1 = dba.InsertVertex();
-      v1.PropsSet(prop, worker_id * 10);
-      auto v2 = dba.InsertVertex();
-      v2.PropsSet(prop, worker_id * 10 + 1);
-      auto e12 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
-      e12.PropsSet(prop, worker_id * 10 + 2);
-    };
-    create_data(dba, 0);
-    GraphDbAccessor dba_w1{worker(1), dba.transaction_id()};
-    create_data(dba_w1, 1);
-    GraphDbAccessor dba_w2{worker(2), dba.transaction_id()};
-    create_data(dba_w2, 2);
-    dba.Commit();
-  }
-
-  GraphDbAccessor dba{master()};
-  Context ctx{dba};
-  SymbolGenerator symbol_generator{ctx.symbol_table_};
-  AstTreeStorage storage;
-
-  // Query plan for: MATCH p = (n)-[r]->(m) return [n, r], m, p
-  // Use this query to test graph elements are transferred correctly in
-  // collections too.
-  auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
-  auto r_m =
-      MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
-                 EdgeAtom::Direction::OUT, {}, "m", false, GraphView::OLD);
-  auto p_sym = ctx.symbol_table_.CreateSymbol("p", true);
-  auto p = std::make_shared<query::plan::ConstructNamedPath>(
-      r_m.op_, p_sym,
-      std::vector<Symbol>{n.sym_, r_m.edge_sym_, r_m.node_sym_});
-  auto return_n = IDENT("n");
-  ctx.symbol_table_[*return_n] = n.sym_;
-  auto return_r = IDENT("r");
-  ctx.symbol_table_[*return_r] = r_m.edge_sym_;
-  auto return_n_r = NEXPR("[n, r]", LIST(return_n, return_r));
-  ctx.symbol_table_[*return_n_r] = ctx.symbol_table_.CreateSymbol("", true);
-  auto return_m = NEXPR("m", IDENT("m"));
-  ctx.symbol_table_[*return_m->expression_] = r_m.node_sym_;
-  ctx.symbol_table_[*return_m] = ctx.symbol_table_.CreateSymbol("", true);
-  auto return_p = NEXPR("p", IDENT("p"));
-  ctx.symbol_table_[*return_p->expression_] = p_sym;
-  ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true);
-  auto produce = MakeProduce(p, return_n_r, return_m, return_p);
-
-  auto check_result = [prop](
-      int worker_id,
-      const std::vector<std::vector<query::TypedValue>> &frames) {
-    int offset = worker_id * 10;
-    ASSERT_EQ(frames.size(), 1);
-    auto &row = frames[0];
-    ASSERT_EQ(row.size(), 3);
-    auto &list = row[0].ValueList();
-    ASSERT_EQ(list.size(), 2);
-    ASSERT_EQ(list[0].ValueVertex().PropsAt(prop).Value<int64_t>(), offset);
-    ASSERT_EQ(list[1].ValueEdge().PropsAt(prop).Value<int64_t>(), offset + 2);
-    ASSERT_EQ(row[1].ValueVertex().PropsAt(prop).Value<int64_t>(), offset + 1);
-    auto &path = row[2].ValuePath();
-    ASSERT_EQ(path.size(), 1);
-    ASSERT_EQ(path.vertices()[0].PropsAt(prop).Value<int64_t>(), offset);
-    ASSERT_EQ(path.edges()[0].PropsAt(prop).Value<int64_t>(), offset + 2);
-    ASSERT_EQ(path.vertices()[1].PropsAt(prop).Value<int64_t>(), offset + 1);
-  };
-
-  // Test that the plan works locally.
-  auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
-  check_result(0, results);
-
-  const int plan_id = 42;
-  master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
-
-  Parameters params;
-  std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
-                                     ctx.symbol_table_[*return_m], p_sym};
-  auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
-                                               int worker_id) {
-    return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
-                                                     params, symbols, false, 3);
-  };
-  auto future_w1_results = remote_pull(dba, 1);
-  auto future_w2_results = remote_pull(dba, 2);
-  check_result(1, future_w1_results.get().frames);
-  check_result(2, future_w2_results.get().frames);
-}
-
 TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
   storage::Label label;
   storage::Property property;
@@ -334,85 +176,3 @@ TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
   VertexAccessor v_in_w2{v_ga, dba_w2};
   EXPECT_EQ(v_in_w2.PropsAt(prop).Value<int64_t>(), 42);
 }
-
-TEST_F(DistributedGraphDbTest, Synchronize) {
-  auto from = InsertVertex(worker(1));
-  auto to = InsertVertex(worker(2));
-  InsertEdge(from, to, "et");
-
-  // Query: MATCH (n)--(m) SET m.prop = 2 RETURN n.prop
-  // This query ensures that a remote update gets applied and the local stuff
-  // gets reconstructed.
-  auto &db = master();
-  GraphDbAccessor dba{db};
-  Context ctx{dba};
-  SymbolGenerator symbol_generator{ctx.symbol_table_};
-  AstTreeStorage storage;
-  // MATCH
-  auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
-  auto r_m =
-      MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
-                 EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD);
-
-  // SET
-  auto literal = LITERAL(42);
-  auto prop = PROPERTY_PAIR("prop");
-  auto m_p = PROPERTY_LOOKUP("m", prop);
-  ctx.symbol_table_[*m_p->expression_] = r_m.node_sym_;
-  auto set_m_p = std::make_shared<plan::SetProperty>(r_m.op_, m_p, literal);
-
-  const int plan_id = 42;
-  master().plan_dispatcher().DispatchPlan(plan_id, set_m_p, ctx.symbol_table_);
-
-  // Master-side PullRemote, Synchronize
-  auto pull_remote = std::make_shared<query::plan::PullRemote>(
-      nullptr, plan_id, std::vector<Symbol>{n.sym_});
-  auto synchronize =
-      std::make_shared<query::plan::Synchronize>(set_m_p, pull_remote, true);
-
-  // RETURN
-  auto n_p =
-      storage.Create<PropertyLookup>(storage.Create<Identifier>("n"), prop);
-  ctx.symbol_table_[*n_p->expression_] = n.sym_;
-  auto return_n_p = NEXPR("n.prop", n_p);
-  auto return_n_p_sym = ctx.symbol_table_.CreateSymbol("n.p", true);
-  ctx.symbol_table_[*return_n_p] = return_n_p_sym;
-  auto produce = MakeProduce(synchronize, return_n_p);
-
-  auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
-  ASSERT_EQ(results.size(), 2);
-  ASSERT_EQ(results[0].size(), 1);
-  EXPECT_EQ(results[0][0].ValueInt(), 42);
-  ASSERT_EQ(results[1].size(), 1);
-  EXPECT_EQ(results[1][0].ValueInt(), 42);
-
-  // TODO test without advance command?
-}
-
-TEST_F(DistributedGraphDbTest, Create) {
-  // Query: UNWIND range(0, 1000) as x CREATE ()
-  auto &db = master();
-  GraphDbAccessor dba{db};
-  Context ctx{dba};
-  SymbolGenerator symbol_generator{ctx.symbol_table_};
-  AstTreeStorage storage;
-  auto range = FN("range", LITERAL(0), LITERAL(1000));
-  auto x = ctx.symbol_table_.CreateSymbol("x", true);
-  auto unwind = std::make_shared<plan::Unwind>(nullptr, range, x);
-  auto node = NODE("n");
-  ctx.symbol_table_[*node->identifier_] =
-      ctx.symbol_table_.CreateSymbol("n", true);
-  auto create = std::make_shared<query::plan::CreateNode>(unwind, node, true);
-  PullAll(create, dba, ctx.symbol_table_);
-  dba.Commit();
-
-  auto vertex_count = [](database::GraphDb &db) {
-    database::GraphDbAccessor dba{db};
-    auto vertices = dba.Vertices(false);
-    return std::distance(vertices.begin(), vertices.end());
-  };
-
-  EXPECT_GT(vertex_count(master()), 200);
-  EXPECT_GT(vertex_count(worker(1)), 200);
-  EXPECT_GT(vertex_count(worker(2)), 200);
-}
diff --git a/tests/unit/distributed_interpretation.cpp b/tests/unit/distributed_interpretation.cpp
index 6cbae6ae3..10379ce6f 100644
--- a/tests/unit/distributed_interpretation.cpp
+++ b/tests/unit/distributed_interpretation.cpp
@@ -1,5 +1,5 @@
-#include "gtest/gtest.h"
 #include "gmock/gmock.h"
+#include "gtest/gtest.h"
 
 #include "database/graph_db.hpp"
 #include "distributed_common.hpp"
@@ -10,46 +10,50 @@
 using namespace distributed;
 using namespace database;
 
-TEST_F(DistributedGraphDbTest, RemotePullTest) {
-  using Interpreter = query::Interpreter;
-  std::map<std::string, query::TypedValue> params = {};
+class DistributedInterpretationTest : public DistributedGraphDbTest {
+ protected:
+  auto Run(const std::string &query) {
+    std::map<std::string, query::TypedValue> params = {};
+    GraphDbAccessor dba(master());
+    ResultStreamFaker result;
+    query::Interpreter interpreter_;
+    interpreter_(query, dba, params, false).PullAll(result);
+    dba.Commit();
+    return result.GetResults();
+  }
+};
 
-  GraphDbAccessor dba(master());
+TEST_F(DistributedInterpretationTest, RemotePullTest) {
+  auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1");
+  ASSERT_EQ(results.size(), 3 * 21);
 
-  ResultStreamFaker result;
-  Interpreter interpreter_;
-  interpreter_("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1", dba,
-               params, false)
-      .PullAll(result);
-
-  // Three instances (master + 2 workers) with 21 result each.
-  uint expected_results = 3U * 21;
-  ASSERT_EQ(result.GetHeader().size(), 1U);
-  EXPECT_EQ(result.GetHeader()[0], "1");
-  ASSERT_EQ(result.GetResults().size(), expected_results);
-
-  for (uint i = 0; i < expected_results; ++i) {
-    ASSERT_EQ(result.GetResults()[i].size(), 1U);
-    ASSERT_EQ(result.GetResults()[i][0].Value<int64_t>(), 1);
+  for (auto result : results) {
+    ASSERT_EQ(result.size(), 1U);
+    ASSERT_EQ(result[0].ValueInt(), 1);
   }
 }
 
-TEST_F(DistributedGraphDbTest, RemotePullNoResultsTest) {
-  using Interpreter = query::Interpreter;
-  std::map<std::string, query::TypedValue> params = {};
-
-  GraphDbAccessor dba(master());
-
-  ResultStreamFaker result;
-  Interpreter interpreter_;
-  interpreter_("MATCH (n) RETURN n", dba, params, false).PullAll(result);
-
-  ASSERT_EQ(result.GetHeader().size(), 1U);
-  EXPECT_EQ(result.GetHeader()[0], "n");
-  ASSERT_EQ(result.GetResults().size(), 0U);
+TEST_F(DistributedInterpretationTest, RemotePullNoResultsTest) {
+  auto results = Run("MATCH (n) RETURN n");
+  ASSERT_EQ(results.size(), 0U);
 }
 
-TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
+TEST_F(DistributedInterpretationTest, CreateExpand) {
+  InsertVertex(master());
+  InsertVertex(worker(1));
+  InsertVertex(worker(1));
+  InsertVertex(worker(2));
+  InsertVertex(worker(2));
+  InsertVertex(worker(2));
+
+  Run("MATCH (n) CREATE (n)-[:T]->(m) RETURN n");
+
+  EXPECT_EQ(VertexCount(master()), 2);
+  EXPECT_EQ(VertexCount(worker(1)), 4);
+  EXPECT_EQ(VertexCount(worker(2)), 6);
+}
+
+TEST_F(DistributedInterpretationTest, RemoteExpandTest2) {
   // Make a fully connected graph with vertices scattered across master and
   // worker storage.
   // Vertex count is low, because test gets exponentially slower. The expected
@@ -70,21 +74,15 @@ TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
   };
   std::vector<std::string> edge_types;
   edge_types.reserve(vertices.size() * vertices.size());
-  for (int i = 0; i < vertices.size(); ++i) {
-    for (int j = 0; j < vertices.size(); ++j) {
+  for (size_t i = 0; i < vertices.size(); ++i) {
+    for (size_t j = 0; j < vertices.size(); ++j) {
       auto edge_type = get_edge_type(i, j);
       edge_types.push_back(edge_type);
       InsertEdge(vertices[i], vertices[j], edge_type);
     }
   }
-  query::Interpreter interpret;
-  std::map<std::string, query::TypedValue> params;
-  GraphDbAccessor dba(master());
-  ResultStreamFaker result;
-  interpret("MATCH (n)-[r1]-(m)-[r2]-(l) RETURN type(r1), type(r2)", dba,
-            params, false)
-      .PullAll(result);
-  ASSERT_EQ(result.GetHeader().size(), 2U);
+
+  auto results = Run("MATCH (n)-[r1]-(m)-[r2]-(l) RETURN type(r1), type(r2)");
   // We expect the number of results to be:
   size_t expected_result_size =
       // pick (n)
@@ -96,11 +94,11 @@ TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
       (2 * vertices.size() - 1 - 1);
   std::vector<std::vector<std::string>> expected;
   expected.reserve(expected_result_size);
-  for (int n = 0; n < vertices.size(); ++n) {
-    for (int m = 0; m < vertices.size(); ++m) {
+  for (size_t n = 0; n < vertices.size(); ++n) {
+    for (size_t m = 0; m < vertices.size(); ++m) {
       std::vector<std::string> r1s{get_edge_type(n, m)};
       if (n != m) r1s.push_back(get_edge_type(m, n));
-      for (int l = 0; l < vertices.size(); ++l) {
+      for (size_t l = 0; l < vertices.size(); ++l) {
         std::vector<std::string> r2s{get_edge_type(m, l)};
         if (m != l) r2s.push_back(get_edge_type(l, m));
         for (const auto &r1 : r1s) {
@@ -113,10 +111,10 @@ TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
     }
   }
   ASSERT_EQ(expected.size(), expected_result_size);
-  ASSERT_EQ(result.GetResults().size(), expected_result_size);
+  ASSERT_EQ(results.size(), expected_result_size);
   std::vector<std::vector<std::string>> got;
-  got.reserve(result.GetResults().size());
-  for (const auto &res : result.GetResults()) {
+  got.reserve(results.size());
+  for (const auto &res : results) {
     std::vector<std::string> row;
     row.reserve(res.size());
     for (const auto &col : res) {
diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp
new file mode 100644
index 000000000..839a74af6
--- /dev/null
+++ b/tests/unit/distributed_query_plan.cpp
@@ -0,0 +1,264 @@
+#include <memory>
+#include <thread>
+#include <unordered_set>
+
+#include "gtest/gtest.h"
+
+#include "database/graph_db.hpp"
+#include "distributed/coordination.hpp"
+#include "distributed/coordination_master.hpp"
+#include "distributed/coordination_worker.hpp"
+#include "distributed/plan_consumer.hpp"
+#include "distributed/plan_dispatcher.hpp"
+#include "distributed/remote_data_rpc_clients.hpp"
+#include "distributed/remote_data_rpc_server.hpp"
+#include "distributed/remote_pull_rpc_clients.hpp"
+#include "distributed_common.hpp"
+#include "io/network/endpoint.hpp"
+#include "query/frontend/ast/ast.hpp"
+#include "query/frontend/ast/cypher_main_visitor.hpp"
+#include "query/frontend/semantic/symbol_generator.hpp"
+#include "query/frontend/semantic/symbol_table.hpp"
+#include "query/interpreter.hpp"
+#include "query/plan/planner.hpp"
+#include "query/typed_value.hpp"
+#include "query_common.hpp"
+#include "query_plan_common.hpp"
+#include "transactions/engine_master.hpp"
+
+using namespace distributed;
+using namespace database;
+
+TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
+  GraphDbAccessor dba{master()};
+  Context ctx{dba};
+  SymbolGenerator symbol_generator{ctx.symbol_table_};
+  AstTreeStorage storage;
+
+  // Query plan for: UNWIND [42, true, "bla", 1, 2] as x RETURN x
+  using namespace query;
+  auto list =
+      LIST(LITERAL(42), LITERAL(true), LITERAL("bla"), LITERAL(1), LITERAL(2));
+  auto x = ctx.symbol_table_.CreateSymbol("x", true);
+  auto unwind = std::make_shared<plan::Unwind>(nullptr, list, x);
+  auto x_expr = IDENT("x");
+  ctx.symbol_table_[*x_expr] = x;
+  auto x_ne = NEXPR("x", x_expr);
+  ctx.symbol_table_[*x_ne] = ctx.symbol_table_.CreateSymbol("x_ne", true);
+  auto produce = MakeProduce(unwind, x_ne);
+
+  // Test that the plan works locally.
+  auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
+  ASSERT_EQ(results.size(), 5);
+
+  const int plan_id = 42;
+  master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
+
+  Parameters params;
+  std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
+  auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
+                                               int worker_id) {
+    return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
+                                                     params, symbols, false, 3);
+  };
+  auto expect_first_batch = [](auto &batch) {
+    EXPECT_EQ(batch.pull_state,
+              distributed::RemotePullState::CURSOR_IN_PROGRESS);
+    ASSERT_EQ(batch.frames.size(), 3);
+    ASSERT_EQ(batch.frames[0].size(), 1);
+    EXPECT_EQ(batch.frames[0][0].ValueInt(), 42);
+    EXPECT_EQ(batch.frames[1][0].ValueBool(), true);
+    EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
+  };
+  auto expect_second_batch = [](auto &batch) {
+    EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED);
+    ASSERT_EQ(batch.frames.size(), 2);
+    ASSERT_EQ(batch.frames[0].size(), 1);
+    EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
+    EXPECT_EQ(batch.frames[1][0].ValueInt(), 2);
+  };
+
+  GraphDbAccessor dba_1{master()};
+  GraphDbAccessor dba_2{master()};
+  for (int worker_id : {1, 2}) {
+    // TODO flor, proper test async here.
+    auto tx1_batch1 = remote_pull(dba_1, worker_id).get();
+    expect_first_batch(tx1_batch1);
+    auto tx2_batch1 = remote_pull(dba_2, worker_id).get();
+    expect_first_batch(tx2_batch1);
+    auto tx2_batch2 = remote_pull(dba_2, worker_id).get();
+    expect_second_batch(tx2_batch2);
+    auto tx1_batch2 = remote_pull(dba_1, worker_id).get();
+    expect_second_batch(tx1_batch2);
+  }
+}
+
+TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
+  // Create some data on the master and both workers. Eeach edge (3 of them) and
+  // vertex (6 of them) will be uniquely identified with their worker id and
+  // sequence ID, so we can check we retrieved all.
+  storage::Property prop;
+  {
+    GraphDbAccessor dba{master()};
+    prop = dba.Property("prop");
+    auto create_data = [prop](GraphDbAccessor &dba, int worker_id) {
+      auto v1 = dba.InsertVertex();
+      v1.PropsSet(prop, worker_id * 10);
+      auto v2 = dba.InsertVertex();
+      v2.PropsSet(prop, worker_id * 10 + 1);
+      auto e12 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
+      e12.PropsSet(prop, worker_id * 10 + 2);
+    };
+    create_data(dba, 0);
+    GraphDbAccessor dba_w1{worker(1), dba.transaction_id()};
+    create_data(dba_w1, 1);
+    GraphDbAccessor dba_w2{worker(2), dba.transaction_id()};
+    create_data(dba_w2, 2);
+    dba.Commit();
+  }
+
+  GraphDbAccessor dba{master()};
+  Context ctx{dba};
+  SymbolGenerator symbol_generator{ctx.symbol_table_};
+  AstTreeStorage storage;
+
+  // Query plan for: MATCH p = (n)-[r]->(m) return [n, r], m, p
+  // Use this query to test graph elements are transferred correctly in
+  // collections too.
+  auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
+  auto r_m =
+      MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
+                 EdgeAtom::Direction::OUT, {}, "m", false, GraphView::OLD);
+  auto p_sym = ctx.symbol_table_.CreateSymbol("p", true);
+  auto p = std::make_shared<query::plan::ConstructNamedPath>(
+      r_m.op_, p_sym,
+      std::vector<Symbol>{n.sym_, r_m.edge_sym_, r_m.node_sym_});
+  auto return_n = IDENT("n");
+  ctx.symbol_table_[*return_n] = n.sym_;
+  auto return_r = IDENT("r");
+  ctx.symbol_table_[*return_r] = r_m.edge_sym_;
+  auto return_n_r = NEXPR("[n, r]", LIST(return_n, return_r));
+  ctx.symbol_table_[*return_n_r] = ctx.symbol_table_.CreateSymbol("", true);
+  auto return_m = NEXPR("m", IDENT("m"));
+  ctx.symbol_table_[*return_m->expression_] = r_m.node_sym_;
+  ctx.symbol_table_[*return_m] = ctx.symbol_table_.CreateSymbol("", true);
+  auto return_p = NEXPR("p", IDENT("p"));
+  ctx.symbol_table_[*return_p->expression_] = p_sym;
+  ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true);
+  auto produce = MakeProduce(p, return_n_r, return_m, return_p);
+
+  auto check_result = [prop](
+      int worker_id,
+      const std::vector<std::vector<query::TypedValue>> &frames) {
+    int offset = worker_id * 10;
+    ASSERT_EQ(frames.size(), 1);
+    auto &row = frames[0];
+    ASSERT_EQ(row.size(), 3);
+    auto &list = row[0].ValueList();
+    ASSERT_EQ(list.size(), 2);
+    ASSERT_EQ(list[0].ValueVertex().PropsAt(prop).Value<int64_t>(), offset);
+    ASSERT_EQ(list[1].ValueEdge().PropsAt(prop).Value<int64_t>(), offset + 2);
+    ASSERT_EQ(row[1].ValueVertex().PropsAt(prop).Value<int64_t>(), offset + 1);
+    auto &path = row[2].ValuePath();
+    ASSERT_EQ(path.size(), 1);
+    ASSERT_EQ(path.vertices()[0].PropsAt(prop).Value<int64_t>(), offset);
+    ASSERT_EQ(path.edges()[0].PropsAt(prop).Value<int64_t>(), offset + 2);
+    ASSERT_EQ(path.vertices()[1].PropsAt(prop).Value<int64_t>(), offset + 1);
+  };
+
+  // Test that the plan works locally.
+  auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
+  check_result(0, results);
+
+  const int plan_id = 42;
+  master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
+
+  Parameters params;
+  std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
+                                     ctx.symbol_table_[*return_m], p_sym};
+  auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
+                                               int worker_id) {
+    return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
+                                                     params, symbols, false, 3);
+  };
+  auto future_w1_results = remote_pull(dba, 1);
+  auto future_w2_results = remote_pull(dba, 2);
+  check_result(1, future_w1_results.get().frames);
+  check_result(2, future_w2_results.get().frames);
+}
+
+TEST_F(DistributedGraphDbTest, Synchronize) {
+  auto from = InsertVertex(worker(1));
+  auto to = InsertVertex(worker(2));
+  InsertEdge(from, to, "et");
+
+  // Query: MATCH (n)--(m) SET m.prop = 2 RETURN n.prop
+  // This query ensures that a remote update gets applied and the local stuff
+  // gets reconstructed.
+  auto &db = master();
+  GraphDbAccessor dba{db};
+  Context ctx{dba};
+  SymbolGenerator symbol_generator{ctx.symbol_table_};
+  AstTreeStorage storage;
+  // MATCH
+  auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
+  auto r_m =
+      MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
+                 EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD);
+
+  // SET
+  auto literal = LITERAL(42);
+  auto prop = PROPERTY_PAIR("prop");
+  auto m_p = PROPERTY_LOOKUP("m", prop);
+  ctx.symbol_table_[*m_p->expression_] = r_m.node_sym_;
+  auto set_m_p = std::make_shared<plan::SetProperty>(r_m.op_, m_p, literal);
+
+  const int plan_id = 42;
+  master().plan_dispatcher().DispatchPlan(plan_id, set_m_p, ctx.symbol_table_);
+
+  // Master-side PullRemote, Synchronize
+  auto pull_remote = std::make_shared<query::plan::PullRemote>(
+      nullptr, plan_id, std::vector<Symbol>{n.sym_});
+  auto synchronize =
+      std::make_shared<query::plan::Synchronize>(set_m_p, pull_remote, true);
+
+  // RETURN
+  auto n_p =
+      storage.Create<PropertyLookup>(storage.Create<Identifier>("n"), prop);
+  ctx.symbol_table_[*n_p->expression_] = n.sym_;
+  auto return_n_p = NEXPR("n.prop", n_p);
+  auto return_n_p_sym = ctx.symbol_table_.CreateSymbol("n.p", true);
+  ctx.symbol_table_[*return_n_p] = return_n_p_sym;
+  auto produce = MakeProduce(synchronize, return_n_p);
+
+  auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
+  ASSERT_EQ(results.size(), 2);
+  ASSERT_EQ(results[0].size(), 1);
+  EXPECT_EQ(results[0][0].ValueInt(), 42);
+  ASSERT_EQ(results[1].size(), 1);
+  EXPECT_EQ(results[1][0].ValueInt(), 42);
+
+  // TODO test without advance command?
+}
+
+TEST_F(DistributedGraphDbTest, Create) {
+  // Query: UNWIND range(0, 1000) as x CREATE ()
+  auto &db = master();
+  GraphDbAccessor dba{db};
+  Context ctx{dba};
+  SymbolGenerator symbol_generator{ctx.symbol_table_};
+  AstTreeStorage storage;
+  auto range = FN("range", LITERAL(0), LITERAL(1000));
+  auto x = ctx.symbol_table_.CreateSymbol("x", true);
+  auto unwind = std::make_shared<plan::Unwind>(nullptr, range, x);
+  auto node = NODE("n");
+  ctx.symbol_table_[*node->identifier_] =
+      ctx.symbol_table_.CreateSymbol("n", true);
+  auto create = std::make_shared<query::plan::CreateNode>(unwind, node, true);
+  PullAll(create, dba, ctx.symbol_table_);
+  dba.Commit();
+
+  EXPECT_GT(VertexCount(master()), 200);
+  EXPECT_GT(VertexCount(worker(1)), 200);
+  EXPECT_GT(VertexCount(worker(2)), 200);
+}