diff --git a/src/data_structures/concurrent/skiplist.hpp b/src/data_structures/concurrent/skiplist.hpp index 9d7f91773..d68d96a5b 100644 --- a/src/data_structures/concurrent/skiplist.hpp +++ b/src/data_structures/concurrent/skiplist.hpp @@ -570,7 +570,7 @@ class SkipList : private Lockable { Accessor(SkipList *skiplist) : skiplist(skiplist) { debug_assert(skiplist != nullptr, "Skiplist is nullptr."); - skiplist->gc.add_ref(); + skiplist->gc.AddRef(); } public: @@ -583,7 +583,7 @@ class SkipList : private Lockable { ~Accessor() { if (skiplist == nullptr) return; - skiplist->gc.release_ref(); + skiplist->gc.ReleaseRef(); } Iterator begin() { return skiplist->begin(); } @@ -1122,7 +1122,7 @@ class SkipList : private Lockable { preds[level]->forward(level, node->forward(level)); // TODO: review and test - gc.collect(node); + gc.Collect(node); count.fetch_sub(1); return true; diff --git a/src/data_structures/concurrent/skiplist_gc.hpp b/src/data_structures/concurrent/skiplist_gc.hpp index d3dd1f700..f2a2f0ea1 100644 --- a/src/data_structures/concurrent/skiplist_gc.hpp +++ b/src/data_structures/concurrent/skiplist_gc.hpp @@ -1,11 +1,14 @@ #pragma once // TODO: remove from here and from the project +#include #include +#include #include "logging/loggable.hpp" #include "memory/freelist.hpp" #include "memory/lazy_gc.hpp" +#include "threading/pool.hpp" #include "threading/sync/spinlock.hpp" #include "utils/assert.hpp" @@ -15,43 +18,55 @@ class SkiplistGC : public LazyGC, lock_t>, public: SkiplistGC() : Loggable("SkiplistGC") {} - // release_ref method should be called by a thread - // when the thread finish it job over object - // which has to be lazy cleaned - // if thread counter becames zero, all objects in the local_freelist - // are going to be deleted - // the only problem with this approach is that - // GC may never be called, but for now we can deal with that - void release_ref() { - std::vector local_freelist; + /** + * ReleaseRef method should be called by some thread which finishes access to + * skiplist. If thread reference_count_ becomes zero, all objects in the + * local_freelist are going to be deleted. The only problem with this approach + * is that GC may never be called, but for now we can deal with that. + */ + void ReleaseRef() { + // This has to be a shared_ptr since std::function requires that the + // callable object be copy-constructable. + std::shared_ptr> local_freelist = + std::make_shared>(); // take freelist if there is no more threads { auto lock = this->acquire_unique(); - debug_assert(this->count > 0, "Count is equal to zero."); - --this->count; - if (this->count == 0) { - freelist.swap(local_freelist); + debug_assert(this->reference_count_ > 0, "Count is equal to zero."); + --this->reference_count_; + if (this->reference_count_ == 0) { + freelist_.swap(*local_freelist); } } - if (local_freelist.size() > 0) { - logger.trace("GC started"); - logger.trace("Local list size: {}", local_freelist.size()); - long long counter = 0; - // destroy all elements from local_freelist - for (auto element : local_freelist) { - if (element->flags.is_marked()) { - T::destroy(element); - counter++; - } - } - logger.trace("Number of destroyed elements: {}", counter); + if (local_freelist->size() > 0) { + thread_pool_.run(std::bind( + [this](std::shared_ptr> local_freelist) { + logger.trace("GC started"); + logger.trace("Local list size: {}", local_freelist->size()); + long long destroyed = 0; + // destroy all elements from local_freelist + for (auto element : *local_freelist) { + if (element->flags.is_marked()) { + T::destroy(element); + destroyed++; + } else { + logger.warn( + "Unmarked node appeared in the collection ready for " + "destruction."); + } + } + logger.trace("Number of destroyed elements: {}", destroyed); + }, + local_freelist)); } } - void collect(T *node) { freelist.add(node); } + void Collect(T *node) { freelist_.add(node); } private: - FreeList freelist; + // We use FreeList since it's thread-safe. + FreeList freelist_; + Pool thread_pool_; }; diff --git a/src/memory/freelist.hpp b/src/memory/freelist.hpp index 5dbf2add9..6cd0c8c32 100644 --- a/src/memory/freelist.hpp +++ b/src/memory/freelist.hpp @@ -8,9 +8,9 @@ template class FreeList : Lockable { public: - void swap(std::vector &dst) { std::swap(data, dst); } + void swap(std::vector &dst) { std::swap(data, dst); } - void add(T *element) { + void add(T element) { auto lock = this->acquire_unique(); data.emplace_back(element); } @@ -18,5 +18,5 @@ class FreeList : Lockable { size_t size() const { return data.size(); } private: - std::vector data; + std::vector data; }; diff --git a/src/memory/lazy_gc.hpp b/src/memory/lazy_gc.hpp index 8c239400b..eae9c0d06 100644 --- a/src/memory/lazy_gc.hpp +++ b/src/memory/lazy_gc.hpp @@ -10,15 +10,15 @@ template class LazyGC : public Crtp, public Lockable { public: - // add_ref method should be called by a thread + // AddRef method should be called by a thread // when the thread has to do something over // object which has to be lazy cleaned when // the thread finish it job - void add_ref() { + void AddRef() { auto lock = this->acquire_unique(); - ++count; + ++reference_count_; } protected: - size_t count{0}; + size_t reference_count_{0}; }; diff --git a/src/threading/pool.hpp b/src/threading/pool.hpp index 2e55e5b3e..ea21c326a 100644 --- a/src/threading/pool.hpp +++ b/src/threading/pool.hpp @@ -8,6 +8,11 @@ #include "threading/sync/lockable.hpp" +/** + * ThreadPool which will invoke maximum concurrent number of threads for the + * used hardware and will schedule tasks on thread as they are added to the + * pool. + */ class Pool : Lockable { using task_t = std::function; @@ -31,6 +36,10 @@ class Pool : Lockable { for (auto& thread : threads) thread.join(); } + /** + * Runs an asynchronous task. + * @param f - task to run. + */ void run(task_t f) { { auto lock = acquire_unique(); @@ -64,6 +73,7 @@ class Pool : Lockable { tasks.pop(); } + // Start the execution of task. task(); } } diff --git a/tests/unit/skiplist_gc.cpp b/tests/unit/skiplist_gc.cpp new file mode 100644 index 000000000..fe4fca714 --- /dev/null +++ b/tests/unit/skiplist_gc.cpp @@ -0,0 +1,105 @@ +#include "gtest/gtest.h" + +#include +#include +#include + +#include "data_structures/concurrent/skiplist.hpp" +#include "logging/streams/stderr.hpp" + +/** + * FakeItem class which increments a variable in the destructor. + * Used to keep track of the number of destroyed elements in GC. + */ +class FakeItem { + public: + FakeItem(std::atomic &count, int value) : count(count), value(value) {} + ~FakeItem() { count.fetch_add(1); } + + bool operator<(const FakeItem &item) const { + return this->value < item.value; + } + bool operator>(const FakeItem &item) const { + return this->value > item.value; + } + + private: + std::atomic &count; + int value; +}; + +TEST(SkipListGC, TripleScopeGC) { + SkipList skiplist; + std::atomic count{0}; + auto item = FakeItem(count, 1); + { + auto access_1 = skiplist.access(); + { + auto access_2 = skiplist.access(); + { + auto access_3 = skiplist.access(); + access_1.insert(item); // add with 1 + access_2.remove(item); // remove with 2 + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); + } // scope end - GC called + for (int i = 0; i < 10; ++i) { + if (count != 0) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_EQ(count, 1); +} + +TEST(SkipListGC, BlockedGCNoGC) { + SkipList skiplist; + std::atomic count{0}; + auto item = FakeItem(count, 1); + auto blocking_access = skiplist.access(); + { + auto access = skiplist.access(); + access.insert(item); + access.remove(item); + } // scope end - GC still isn't called because of blocking_access + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); +} + +TEST(SkipListGC, NotInScopeGC) { + SkipList skiplist; + std::atomic count{0}; + auto item = FakeItem(count, 1); + { + auto access = skiplist.access(); + access.insert(item); + access.remove(item); + } // scope end - GC called + for (int i = 0; i < 10; ++i) { + if (count != 0) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_EQ(count, 1); +} + +TEST(SkipListGC, StillInScopeNoGC) { + SkipList skiplist; + std::atomic count{0}; + auto item = FakeItem(count, 1); + auto access = skiplist.access(); + access.insert(item); + access.remove(item); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(count, 0); +} + +int main(int argc, char **argv) { + logging::init_sync(); + logging::log->pipe(std::make_unique()); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}