Global thread pool added.
Summary: Global thread pool implementation. Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D369
This commit is contained in:
parent
23ab2b41a3
commit
4b712595d9
@ -8,7 +8,7 @@
|
||||
#include "logging/loggable.hpp"
|
||||
#include "memory/freelist.hpp"
|
||||
#include "memory/lazy_gc.hpp"
|
||||
#include "threading/pool.hpp"
|
||||
#include "threading/global_pool.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
@ -41,10 +41,17 @@ class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>,
|
||||
}
|
||||
|
||||
if (local_freelist->size() > 0) {
|
||||
thread_pool_.run(std::bind(
|
||||
[this](std::shared_ptr<std::vector<T *>> local_freelist) {
|
||||
logger.trace("GC started");
|
||||
logger.trace("Local list size: {}", local_freelist->size());
|
||||
// We need to use a Global thread pool, otherwise we run into problems
|
||||
// because each skiplist would use it's own thread pool and that would
|
||||
// invoke too many threads.
|
||||
GlobalPool::getSingletonInstance()->run(std::bind(
|
||||
[](std::shared_ptr<std::vector<T *>> local_freelist) {
|
||||
// Comment logger out for now because we can't send an instance of
|
||||
// this to global pool because this SkipListGc instance could be
|
||||
// destroyed before the GC starts and as such will SEGFAULT.
|
||||
|
||||
// logger.trace("GC started");
|
||||
// logger.trace("Local list size: {}", local_freelist->size());
|
||||
long long destroyed = 0;
|
||||
// destroy all elements from local_freelist
|
||||
for (auto element : *local_freelist) {
|
||||
@ -52,12 +59,12 @@ class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>,
|
||||
T::destroy(element);
|
||||
destroyed++;
|
||||
} else {
|
||||
logger.warn(
|
||||
"Unmarked node appeared in the collection ready for "
|
||||
"destruction.");
|
||||
// logger.warn(
|
||||
// "Unmarked node appeared in the collection ready for "
|
||||
// "destruction.");
|
||||
}
|
||||
}
|
||||
logger.trace("Number of destroyed elements: {}", destroyed);
|
||||
// logger.trace("Number of destroyed elements: {}", destroyed);
|
||||
},
|
||||
local_freelist));
|
||||
}
|
||||
@ -68,5 +75,4 @@ class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>,
|
||||
private:
|
||||
// We use FreeList since it's thread-safe.
|
||||
FreeList<T *> freelist_;
|
||||
Pool thread_pool_;
|
||||
};
|
||||
|
32
src/threading/global_pool.hpp
Normal file
32
src/threading/global_pool.hpp
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <future>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
#include "threading/pool.hpp"
|
||||
|
||||
/**
|
||||
* @brief - Singleton class which implements thread pool.
|
||||
*/
|
||||
class GlobalPool {
|
||||
public:
|
||||
// Guaranteed by the C++11 standard to be thread-safe.
|
||||
static GlobalPool *getSingletonInstance() {
|
||||
static GlobalPool instance;
|
||||
return &instance;
|
||||
}
|
||||
|
||||
void run(Pool::task_t f) { thread_pool_.run(f); }
|
||||
|
||||
GlobalPool(const GlobalPool &) = delete;
|
||||
GlobalPool(const GlobalPool &&) = delete;
|
||||
GlobalPool operator=(const GlobalPool &) = delete;
|
||||
GlobalPool operator=(const GlobalPool &&) = delete;
|
||||
|
||||
private:
|
||||
GlobalPool() {}
|
||||
Pool thread_pool_;
|
||||
};
|
@ -14,9 +14,8 @@
|
||||
* pool.
|
||||
*/
|
||||
class Pool : Lockable<std::mutex> {
|
||||
using task_t = std::function<void()>;
|
||||
|
||||
public:
|
||||
using task_t = std::function<void()>;
|
||||
using sptr = std::shared_ptr<Pool>;
|
||||
|
||||
explicit Pool(size_t n = std::thread::hardware_concurrency()) : alive(true) {
|
||||
|
@ -30,8 +30,9 @@ class FakeItem {
|
||||
|
||||
TEST(SkipListGC, TripleScopeGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
|
||||
auto item = FakeItem(*count, 1);
|
||||
{
|
||||
auto access_1 = skiplist.access();
|
||||
{
|
||||
@ -41,25 +42,25 @@ TEST(SkipListGC, TripleScopeGC) {
|
||||
access_1.insert(item); // add with 1
|
||||
access_2.remove(item); // remove with 2
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
EXPECT_EQ(*count, 0);
|
||||
} // scope end - GC called
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
if (count != 0) break;
|
||||
if (*count != 0) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
EXPECT_EQ(count, 1);
|
||||
EXPECT_EQ(*count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, BlockedGCNoGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
auto item = FakeItem(*count, 1);
|
||||
auto blocking_access = skiplist.access();
|
||||
{
|
||||
auto access = skiplist.access();
|
||||
@ -67,34 +68,37 @@ TEST(SkipListGC, BlockedGCNoGC) {
|
||||
access.remove(item);
|
||||
} // scope end - GC still isn't called because of blocking_access
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, NotInScopeGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
auto item = FakeItem(*count, 1);
|
||||
{
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
} // scope end - GC called
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
if (count != 0) break;
|
||||
if (*count == 1) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
EXPECT_EQ(count, 1);
|
||||
// If this count is not 1 that means we are still doing GC in the background
|
||||
// and might crash the test if we try to modify count variable after it's been
|
||||
// deallocated.
|
||||
ASSERT_EQ(*count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, StillInScopeNoGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
auto item = FakeItem(*count, 1);
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
Loading…
Reference in New Issue
Block a user