This commit is contained in:
buda 2015-09-24 23:25:48 +02:00
commit e860c40dff
10 changed files with 441 additions and 251 deletions

4
.arcconfig Normal file
View File

@ -0,0 +1,4 @@
{
"project_id" : "memgraph",
"conduit_uri" : "https://phabricator.tomicevic.com"
}

1
cypher/.gitignore vendored
View File

@ -1,6 +1,7 @@
*.o
cypher.cpp
cypher.h
cypher.hpp
parser
cypher.out
parser

View File

@ -1,79 +0,0 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_BITBLOCK_HPP
#define MEMGRAPH_DATA_STRUCTURES_BITBLOCK_HPP
#include <cstdlib>
#include <cassert>
#include <atomic>
#include <unistd.h>
template<class block_t = uint8_t,
size_t N = 1>
struct BitBlock
{
BitBlock() : block(0) {}
static constexpr size_t bits = sizeof(block_t) * 8;
static constexpr size_t size = bits / N;
// e.g. if N = 2,
// mask = 11111111 >> 6 = 00000011
static constexpr block_t mask = (block_t)(-1) >> (bits - N);
block_t at(size_t n)
{
assert(n < size);
block_t b = block.load(std::memory_order_relaxed);
return (b >> n * N) & mask;
}
// caution! this method assumes that the value on the sub-block n is 0..0!
void set(size_t n, block_t value)
{
assert(n < size);
assert(value < (1UL << N));
block_t b, new_value;
while(true)
{
b = block.load(std::memory_order_relaxed);
new_value = b | (value << n * N);
if(block.compare_exchange_weak(b, new_value,
std::memory_order_release,
std::memory_order_relaxed))
{
break;
}
// reduces contention and works better than pure while
usleep(250);
}
}
void clear(size_t n)
{
assert(n < size);
block_t b, new_value;
while(true)
{
b = block.load(std::memory_order_relaxed);
new_value = b & ~(mask << n * N);
if(block.compare_exchange_weak(b, new_value,
std::memory_order_release,
std::memory_order_relaxed))
{
break;
}
// reduces contention and works better than pure while
usleep(250);
}
}
std::atomic<block_t> block;
};
#endif

View File

@ -1,59 +1,148 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP
#define MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP
#include <cassert>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include "sync/spinlock.hpp"
#include "bitblock.hpp"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
template <class block_t,
size_t N,
class lock_t>
class DynamicBitset
template <class block_t = uint8_t, size_t chunk_size = 32768>
class DynamicBitset : Lockable<SpinLock>
{
using Block = BitBlock<block_t, N>;
struct Block
{
Block(Block&) = delete;
Block(Block&&) = delete;
static constexpr size_t size = sizeof(block_t) * 8;
constexpr block_t bitmask(size_t group_size)
{
return (block_t)(-1) >> (size - group_size);
}
block_t at(size_t k, size_t n, std::memory_order order)
{
assert(k + n - 1 < size);
return (block.load(order) >> k) & bitmask(n);
}
void set(size_t k, size_t n, std::memory_order order)
{
assert(k + n - 1 < size);
block.fetch_or(bitmask(n) << k, order);
}
void clear(size_t k, size_t n, std::memory_order order)
{
assert(k + n - 1 < size);
block.fetch_and(~(bitmask(n) << k), order);
}
std::atomic<block_t> block {0};
};
struct Chunk
{
Chunk() : next(nullptr)
{
static_assert(chunk_size % sizeof(block_t) == 0,
"chunk size not divisible by block size");
}
Chunk(Chunk&) = delete;
Chunk(Chunk&&) = delete;
~Chunk()
{
delete next;
}
static constexpr size_t size = chunk_size * Block::size;
static constexpr size_t n_blocks = chunk_size / sizeof(block_t);
block_t at(size_t k, size_t n, std::memory_order order)
{
return blocks[k / Block::size].at(k % Block::size, n, order);
}
void set(size_t k, size_t n, std::memory_order order)
{
blocks[k / Block::size].set(k % Block::size, n, order);
}
void clear(size_t k, size_t n, std::memory_order order)
{
blocks[k / Block::size].clear(k % Block::size, n, order);
}
Block blocks[n_blocks];
std::atomic<Chunk*> next;
};
public:
DynamicBitset(size_t n) : data(container_size(n)) {}
void resize(size_t n)
block_t at(size_t k, size_t n = 1)
{
auto guard = acquire();
data.resize(container_size(n));
auto chunk = find_chunk(k);
chunk.at(k, n, std::memory_order_seq_cst);
}
size_t size() const
void set(size_t k, size_t n = 1)
{
return data.size();
auto chunk = find_chunk(k);
chunk.set(k, n, std::memory_order_seq_cst);
}
block_t at(size_t n)
void clear(size_t k, size_t n = 1)
{
return data[n / Block::size].at(n % Block::size);
}
void set(size_t n, block_t value)
{
data[n / Block::size].set(n % Block::size, value);
auto chunk = find_chunk(k);
chunk.clear(k, n, std::memory_order_seq_cst);
}
private:
std::unique_lock<lock_t> acquire()
Chunk& find_chunk(size_t& k)
{
return std::unique_lock<lock_t>(lock);
Chunk* chunk = head.load(), next = nullptr;
// while i'm not in the right chunk
// (my index is bigger than the size of this chunk)
while(k >= Chunk::size)
{
next = chunk->next.load();
// if a next chunk exists, switch to it and decrement my
// pointer by the size of the current chunk
if(next != nullptr)
{
chunk = next;
k -= Chunk::size;
continue;
}
// the next chunk does not exist and we need it. take an exclusive
// lock to prevent others that also want to create a new chunk
// from creating it
auto guard = acquire();
// double-check locking. if the chunk exists now, some other thread
// has just created it, continue searching for my chunk
if(chunk->next.load() != nullptr)
continue;
chunk->next.store(new Chunk());
}
assert(chunk != nullptr);
return *chunk;
}
size_t container_size(size_t num_elems)
{
return (num_elems + N - 1) / N;
}
std::vector<Block> data;
lock_t lock;
std::atomic<Chunk*> head;
};
#endif

View File

@ -2,107 +2,204 @@
#define MEMGRAPH_DATA_STRUCTURES_LIST_LOCKFREE_LIST_HPP
#include <atomic>
#include <memory>
#include <unistd.h>
#include "threading/sync/lockable.hpp"
namespace lockfree
{
template <class T, size_t sleep_time = 250>
class List
class List : Lockable<SpinLock>
{
struct Node
{
T item;
std::shared_ptr<Node> next;
};
public:
List() = default;
// the default destructor is recursive so it could blow the stack if the
// list is long enough. the head node is destructed first via a shared_ptr
// and then it automatically destructs the second node and the second
// destructs the third and so on. keep that in mind
~List() = default;
List(List&) = delete;
List(List&&) = delete;
void operator=(List&) = delete;
class iterator
class read_iterator
{
public:
iterator(std::shared_ptr<Node> ptr) : ptr(ptr) {}
read_iterator(T* curr) : curr(curr) {}
T& operator*() { return *curr; }
T* operator->() { return curr; }
T& operator*() { return ptr->item; }
T* operator->() { return &ptr->item; }
operator T*() { return curr; }
iterator& operator++()
read_iterator& operator++()
{
if(ptr)
ptr = ptr->next;
// todo add curr->next to the hazard pointer list
// (synchronization with GC)
curr = curr->next.load();
return *this;
}
iterator& operator++(int)
read_iterator& operator++(int)
{
operator++();
return operator++();
}
private:
T* curr;
};
class read_write_iterator
{
friend class List<T, sleep_time>;
public:
read_write_iterator(T* prev, T* curr) : prev(prev), curr(curr) {}
T& operator*() { return *curr; }
T* operator->() { return curr; }
operator T*() { return curr; }
read_write_iterator& operator++()
{
// todo add curr->next to the hazard pointer list
// (synchronization with GC)
prev = curr;
curr = curr->next.load();
return *this;
}
read_write_iterator& operator++(int)
{
return operator++();
}
private:
std::shared_ptr<Node> ptr;
T* prev, curr;
};
iterator begin() { return iterator(std::move(head.load())); }
iterator end() { return iterator(nullptr); }
auto find(T item)
read_iterator begin()
{
auto p = head.load();
while(p && p->item != item)
p = p->next;
return iterator(std::move(p));
return read_iterator(head.load());
}
iterator push_front(T item)
read_write_iterator rw_begin()
{
auto p = std::make_shared<Node>();
p->item = item;
p->next = std::move(head.load());
while(!head.compare_exchange_weak(p->next, p))
usleep(sleep_time);
return iterator(p);
return read_write_iterator(nullptr, head.load());
}
iterator pop_front()
void push_front(T* node)
{
auto p = head.load();
auto q = p;
// we want to push an item to front of a list like this
// HEAD --> [1] --> [2] --> [3] --> ...
// read the value of head atomically and set the node's next pointer
// to point to the same location as head
while(p && !head.compare_exchange_weak(p, p->next))
// HEAD --------> [1] --> [2] --> [3] --> ...
// |
// |
// NODE ------+
T* h = node->next = head.load();
// atomically do: if the value of node->next is equal to current value
// of head, make the head to point to the node.
// if this fails (another thread has just made progress), update the
// value of node->next to the current value of head and retry again
// until you succeed
// HEAD ----|CAS|----------> [1] --> [2] --> [3] --> ...
// | | |
// | v |
// +-------|CAS|---> NODE ---+
while(!head.compare_exchange_weak(h, node))
{
q = p;
node->next.store(h);
usleep(sleep_time);
}
return iterator(q);
// the final state of the list after compare-and-swap looks like this
// HEAD [1] --> [2] --> [3] --> ...
// | |
// | |
// +---> NODE ---+
}
bool remove(read_write_iterator& it)
{
// acquire an exclusive guard.
// we only care about push_front and iterator performance so we can
// we only care about push_front and iterator performance so we can
// tradeoff some remove speed for better reads and inserts. remove is
// used exclusively by the GC thread(s) so it can be slower
auto guard = acquire();
// even though concurrent removes are synchronized, we need to worry
// about concurrent reads (solved by using atomics) and concurrent
// inserts to head (VERY dangerous, suffers from ABA problem, solved
// by simply not deleting the head node until it gets pushed further
// down the list)
// check if we're deleting the head node. we can't do that because of
// the ABA problem so just return false for now. the logic behind this
// is that this node will move further down the list next time the
// garbage collector traverses this list and therefore it will become
// deletable
if(it->prev == nullptr)
return false;
// HEAD --> ... --> [i] --> [i + 1] --> [i + 2] --> ...
//
// prev curr next
auto prev = it->prev;
auto curr = it->curr;
auto next = curr->next.load(std::memory_order_acquire);
// effectively remove the curr node from the list
// +---------------------+
// | |
// | v
// HEAD --> ... --> [i] [i + 1] --> [i + 2] --> ...
//
// prev curr next
prev->next.store(next, std::memory_order_release);
// TODO curr is now removed from the list so no iterators will be able
// to reach it at this point, but we still need to check the hazard
// pointers and wait until everyone who currently holds a reference to
// it has stopped using it before we can physically delete it
// while(hp.find(reinterpret_cast<uintptr_t>(curr)))
// sleep(sleep_time);
return true;
}
// TODO think about how to make this lock free and safe from ABA
// this can easily be thread safe if there is ONLY ONE concurrent
// remove operation
//bool remove(T item);
private:
std::atomic<std::shared_ptr<Node>> head { nullptr };
std::atomic<T*> head { nullptr };
};
};
template <class T, size_t sleep_time>
bool operator==(typename List<T, sleep_time>::read_iterator& a,
typename List<T, sleep_time>::read_iterator& b)
{
return a->curr == b->curr;
}
template <class T, size_t sleep_time>
bool operator!=(typename List<T, sleep_time>::read_iterator& a,
typename List<T, sleep_time>::read_iterator& b)
{
return !operator==(a, b);
}
}
#endif

View File

@ -1,71 +0,0 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_LOCKFREE_LIST2_HPP
#define MEMGRAPH_DATA_STRUCTURES_LOCKFREE_LIST2_HPP
#include <atomic>
#include "threading/sync/spinlock.hpp"
#include "threading/sync/lockable.hpp"
namespace lockfree
{
template <class T>
class List2 : Lockable<SpinLock>
{
struct Node { T value; std::atomic<Node*> next; };
class iterator
{
iterator(Node* node) : node(node) {}
T& operator*() const { return *node; }
T* operator->() const { return node; }
bool end() const
{
return node == nullptr;
}
iterator& operator++()
{
node = node->next.load(std::memory_order_relaxed);
return *this;
}
iterator& operator++(int)
{
return operator++();
}
protected:
Node* node;
};
public:
~List2()
{
Node* next, node = head.load(std::memory_order_relaxed);
for(; node != nullptr; node = next)
{
next = node->next;
delete node;
}
}
void insert(T value)
{
auto guard = acquire();
head.store(new Node { value, head });
}
iterator begin() { return iterator(head.load()); }
private:
std::atomic<Node*> head;
};
}
#endif

115
promise.cpp Normal file
View File

@ -0,0 +1,115 @@
#include <iostream>
#include <atomic>
#include "server/uv/uv.hpp"
#include "server/http/http.hpp"
template <class T, class... Args>
class Promise
{
public:
Promise(std::function<T(Args...)> f, Args&&... args)
{
result = f(std::forward<Args>(args)...);
}
template <class R>
Promise<R, T> then(std::function<R(T)> f)
{
return Promise(f, std::forward<T>(result));
}
T get()
{
return result;
}
private:
T result;
std::atomic<bool> completed;
};
class TaskPool
{
template <class R, class... Args>
class Task
{
public:
using task_t = Task<R, Args...>;
using work_f = std::function<R(Args...)>;
using after_work_f = std::function<void(R)>;
Task(work_f work, after_work_f callback)
: work(work), callback(callback)
{
req.data = this;
}
void launch(uv::UvLoop& loop)
{
uv_queue_work(loop, &req, work_cb, after_work_cb);
}
private:
std::function<R(Args...)> work;
std::function<void(R&)> callback;
R result;
uv_work_t req;
static void work_cb(uv_work_t* req)
{
auto& task = *reinterpret_cast<task_t*>(req->data);
}
static void after_work_cb(uv_work_t* req, int)
{
auto task = reinterpret_cast<task_t>(req->data);
delete task;
}
};
public:
TaskPool(uv::UvLoop& loop) : loop(loop) {}
template <class R, class...Args>
void launch(std::function<R(Args...)> func,
std::function<void(R&)> callback)
{
auto task = new Task<R, Args...>(func, callback);
task->launch(loop);
}
private:
uv::UvLoop& loop;
};
int main(void)
{
uv::UvLoop loop;
TaskPool tp(loop);
tp.launch([](void) -> int { return 3 },
[](int x) -> void { std::cout << x << std::endl; });
// http::HttpServer server(loop);
//
// http::Ipv4 ip("0.0.0.0", 3400);
//
// server.listen(ip, [](http::Request& req, http::Response& res) {
//
//
//
// res.send(req.url);
// });
//
// loop.run(uv::UvLoop::Mode::Default);
return 0;
}

View File

@ -12,6 +12,7 @@ class LockExpiredError : public std::runtime_error
using runtime_error::runtime_error;
};
template <size_t microseconds = 250>
class TimedSpinLock
{
public:
@ -36,7 +37,7 @@ public:
if(clock::now() - start > expiration)
throw LockExpiredError("This lock has expired");
usleep(250);
usleep(microseconds);
}
}

View File

@ -1,12 +0,0 @@
#ifndef MEMGRAPH_THREADING_TASK_HPP
#define MEMGRAPH_THREADING_TASK_HPP
class Task
{
public:
private:
//std::function<
};
#endif

45
utils/mark_ref.hpp Normal file
View File

@ -0,0 +1,45 @@
#ifndef MEMGRAPH_UTILS_MARK_REF_HPP
#define MEMGRAPH_UTILS_MARK_REF_HPP
#include <stdint.h>
template <class T>
struct MarkRef
{
MarkRef() = default;
MarkRef(MarkRef&) = default;
MarkRef(MarkRef&&) = default;
bool is_marked() const
{
return ptr & 0x1L;
}
bool set_mark()
{
return ptr |= 0x1L;
}
bool clear_mark()
{
return ptr &= ~0x1L;
}
T* get() const
{
return reinterpret_cast<T*>(ptr & ~0x1L);
}
T& operator*() { return *get(); }
T* operator->() { return get(); }
uintptr_t ptr;
};
//template <class T, class... Args>
//MarkRef<T> make_markref(Args&&... args)
//{
//
//}
#endif