Add rpc client pool
Summary: See above. The unit test creates two clients on demand so I guess it works. Reviewers: mferencevic, florijan, teon.banek Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1136
This commit is contained in:
parent
010ed52622
commit
6fa06f7ea5
48
src/communication/rpc/client_pool.hpp
Normal file
48
src/communication/rpc/client_pool.hpp
Normal file
@ -0,0 +1,48 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <stack>
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
|
||||
namespace communication::rpc {
|
||||
|
||||
/**
|
||||
* A simple client pool that creates new RPC clients on demand. Useful when you
|
||||
* want to send RPCs to the same server from multiple threads without them
|
||||
* blocking each other.
|
||||
*/
|
||||
class ClientPool {
|
||||
public:
|
||||
ClientPool(const io::network::Endpoint &endpoint, const std::string &name)
|
||||
: endpoint_(endpoint), name_(name) {}
|
||||
|
||||
template <typename TRequestResponse, typename... Args>
|
||||
std::unique_ptr<typename TRequestResponse::Response> Call(Args &&... args) {
|
||||
std::unique_ptr<Client> client;
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
if (unused_clients_.empty()) {
|
||||
client = std::make_unique<Client>(endpoint_, name_);
|
||||
} else {
|
||||
client = std::move(unused_clients_.top());
|
||||
unused_clients_.pop();
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
auto resp = client->Call<TRequestResponse>(std::forward<Args>(args)...);
|
||||
|
||||
lock.lock();
|
||||
unused_clients_.push(std::move(client));
|
||||
return resp;
|
||||
};
|
||||
|
||||
private:
|
||||
io::network::Endpoint endpoint_;
|
||||
std::string name_;
|
||||
|
||||
std::mutex mutex_;
|
||||
std::stack<std::unique_ptr<Client>> unused_clients_;
|
||||
};
|
||||
|
||||
} // namespace communication::rpc
|
@ -11,9 +11,9 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "communication/rpc/client_pool.hpp"
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace communication::rpc;
|
||||
@ -91,3 +91,55 @@ TEST(Rpc, Abort) {
|
||||
|
||||
thread.join();
|
||||
}
|
||||
|
||||
TEST(Rpc, ClientPool) {
|
||||
System server_system({"127.0.0.1", 0});
|
||||
Server server(server_system, "main", 4);
|
||||
server.Register<Sum>([](const SumReq &request) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
return std::make_unique<SumRes>(request.x + request.y);
|
||||
});
|
||||
std::this_thread::sleep_for(100ms);
|
||||
|
||||
|
||||
Client client(server_system.endpoint(), "main");
|
||||
|
||||
/* these calls should take more than 400ms because we're using a regular
|
||||
* client */
|
||||
auto get_sum_client = [&client](int x, int y) {
|
||||
auto sum = client.Call<Sum>(x, y);
|
||||
ASSERT_TRUE(sum != nullptr);
|
||||
EXPECT_EQ(sum->sum, x + y);
|
||||
};
|
||||
|
||||
utils::Timer t1;
|
||||
std::vector<std::thread> threads;
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
threads.emplace_back(get_sum_client, 2 * i, 2 * i + 1);
|
||||
}
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
threads.clear();
|
||||
|
||||
EXPECT_GE(t1.Elapsed(), 400ms);
|
||||
|
||||
ClientPool pool(server_system.endpoint(), "main");
|
||||
|
||||
/* these calls shouldn't take much more that 100ms because they execute in
|
||||
* parallel */
|
||||
auto get_sum = [&pool](int x, int y) {
|
||||
auto sum = pool.Call<Sum>(x, y);
|
||||
ASSERT_TRUE(sum != nullptr);
|
||||
EXPECT_EQ(sum->sum, x + y);
|
||||
};
|
||||
|
||||
utils::Timer t2;
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
threads.emplace_back(get_sum, 2 * i, 2 * i + 1);
|
||||
}
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
EXPECT_LE(t2.Elapsed(), 200ms);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user