Use ThreadPool for async RPC

Reviewers: teon.banek, mtomic, mferencevic

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1287
This commit is contained in:
florijan 2018-03-09 10:26:51 +01:00
parent 4675032e8d
commit c35d7fbec6
5 changed files with 132 additions and 93 deletions

View File

@ -9,6 +9,8 @@
#include "distributed/index_rpc_messages.hpp"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
#include "threading/thread_pool.hpp"
#include "utils/future.hpp"
namespace distributed {
@ -17,7 +19,9 @@ namespace distributed {
* Thread safe. */
class RpcWorkerClients {
public:
RpcWorkerClients(Coordination &coordination) : coordination_(coordination) {}
RpcWorkerClients(Coordination &coordination)
: coordination_(coordination),
thread_pool_(std::thread::hardware_concurrency()) {}
RpcWorkerClients(const RpcWorkerClients &) = delete;
RpcWorkerClients(RpcWorkerClients &&) = delete;
@ -45,9 +49,7 @@ class RpcWorkerClients {
int worker_id,
std::function<TResult(communication::rpc::ClientPool &)> execute) {
auto &client_pool = GetClientPool(worker_id);
return utils::make_future(
std::async(std::launch::async,
[execute, &client_pool]() { return execute(client_pool); }));
return thread_pool_.Run(execute, std::ref(client_pool));
}
/** Asynchroniously executes the `execute` function on all worker rpc clients
@ -70,6 +72,7 @@ class RpcWorkerClients {
Coordination &coordination_;
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
std::mutex lock_;
threading::ThreadPool thread_pool_;
};
/** Wrapper class around a RPC call to build indices.
@ -94,4 +97,5 @@ class IndexRpcClients {
private:
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -1,87 +0,0 @@
#pragma once
#include <atomic>
#include <condition_variable>
#include <future>
#include <mutex>
#include <queue>
#include "threading/sync/lockable.hpp"
/**
* ThreadPool which will invoke maximum concurrent number of threads for the
* used hardware and will schedule tasks on thread as they are added to the
* pool.
*/
class Pool : Lockable<std::mutex> {
public:
using task_t = std::function<void()>;
using sptr = std::shared_ptr<Pool>;
explicit Pool(size_t n = std::thread::hardware_concurrency()) : alive(true) {
threads.reserve(n);
for (size_t i = 0; i < n; ++i)
threads.emplace_back([this]() -> void { loop(); });
}
Pool(Pool &) = delete;
Pool(Pool &&) = delete;
~Pool() {
{
// We need to hold the lock before we notify threads because condition
// variable wait for could read the value of alive as true, then receive a
// notification and then continue waiting since it read the value as true,
// that's why we have to force notification to occur while the condition
// variable doesn't have a lock.
auto lock = acquire_unique();
alive.store(false, std::memory_order_seq_cst);
cond.notify_all();
}
for (auto &thread : threads) thread.join();
}
/**
* Runs an asynchronous task.
* @param f - task to run.
*/
void run(task_t f) {
{
auto lock = acquire_unique();
tasks.push(f);
}
cond.notify_one();
}
private:
std::vector<std::thread> threads;
std::queue<task_t> tasks;
std::atomic<bool> alive;
std::mutex mutex;
std::condition_variable cond;
void loop() {
while (true) {
task_t task;
{
auto lock = acquire_unique();
cond.wait(lock,
[this] { return !this->alive || !this->tasks.empty(); });
if (!alive && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
// Start the execution of task.
task();
}
}
};

View File

@ -0,0 +1,83 @@
#pragma once
/// @file
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
#include "glog/logging.h"
#include "utils/future.hpp"
namespace threading {
/// A thread pool for asynchronous task execution. Supports tasks that produce
/// return values by returning `utils::Future` objects.
class ThreadPool {
public:
/// Creates a thread pool with the given number of threads.
explicit ThreadPool(size_t threads) {
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cvar_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
/// Runs the given callable with the given args, asynchronously. This function
/// immediately returns an `utils::Future` with the result, to be
/// consumed when ready.
template <class TCallable, class... TArgs>
auto Run(TCallable &&callable, TArgs &&... args) {
auto task = std::make_shared<
std::packaged_task<std::result_of_t<TCallable(TArgs...)>()>>(std::bind(
std::forward<TCallable>(callable), std::forward<TArgs>(args)...));
auto res = utils::make_future(task->get_future());
std::unique_lock<std::mutex> lock(mutex_);
CHECK(!stop_) << "ThreadPool::Run called on stopped ThreadPool.";
tasks_.emplace([task]() { (*task)(); });
lock.unlock();
cvar_.notify_one();
return res;
}
~ThreadPool() {
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
lock.unlock();
cvar_.notify_all();
for (std::thread &worker : workers_) {
if (worker.joinable()) worker.join();
}
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cvar_;
bool stop_{false};
};
} // namespace threading

View File

@ -1,5 +1,3 @@
#include <future>
#include "boost/archive/binary_iarchive.hpp"
#include "boost/archive/binary_oarchive.hpp"
#include "boost/serialization/export.hpp"

View File

@ -0,0 +1,41 @@
#include <chrono>
#include <memory>
#include <unordered_set>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "threading/thread_pool.hpp"
#include "utils/future.hpp"
#include "utils/timer.hpp"
TEST(ThreadPool, RunMany) {
threading::ThreadPool tp(10);
const int kResults = 10000;
std::vector<utils::Future<int>> results;
for (int i = 0; i < kResults; ++i) {
results.emplace_back(tp.Run([i]() { return i; }));
}
std::unordered_set<int> result_set;
for (auto &result : results) result_set.insert(result.get());
EXPECT_EQ(result_set.size(), kResults);
}
TEST(ThreadPool, EnsureParallel) {
using namespace std::chrono_literals;
const int kSize = 10;
threading::ThreadPool tp(kSize);
std::vector<utils::Future<void>> results;
utils::Timer t;
for (int i = 0; i < kSize; ++i) {
results.emplace_back(tp.Run([]() { std::this_thread::sleep_for(50ms); }));
}
for (auto &res : results) res.wait();
auto elapsed = t.Elapsed();
EXPECT_GE(elapsed, 30ms);
EXPECT_LE(elapsed, 200ms);
}