From 31aded2daeddb29827fd9955e035afb3a886802d Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Wed, 24 Jan 2018 09:54:38 +0100 Subject: [PATCH] Add test for `PlanDispatcher` and `PlanConsumer` Summary: Added test for `PlanDispatcher` and `PlanConsumer`. This diff also contains a fix for the async rpc call on all clients.h Reviewers: florijan, teon.banek Reviewed By: florijan Subscribers: pullbot, dgleich Differential Revision: https://phabricator.memgraph.io/D1135 --- src/distributed/plan_dispatcher.cpp | 4 ++++ src/distributed/rpc_worker_clients.hpp | 6 +++--- tests/unit/distributed_graph_db.cpp | 30 ++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp index 1a5ce756f..ca4c77f7a 100644 --- a/src/distributed/plan_dispatcher.cpp +++ b/src/distributed/plan_dispatcher.cpp @@ -15,6 +15,10 @@ void PlanDispatcher::DispatchPlan( client.Call(300ms, plan_id, plan, symbol_table); CHECK(result) << "Failed to dispatch plan to worker"; }); + + for (auto &future : futures) { + future.wait(); + } } } // namespace distributed diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index 5e0a26f35..75079f556 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -58,9 +58,9 @@ class RpcWorkerClients { if (worker_id == skip_worker_id) continue; auto &client = GetClient(worker_id); - futures.emplace_back( - std::async(std::launch::async, - [&execute, &client]() { return execute(client); })); + futures.emplace_back(std::async(std::launch::async, [execute, &client]() { + return execute(client); + })); } return futures; } diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index 3a1a285f5..7b201ffe4 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -8,9 +8,12 @@ #include "distributed/coordination.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/plan_consumer.hpp" +#include "distributed/plan_dispatcher.hpp" #include "distributed/remote_data_rpc_clients.hpp" #include "distributed/remote_data_rpc_server.hpp" #include "io/network/endpoint.hpp" +#include "query_plan_common.hpp" #include "transactions/engine_master.hpp" template @@ -201,3 +204,30 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) { EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value(), true); } } + +TEST_F(DistributedGraphDbTest, DispatchPlan) { + auto kRPCWaitTime = 600ms; + int64_t plan_id = 5; + SymbolTable symbol_table; + AstTreeStorage storage; + + auto scan_all = MakeScanAll(storage, symbol_table, "n"); + + master().plan_dispatcher().DispatchPlan(plan_id, scan_all.op_, symbol_table); + std::this_thread::sleep_for(kRPCWaitTime); + + { + auto cached = worker1().plan_consumer().PlanForId(plan_id); + EXPECT_NE(dynamic_cast(cached.first.get()), + nullptr); + EXPECT_EQ(cached.second.max_position(), symbol_table.max_position()); + EXPECT_EQ(cached.second.table(), symbol_table.table()); + } + { + auto cached = worker2().plan_consumer().PlanForId(plan_id); + EXPECT_NE(dynamic_cast(cached.first.get()), + nullptr); + EXPECT_EQ(cached.second.max_position(), symbol_table.max_position()); + EXPECT_EQ(cached.second.table(), symbol_table.table()); + } +}