fixed threadpool

This commit is contained in:
Dominik Tomičević 2015-09-19 15:45:01 +02:00
parent 6939ee9769
commit ab33fce01b
4 changed files with 41 additions and 90 deletions

View File

@ -3,77 +3,53 @@
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <future> #include <future>
#include <queue>
#include <condition_variable>
#include "data_structures/queue/slqueue.hpp" #include "sync/lockable.hpp"
#include "threading/sync/lockable.hpp"
#include "worker.hpp"
#include "data_structures/queue/slqueue.hpp"
class Pool : Lockable<std::mutex> class Pool : Lockable<std::mutex>
{ {
using task_t = std::function<void()>; using task_t = std::function<void()>;
public: public:
Pool(size_t n = std::thread::hardware_concurrency()) Pool(size_t n = std::thread::hardware_concurrency()) : alive(true)
: alive(true)
{ {
start(n); threads.reserve(n);
for(size_t i = 0; i < n; ++i)
threads.emplace_back([this]()->void { loop(); });
} }
Pool(Pool&) = delete;
Pool(Pool&&) = delete;
~Pool() ~Pool()
{ {
alive.store(false, std::memory_order_release); alive.store(false, std::memory_order_release);
cond.notify_all(); cond.notify_all();
for(auto& worker : workers) for(auto& thread : threads)
worker.join(); thread.join();
}
size_t size()
{
return workers.size();
} }
template <class F, class... Args> template <class F, class... Args>
void execute(F&& f, Args&&... args) void run(F&& f, Args&&... args)
{ {
{ {
auto guard = acquire(); auto lock = acquire();
tasks.emplace([f, args...]() { f(args...); });
tasks.emplace([&f, &args...]() {
f(std::forward<Args>(args)...);
});
} }
cond.notify_one(); cond.notify_one();
} }
template <class F, class... Args>
auto execute_with_result(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using ret_t = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<ret_t()>>
(std::bind(f, args...));
auto result = task->get_future();
{
auto guard = acquire();
tasks.emplace([task]() { (*task)(); });
}
cond.notify_one();
return result;
}
private: private:
Pool(const Pool&) = delete; std::vector<std::thread> threads;
Pool(Pool&&) = delete; std::queue<task_t> tasks;
std::vector<Worker<task_t>> workers;
spinlock::Queue<task_t> tasks;
std::atomic<bool> alive; std::atomic<bool> alive;
std::mutex mutex; std::mutex mutex;
@ -81,29 +57,27 @@ private:
void loop() void loop()
{ {
task_t task;
while(true) while(true)
{ {
while(tasks.pop(task)) task_t task;
task();
auto guard = acquire(); {
auto lock = acquire();
cond.wait(guard, [this] { cond.wait(lock, [this] {
return !this->alive || !this->tasks.empty(); return !this->alive || !this->tasks.empty();
}); });
if(!alive && tasks.empty()) if(!alive && tasks.empty())
return; return;
task = std::move(tasks.front());
tasks.pop();
}
task();
} }
} }
void start(size_t n)
{
for(size_t i = 0; i < n; ++i)
workers.emplace_back([this]()->void { this->loop(); });
}
}; };
#endif #endif

View File

@ -10,8 +10,6 @@ public:
void lock() void lock()
{ {
// TODO add asm_pause and counter first before sleeping
// might be faster, but test this and see
while(lock_flag.test_and_set(std::memory_order_acquire)) while(lock_flag.test_and_set(std::memory_order_acquire))
usleep(250); usleep(250);
} }

View File

@ -4,16 +4,17 @@
int main(void) int main(void)
{ {
std::cout << "hardware_concurrency " << std::thread::hardware_concurrency() << std::endl; auto size = 7;
auto size = 2048;
auto N = 1000000; auto N = 1000000;
Pool pool(size); Pool pool(size);
for(int i = 0; i < N; ++i) for(int i = 0; i < N; ++i)
pool.execute([size](int) { pool.run([](int) {
std::this_thread::sleep_for(std::chrono::milliseconds(5)); int sum = 0;
for(int i = 0; i < 2000; ++i)
sum += i % 7;
}, i); }, i);
return 0; return 0;

View File

@ -1,22 +0,0 @@
#ifndef MEMGRAPH_THREADING_WORKER_HPP
#define MEMGRAPH_THREADING_WORKER_HPP
#include <thread>
template <class F, class... Args>
class Worker
{
public:
Worker(F&& f, Args&&... args)
: thread(f, args...) {}
void join()
{
thread.join();
}
private:
std::thread thread;
};
#endif