diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp
index 22e70804d..1ef29ecfe 100644
--- a/src/distributed/remote_pull_rpc_clients.hpp
+++ b/src/distributed/remote_pull_rpc_clients.hpp
@@ -16,35 +16,52 @@ namespace distributed {
  * batches and are therefore accompanied with an enum indicator of the state of
  * remote execution. */
 class RemotePullRpcClients {
+  using Client = communication::rpc::Client;
+
  public:
   RemotePullRpcClients(Coordination &coordination)
       : clients_(coordination, kRemotePullProduceRpcName) {}
 
-  RemotePullResData RemotePull(tx::transaction_id_t tx_id, int worker_id,
-                               int64_t plan_id, const Parameters &params,
-                               const std::vector<query::Symbol> &symbols,
-                               int batch_size = kDefaultBatchSize) {
-    return std::move(clients_.GetClient(worker_id)
-                         .Call<RemotePullRpc>(RemotePullReqData{
-                             tx_id, plan_id, params, symbols, batch_size})
-                         ->member);
+  /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this
+  /// function for the same (tx_id, worker_id, plan_id) before the previous call
+  /// has ended.
+  std::future<RemotePullResData> RemotePull(
+      tx::transaction_id_t tx_id, int worker_id, int64_t plan_id,
+      const Parameters &params, const std::vector<query::Symbol> &symbols,
+      int batch_size = kDefaultBatchSize) {
+    return clients_.ExecuteOnWorker<RemotePullResData>(
+        worker_id,
+        [tx_id, plan_id, &params, &symbols, batch_size](Client &client) {
+          return client
+              .Call<RemotePullRpc>(RemotePullReqData{tx_id, plan_id, params,
+                                                     symbols, batch_size})
+              ->member;
+        });
   }
 
   auto GetWorkerIds() { return clients_.GetWorkerIds(); }
 
-  // Notifies all workers that the given transaction/plan is done. Otherwise the
+  // Notifies a worker that the given transaction/plan is done. Otherwise the
   // server is left with potentially unconsumed Cursors that never get deleted.
   //
   // TODO - maybe this needs to be done with hooks into the transactional
   // engine, so that the Worker discards it's stuff when the relevant
   // transaction are done.
-  //
-  // TODO - this will maybe need a per-worker granularity.
-  void EndRemotePull(tx::transaction_id_t tx_id, int64_t plan_id) {
-    auto futures = clients_.ExecuteOnWorkers<void>(
-        0, [tx_id, plan_id](communication::rpc::Client &client) {
-          client.Call<EndRemotePullRpc>(EndRemotePullReqData{tx_id, plan_id});
+  std::future<void> EndRemotePull(int worker_id, tx::transaction_id_t tx_id,
+                                  int64_t plan_id) {
+    return clients_.ExecuteOnWorker<void>(
+        worker_id, [tx_id, plan_id](Client &client) {
+          return client.Call<EndRemotePullRpc>(
+              EndRemotePullReqData{tx_id, plan_id});
         });
+  }
+
+  void EndAllRemotePulls(tx::transaction_id_t tx_id, int64_t plan_id) {
+    std::vector<std::future<void>> futures;
+    for (auto worker_id : clients_.GetWorkerIds()) {
+      if (worker_id == 0) continue;
+      futures.emplace_back(EndRemotePull(worker_id, tx_id, plan_id));
+    }
     for (auto &future : futures) future.wait();
   }
 
diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp
index b459148af..f54b5c062 100644
--- a/src/distributed/rpc_worker_clients.hpp
+++ b/src/distributed/rpc_worker_clients.hpp
@@ -36,15 +36,21 @@ class RpcWorkerClients {
 
   auto GetWorkerIds() { return coordination_.GetWorkerIds(); }
 
-  /**
-   * Promises to execute function on workers rpc clients.
-   * @Tparam TResult - deduced automatically from method
-   * @param skip_worker_id - worker which to skip (set to -1 to avoid skipping)
-   * @param execute - Method which takes an rpc client and returns a result for
-   * it
-   * @return list of futures filled with function 'execute' results when applied
-   * to rpc clients
-   */
+  /** Asynchroniously executes the given function on the rpc client for the
+   * given worker id. Returns an `std::future` of the given `execute` function's
+   * return type. */
+  template <typename TResult>
+  auto ExecuteOnWorker(
+      int worker_id,
+      std::function<TResult(communication::rpc::Client &)> execute) {
+    auto &client = GetClient(worker_id);
+    return std::async(std::launch::async,
+                      [execute, &client]() { return execute(client); });
+  }
+
+  /** Asynchroniously executes the `execute` function on all worker rpc clients
+   * except the one whose id is `skip_worker_id`. Returns a vectore of futures
+   * contaning the results of the `execute` function. */
   template <typename TResult>
   auto ExecuteOnWorkers(
       int skip_worker_id,
@@ -52,11 +58,7 @@ class RpcWorkerClients {
     std::vector<std::future<TResult>> futures;
     for (auto &worker_id : coordination_.GetWorkerIds()) {
       if (worker_id == skip_worker_id) continue;
-      auto &client = GetClient(worker_id);
-
-      futures.emplace_back(std::async(std::launch::async, [execute, &client]() {
-        return execute(client);
-      }));
+      futures.emplace_back(std::move(ExecuteOnWorker(worker_id, execute)));
     }
     return futures;
   }
diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index 7bd2f6933..89a928731 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -321,10 +321,10 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor(
                                   context.symbol_table_, db, graph_view_);
     auto convert = [&evaluator](const auto &bound)
         -> std::experimental::optional<utils::Bound<PropertyValue>> {
-          if (!bound) return std::experimental::nullopt;
-          return std::experimental::make_optional(utils::Bound<PropertyValue>(
-              bound.value().value()->Accept(evaluator), bound.value().type()));
-        };
+      if (!bound) return std::experimental::nullopt;
+      return std::experimental::make_optional(utils::Bound<PropertyValue>(
+          bound.value().value()->Accept(evaluator), bound.value().type()));
+    };
     return db.Vertices(label_, property_, convert(lower_bound()),
                        convert(upper_bound()), graph_view_ == GraphView::NEW);
   };
@@ -2583,8 +2583,8 @@ PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self,
 
 void PullRemote::PullRemoteCursor::EndRemotePull() {
   if (remote_pull_ended_) return;
-  db_.db().remote_pull_clients().EndRemotePull(db_.transaction().id_,
-                                               self_.plan_id());
+  db_.db().remote_pull_clients().EndAllRemotePulls(db_.transaction().id_,
+                                                self_.plan_id());
   remote_pull_ended_ = true;
 }
 
@@ -2597,7 +2597,7 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
     last_pulled_worker_ = (last_pulled_worker_ + 1) % worker_ids_.size();
     auto remote_results = db_.db().remote_pull_clients().RemotePull(
         db_.transaction().id_, worker_ids_[last_pulled_worker_],
-        self_.plan_id(), context.parameters_, self_.symbols());
+        self_.plan_id(), context.parameters_, self_.symbols()).get();
 
     auto get_results = [&]() {
       for (auto &result : remote_results.frames) {
diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp
index 78c9ca93a..4ec8a7942 100644
--- a/tests/unit/distributed_graph_db.cpp
+++ b/tests/unit/distributed_graph_db.cpp
@@ -260,10 +260,12 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
   const int plan_id = 42;
   master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
 
-  auto remote_pull = [this, plan_id, &ctx, &x_ne](tx::transaction_id_t tx_id,
-                                                  int worker_id) {
-    return master().remote_pull_clients().RemotePull(
-        tx_id, worker_id, plan_id, Parameters(), {ctx.symbol_table_[*x_ne]}, 3);
+  Parameters params;
+  std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
+  auto remote_pull = [this, plan_id, &params, &symbols](
+      tx::transaction_id_t tx_id, int worker_id) {
+    return master().remote_pull_clients().RemotePull(tx_id, worker_id, plan_id,
+                                                     params, symbols, 3);
   };
   auto expect_first_batch = [](auto &batch) {
     EXPECT_EQ(batch.pull_state,
@@ -275,8 +277,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
     EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
   };
   auto expect_second_batch = [](auto &batch) {
-    EXPECT_EQ(batch.pull_state,
-              distributed::RemotePullState::CURSOR_EXHAUSTED);
+    EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED);
     ASSERT_EQ(batch.frames.size(), 2);
     ASSERT_EQ(batch.frames[0].size(), 1);
     EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
@@ -286,21 +287,21 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
   database::GraphDbAccessor dba_1{master()};
   database::GraphDbAccessor dba_2{master()};
   for (int worker_id : {1, 2}) {
-    auto tx1_batch1 = remote_pull(dba_1.transaction_id(), worker_id);
+    auto tx1_batch1 = remote_pull(dba_1.transaction_id(), worker_id).get();
     expect_first_batch(tx1_batch1);
-    auto tx2_batch1 = remote_pull(dba_2.transaction_id(), worker_id);
+    auto tx2_batch1 = remote_pull(dba_2.transaction_id(), worker_id).get();
     expect_first_batch(tx2_batch1);
-    auto tx2_batch2 = remote_pull(dba_2.transaction_id(), worker_id);
+    auto tx2_batch2 = remote_pull(dba_2.transaction_id(), worker_id).get();
     expect_second_batch(tx2_batch2);
-    auto tx1_batch2 = remote_pull(dba_1.transaction_id(), worker_id);
+    auto tx1_batch2 = remote_pull(dba_1.transaction_id(), worker_id).get();
     expect_second_batch(tx1_batch2);
   }
-  master().remote_pull_clients().EndRemotePull(dba_1.transaction_id(), plan_id);
-  master().remote_pull_clients().EndRemotePull(dba_2.transaction_id(), plan_id);
+  master().remote_pull_clients().EndAllRemotePulls(dba_1.transaction_id(),
+                                                   plan_id);
+  master().remote_pull_clients().EndAllRemotePulls(dba_2.transaction_id(),
+                                                   plan_id);
 }
 
-// TODO EndRemotePull test
-
 TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
   using GraphDbAccessor = database::GraphDbAccessor;
   storage::Label label;