From 46b8a91a2f01f6d3225a2a3a1119e3c9880d0e92 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Wed, 21 Feb 2018 13:03:04 +0100 Subject: [PATCH] Add rpc_worker_clients tests Reviewers: florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1214 --- tests/unit/rpc_worker_clients.cpp | 117 ++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tests/unit/rpc_worker_clients.cpp diff --git a/tests/unit/rpc_worker_clients.cpp b/tests/unit/rpc_worker_clients.cpp new file mode 100644 index 000000000..f02b1abf5 --- /dev/null +++ b/tests/unit/rpc_worker_clients.cpp @@ -0,0 +1,117 @@ +#include + +#include "boost/archive/binary_iarchive.hpp" +#include "boost/archive/binary_oarchive.hpp" +#include "boost/serialization/export.hpp" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "communication/rpc/messages.hpp" +#include "communication/rpc/server.hpp" +#include "distributed/coordination_master.hpp" +#include "distributed/coordination_worker.hpp" +#include "distributed/rpc_worker_clients.hpp" +#include "distributed/serialization.hpp" +#include "io/network/endpoint.hpp" + +namespace distributed { + +RPC_NO_MEMBER_MESSAGE(IncrementCounterReq); +RPC_NO_MEMBER_MESSAGE(IncrementCounterRes); + +using IncrementCounterRpc = + communication::rpc::RequestResponse; +}; // namespace distributed + +BOOST_CLASS_EXPORT(distributed::IncrementCounterReq); +BOOST_CLASS_EXPORT(distributed::IncrementCounterRes); + +class RpcWorkerClientsTest : public ::testing::Test { + protected: + const io::network::Endpoint kLocalHost{"127.0.0.1", 0}; + const int kWorkerCount = 2; + const std::string kRpcName = "test_rpc"; + void SetUp() override { + for (int i = 1; i <= kWorkerCount; ++i) { + workers_system_.emplace_back( + std::make_unique(kLocalHost)); + + workers_coord_.emplace_back( + std::make_unique( + *workers_system_.back(), master_system_.endpoint())); + + workers_coord_.back()->RegisterWorker(i); + workers_rpc_server_.emplace_back( + std::make_unique(*workers_system_.back(), + kRpcName)); + workers_rpc_server_.back()->Register( + [this, i](const distributed::IncrementCounterReq &) { + workers_cnt_[i]++; + return std::make_unique(); + }); + } + } + + void TearDown() override { + std::vector wait_on_shutdown; + for (auto &worker : workers_coord_) + wait_on_shutdown.emplace_back([&worker]() { worker->WaitForShutdown(); }); + + std::this_thread::sleep_for(300ms); + + // Starts server shutdown and notifies the workers + master_coord_ = std::experimental::nullopt; + for (auto &worker : wait_on_shutdown) worker.join(); + } + + std::vector> workers_system_; + std::vector> workers_coord_; + std::vector> workers_rpc_server_; + std::unordered_map workers_cnt_; + + communication::rpc::System master_system_{kLocalHost}; + std::experimental::optional master_coord_{ + master_system_}; + + distributed::RpcWorkerClients rpc_workers_{*master_coord_, kRpcName}; +}; + +TEST_F(RpcWorkerClientsTest, GetWorkerIds) { + EXPECT_THAT(rpc_workers_.GetWorkerIds(), testing::UnorderedElementsAreArray( + master_coord_->GetWorkerIds())); +} + +TEST_F(RpcWorkerClientsTest, GetClientPool) { + auto &pool1 = rpc_workers_.GetClientPool(1); + auto &pool2 = rpc_workers_.GetClientPool(2); + EXPECT_NE(&pool1, &pool2); + EXPECT_EQ(&pool1, &rpc_workers_.GetClientPool(1)); +} + +TEST_F(RpcWorkerClientsTest, ExecuteOnWorker) { + auto execute = [](auto &client) -> void { + ASSERT_NE(client.template Call(), + nullptr); + }; + + rpc_workers_.ExecuteOnWorker(1, execute).get(); + EXPECT_EQ(workers_cnt_[0], 0); + EXPECT_EQ(workers_cnt_[1], 1); + EXPECT_EQ(workers_cnt_[2], 0); +} + +TEST_F(RpcWorkerClientsTest, ExecuteOnWorkers) { + auto execute = [](auto &client) -> void { + ASSERT_NE(client.template Call(), + nullptr); + }; + + // Skip master + for (auto &future : rpc_workers_.ExecuteOnWorkers(0, execute)) + future.get(); + + EXPECT_EQ(workers_cnt_[0], 0); + EXPECT_EQ(workers_cnt_[1], 1); + EXPECT_EQ(workers_cnt_[2], 1); +}