utils/assert code, static array, skiplist tests and bug fix

This commit is contained in:
Marko Budiselic 2016-06-16 19:07:49 +01:00
parent 425be0acad
commit 83f094a15f
9 changed files with 245 additions and 50 deletions

View File

@ -8,8 +8,8 @@
#include "threading/sync/spinlock.hpp"
#include "utils/random/fast_binomial.hpp"
#include "memory/lazy_gc.hpp"
#include "utils/placeholder.hpp"
#include "skiplist_gc.hpp"
/* @brief Concurrent lock-based skiplist with fine grained locking
*
@ -94,7 +94,7 @@
* and deletion of nodes.
*/
template <class K, class T, size_t H=32, class lock_t=SpinLock>
class SkipList : LazyGC<SkipList<K, T, H, lock_t>>, Lockable<lock_t>
class SkipList : private Lockable<lock_t>
{
public:
// computes the height for the new node from the interval [1...H]
@ -363,7 +363,8 @@ public:
Accessor(SkipList* skiplist) : skiplist(skiplist)
{
assert(skiplist != nullptr);
// addref
skiplist->gc.add_ref();
}
public:
@ -379,7 +380,7 @@ public:
if(skiplist == nullptr)
return;
// releaseref
skiplist->gc.release_ref();
}
Iterator begin()
@ -437,6 +438,11 @@ public:
return skiplist->remove(key, preds, succs);
}
size_t size() const
{
return skiplist->size();
}
private:
SkipList* skiplist;
Node* preds[H], *succs[H];
@ -487,7 +493,7 @@ private:
size_t size() const
{
return count.load(std::memory_order_acquire);
return count.load();
}
bool greater(const K& key, const Node* const node)
@ -627,7 +633,7 @@ private:
}
new_node->flags.set_fully_linked();
count.fetch_add(1, std::memory_order_relaxed);
count.fetch_add(1);
return {Iterator {new_node}, true};
}
@ -651,8 +657,8 @@ private:
{
auto level = find_path(header, H - 1, key, preds, succs);
if(!marked && (level == -1 || !ok_delete(succs[level], level)))
return false;
if(!marked && (level == -1 || !ok_delete(succs[level], level)))
return false;
if(!marked)
{
@ -664,6 +670,7 @@ private:
return false;
node->flags.set_marked();
marked = true;
}
guard_t guards[H];
@ -674,24 +681,18 @@ private:
for(int level = height - 1; level >= 0; --level)
preds[level]->forward(level, node->forward(level));
// TODO recycle(node)
count.fetch_sub(1, std::memory_order_relaxed);
// TODO: review and test
gc.collect(node);
count.fetch_sub(1);
return true;
}
}
guard_t gc_lock_acquire()
{
return this->acquire_unique();
}
void vacuum()
{
}
// number of elements
std::atomic<size_t> count {0};
Node* header;
SkiplistGC<Node> gc;
};
template <class K, class T, size_t H, class lock_t>

View File

@ -0,0 +1,41 @@
#pragma once
#include "memory/freelist.hpp"
#include "memory/lazy_gc.hpp"
#include "threading/sync/spinlock.hpp"
template <class T, class lock_t = SpinLock>
class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>
{
public:
// release_ref method should be called by a thread
// when the thread finish it job over object
// which has to be lazy cleaned
// if thread counter becames zero, all objects in the local_freelist
// are going to be deleted
// the only problem with this approach is that
// GC may never be called, but for now we can deal with that
void release_ref()
{
std::vector<T *> local_freelist;
// take freelist if there is no more threads
{
auto lock = this->acquire_unique();
--this->count;
if (this->count == 0) {
freelist.swap(local_freelist);
}
}
// destroy all elements from local_freelist
for (auto element : local_freelist) {
if (element->flags.is_marked()) T::destroy(element);
}
}
void collect(T *node) { freelist.add(node); }
private:
FreeList<T> freelist;
};

View File

@ -0,0 +1,65 @@
#pragma once
#include "utils/assert.hpp"
// data structure namespace short ds
// TODO: document strategy related to namespace naming
// (namespace names should be short but eazy to memorize)
namespace ds
{
// static array is data structure which size (capacity) can be known at compile
// time
// this data structure isn't concurrent
template <typename T, size_t N>
class static_array
{
public:
// default constructor
static_array() {}
// explicit constructor which populates the data array with
// initial values, array structure after initialization
// is N * [initial_value]
explicit static_array(const T &initial_value)
{
for (size_t i = 0; i < size(); ++i) {
data[i] = initial_value;
}
}
// returns array size
size_t size() const { return N; }
// returns element reference on specific index
T &operator[](size_t index)
{
runtime_assert(index < N, "Index " << index << " must be less than "
<< N);
return data[index];
}
// returns const element reference on specific index
const T &operator[](size_t index) const
{
runtime_assert(index < N, "Index " << index << " must be less than "
<< N);
return data[index];
}
// returns begin iterator
T *begin() { return &data[0]; }
// returns const begin iterator
const T *begin() const { return &data[0]; }
// returns end iterator
T *end() { return &data[N]; }
// returns const end iterator
const T *end() const { return &data[N]; }
private:
T data[N];
};
}

22
src/memory/freelist.hpp Normal file
View File

@ -0,0 +1,22 @@
#pragma once
#include <vector>
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
template <class T, class lock_t = SpinLock>
class FreeList : Lockable<lock_t>
{
public:
void swap(std::vector<T *> &dst) { std::swap(data, dst); }
void add(T *element)
{
auto lock = this->acquire_unique();
data.emplace_back(element);
}
private:
std::vector<T *> data;
};

View File

@ -2,42 +2,23 @@
#include <atomic>
#include "utils/crtp.hpp"
#include "threading/sync/lockable.hpp"
#include "utils/crtp.hpp"
template <class Derived>
class LazyGC : public Crtp<Derived>
template <class Derived, class lock_t = SpinLock>
class LazyGC : public Crtp<Derived>, public Lockable<lock_t>
{
public:
// add_ref method should be called by a thread
// when the thread has to do something over
// object which has to be lazy cleaned when
// the thread finish it job
void add_ref()
{
/* ref_count.fetch_add(1, std::memory_order_relaxed); */
}
void release_ref()
{
/* // get refcount and subtract atomically */
/* auto count = ref_count.fetch_sub(1, std::memory_order_acq_rel); */
/* // fetch_sub first returns and then subtrarcts so the refcount is */
/* // zero when fetch_sub returns 1 */
/* if(count != 1) */
/* return; */
/* if(!dirty.load(std::memory_order_acquire)) */
/* return; */
/* auto guard = this->derived().gc_lock_acquire(); */
/* if(!dirty.load(std::memory_order_acquire)) */
/* return; */
/* this->derived().vacuum(); */
/* dirty.store(false, std::memory_order_release); */
auto lock = this->acquire_unique();
++count;
}
protected:
std::atomic<int> ref_count {0};
std::atomic<bool> dirty {false};
size_t count{0};
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <mutex>
#include "spinlock.hpp"
template <class lock_t = SpinLock>

View File

@ -14,7 +14,7 @@ void assert_error_handler(const char *file_name, unsigned line_number,
// this is a good place to put your debug breakpoint
// and add some other destination for error message
#ifdef THROW_EXCEPTION_ON_ERROR
throw BasicException(std::string(message));
throw BasicException(message);
#else
std::cerr << message << " in file " << file_name << " #" << line_number
<< std::endl;

View File

@ -0,0 +1,84 @@
#include <chrono>
#include <iostream>
#include <thread>
#include "data_structures/skiplist/skiplist.hpp"
#include "data_structures/static_array.hpp"
#include "utils/assert.hpp"
using std::cout;
using std::endl;
using skiplist_t = SkipList<int, int>;
using namespace std::chrono_literals;
#define THREADS_NO 16
constexpr size_t elems_per_thread = 100000;
int main()
{
ds::static_array<std::thread, THREADS_NO> threads;
skiplist_t skiplist;
// put THREADS_NO * elems_per_thread items to the skiplist
for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) {
threads[thread_i] = std::thread(
[&skiplist](size_t start, size_t end) {
auto accessor = skiplist.access();
for (size_t elem_i = start; elem_i < end; ++elem_i) {
accessor.insert_unique(elem_i, elem_i);
}
},
thread_i * elems_per_thread,
thread_i * elems_per_thread + elems_per_thread);
}
// wait all threads
for (auto &thread : threads) {
thread.join();
}
// get skiplist size
{
auto accessor = skiplist.access();
permanent_assert(accessor.size() == THREADS_NO * elems_per_thread,
"all elements in skiplist");
}
for (size_t thread_i = 0; thread_i < THREADS_NO; ++thread_i) {
threads[thread_i] = std::thread(
[&skiplist](size_t start, size_t end) {
auto accessor = skiplist.access();
for (size_t elem_i = start; elem_i < end; ++elem_i) {
permanent_assert(accessor.remove(elem_i) == true, "");
}
},
thread_i * elems_per_thread,
thread_i * elems_per_thread + elems_per_thread);
}
// wait all threads
for (auto &thread : threads) {
thread.join();
}
// check size
{
auto accessor = skiplist.access();
permanent_assert(accessor.size() == 0, "Size should be 0, but size is " << accessor.size());
}
// check count
{
size_t iterator_counter = 0;
auto accessor = skiplist.access();
for (auto elem : accessor) {
++iterator_counter;
cout << elem.first << " ";
}
permanent_assert(iterator_counter == 0, "deleted elements");
}
// TODO: test GC and memory
return 0;
}