From 3f30a6fad95cc0434ec445256b5ba31731a17c27 Mon Sep 17 00:00:00 2001 From: florijan Date: Tue, 30 Jan 2018 10:29:49 +0100 Subject: [PATCH] Add async remote pull RPC Summary: Remote pulls can now be async. Note that `RemotePullRpcClients::RemotePull` takes references to data structures which should not be temporary in the caller. Still, maybe safer to make copies? Changed `RpcWorkerClients` API to make that possible. Reviewers: dgleich, msantl, teon.banek Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1150 --- src/distributed/remote_pull_rpc_clients.hpp | 47 ++++++++++++++------- src/distributed/rpc_worker_clients.hpp | 30 +++++++------ src/query/plan/operator.cpp | 14 +++--- tests/unit/distributed_graph_db.cpp | 29 +++++++------ 4 files changed, 70 insertions(+), 50 deletions(-) diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 22e70804d..1ef29ecfe 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -16,35 +16,52 @@ namespace distributed { * batches and are therefore accompanied with an enum indicator of the state of * remote execution. */ class RemotePullRpcClients { + using Client = communication::rpc::Client; + public: RemotePullRpcClients(Coordination &coordination) : clients_(coordination, kRemotePullProduceRpcName) {} - RemotePullResData RemotePull(tx::transaction_id_t tx_id, int worker_id, - int64_t plan_id, const Parameters ¶ms, - const std::vector &symbols, - int batch_size = kDefaultBatchSize) { - return std::move(clients_.GetClient(worker_id) - .Call(RemotePullReqData{ - tx_id, plan_id, params, symbols, batch_size}) - ->member); + /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this + /// function for the same (tx_id, worker_id, plan_id) before the previous call + /// has ended. + std::future RemotePull( + tx::transaction_id_t tx_id, int worker_id, int64_t plan_id, + const Parameters ¶ms, const std::vector &symbols, + int batch_size = kDefaultBatchSize) { + return clients_.ExecuteOnWorker( + worker_id, + [tx_id, plan_id, ¶ms, &symbols, batch_size](Client &client) { + return client + .Call(RemotePullReqData{tx_id, plan_id, params, + symbols, batch_size}) + ->member; + }); } auto GetWorkerIds() { return clients_.GetWorkerIds(); } - // Notifies all workers that the given transaction/plan is done. Otherwise the + // Notifies a worker that the given transaction/plan is done. Otherwise the // server is left with potentially unconsumed Cursors that never get deleted. // // TODO - maybe this needs to be done with hooks into the transactional // engine, so that the Worker discards it's stuff when the relevant // transaction are done. - // - // TODO - this will maybe need a per-worker granularity. - void EndRemotePull(tx::transaction_id_t tx_id, int64_t plan_id) { - auto futures = clients_.ExecuteOnWorkers( - 0, [tx_id, plan_id](communication::rpc::Client &client) { - client.Call(EndRemotePullReqData{tx_id, plan_id}); + std::future EndRemotePull(int worker_id, tx::transaction_id_t tx_id, + int64_t plan_id) { + return clients_.ExecuteOnWorker( + worker_id, [tx_id, plan_id](Client &client) { + return client.Call( + EndRemotePullReqData{tx_id, plan_id}); }); + } + + void EndAllRemotePulls(tx::transaction_id_t tx_id, int64_t plan_id) { + std::vector> futures; + for (auto worker_id : clients_.GetWorkerIds()) { + if (worker_id == 0) continue; + futures.emplace_back(EndRemotePull(worker_id, tx_id, plan_id)); + } for (auto &future : futures) future.wait(); } diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index b459148af..f54b5c062 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -36,15 +36,21 @@ class RpcWorkerClients { auto GetWorkerIds() { return coordination_.GetWorkerIds(); } - /** - * Promises to execute function on workers rpc clients. - * @Tparam TResult - deduced automatically from method - * @param skip_worker_id - worker which to skip (set to -1 to avoid skipping) - * @param execute - Method which takes an rpc client and returns a result for - * it - * @return list of futures filled with function 'execute' results when applied - * to rpc clients - */ + /** Asynchroniously executes the given function on the rpc client for the + * given worker id. Returns an `std::future` of the given `execute` function's + * return type. */ + template + auto ExecuteOnWorker( + int worker_id, + std::function execute) { + auto &client = GetClient(worker_id); + return std::async(std::launch::async, + [execute, &client]() { return execute(client); }); + } + + /** Asynchroniously executes the `execute` function on all worker rpc clients + * except the one whose id is `skip_worker_id`. Returns a vectore of futures + * contaning the results of the `execute` function. */ template auto ExecuteOnWorkers( int skip_worker_id, @@ -52,11 +58,7 @@ class RpcWorkerClients { std::vector> futures; for (auto &worker_id : coordination_.GetWorkerIds()) { if (worker_id == skip_worker_id) continue; - auto &client = GetClient(worker_id); - - futures.emplace_back(std::async(std::launch::async, [execute, &client]() { - return execute(client); - })); + futures.emplace_back(std::move(ExecuteOnWorker(worker_id, execute))); } return futures; } diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 7bd2f6933..89a928731 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -321,10 +321,10 @@ std::unique_ptr ScanAllByLabelPropertyRange::MakeCursor( context.symbol_table_, db, graph_view_); auto convert = [&evaluator](const auto &bound) -> std::experimental::optional> { - if (!bound) return std::experimental::nullopt; - return std::experimental::make_optional(utils::Bound( - bound.value().value()->Accept(evaluator), bound.value().type())); - }; + if (!bound) return std::experimental::nullopt; + return std::experimental::make_optional(utils::Bound( + bound.value().value()->Accept(evaluator), bound.value().type())); + }; return db.Vertices(label_, property_, convert(lower_bound()), convert(upper_bound()), graph_view_ == GraphView::NEW); }; @@ -2583,8 +2583,8 @@ PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self, void PullRemote::PullRemoteCursor::EndRemotePull() { if (remote_pull_ended_) return; - db_.db().remote_pull_clients().EndRemotePull(db_.transaction().id_, - self_.plan_id()); + db_.db().remote_pull_clients().EndAllRemotePulls(db_.transaction().id_, + self_.plan_id()); remote_pull_ended_ = true; } @@ -2597,7 +2597,7 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { last_pulled_worker_ = (last_pulled_worker_ + 1) % worker_ids_.size(); auto remote_results = db_.db().remote_pull_clients().RemotePull( db_.transaction().id_, worker_ids_[last_pulled_worker_], - self_.plan_id(), context.parameters_, self_.symbols()); + self_.plan_id(), context.parameters_, self_.symbols()).get(); auto get_results = [&]() { for (auto &result : remote_results.frames) { diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index 78c9ca93a..4ec8a7942 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -260,10 +260,12 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { const int plan_id = 42; master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_); - auto remote_pull = [this, plan_id, &ctx, &x_ne](tx::transaction_id_t tx_id, - int worker_id) { - return master().remote_pull_clients().RemotePull( - tx_id, worker_id, plan_id, Parameters(), {ctx.symbol_table_[*x_ne]}, 3); + Parameters params; + std::vector symbols{ctx.symbol_table_[*x_ne]}; + auto remote_pull = [this, plan_id, ¶ms, &symbols]( + tx::transaction_id_t tx_id, int worker_id) { + return master().remote_pull_clients().RemotePull(tx_id, worker_id, plan_id, + params, symbols, 3); }; auto expect_first_batch = [](auto &batch) { EXPECT_EQ(batch.pull_state, @@ -275,8 +277,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { EXPECT_EQ(batch.frames[2][0].ValueString(), "bla"); }; auto expect_second_batch = [](auto &batch) { - EXPECT_EQ(batch.pull_state, - distributed::RemotePullState::CURSOR_EXHAUSTED); + EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED); ASSERT_EQ(batch.frames.size(), 2); ASSERT_EQ(batch.frames[0].size(), 1); EXPECT_EQ(batch.frames[0][0].ValueInt(), 1); @@ -286,21 +287,21 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { database::GraphDbAccessor dba_1{master()}; database::GraphDbAccessor dba_2{master()}; for (int worker_id : {1, 2}) { - auto tx1_batch1 = remote_pull(dba_1.transaction_id(), worker_id); + auto tx1_batch1 = remote_pull(dba_1.transaction_id(), worker_id).get(); expect_first_batch(tx1_batch1); - auto tx2_batch1 = remote_pull(dba_2.transaction_id(), worker_id); + auto tx2_batch1 = remote_pull(dba_2.transaction_id(), worker_id).get(); expect_first_batch(tx2_batch1); - auto tx2_batch2 = remote_pull(dba_2.transaction_id(), worker_id); + auto tx2_batch2 = remote_pull(dba_2.transaction_id(), worker_id).get(); expect_second_batch(tx2_batch2); - auto tx1_batch2 = remote_pull(dba_1.transaction_id(), worker_id); + auto tx1_batch2 = remote_pull(dba_1.transaction_id(), worker_id).get(); expect_second_batch(tx1_batch2); } - master().remote_pull_clients().EndRemotePull(dba_1.transaction_id(), plan_id); - master().remote_pull_clients().EndRemotePull(dba_2.transaction_id(), plan_id); + master().remote_pull_clients().EndAllRemotePulls(dba_1.transaction_id(), + plan_id); + master().remote_pull_clients().EndAllRemotePulls(dba_2.transaction_id(), + plan_id); } -// TODO EndRemotePull test - TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { using GraphDbAccessor = database::GraphDbAccessor; storage::Label label;