Merge branch 'master' into dev

This commit is contained in:
Marko Budiselic 2016-06-05 09:29:16 +02:00
commit 38b6040529
10 changed files with 116 additions and 490 deletions

View File

@ -1,58 +0,0 @@
#pragma once
#include <cstdlib>
#include <atomic>
#include "utils/mark_ref.hpp"
namespace lockfree
{
template <class K, class T>
class SkipList
{
public:
struct Node
{
using ref_t = MarkRef<Node>;
K* key;
T* item;
const uint8_t height;
static Node* create(uint8_t height, K* key, T* item)
{
auto size = sizeof(Node) + height * sizeof(std::atomic<ref_t>);
auto node = static_cast<Node*>(std::malloc(size));
return new (node) Node(height, key, item);
}
static void destroy(Node* node)
{
node->~SkipNode();
free(node);
}
private:
Node(uint8_t height, K* key, T* item)
: key(key), item(item), height(height)
{
for(uint8_t i = 0; i < height; ++i)
new (&tower[i]) std::atomic<ref_t>(nullptr);
}
// this creates an array of the size zero. we can't put any sensible
// value here since we don't know what size it will be untill the
// node is allocated. we could make it a SkipNode** but then we would
// have two memory allocations, one for node and one for the forward
// list. this way we avoid expensive malloc/free calls and also cache
// thrashing when following a pointer on the heap
std::atomic<ref_t> tower[0];
};
//void list_search(const K& key,
};
}

View File

@ -1,22 +0,0 @@
#pragma once
#include "utils/random/xorshift.hpp"
template <class randomizer_t>
size_t new_height(int max_height)
{
// get 64 random bits (coin tosses)
uint64_t rand = xorshift::next();
size_t height = 1;
// for every head (1) increase the tower height by one until the tail (0)
// comes. this gives the following probabilities for tower heights:
//
// 1/2 1/4 1/8 1/16 1/32 1/64 ...
// 1 2 3 4 5 6 ...
//
while(max_height-- && ((rand >>= 1) & 1))
height++;
return height;
}

View File

@ -1,253 +0,0 @@
#pragma once
#include <algorithm>
#include <cstdlib>
#include <array>
#include "new_height.hpp"
#include "skipnode.hpp"
// concurrent skiplist based on the implementation described in
// "A Provably Correct Scalable Concurrent Skip List"
// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
template <class K,
class T,
size_t MAX_HEIGHT = 24,
class compare=std::less<K>,
class lock_type=SpinLock>
class SkipList
{
using Node = SkipNode<K, T, lock_type>;
public:
SkipList()
: size_(0),
header(Node::create(MAX_HEIGHT, nullptr, nullptr)) {}
~SkipList()
{
for(Node* current = header.load(); current;)
{
Node* next = current->forward(0);
Node::destroy(current);
current = next;
}
}
size_t size() const
{
return size_.load();
}
uint8_t height() const
{
return MAX_HEIGHT;
}
//private:
bool greater(const K* const key, const Node* node)
{
return node && compare()(*node->key, *key);
}
bool less(const K* const key, const Node* node)
{
return (node == nullptr) || compare()(*key, *node->key);
}
size_t increment_size(size_t delta)
{
return size_.fetch_add(delta) + delta;
}
int find_path(Node* from,
int start_level,
const K* const key,
Node* preds[],
Node* succs[])
{
int lfound = -1;
Node* pred = from;
for(int level = start_level; level >= 0; --level)
{
Node* node = pred->forward(level);
while(greater(key, node))
pred = node, node = pred->forward(level);
if(lfound == -1 && !less(key, node))
lfound = level;
preds[level] = pred;
succs[level] = node;
}
return lfound;
}
Node* find(const K* const key)
{
Node* pred = header.load();
Node* node = nullptr;
uint8_t level = pred->height;
bool found = false;
while(!found)
{
// descend down first, facebook says it works better xD but make
// some tests when you have time to determine the best strategy
for(; level > 0 &&
less(key, node = pred->forward(level - 1)); --level) {}
if(level == 0)
return nullptr;
--level;
while(greater(key, node))
pred = node, node = node->forward(level);
found = !less(key, node);
}
return node;
}
template <bool ADDING>
bool lock_nodes(uint8_t height,
std::unique_lock<lock_type> guards[MAX_HEIGHT],
Node* preds[MAX_HEIGHT],
Node* succs[MAX_HEIGHT])
{
Node *prepred, *pred, *succ = nullptr;
bool valid = true;
for(int level = 0; valid && level < height; ++level)
{
pred = preds[level], succ = succs[level];
if(pred != prepred)
guards[level] = pred->guard(), prepred = pred;
valid = !pred->marked() && pred->forward(level) == succ;
if(ADDING)
valid = valid && (succ == nullptr || !succ->marked());
}
return valid;
}
bool insert(K* key, T* item)
{
Node *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(lfound != -1)
{
auto found = succs[lfound];
if(!found->marked())
{
while(!found->fully_linked()) {}
return false;
}
continue;
}
auto node_height = new_height(MAX_HEIGHT);
std::unique_lock<lock_type> guards[MAX_HEIGHT];
// try to acquire the locks for predecessors up to the height of
// the new node. release the locks and try again if someone else
// has the locks
if(!lock_nodes<true>(node_height, guards, preds, succs))
continue;
// you have the locks, create a new node
auto new_node = Node::create(node_height, key, item);
// link the predecessors and successors, e.g.
//
// 4 HEAD ... P ------------------------> S ... NULL
// 3 HEAD ... ... P -----> NEW ---------> S ... NULL
// 2 HEAD ... ... P -----> NEW -----> S ... ... NULL
// 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL
for(uint8_t level = 0; level < node_height; ++level)
{
new_node->forward(level, succs[level]);
preds[level]->forward(level, new_node);
}
new_node->set_fully_linked();
increment_size(1);
return true;
}
}
bool ok_delete(Node* node, int level)
{
return node->fully_linked()
&& node->height - 1 == level
&& !node->marked();
}
bool remove(const K* const key)
{
Node* node = nullptr;
std::unique_lock<lock_type> node_guard;
bool marked = false;
int node_height = 0;
Node* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
while(true)
{
auto head = header.load();
auto lfound = find_path(head, MAX_HEIGHT - 1, key, preds, succs);
if(!marked && (lfound == -1 || !ok_delete(succs[lfound], lfound)))
return false;
if(!marked)
{
node = succs[lfound];
node_height = node->height;
node_guard = node->guard();
if(node->marked())
return false;
node->set_marked();
}
std::unique_lock<lock_type> guards[MAX_HEIGHT];
if(!lock_nodes<false>(node_height, guards, preds, succs))
continue;
for(int level = node_height - 1; level >= 0; --level)
preds[level]->forward(level, node->forward(level));
increment_size(-1);
break;
}
// TODO recyclee(node);
return true;
}
std::atomic<size_t> size_;
std::atomic<Node*> header;
};

View File

@ -9,6 +9,7 @@
#include "utils/random/fast_binomial.hpp"
#include "memory/lazy_gc.hpp"
#include "utils/placeholder.hpp"
/* @brief Concurrent lock-based skiplist with fine grained locking
*
@ -145,11 +146,41 @@ public:
public:
friend class SkipList;
value_type data;
const uint8_t height;
Flags flags;
const K& key() const
{
return kv_pair().first;
}
T& value()
{
return kv_pair().second;
}
const T& value() const
{
return kv_pair().second;
}
value_type& kv_pair()
{
return data.get();
}
const value_type& kv_pair() const
{
return data.get();
}
static Node* sentinel(uint8_t height)
{
// we have raw memory and we need to construct an object
// of type Node on it
return new (allocate(height)) Node(height);
}
static Node* create(const K& key, const T& item, uint8_t height)
{
return create({key, item}, height);
@ -162,15 +193,7 @@ public:
static Node* create(value_type&& data, uint8_t height)
{
// [ Node ][Node*][Node*][Node*]...[Node*]
// | | | | |
// | 0 1 2 height-1
// |----------------||-----------------------------|
// space for Node space for tower pointers
// structure right after the Node
// structure
auto size = sizeof(Node) + height * sizeof(std::atomic<Node*>);
auto node = static_cast<Node*>(std::malloc(size));
auto node = allocate(height);
// we have raw memory and we need to construct an object
// of type Node on it
@ -194,8 +217,7 @@ public:
}
private:
Node(value_type&& data, uint8_t height)
: data(std::move(data)), height(height)
Node(uint8_t height) : height(height)
{
// here we assume, that the memory for N towers (N = height) has
// been allocated right after the Node structure so we need to
@ -204,12 +226,35 @@ public:
new (&tower[i]) std::atomic<Node*> {nullptr};
}
Node(value_type&& data, uint8_t height) : Node(height)
{
this->data.set(std::forward<value_type>(data));
}
~Node()
{
for(auto i = 0; i < height; ++i)
tower[i].~atomic();
}
static Node* allocate(uint8_t height)
{
// [ Node ][Node*][Node*][Node*]...[Node*]
// | | | | |
// | 0 1 2 height-1
// |----------------||-----------------------------|
// space for Node space for tower pointers
// structure right after the Node
// structure
auto size = sizeof(Node) + height * sizeof(std::atomic<Node*>);
auto node = static_cast<Node*>(std::malloc(size));
return node;
}
Placeholder<value_type> data;
// this creates an array of the size zero. we can't put any sensible
// value here since we don't know what size it will be untill the
// node is allocated. we could make it a Node** but then we would
@ -234,19 +279,19 @@ public:
value_type& operator*()
{
assert(node != nullptr);
return node->data;
return node->kv_pair();
}
value_type* operator->()
{
assert(node != nullptr);
return &node->data;
return &node->kv_pair();
}
operator value_type&()
{
assert(node != nullptr);
return node->data;
return node->kv_pair();
}
It& operator++()
@ -307,7 +352,7 @@ public:
Iterator(const Iterator&) = default;
};
SkipList() : header(Node::create(K(), std::move(T(0)), H)) {}
SkipList() : header(Node::sentinel(H)) {}
friend class Accessor;
@ -447,12 +492,12 @@ private:
bool greater(const K& key, const Node* const node)
{
return node && key > node->data.first;
return node && key > node->key();
}
bool less(const K& key, const Node* const node)
{
return (node == nullptr) || key < node->data.first;
return (node == nullptr) || key < node->key();
}
ConstIterator find(const K& key) const

View File

@ -1,135 +0,0 @@
#pragma once
#include <cstdlib>
#include <atomic>
#include <mutex>
#include "threading/sync/spinlock.hpp"
// concurrent skiplist node based on the implementation described in
// "A Provably Correct Scalable Concurrent Skip List"
// https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
template <class K,
class T,
class lock_type=SpinLock>
struct SkipNode
{
using Node = SkipNode<K, T, lock_type>;
enum flags {
MARKED = 1,
FULLY_LINKED = 1 << 1
};
// key against the value is sorted in the skiplist. must be comparable
K* key;
// item on the heap this node points
T* item;
const uint8_t height;
// use this for creating new nodes. DON'T use the constructor (it's
// private anyway)
static SkipNode* create(int height, K* key, T* item)
{
size_t size = sizeof(Node) + height * sizeof(std::atomic<Node*>);
auto* node = static_cast<SkipNode*>(malloc(size));
new (node) Node(height, key, item);
return node;
}
// acquire an exclusive guard on this node, use for concurrent access
std::unique_lock<lock_type> guard()
{
return std::unique_lock<lock_type>(lock);
}
// use this for destroying nodes after you don't need them any more
static void destroy(Node* node)
{
node->~SkipNode();
free(node);
}
bool marked() const
{
return fget() & MARKED;
}
void set_marked()
{
fset(fget() | MARKED);
}
bool fully_linked() const
{
return fget() & FULLY_LINKED;
}
void set_fully_linked()
{
fset(fget() | FULLY_LINKED);
}
Node* forward(uint8_t level)
{
return forward_[level].load(std::memory_order_consume);
}
void forward(uint8_t level, Node* next)
{
forward_[level].store(next, std::memory_order_release);
}
private:
SkipNode(uint8_t height, K* key, T* item)
: key(key), item(item), height(height)
{
// set the flags to zero at the beginning
fset(0);
// we need to explicitly call the placement new operator over memory
// allocated for forward_ pointers, see the notes below
for (uint8_t i = 0; i < height; ++i)
new (&forward_[i]) std::atomic<Node*>(nullptr);
}
~SkipNode()
{
for (uint8_t i = 0; i < height; ++i)
forward_[i].~atomic();
}
uint8_t fget() const
{
// do an atomic load of the flags. if you need to use this value
// more than one time in a function it's a good idea to store it
// in a stack variable (non atomic) to optimize for performance
return flags.load(std::memory_order_consume);
}
void fset(uint8_t value)
{
// atomically set new flags
flags.store(value, std::memory_order_release);
}
std::atomic<uint8_t> flags;
lock_type lock;
// this creates an array of the size zero locally inside the SkipNode
// struct. we can't put any sensible value here since we don't know
// what size it will be untill the skipnode is allocated. we could make
// it a SkipNode** but then we would have two memory allocations, one for
// SkipNode and one for the forward list and malloc calls are expensive!
// we're gonna cheat here. we'll make this a zero length list and then
// allocate enough memory for the SkipNode struct to store more than zero
// elements (precisely *height* elements). c++ does not check bounds so we
// can access anything we want!
std::atomic<Node*> forward_[0];
};

View File

@ -0,0 +1,40 @@
#include <iostream>
#include "data_structures/skiplist/skiplist.hpp"
#include "storage/indexes/keys/unique_key.hpp"
using std::cout;
using std::endl;
using skiplist_t = SkipList<UniqueKeyAsc<int>, int>;
void print_skiplist(const skiplist_t::Accessor& skiplist)
{
cout << "---- skiplist now has: ";
for(auto& kv : skiplist)
cout << "(" << kv.first << ", " << kv.second << ") ";
cout << "----" << endl;
}
int main(void)
{
skiplist_t skiplist;
auto accessor = skiplist.access();
// this has to be here since UniqueKey<> class takes references!
int keys[] = {0, 1, 2, 3, 4, 5, 6, 7, 8};
accessor.insert_unique(keys[1], 10);
accessor.insert_unique(keys[2], 20);
accessor.insert_unique(keys[7], 70);
accessor.insert_unique(keys[4], 40);
accessor.insert_unique(keys[8], 80);
accessor.insert_unique(keys[3], 30);
print_skiplist(accessor);
return 0;
}

View File

@ -103,7 +103,7 @@ public:
}
R3(R3&) = delete;
R3(R3&& other)
{
this->routes = std::move(other.routes);

View File

@ -32,6 +32,9 @@ private:
const K& key;
};
template <class K, class SortOrder>
constexpr SortOrder UniqueKey<K, SortOrder>::sort_order;
template <class K>
using UniqueKeyAsc = UniqueKey<K, Ascending<K>>;

View File

@ -1,7 +1,7 @@
#pragma once
template <class T>
class Ascending
struct Ascending
{
constexpr bool operator()(const T& lhs, const T& rhs) const
{
@ -10,7 +10,7 @@ class Ascending
};
template <class T>
class Descending
struct Descending
{
constexpr bool operator()(const T& lhs, const T& rhs) const
{

View File

@ -24,6 +24,12 @@ public:
return *data._M_ptr();
}
const T& get() const noexcept
{
assert(initialized);
return *data._M_ptr();
}
void set(const T& item)
{
new (data._M_addr()) T(item);