Add async remote pull RPC

Summary:
Remote pulls can now be async. Note that
`RemotePullRpcClients::RemotePull` takes references to data structures
which should not be temporary in the caller. Still, maybe safer to make
copies?

Changed `RpcWorkerClients` API to make that possible.

Reviewers: dgleich, msantl, teon.banek

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1150
This commit is contained in:
florijan 2018-01-30 10:29:49 +01:00
parent c37bb87ed8
commit 3f30a6fad9
4 changed files with 70 additions and 50 deletions

View File

@ -16,35 +16,52 @@ namespace distributed {
* batches and are therefore accompanied with an enum indicator of the state of * batches and are therefore accompanied with an enum indicator of the state of
* remote execution. */ * remote execution. */
class RemotePullRpcClients { class RemotePullRpcClients {
using Client = communication::rpc::Client;
public: public:
RemotePullRpcClients(Coordination &coordination) RemotePullRpcClients(Coordination &coordination)
: clients_(coordination, kRemotePullProduceRpcName) {} : clients_(coordination, kRemotePullProduceRpcName) {}
RemotePullResData RemotePull(tx::transaction_id_t tx_id, int worker_id, /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this
int64_t plan_id, const Parameters &params, /// function for the same (tx_id, worker_id, plan_id) before the previous call
const std::vector<query::Symbol> &symbols, /// has ended.
int batch_size = kDefaultBatchSize) { std::future<RemotePullResData> RemotePull(
return std::move(clients_.GetClient(worker_id) tx::transaction_id_t tx_id, int worker_id, int64_t plan_id,
.Call<RemotePullRpc>(RemotePullReqData{ const Parameters &params, const std::vector<query::Symbol> &symbols,
tx_id, plan_id, params, symbols, batch_size}) int batch_size = kDefaultBatchSize) {
->member); return clients_.ExecuteOnWorker<RemotePullResData>(
worker_id,
[tx_id, plan_id, &params, &symbols, batch_size](Client &client) {
return client
.Call<RemotePullRpc>(RemotePullReqData{tx_id, plan_id, params,
symbols, batch_size})
->member;
});
} }
auto GetWorkerIds() { return clients_.GetWorkerIds(); } auto GetWorkerIds() { return clients_.GetWorkerIds(); }
// Notifies all workers that the given transaction/plan is done. Otherwise the // Notifies a worker that the given transaction/plan is done. Otherwise the
// server is left with potentially unconsumed Cursors that never get deleted. // server is left with potentially unconsumed Cursors that never get deleted.
// //
// TODO - maybe this needs to be done with hooks into the transactional // TODO - maybe this needs to be done with hooks into the transactional
// engine, so that the Worker discards it's stuff when the relevant // engine, so that the Worker discards it's stuff when the relevant
// transaction are done. // transaction are done.
// std::future<void> EndRemotePull(int worker_id, tx::transaction_id_t tx_id,
// TODO - this will maybe need a per-worker granularity. int64_t plan_id) {
void EndRemotePull(tx::transaction_id_t tx_id, int64_t plan_id) { return clients_.ExecuteOnWorker<void>(
auto futures = clients_.ExecuteOnWorkers<void>( worker_id, [tx_id, plan_id](Client &client) {
0, [tx_id, plan_id](communication::rpc::Client &client) { return client.Call<EndRemotePullRpc>(
client.Call<EndRemotePullRpc>(EndRemotePullReqData{tx_id, plan_id}); EndRemotePullReqData{tx_id, plan_id});
}); });
}
void EndAllRemotePulls(tx::transaction_id_t tx_id, int64_t plan_id) {
std::vector<std::future<void>> futures;
for (auto worker_id : clients_.GetWorkerIds()) {
if (worker_id == 0) continue;
futures.emplace_back(EndRemotePull(worker_id, tx_id, plan_id));
}
for (auto &future : futures) future.wait(); for (auto &future : futures) future.wait();
} }

View File

@ -36,15 +36,21 @@ class RpcWorkerClients {
auto GetWorkerIds() { return coordination_.GetWorkerIds(); } auto GetWorkerIds() { return coordination_.GetWorkerIds(); }
/** /** Asynchroniously executes the given function on the rpc client for the
* Promises to execute function on workers rpc clients. * given worker id. Returns an `std::future` of the given `execute` function's
* @Tparam TResult - deduced automatically from method * return type. */
* @param skip_worker_id - worker which to skip (set to -1 to avoid skipping) template <typename TResult>
* @param execute - Method which takes an rpc client and returns a result for auto ExecuteOnWorker(
* it int worker_id,
* @return list of futures filled with function 'execute' results when applied std::function<TResult(communication::rpc::Client &)> execute) {
* to rpc clients auto &client = GetClient(worker_id);
*/ return std::async(std::launch::async,
[execute, &client]() { return execute(client); });
}
/** Asynchroniously executes the `execute` function on all worker rpc clients
* except the one whose id is `skip_worker_id`. Returns a vectore of futures
* contaning the results of the `execute` function. */
template <typename TResult> template <typename TResult>
auto ExecuteOnWorkers( auto ExecuteOnWorkers(
int skip_worker_id, int skip_worker_id,
@ -52,11 +58,7 @@ class RpcWorkerClients {
std::vector<std::future<TResult>> futures; std::vector<std::future<TResult>> futures;
for (auto &worker_id : coordination_.GetWorkerIds()) { for (auto &worker_id : coordination_.GetWorkerIds()) {
if (worker_id == skip_worker_id) continue; if (worker_id == skip_worker_id) continue;
auto &client = GetClient(worker_id); futures.emplace_back(std::move(ExecuteOnWorker(worker_id, execute)));
futures.emplace_back(std::async(std::launch::async, [execute, &client]() {
return execute(client);
}));
} }
return futures; return futures;
} }

View File

@ -321,10 +321,10 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor(
context.symbol_table_, db, graph_view_); context.symbol_table_, db, graph_view_);
auto convert = [&evaluator](const auto &bound) auto convert = [&evaluator](const auto &bound)
-> std::experimental::optional<utils::Bound<PropertyValue>> { -> std::experimental::optional<utils::Bound<PropertyValue>> {
if (!bound) return std::experimental::nullopt; if (!bound) return std::experimental::nullopt;
return std::experimental::make_optional(utils::Bound<PropertyValue>( return std::experimental::make_optional(utils::Bound<PropertyValue>(
bound.value().value()->Accept(evaluator), bound.value().type())); bound.value().value()->Accept(evaluator), bound.value().type()));
}; };
return db.Vertices(label_, property_, convert(lower_bound()), return db.Vertices(label_, property_, convert(lower_bound()),
convert(upper_bound()), graph_view_ == GraphView::NEW); convert(upper_bound()), graph_view_ == GraphView::NEW);
}; };
@ -2583,8 +2583,8 @@ PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self,
void PullRemote::PullRemoteCursor::EndRemotePull() { void PullRemote::PullRemoteCursor::EndRemotePull() {
if (remote_pull_ended_) return; if (remote_pull_ended_) return;
db_.db().remote_pull_clients().EndRemotePull(db_.transaction().id_, db_.db().remote_pull_clients().EndAllRemotePulls(db_.transaction().id_,
self_.plan_id()); self_.plan_id());
remote_pull_ended_ = true; remote_pull_ended_ = true;
} }
@ -2597,7 +2597,7 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
last_pulled_worker_ = (last_pulled_worker_ + 1) % worker_ids_.size(); last_pulled_worker_ = (last_pulled_worker_ + 1) % worker_ids_.size();
auto remote_results = db_.db().remote_pull_clients().RemotePull( auto remote_results = db_.db().remote_pull_clients().RemotePull(
db_.transaction().id_, worker_ids_[last_pulled_worker_], db_.transaction().id_, worker_ids_[last_pulled_worker_],
self_.plan_id(), context.parameters_, self_.symbols()); self_.plan_id(), context.parameters_, self_.symbols()).get();
auto get_results = [&]() { auto get_results = [&]() {
for (auto &result : remote_results.frames) { for (auto &result : remote_results.frames) {

View File

@ -260,10 +260,12 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
const int plan_id = 42; const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_); master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
auto remote_pull = [this, plan_id, &ctx, &x_ne](tx::transaction_id_t tx_id, Parameters params;
int worker_id) { std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
return master().remote_pull_clients().RemotePull( auto remote_pull = [this, plan_id, &params, &symbols](
tx_id, worker_id, plan_id, Parameters(), {ctx.symbol_table_[*x_ne]}, 3); tx::transaction_id_t tx_id, int worker_id) {
return master().remote_pull_clients().RemotePull(tx_id, worker_id, plan_id,
params, symbols, 3);
}; };
auto expect_first_batch = [](auto &batch) { auto expect_first_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state, EXPECT_EQ(batch.pull_state,
@ -275,8 +277,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
EXPECT_EQ(batch.frames[2][0].ValueString(), "bla"); EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
}; };
auto expect_second_batch = [](auto &batch) { auto expect_second_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state, EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED);
distributed::RemotePullState::CURSOR_EXHAUSTED);
ASSERT_EQ(batch.frames.size(), 2); ASSERT_EQ(batch.frames.size(), 2);
ASSERT_EQ(batch.frames[0].size(), 1); ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 1); EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
@ -286,21 +287,21 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
database::GraphDbAccessor dba_1{master()}; database::GraphDbAccessor dba_1{master()};
database::GraphDbAccessor dba_2{master()}; database::GraphDbAccessor dba_2{master()};
for (int worker_id : {1, 2}) { for (int worker_id : {1, 2}) {
auto tx1_batch1 = remote_pull(dba_1.transaction_id(), worker_id); auto tx1_batch1 = remote_pull(dba_1.transaction_id(), worker_id).get();
expect_first_batch(tx1_batch1); expect_first_batch(tx1_batch1);
auto tx2_batch1 = remote_pull(dba_2.transaction_id(), worker_id); auto tx2_batch1 = remote_pull(dba_2.transaction_id(), worker_id).get();
expect_first_batch(tx2_batch1); expect_first_batch(tx2_batch1);
auto tx2_batch2 = remote_pull(dba_2.transaction_id(), worker_id); auto tx2_batch2 = remote_pull(dba_2.transaction_id(), worker_id).get();
expect_second_batch(tx2_batch2); expect_second_batch(tx2_batch2);
auto tx1_batch2 = remote_pull(dba_1.transaction_id(), worker_id); auto tx1_batch2 = remote_pull(dba_1.transaction_id(), worker_id).get();
expect_second_batch(tx1_batch2); expect_second_batch(tx1_batch2);
} }
master().remote_pull_clients().EndRemotePull(dba_1.transaction_id(), plan_id); master().remote_pull_clients().EndAllRemotePulls(dba_1.transaction_id(),
master().remote_pull_clients().EndRemotePull(dba_2.transaction_id(), plan_id); plan_id);
master().remote_pull_clients().EndAllRemotePulls(dba_2.transaction_id(),
plan_id);
} }
// TODO EndRemotePull test
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
using GraphDbAccessor = database::GraphDbAccessor; using GraphDbAccessor = database::GraphDbAccessor;
storage::Label label; storage::Label label;