From 4b712595d9514d80dd2bfd3c89ce733d7986f1b5 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Tue, 16 May 2017 15:54:40 +0200 Subject: [PATCH] Global thread pool added. Summary: Global thread pool implementation. Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D369 --- .../concurrent/skiplist_gc.hpp | 26 ++++++++----- src/threading/global_pool.hpp | 32 ++++++++++++++++ src/threading/pool.hpp | 3 +- tests/unit/skiplist_gc.cpp | 38 ++++++++++--------- 4 files changed, 70 insertions(+), 29 deletions(-) create mode 100644 src/threading/global_pool.hpp diff --git a/src/data_structures/concurrent/skiplist_gc.hpp b/src/data_structures/concurrent/skiplist_gc.hpp index f2a2f0ea1..6eb287035 100644 --- a/src/data_structures/concurrent/skiplist_gc.hpp +++ b/src/data_structures/concurrent/skiplist_gc.hpp @@ -8,7 +8,7 @@ #include "logging/loggable.hpp" #include "memory/freelist.hpp" #include "memory/lazy_gc.hpp" -#include "threading/pool.hpp" +#include "threading/global_pool.hpp" #include "threading/sync/spinlock.hpp" #include "utils/assert.hpp" @@ -41,10 +41,17 @@ class SkiplistGC : public LazyGC, lock_t>, } if (local_freelist->size() > 0) { - thread_pool_.run(std::bind( - [this](std::shared_ptr> local_freelist) { - logger.trace("GC started"); - logger.trace("Local list size: {}", local_freelist->size()); + // We need to use a Global thread pool, otherwise we run into problems + // because each skiplist would use it's own thread pool and that would + // invoke too many threads. + GlobalPool::getSingletonInstance()->run(std::bind( + [](std::shared_ptr> local_freelist) { + // Comment logger out for now because we can't send an instance of + // this to global pool because this SkipListGc instance could be + // destroyed before the GC starts and as such will SEGFAULT. + + // logger.trace("GC started"); + // logger.trace("Local list size: {}", local_freelist->size()); long long destroyed = 0; // destroy all elements from local_freelist for (auto element : *local_freelist) { @@ -52,12 +59,12 @@ class SkiplistGC : public LazyGC, lock_t>, T::destroy(element); destroyed++; } else { - logger.warn( - "Unmarked node appeared in the collection ready for " - "destruction."); + // logger.warn( + // "Unmarked node appeared in the collection ready for " + // "destruction."); } } - logger.trace("Number of destroyed elements: {}", destroyed); + // logger.trace("Number of destroyed elements: {}", destroyed); }, local_freelist)); } @@ -68,5 +75,4 @@ class SkiplistGC : public LazyGC, lock_t>, private: // We use FreeList since it's thread-safe. FreeList freelist_; - Pool thread_pool_; }; diff --git a/src/threading/global_pool.hpp b/src/threading/global_pool.hpp new file mode 100644 index 000000000..a1a3b10ec --- /dev/null +++ b/src/threading/global_pool.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "threading/pool.hpp" + +/** + * @brief - Singleton class which implements thread pool. + */ +class GlobalPool { + public: + // Guaranteed by the C++11 standard to be thread-safe. + static GlobalPool *getSingletonInstance() { + static GlobalPool instance; + return &instance; + } + + void run(Pool::task_t f) { thread_pool_.run(f); } + + GlobalPool(const GlobalPool &) = delete; + GlobalPool(const GlobalPool &&) = delete; + GlobalPool operator=(const GlobalPool &) = delete; + GlobalPool operator=(const GlobalPool &&) = delete; + + private: + GlobalPool() {} + Pool thread_pool_; +}; diff --git a/src/threading/pool.hpp b/src/threading/pool.hpp index f577b1ca7..fd1dde46f 100644 --- a/src/threading/pool.hpp +++ b/src/threading/pool.hpp @@ -14,9 +14,8 @@ * pool. */ class Pool : Lockable { - using task_t = std::function; - public: + using task_t = std::function; using sptr = std::shared_ptr; explicit Pool(size_t n = std::thread::hardware_concurrency()) : alive(true) { diff --git a/tests/unit/skiplist_gc.cpp b/tests/unit/skiplist_gc.cpp index fe4fca714..3b3a80ec9 100644 --- a/tests/unit/skiplist_gc.cpp +++ b/tests/unit/skiplist_gc.cpp @@ -30,8 +30,9 @@ class FakeItem { TEST(SkipListGC, TripleScopeGC) { SkipList skiplist; - std::atomic count{0}; - auto item = FakeItem(count, 1); + std::atomic *count = new std::atomic{0}; + + auto item = FakeItem(*count, 1); { auto access_1 = skiplist.access(); { @@ -41,25 +42,25 @@ TEST(SkipListGC, TripleScopeGC) { access_1.insert(item); // add with 1 access_2.remove(item); // remove with 2 std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(count, 0); + EXPECT_EQ(*count, 0); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(count, 0); + EXPECT_EQ(*count, 0); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(count, 0); + EXPECT_EQ(*count, 0); } // scope end - GC called for (int i = 0; i < 10; ++i) { - if (count != 0) break; + if (*count != 0) break; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - EXPECT_EQ(count, 1); + EXPECT_EQ(*count, 1); } TEST(SkipListGC, BlockedGCNoGC) { SkipList skiplist; - std::atomic count{0}; - auto item = FakeItem(count, 1); + std::atomic *count = new std::atomic{0}; + auto item = FakeItem(*count, 1); auto blocking_access = skiplist.access(); { auto access = skiplist.access(); @@ -67,34 +68,37 @@ TEST(SkipListGC, BlockedGCNoGC) { access.remove(item); } // scope end - GC still isn't called because of blocking_access std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(count, 0); + EXPECT_EQ(*count, 0); } TEST(SkipListGC, NotInScopeGC) { SkipList skiplist; - std::atomic count{0}; - auto item = FakeItem(count, 1); + std::atomic *count = new std::atomic{0}; + auto item = FakeItem(*count, 1); { auto access = skiplist.access(); access.insert(item); access.remove(item); } // scope end - GC called for (int i = 0; i < 10; ++i) { - if (count != 0) break; + if (*count == 1) break; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - EXPECT_EQ(count, 1); + // If this count is not 1 that means we are still doing GC in the background + // and might crash the test if we try to modify count variable after it's been + // deallocated. + ASSERT_EQ(*count, 1); } TEST(SkipListGC, StillInScopeNoGC) { SkipList skiplist; - std::atomic count{0}; - auto item = FakeItem(count, 1); + std::atomic *count = new std::atomic{0}; + auto item = FakeItem(*count, 1); auto access = skiplist.access(); access.insert(item); access.remove(item); std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(count, 0); + EXPECT_EQ(*count, 0); } int main(int argc, char **argv) {