finished implementing a basic lock free skiplist

This commit is contained in:
Dominik Tomičević 2015-06-28 11:43:52 +02:00
parent 255079fc84
commit 7bbfe867be
7 changed files with 365 additions and 109 deletions

View File

@ -7,7 +7,7 @@ size_t new_height(int max_height)
{
// get 64 random bits (coin tosses)
uint64_t rand = xorshift::next();
size_t height = 0;
size_t height = 1;
// for every head (1) increase the tower height by one until the tail (0)
// comes. this gives the following probabilities for tower heights:

View File

@ -8,75 +8,252 @@
#include "new_height.hpp"
#include "skipnode.hpp"
template <class K, class T>
// concurrent skiplist based on the implementation described in
// "A Provably Correct Scalable Concurrent Skip List"
// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
template <class K,
class T,
size_t MAX_HEIGHT = 24,
class compare=std::less<K>,
class lock_type=SpinLock>
class SkipList
{
using Node = SkipNode<K, T>;
using Node = SkipNode<K, T, lock_type>;
public:
SkipList(size_t max_height);
SkipList()
: size_(0),
header(Node::create(MAX_HEIGHT, nullptr, nullptr)) {}
T* get(const K* const key);
void put(const K* key, T* item);
void del(const K* const key);
~SkipList()
{
for(Node* current = header.load(std::memory_order_relaxed); current;)
{
Node* next = current->forward(0);
Node::destroy(current);
current = next;
}
}
private:
size_t level;
Node* header;
size_t size() const
{
return size_.load(std::memory_order_relaxed);
}
uint8_t height() const
{
return MAX_HEIGHT;
}
//private:
bool greater(const K* const key, const Node* node)
{
return node && compare()(*node->key, *key);
}
bool less(const K* const key, const Node* node)
{
return (node == nullptr) || compare()(*key, *node->key);
}
size_t increment_size(size_t delta)
{
return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
}
int find_path(Node* from,
int start_level,
const K* const key,
Node* preds[],
Node* succs[])
{
int lfound = -1;
Node* pred = from;
for(int level = start_level; level >= 0; --level)
{
Node* node = pred->forward(level);
while(greater(key, node))
pred = node, node = pred->forward(level);
if(lfound == -1 && !less(key, node))
lfound = level;
preds[level] = pred;
succs[level] = node; // TODO what's FB doing here?
}
//std::cout << "lfound = " << lfound << std::endl;
return lfound;
}
Node* find(const K* const key)
{
Node* pred = header.load(std::memory_order_consume);
Node* node = nullptr;
uint8_t level = pred->height;
bool found = false;
while(!found)
{
// descend down first, facebook says it works better xD but make
// some tests when you have time to determine the best strategy
for(; level > 0 &&
less(key, node = pred->forward(level - 1)); --level) {}
if(level == 0)
return nullptr;
--level;
while(greater(key, node))
pred = node, node = node->forward(level);
found = !less(key, node);
}
return node;
}
template <bool ADDING>
bool lock_nodes(uint8_t height,
std::unique_lock<lock_type> guards[MAX_HEIGHT],
Node* preds[MAX_HEIGHT],
Node* succs[MAX_HEIGHT])
{
Node *prepred, *pred, *succ = nullptr;
bool valid = true;
for(int level = 0; valid && level < height; ++level)
{
pred = preds[level], succ = succs[level];
if(pred != prepred)
guards[level] = pred->guard(), prepred = pred;
valid = !pred->marked() && pred->forward(level) == succ;
if(ADDING)
valid = valid && (succ == nullptr || !succ->marked());
}
return valid;
}
bool insert(K* key, T* item)
{
Node *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load(std::memory_order_consume);
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(lfound != -1)
{
auto found = succs[lfound];
if(!found->marked())
{
while(!found->fully_linked()) {}
return false;
}
continue;
}
auto node_height = new_height(MAX_HEIGHT);
std::unique_lock<lock_type> guards[MAX_HEIGHT];
// try to acquire the locks for predecessors up to the height of
// the new node. release the locks and try again if someone else
// has the locks
if(!lock_nodes<true>(node_height, guards, preds, succs))
continue;
// you have the locks, create a new node
auto new_node = Node::create(node_height, key, item);
// link the predecessors and successors, e.g.
//
// 4 HEAD ... P ------------------------> S ... NULL
// 3 HEAD ... ... P -----> NEW ---------> S ... NULL
// 2 HEAD ... ... P -----> NEW -----> S ... ... NULL
// 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL
for(uint8_t level = 0; level < node_height; ++level)
{
new_node->forward(level, succs[level]);
preds[level]->forward(level, new_node);
}
new_node->set_fully_linked();
increment_size(1);
return true;
}
}
bool ok_delete(Node* node, int level)
{
return node->fully_linked()
&& node->height - 1 == level
&& !node->marked();
}
bool remove(const K* const key)
{
Node* node = nullptr;
std::unique_lock<lock_type> node_guard;
bool marked = false;
int node_height = 0;
Node* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load(std::memory_order_consume);
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound)))
return false;
if(!marked)
{
node = succs[lfound];
node_height = node->height;
node_guard = node->guard();
if(node->marked())
return false;
node->set_marked();
}
std::unique_lock<lock_type> guards[MAX_HEIGHT];
if(!lock_nodes<false>(node_height, guards, preds, succs))
continue;
for(int level = node_height - 1; level >= 0; --level)
preds[level]->forward(level, node->forward(level));
increment_size(-1);
break;
}
// TODO recyclee(node);
return true;
}
std::atomic<size_t> size_;
std::atomic<Node*> header;
};
template <class K, class T>
SkipList<K, T>::SkipList(size_t level)
: level(level)
{
header = new Node(level);
auto sentinel = new Node();
for(int i = 0; i < level; ++i)
header->forward[i] = sentinel;
}
template <class K, class T>
T* SkipList<K, T>::get(const K* const key)
{
Node* current = header;
for(int i = level - 1; i >= 0; --i)
{
Node* next = current->forward[i];
while(next->key != nullptr && *next->key < *key)
current = current->forward[i];
}
return current->item;
}
template <class K, class T>
void SkipList<K, T>::put(const K* key, T* item)
{
auto height = new_height(level);
auto node = new Node(key, item, height);
// needed to update higher level forward pointers
int trace[level];
Node* current = header;
for(int i = level - 1; i >= 0; --i)
{
Node* next = current->forward[i];
while(next->key != nullptr && *next->key < *key)
current = current->forward[i];
}
}
template <class K, class T>
void SkipList<K, T>::del(const K* const key)
{
}
#endif

View File

@ -3,6 +3,7 @@
#include <cstdlib>
#include <atomic>
#include <mutex>
#include "utils/sync/spinlock.hpp"
@ -10,27 +11,116 @@
// "A Provably Correct Scalable Concurrent Skip List"
// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
template <class K, class T>
template <class K,
class T,
class lock_type=SpinLock>
struct SkipNode
{
using Node = SkipNode<K, T, lock_type>;
enum flags {
MARKED_FOR_REMOVAL = 1,
FULLY_LINKED = 1 << 1
MARKED = 1,
FULLY_LINKED = 1 << 1
};
static SkipNode* create();
static SkipNode* destroy();
private:
SkipNode();
~SkipNode();
// key against the value is sorted in the skiplist. must be comparable
K* key;
// item on the heap this node points
T* item;
const uint8_t height;
// use this for creating new nodes. DON'T use the constructor (it's
// private anyway)
static SkipNode* create(int height, K* key, T* item)
{
size_t size = sizeof(Node) + height * sizeof(std::atomic<Node*>);
auto* node = static_cast<SkipNode*>(malloc(size));
new (node) Node(height, key, item);
return node;
}
// acquire an exclusive guard on this node, use for concurrent access
std::unique_lock<lock_type> guard()
{
return std::unique_lock<lock_type>(lock);
}
// use this for destroying nodes after you don't need them any more
static void destroy(Node* node)
{
node->~SkipNode();
free(node);
}
bool marked() const
{
return fget() & MARKED;
}
void set_marked()
{
fset(fget() | MARKED);
}
bool fully_linked() const
{
return fget() & FULLY_LINKED;
}
void set_fully_linked()
{
fset(fget() | FULLY_LINKED);
}
Node* forward(uint8_t level)
{
return forward_[level].load(std::memory_order_consume);
}
void forward(uint8_t level, Node* next)
{
forward_[level].store(next, std::memory_order_release);
}
private:
SkipNode(uint8_t height, K* key, T* item)
: key(key), item(item), height(height)
{
// set the flags to zero at the beginning
fset(0);
// we need to explicitly call the placement new operator over memory
// allocated for forward_ pointers, see the notes below
for (uint8_t i = 0; i < height; ++i)
new (&forward_[i]) std::atomic<Node*>(nullptr);
}
~SkipNode()
{
for (uint8_t i = 0; i < height; ++i)
forward_[i].~atomic();
}
uint8_t fget() const
{
// do an atomic load of the flags. if you need to use this value
// more than one time in a function it's a good idea to store it
// in a stack variable (non atomic) to optimize for performance
return flags.load(std::memory_order_consume);
}
void fset(uint8_t value)
{
// atomically set new flags
flags.store(value, std::memory_order_release);
}
std::atomic<uint8_t> flags;
const uint8_t level;
SpinLock lock;
lock_type lock;
// this creates an array of the size zero locally inside the SkipNode
// struct. we can't put any sensible value here since we don't know
@ -40,28 +130,9 @@ private:
// we're gonna cheat here. we'll make this a zero length list and then
// allocate enough memory for the SkipNode struct to store more than zero
// elements (precisely *level* elements). c++ does not check bounds so we
// elements (precisely *height* elements). c++ does not check bounds so we
// can access anything we want!
std::atomic<SkipNode<K, T>*> forward[0];
std::atomic<Node*> forward_[0];
};
template <class K, class T>
SkipNode<K, T>::SkipNode(size_t level)
{
forward = new SkipNode*[level];
}
template <class K, class T>
SkipNode<K, T>::SkipNode(K* key, T* item, size_t level)
: key(key), item(item)
{
forward = new SkipNode*[level];
}
template <class K, class T>
SkipNode<K, T>::~SkipNode()
{
delete forward;
}
#endif

View File

@ -23,10 +23,10 @@ TEST_CASE("New height distribution must be approx. 1/2 1/4 1/8 ...")
// generate a tower and put it in a box with his same-height brothers
for(int i = 0; i < N; ++i)
heights[new_height(max_height)]++;
heights[new_height(max_height) - 1]++;
// evaluate the number of towers in all of the boxes
for(int i = 0; i < max_height; ++i)
for(int i = 1; i < max_height; ++i)
{
// compute how much towers should be in this box
int x = N / (2 << i);

View File

@ -1,17 +1,18 @@
#include <thread>
#include <chrono>
#include <vector>
#include <mutex>
#include <atomic>
#include "catch.hpp"
#include "utils/sync/spinlock.hpp"
TEST_CASE("a thread can acquire and release the lock", "[spinlock]")
{
SpinLock lock;
lock.acquire();
// i have a lock
lock.release();
{
std::unique_lock<SpinLock> lock;
// I HAS A LOCK!
}
REQUIRE(true);
}
@ -24,14 +25,15 @@ void test_lock()
{
using namespace std::literals;
lock.acquire();
x++;
{
std::unique_lock<SpinLock> guard(lock);
x++;
REQUIRE(x < 2);
std::this_thread::sleep_for(25ms);
std::this_thread::sleep_for(25ms);
x--;
lock.release();
REQUIRE(x < 2);
x--;
}
}
TEST_CASE("only one thread at a time can own the lock", "[spinlock]")

View File

@ -22,6 +22,10 @@ namespace xorshift
void init()
{
// TODO
// not sure if this thread local means anything for other threads
// fix this!!!!
// use a slow, more complex rnd generator to initialize a fast one
// make sure to call this before requesting any random numbers!
std::random_device rd;

View File

@ -17,6 +17,8 @@ private:
void SpinLock::lock()
{
// TODO add asm pause and counter first before sleeping
// might be faster, but test this and see
while(lock_flag.test_and_set(std::memory_order_acquire))
usleep(250);
}