Test GarbageCollector in skiplist.
Summary: Tests the skiplist GC. Reviewers: florijan, mferencevic, buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D145
This commit is contained in:
parent
596b015e99
commit
afbec0a26e
@ -570,7 +570,7 @@ class SkipList : private Lockable<lock_t> {
|
||||
Accessor(SkipList *skiplist) : skiplist(skiplist) {
|
||||
debug_assert(skiplist != nullptr, "Skiplist is nullptr.");
|
||||
|
||||
skiplist->gc.add_ref();
|
||||
skiplist->gc.AddRef();
|
||||
}
|
||||
|
||||
public:
|
||||
@ -583,7 +583,7 @@ class SkipList : private Lockable<lock_t> {
|
||||
~Accessor() {
|
||||
if (skiplist == nullptr) return;
|
||||
|
||||
skiplist->gc.release_ref();
|
||||
skiplist->gc.ReleaseRef();
|
||||
}
|
||||
|
||||
Iterator begin() { return skiplist->begin(); }
|
||||
@ -1122,7 +1122,7 @@ class SkipList : private Lockable<lock_t> {
|
||||
preds[level]->forward(level, node->forward(level));
|
||||
|
||||
// TODO: review and test
|
||||
gc.collect(node);
|
||||
gc.Collect(node);
|
||||
|
||||
count.fetch_sub(1);
|
||||
return true;
|
||||
|
@ -1,11 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
// TODO: remove from here and from the project
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include "logging/loggable.hpp"
|
||||
#include "memory/freelist.hpp"
|
||||
#include "memory/lazy_gc.hpp"
|
||||
#include "threading/pool.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
@ -15,43 +18,55 @@ class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>,
|
||||
public:
|
||||
SkiplistGC() : Loggable("SkiplistGC") {}
|
||||
|
||||
// release_ref method should be called by a thread
|
||||
// when the thread finish it job over object
|
||||
// which has to be lazy cleaned
|
||||
// if thread counter becames zero, all objects in the local_freelist
|
||||
// are going to be deleted
|
||||
// the only problem with this approach is that
|
||||
// GC may never be called, but for now we can deal with that
|
||||
void release_ref() {
|
||||
std::vector<T *> local_freelist;
|
||||
/**
|
||||
* ReleaseRef method should be called by some thread which finishes access to
|
||||
* skiplist. If thread reference_count_ becomes zero, all objects in the
|
||||
* local_freelist are going to be deleted. The only problem with this approach
|
||||
* is that GC may never be called, but for now we can deal with that.
|
||||
*/
|
||||
void ReleaseRef() {
|
||||
// This has to be a shared_ptr since std::function requires that the
|
||||
// callable object be copy-constructable.
|
||||
std::shared_ptr<std::vector<T *>> local_freelist =
|
||||
std::make_shared<std::vector<T *>>();
|
||||
|
||||
// take freelist if there is no more threads
|
||||
{
|
||||
auto lock = this->acquire_unique();
|
||||
debug_assert(this->count > 0, "Count is equal to zero.");
|
||||
--this->count;
|
||||
if (this->count == 0) {
|
||||
freelist.swap(local_freelist);
|
||||
debug_assert(this->reference_count_ > 0, "Count is equal to zero.");
|
||||
--this->reference_count_;
|
||||
if (this->reference_count_ == 0) {
|
||||
freelist_.swap(*local_freelist);
|
||||
}
|
||||
}
|
||||
|
||||
if (local_freelist.size() > 0) {
|
||||
logger.trace("GC started");
|
||||
logger.trace("Local list size: {}", local_freelist.size());
|
||||
long long counter = 0;
|
||||
// destroy all elements from local_freelist
|
||||
for (auto element : local_freelist) {
|
||||
if (element->flags.is_marked()) {
|
||||
T::destroy(element);
|
||||
counter++;
|
||||
}
|
||||
}
|
||||
logger.trace("Number of destroyed elements: {}", counter);
|
||||
if (local_freelist->size() > 0) {
|
||||
thread_pool_.run(std::bind(
|
||||
[this](std::shared_ptr<std::vector<T *>> local_freelist) {
|
||||
logger.trace("GC started");
|
||||
logger.trace("Local list size: {}", local_freelist->size());
|
||||
long long destroyed = 0;
|
||||
// destroy all elements from local_freelist
|
||||
for (auto element : *local_freelist) {
|
||||
if (element->flags.is_marked()) {
|
||||
T::destroy(element);
|
||||
destroyed++;
|
||||
} else {
|
||||
logger.warn(
|
||||
"Unmarked node appeared in the collection ready for "
|
||||
"destruction.");
|
||||
}
|
||||
}
|
||||
logger.trace("Number of destroyed elements: {}", destroyed);
|
||||
},
|
||||
local_freelist));
|
||||
}
|
||||
}
|
||||
|
||||
void collect(T *node) { freelist.add(node); }
|
||||
void Collect(T *node) { freelist_.add(node); }
|
||||
|
||||
private:
|
||||
FreeList<T> freelist;
|
||||
// We use FreeList since it's thread-safe.
|
||||
FreeList<T *> freelist_;
|
||||
Pool thread_pool_;
|
||||
};
|
||||
|
@ -8,9 +8,9 @@
|
||||
template <class T, class lock_t = SpinLock>
|
||||
class FreeList : Lockable<lock_t> {
|
||||
public:
|
||||
void swap(std::vector<T *> &dst) { std::swap(data, dst); }
|
||||
void swap(std::vector<T> &dst) { std::swap(data, dst); }
|
||||
|
||||
void add(T *element) {
|
||||
void add(T element) {
|
||||
auto lock = this->acquire_unique();
|
||||
data.emplace_back(element);
|
||||
}
|
||||
@ -18,5 +18,5 @@ class FreeList : Lockable<lock_t> {
|
||||
size_t size() const { return data.size(); }
|
||||
|
||||
private:
|
||||
std::vector<T *> data;
|
||||
std::vector<T> data;
|
||||
};
|
||||
|
@ -10,15 +10,15 @@
|
||||
template <class Derived, class lock_t = SpinLock>
|
||||
class LazyGC : public Crtp<Derived>, public Lockable<lock_t> {
|
||||
public:
|
||||
// add_ref method should be called by a thread
|
||||
// AddRef method should be called by a thread
|
||||
// when the thread has to do something over
|
||||
// object which has to be lazy cleaned when
|
||||
// the thread finish it job
|
||||
void add_ref() {
|
||||
void AddRef() {
|
||||
auto lock = this->acquire_unique();
|
||||
++count;
|
||||
++reference_count_;
|
||||
}
|
||||
|
||||
protected:
|
||||
size_t count{0};
|
||||
size_t reference_count_{0};
|
||||
};
|
||||
|
@ -8,6 +8,11 @@
|
||||
|
||||
#include "threading/sync/lockable.hpp"
|
||||
|
||||
/**
|
||||
* ThreadPool which will invoke maximum concurrent number of threads for the
|
||||
* used hardware and will schedule tasks on thread as they are added to the
|
||||
* pool.
|
||||
*/
|
||||
class Pool : Lockable<std::mutex> {
|
||||
using task_t = std::function<void()>;
|
||||
|
||||
@ -31,6 +36,10 @@ class Pool : Lockable<std::mutex> {
|
||||
for (auto& thread : threads) thread.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs an asynchronous task.
|
||||
* @param f - task to run.
|
||||
*/
|
||||
void run(task_t f) {
|
||||
{
|
||||
auto lock = acquire_unique();
|
||||
@ -64,6 +73,7 @@ class Pool : Lockable<std::mutex> {
|
||||
tasks.pop();
|
||||
}
|
||||
|
||||
// Start the execution of task.
|
||||
task();
|
||||
}
|
||||
}
|
||||
|
105
tests/unit/skiplist_gc.cpp
Normal file
105
tests/unit/skiplist_gc.cpp
Normal file
@ -0,0 +1,105 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
#include "logging/streams/stderr.hpp"
|
||||
|
||||
/**
|
||||
* FakeItem class which increments a variable in the destructor.
|
||||
* Used to keep track of the number of destroyed elements in GC.
|
||||
*/
|
||||
class FakeItem {
|
||||
public:
|
||||
FakeItem(std::atomic<int> &count, int value) : count(count), value(value) {}
|
||||
~FakeItem() { count.fetch_add(1); }
|
||||
|
||||
bool operator<(const FakeItem &item) const {
|
||||
return this->value < item.value;
|
||||
}
|
||||
bool operator>(const FakeItem &item) const {
|
||||
return this->value > item.value;
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<int> &count;
|
||||
int value;
|
||||
};
|
||||
|
||||
TEST(SkipListGC, TripleScopeGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
{
|
||||
auto access_1 = skiplist.access();
|
||||
{
|
||||
auto access_2 = skiplist.access();
|
||||
{
|
||||
auto access_3 = skiplist.access();
|
||||
access_1.insert(item); // add with 1
|
||||
access_2.remove(item); // remove with 2
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
} // scope end - GC called
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
if (count != 0) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
EXPECT_EQ(count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, BlockedGCNoGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
auto blocking_access = skiplist.access();
|
||||
{
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
} // scope end - GC still isn't called because of blocking_access
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, NotInScopeGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
{
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
} // scope end - GC called
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
if (count != 0) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
EXPECT_EQ(count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, StillInScopeNoGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> count{0};
|
||||
auto item = FakeItem(count, 1);
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(count, 0);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
logging::init_sync();
|
||||
logging::log->pipe(std::make_unique<Stderr>());
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
Loading…
Reference in New Issue
Block a user