From 83690e7269dfd844c8f2e1730a40cdc16a51c951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= Date: Tue, 22 Sep 2015 01:04:13 +0200 Subject: [PATCH 1/5] implemented a lockfree list --- data_structures/list/lockfree_list.hpp | 211 ++++++++++++++++++------- 1 file changed, 154 insertions(+), 57 deletions(-) diff --git a/data_structures/list/lockfree_list.hpp b/data_structures/list/lockfree_list.hpp index 96185904d..c7c0b4895 100644 --- a/data_structures/list/lockfree_list.hpp +++ b/data_structures/list/lockfree_list.hpp @@ -2,107 +2,204 @@ #define MEMGRAPH_DATA_STRUCTURES_LIST_LOCKFREE_LIST_HPP #include -#include #include +#include "threading/sync/lockable.hpp" + namespace lockfree { template -class List +class List : Lockable { - struct Node - { - T item; - std::shared_ptr next; - }; - public: List() = default; - - // the default destructor is recursive so it could blow the stack if the - // list is long enough. the head node is destructed first via a shared_ptr - // and then it automatically destructs the second node and the second - // destructs the third and so on. keep that in mind - ~List() = default; List(List&) = delete; + List(List&&) = delete; + void operator=(List&) = delete; - class iterator + class read_iterator { public: - iterator(std::shared_ptr ptr) : ptr(ptr) {} + read_iterator(T* curr) : curr(curr) {} + + T& operator*() { return *curr; } + T* operator->() { return curr; } - T& operator*() { return ptr->item; } - T* operator->() { return &ptr->item; } + operator T*() { return curr; } - iterator& operator++() + read_iterator& operator++() { - if(ptr) - ptr = ptr->next; + // todo add curr->next to the hazard pointer list + // (synchronization with GC) + curr = curr->next.load(); return *this; } - iterator& operator++(int) + read_iterator& operator++(int) { - operator++(); + return operator++(); + } + + private: + T* curr; + }; + + class read_write_iterator + { + friend class List; + + public: + read_write_iterator(T* prev, T* curr) : prev(prev), curr(curr) {} + + T& operator*() { return *curr; } + T* operator->() { return curr; } + + operator T*() { return curr; } + + read_write_iterator& operator++() + { + // todo add curr->next to the hazard pointer list + // (synchronization with GC) + + prev = curr; + curr = curr->next.load(); return *this; } - + + read_write_iterator& operator++(int) + { + return operator++(); + } + private: - std::shared_ptr ptr; + T* prev, curr; }; - iterator begin() { return iterator(std::move(head.load())); } - iterator end() { return iterator(nullptr); } - - auto find(T item) + read_iterator begin() { - auto p = head.load(); - - while(p && p->item != item) - p = p->next; - - return iterator(std::move(p)); + return read_iterator(head.load()); } - iterator push_front(T item) + read_write_iterator rw_begin() { - auto p = std::make_shared(); - p->item = item; - p->next = std::move(head.load()); - - while(!head.compare_exchange_weak(p->next, p)) - usleep(sleep_time); - - return iterator(p); + return read_write_iterator(nullptr, head.load()); } - iterator pop_front() + void push_front(T* node) { - auto p = head.load(); - auto q = p; + // we want to push an item to front of a list like this + // HEAD --> [1] --> [2] --> [3] --> ... + + // read the value of head atomically and set the node's next pointer + // to point to the same location as head - while(p && !head.compare_exchange_weak(p, p->next)) + // HEAD --------> [1] --> [2] --> [3] --> ... + // | + // | + // NODE ------+ + + T* h = node->next = head.load(); + + // atomically do: if the value of node->next is equal to current value + // of head, make the head to point to the node. + // if this fails (another thread has just made progress), update the + // value of node->next to the current value of head and retry again + // until you succeed + + // HEAD ----|CAS|----------> [1] --> [2] --> [3] --> ... + // | | | + // | v | + // +-------|CAS|---> NODE ---+ + + while(!head.compare_exchange_weak(h, node)) { - q = p; + node->next.store(h); usleep(sleep_time); } - return iterator(q); + // the final state of the list after compare-and-swap looks like this + + // HEAD [1] --> [2] --> [3] --> ... + // | | + // | | + // +---> NODE ---+ + } + + bool remove(read_write_iterator& it) + { + // acquire an exclusive guard. + // we only care about push_front and iterator performance so we can + // we only care about push_front and iterator performance so we can + // tradeoff some remove speed for better reads and inserts. remove is + // used exclusively by the GC thread(s) so it can be slower + auto guard = acquire(); + + // even though concurrent removes are synchronized, we need to worry + // about concurrent reads (solved by using atomics) and concurrent + // inserts to head (VERY dangerous, suffers from ABA problem, solved + // by simply not deleting the head node until it gets pushed further + // down the list) + + // check if we're deleting the head node. we can't do that because of + // the ABA problem so just return false for now. the logic behind this + // is that this node will move further down the list next time the + // garbage collector traverses this list and therefore it will become + // deletable + if(it->prev == nullptr) + return false; + + // HEAD --> ... --> [i] --> [i + 1] --> [i + 2] --> ... + // + // prev curr next + + auto prev = it->prev; + auto curr = it->curr; + auto next = curr->next.load(std::memory_order_acquire); + + // effectively remove the curr node from the list + + // +---------------------+ + // | | + // | v + // HEAD --> ... --> [i] [i + 1] --> [i + 2] --> ... + // + // prev curr next + + prev->next.store(next, std::memory_order_release); + + // TODO curr is now removed from the list so no iterators will be able + // to reach it at this point, but we still need to check the hazard + // pointers and wait until everyone who currently holds a reference to + // it has stopped using it before we can physically delete it + + // while(hp.find(reinterpret_cast(curr))) + // sleep(sleep_time); + + return true; } - // TODO think about how to make this lock free and safe from ABA - // this can easily be thread safe if there is ONLY ONE concurrent - // remove operation - //bool remove(T item); - private: - std::atomic> head { nullptr }; + std::atomic head { nullptr }; }; -}; +template +bool operator==(typename List::read_iterator& a, + typename List::read_iterator& b) +{ + return a->curr == b->curr; +} + +template +bool operator!=(typename List::read_iterator& a, + typename List::read_iterator& b) +{ + return !operator==(a, b); +} + +} #endif From 9ae14c36c86451606d795b0957f176a1873a8516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= Date: Tue, 22 Sep 2015 01:05:54 +0200 Subject: [PATCH 2/5] added arcconfig --- .arcconfig | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .arcconfig diff --git a/.arcconfig b/.arcconfig new file mode 100644 index 000000000..92aa20d62 --- /dev/null +++ b/.arcconfig @@ -0,0 +1,4 @@ +{ + "project_id" : "memgraph", + "conduit_uri" : "https://phabricator.tomicevic.com" +} From 38ec6a10b18cf600acd4254f901c9c4b94475b40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= Date: Tue, 22 Sep 2015 01:10:47 +0200 Subject: [PATCH 3/5] implemented a lockfree list Summary: It's not the fastest for all operations but it's efficient on inserts to front and list traversals. removes are quite slow but it's not important because remove is only used by the garbage collector. Test Plan: Reason wisely Reviewers: buda, borko Differential Revision: https://phabricator.tomicevic.com/D1 --- cypher/.gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/cypher/.gitignore b/cypher/.gitignore index 0d539c1af..915128399 100644 --- a/cypher/.gitignore +++ b/cypher/.gitignore @@ -1,6 +1,7 @@ *.o cypher.cpp cypher.h +cypher.hpp parser cypher.out parser From ec9af001d62763311e811aa5f273a57e6cacd3c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= Date: Thu, 24 Sep 2015 01:14:00 +0200 Subject: [PATCH 4/5] lock free linked list implementation --- data_structures/list/lockfree_list2.hpp | 71 --------------- promise.cpp | 115 ++++++++++++++++++++++++ threading/sync/timed_spinlock.hpp | 3 +- threading/task.hpp | 12 --- utils/mark_ref.hpp | 45 ++++++++++ 5 files changed, 162 insertions(+), 84 deletions(-) delete mode 100644 data_structures/list/lockfree_list2.hpp create mode 100644 promise.cpp delete mode 100644 threading/task.hpp create mode 100644 utils/mark_ref.hpp diff --git a/data_structures/list/lockfree_list2.hpp b/data_structures/list/lockfree_list2.hpp deleted file mode 100644 index a9d762cff..000000000 --- a/data_structures/list/lockfree_list2.hpp +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef MEMGRAPH_DATA_STRUCTURES_LOCKFREE_LIST2_HPP -#define MEMGRAPH_DATA_STRUCTURES_LOCKFREE_LIST2_HPP - -#include - -#include "threading/sync/spinlock.hpp" -#include "threading/sync/lockable.hpp" - -namespace lockfree -{ - -template -class List2 : Lockable -{ - struct Node { T value; std::atomic next; }; - - class iterator - { - iterator(Node* node) : node(node) {} - - T& operator*() const { return *node; } - T* operator->() const { return node; } - - bool end() const - { - return node == nullptr; - } - - iterator& operator++() - { - node = node->next.load(std::memory_order_relaxed); - return *this; - } - - iterator& operator++(int) - { - return operator++(); - } - - protected: - Node* node; - }; - -public: - - ~List2() - { - Node* next, node = head.load(std::memory_order_relaxed); - - for(; node != nullptr; node = next) - { - next = node->next; - delete node; - } - } - - void insert(T value) - { - auto guard = acquire(); - head.store(new Node { value, head }); - } - - iterator begin() { return iterator(head.load()); } - -private: - std::atomic head; -}; - -} - -#endif diff --git a/promise.cpp b/promise.cpp new file mode 100644 index 000000000..8df711aef --- /dev/null +++ b/promise.cpp @@ -0,0 +1,115 @@ +#include +#include + +#include "server/uv/uv.hpp" +#include "server/http/http.hpp" + +template +class Promise +{ +public: + Promise(std::function f, Args&&... args) + { + result = f(std::forward(args)...); + } + + template + Promise then(std::function f) + { + return Promise(f, std::forward(result)); + } + + T get() + { + return result; + } + +private: + + + T result; + std::atomic completed; +}; + +class TaskPool +{ + template + class Task + { + public: + using task_t = Task; + using work_f = std::function; + using after_work_f = std::function; + + Task(work_f work, after_work_f callback) + : work(work), callback(callback) + { + req.data = this; + } + + void launch(uv::UvLoop& loop) + { + uv_queue_work(loop, &req, work_cb, after_work_cb); + } + + private: + std::function work; + std::function callback; + + R result; + + uv_work_t req; + + static void work_cb(uv_work_t* req) + { + auto& task = *reinterpret_cast(req->data); + } + + static void after_work_cb(uv_work_t* req, int) + { + auto task = reinterpret_cast(req->data); + delete task; + } + }; + +public: + TaskPool(uv::UvLoop& loop) : loop(loop) {} + + template + void launch(std::function func, + std::function callback) + { + auto task = new Task(func, callback); + task->launch(loop); + } + +private: + uv::UvLoop& loop; +}; + + + +int main(void) +{ + uv::UvLoop loop; + TaskPool tp(loop); + + tp.launch([](void) -> int { return 3 }, + [](int x) -> void { std::cout << x << std::endl; }); + +// http::HttpServer server(loop); +// +// http::Ipv4 ip("0.0.0.0", 3400); +// +// server.listen(ip, [](http::Request& req, http::Response& res) { +// +// +// +// res.send(req.url); +// }); +// +// loop.run(uv::UvLoop::Mode::Default); + + + return 0; +} diff --git a/threading/sync/timed_spinlock.hpp b/threading/sync/timed_spinlock.hpp index 5758843b6..77eb6aa3d 100644 --- a/threading/sync/timed_spinlock.hpp +++ b/threading/sync/timed_spinlock.hpp @@ -12,6 +12,7 @@ class LockExpiredError : public std::runtime_error using runtime_error::runtime_error; }; +template class TimedSpinLock { public: @@ -36,7 +37,7 @@ public: if(clock::now() - start > expiration) throw LockExpiredError("This lock has expired"); - usleep(250); + usleep(microseconds); } } diff --git a/threading/task.hpp b/threading/task.hpp deleted file mode 100644 index d5be264c3..000000000 --- a/threading/task.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef MEMGRAPH_THREADING_TASK_HPP -#define MEMGRAPH_THREADING_TASK_HPP - -class Task -{ -public: - -private: - //std::function< -}; - -#endif diff --git a/utils/mark_ref.hpp b/utils/mark_ref.hpp new file mode 100644 index 000000000..b98d5c731 --- /dev/null +++ b/utils/mark_ref.hpp @@ -0,0 +1,45 @@ +#ifndef MEMGRAPH_UTILS_MARK_REF_HPP +#define MEMGRAPH_UTILS_MARK_REF_HPP + +#include + +template +struct MarkRef +{ + MarkRef() = default; + MarkRef(MarkRef&) = default; + MarkRef(MarkRef&&) = default; + + bool is_marked() const + { + return ptr & 0x1L; + } + + bool set_mark() + { + return ptr |= 0x1L; + } + + bool clear_mark() + { + return ptr &= ~0x1L; + } + + T* get() const + { + return reinterpret_cast(ptr & ~0x1L); + } + + T& operator*() { return *get(); } + T* operator->() { return get(); } + + uintptr_t ptr; +}; + +//template +//MarkRef make_markref(Args&&... args) +//{ +// +//} + +#endif From 2727c26eb59fa9ea38b496d3752db2610ec53171 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Tomic=CC=8Cevic=CC=81?= Date: Thu, 24 Sep 2015 18:36:16 +0200 Subject: [PATCH 5/5] implemented a lock free and (almost) wait free bitset --- data_structures/bitset/bitblock.hpp | 79 ----------- data_structures/bitset/dynamic_bitset.hpp | 151 +++++++++++++++++----- 2 files changed, 120 insertions(+), 110 deletions(-) delete mode 100644 data_structures/bitset/bitblock.hpp diff --git a/data_structures/bitset/bitblock.hpp b/data_structures/bitset/bitblock.hpp deleted file mode 100644 index 52adc1f1c..000000000 --- a/data_structures/bitset/bitblock.hpp +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef MEMGRAPH_DATA_STRUCTURES_BITBLOCK_HPP -#define MEMGRAPH_DATA_STRUCTURES_BITBLOCK_HPP - -#include -#include -#include -#include - -template -struct BitBlock -{ - BitBlock() : block(0) {} - - static constexpr size_t bits = sizeof(block_t) * 8; - static constexpr size_t size = bits / N; - - // e.g. if N = 2, - // mask = 11111111 >> 6 = 00000011 - static constexpr block_t mask = (block_t)(-1) >> (bits - N); - - block_t at(size_t n) - { - assert(n < size); - - block_t b = block.load(std::memory_order_relaxed); - return (b >> n * N) & mask; - } - - // caution! this method assumes that the value on the sub-block n is 0..0! - void set(size_t n, block_t value) - { - assert(n < size); - assert(value < (1UL << N)); - block_t b, new_value; - - while(true) - { - b = block.load(std::memory_order_relaxed); - new_value = b | (value << n * N); - - if(block.compare_exchange_weak(b, new_value, - std::memory_order_release, - std::memory_order_relaxed)) - { - break; - } - - // reduces contention and works better than pure while - usleep(250); - } - } - - void clear(size_t n) - { - assert(n < size); - block_t b, new_value; - - while(true) - { - b = block.load(std::memory_order_relaxed); - new_value = b & ~(mask << n * N); - - if(block.compare_exchange_weak(b, new_value, - std::memory_order_release, - std::memory_order_relaxed)) - { - break; - } - - // reduces contention and works better than pure while - usleep(250); - } - } - - std::atomic block; -}; - -#endif diff --git a/data_structures/bitset/dynamic_bitset.hpp b/data_structures/bitset/dynamic_bitset.hpp index cc2cee77d..b41fc4b75 100644 --- a/data_structures/bitset/dynamic_bitset.hpp +++ b/data_structures/bitset/dynamic_bitset.hpp @@ -1,59 +1,148 @@ #ifndef MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP #define MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP +#include #include +#include #include +#include #include -#include "sync/spinlock.hpp" -#include "bitblock.hpp" +#include "threading/sync/lockable.hpp" +#include "threading/sync/spinlock.hpp" -template -class DynamicBitset +template +class DynamicBitset : Lockable { - using Block = BitBlock; + struct Block + { + Block(Block&) = delete; + Block(Block&&) = delete; + + static constexpr size_t size = sizeof(block_t) * 8; + + constexpr block_t bitmask(size_t group_size) + { + return (block_t)(-1) >> (size - group_size); + } + + block_t at(size_t k, size_t n, std::memory_order order) + { + assert(k + n - 1 < size); + return (block.load(order) >> k) & bitmask(n); + } + + void set(size_t k, size_t n, std::memory_order order) + { + assert(k + n - 1 < size); + block.fetch_or(bitmask(n) << k, order); + } + + void clear(size_t k, size_t n, std::memory_order order) + { + assert(k + n - 1 < size); + block.fetch_and(~(bitmask(n) << k), order); + } + + std::atomic block {0}; + }; + + struct Chunk + { + Chunk() : next(nullptr) + { + static_assert(chunk_size % sizeof(block_t) == 0, + "chunk size not divisible by block size"); + } + + Chunk(Chunk&) = delete; + Chunk(Chunk&&) = delete; + + ~Chunk() + { + delete next; + } + + static constexpr size_t size = chunk_size * Block::size; + static constexpr size_t n_blocks = chunk_size / sizeof(block_t); + + block_t at(size_t k, size_t n, std::memory_order order) + { + return blocks[k / Block::size].at(k % Block::size, n, order); + } + + void set(size_t k, size_t n, std::memory_order order) + { + blocks[k / Block::size].set(k % Block::size, n, order); + } + + void clear(size_t k, size_t n, std::memory_order order) + { + blocks[k / Block::size].clear(k % Block::size, n, order); + } + + Block blocks[n_blocks]; + std::atomic next; + }; public: - DynamicBitset(size_t n) : data(container_size(n)) {} - void resize(size_t n) + block_t at(size_t k, size_t n = 1) { - auto guard = acquire(); - data.resize(container_size(n)); + auto chunk = find_chunk(k); + chunk.at(k, n, std::memory_order_seq_cst); } - size_t size() const + void set(size_t k, size_t n = 1) { - return data.size(); + auto chunk = find_chunk(k); + chunk.set(k, n, std::memory_order_seq_cst); } - block_t at(size_t n) + void clear(size_t k, size_t n = 1) { - return data[n / Block::size].at(n % Block::size); - } - - void set(size_t n, block_t value) - { - data[n / Block::size].set(n % Block::size, value); + auto chunk = find_chunk(k); + chunk.clear(k, n, std::memory_order_seq_cst); } private: - - std::unique_lock acquire() + Chunk& find_chunk(size_t& k) { - return std::unique_lock(lock); + Chunk* chunk = head.load(), next = nullptr; + + // while i'm not in the right chunk + // (my index is bigger than the size of this chunk) + while(k >= Chunk::size) + { + next = chunk->next.load(); + + // if a next chunk exists, switch to it and decrement my + // pointer by the size of the current chunk + if(next != nullptr) + { + chunk = next; + k -= Chunk::size; + continue; + } + + // the next chunk does not exist and we need it. take an exclusive + // lock to prevent others that also want to create a new chunk + // from creating it + auto guard = acquire(); + + // double-check locking. if the chunk exists now, some other thread + // has just created it, continue searching for my chunk + if(chunk->next.load() != nullptr) + continue; + + chunk->next.store(new Chunk()); + } + + assert(chunk != nullptr); + return *chunk; } - size_t container_size(size_t num_elems) - { - return (num_elems + N - 1) / N; - } - - std::vector data; - - lock_t lock; + std::atomic head; }; #endif