Add rpc_worker_clients tests
Reviewers: florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1214
This commit is contained in:
parent
ae95e9480c
commit
46b8a91a2f
117
tests/unit/rpc_worker_clients.cpp
Normal file
117
tests/unit/rpc_worker_clients.cpp
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
#include <future>
|
||||||
|
|
||||||
|
#include "boost/archive/binary_iarchive.hpp"
|
||||||
|
#include "boost/archive/binary_oarchive.hpp"
|
||||||
|
#include "boost/serialization/export.hpp"
|
||||||
|
#include "gmock/gmock.h"
|
||||||
|
#include "gtest/gtest.h"
|
||||||
|
|
||||||
|
#include "communication/rpc/messages.hpp"
|
||||||
|
#include "communication/rpc/server.hpp"
|
||||||
|
#include "distributed/coordination_master.hpp"
|
||||||
|
#include "distributed/coordination_worker.hpp"
|
||||||
|
#include "distributed/rpc_worker_clients.hpp"
|
||||||
|
#include "distributed/serialization.hpp"
|
||||||
|
#include "io/network/endpoint.hpp"
|
||||||
|
|
||||||
|
namespace distributed {
|
||||||
|
|
||||||
|
RPC_NO_MEMBER_MESSAGE(IncrementCounterReq);
|
||||||
|
RPC_NO_MEMBER_MESSAGE(IncrementCounterRes);
|
||||||
|
|
||||||
|
using IncrementCounterRpc =
|
||||||
|
communication::rpc::RequestResponse<IncrementCounterReq,
|
||||||
|
IncrementCounterRes>;
|
||||||
|
}; // namespace distributed
|
||||||
|
|
||||||
|
BOOST_CLASS_EXPORT(distributed::IncrementCounterReq);
|
||||||
|
BOOST_CLASS_EXPORT(distributed::IncrementCounterRes);
|
||||||
|
|
||||||
|
class RpcWorkerClientsTest : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
const io::network::Endpoint kLocalHost{"127.0.0.1", 0};
|
||||||
|
const int kWorkerCount = 2;
|
||||||
|
const std::string kRpcName = "test_rpc";
|
||||||
|
void SetUp() override {
|
||||||
|
for (int i = 1; i <= kWorkerCount; ++i) {
|
||||||
|
workers_system_.emplace_back(
|
||||||
|
std::make_unique<communication::rpc::System>(kLocalHost));
|
||||||
|
|
||||||
|
workers_coord_.emplace_back(
|
||||||
|
std::make_unique<distributed::WorkerCoordination>(
|
||||||
|
*workers_system_.back(), master_system_.endpoint()));
|
||||||
|
|
||||||
|
workers_coord_.back()->RegisterWorker(i);
|
||||||
|
workers_rpc_server_.emplace_back(
|
||||||
|
std::make_unique<communication::rpc::Server>(*workers_system_.back(),
|
||||||
|
kRpcName));
|
||||||
|
workers_rpc_server_.back()->Register<distributed::IncrementCounterRpc>(
|
||||||
|
[this, i](const distributed::IncrementCounterReq &) {
|
||||||
|
workers_cnt_[i]++;
|
||||||
|
return std::make_unique<distributed::IncrementCounterRes>();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void TearDown() override {
|
||||||
|
std::vector<std::thread> wait_on_shutdown;
|
||||||
|
for (auto &worker : workers_coord_)
|
||||||
|
wait_on_shutdown.emplace_back([&worker]() { worker->WaitForShutdown(); });
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(300ms);
|
||||||
|
|
||||||
|
// Starts server shutdown and notifies the workers
|
||||||
|
master_coord_ = std::experimental::nullopt;
|
||||||
|
for (auto &worker : wait_on_shutdown) worker.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::unique_ptr<communication::rpc::System>> workers_system_;
|
||||||
|
std::vector<std::unique_ptr<distributed::WorkerCoordination>> workers_coord_;
|
||||||
|
std::vector<std::unique_ptr<communication::rpc::Server>> workers_rpc_server_;
|
||||||
|
std::unordered_map<int, int> workers_cnt_;
|
||||||
|
|
||||||
|
communication::rpc::System master_system_{kLocalHost};
|
||||||
|
std::experimental::optional<distributed::MasterCoordination> master_coord_{
|
||||||
|
master_system_};
|
||||||
|
|
||||||
|
distributed::RpcWorkerClients rpc_workers_{*master_coord_, kRpcName};
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(RpcWorkerClientsTest, GetWorkerIds) {
|
||||||
|
EXPECT_THAT(rpc_workers_.GetWorkerIds(), testing::UnorderedElementsAreArray(
|
||||||
|
master_coord_->GetWorkerIds()));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RpcWorkerClientsTest, GetClientPool) {
|
||||||
|
auto &pool1 = rpc_workers_.GetClientPool(1);
|
||||||
|
auto &pool2 = rpc_workers_.GetClientPool(2);
|
||||||
|
EXPECT_NE(&pool1, &pool2);
|
||||||
|
EXPECT_EQ(&pool1, &rpc_workers_.GetClientPool(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RpcWorkerClientsTest, ExecuteOnWorker) {
|
||||||
|
auto execute = [](auto &client) -> void {
|
||||||
|
ASSERT_NE(client.template Call<distributed::IncrementCounterRpc>(),
|
||||||
|
nullptr);
|
||||||
|
};
|
||||||
|
|
||||||
|
rpc_workers_.ExecuteOnWorker<void>(1, execute).get();
|
||||||
|
EXPECT_EQ(workers_cnt_[0], 0);
|
||||||
|
EXPECT_EQ(workers_cnt_[1], 1);
|
||||||
|
EXPECT_EQ(workers_cnt_[2], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RpcWorkerClientsTest, ExecuteOnWorkers) {
|
||||||
|
auto execute = [](auto &client) -> void {
|
||||||
|
ASSERT_NE(client.template Call<distributed::IncrementCounterRpc>(),
|
||||||
|
nullptr);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Skip master
|
||||||
|
for (auto &future : rpc_workers_.ExecuteOnWorkers<void>(0, execute))
|
||||||
|
future.get();
|
||||||
|
|
||||||
|
EXPECT_EQ(workers_cnt_[0], 0);
|
||||||
|
EXPECT_EQ(workers_cnt_[1], 1);
|
||||||
|
EXPECT_EQ(workers_cnt_[2], 1);
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user