diff --git a/.arcconfig b/.arcconfig new file mode 100644 index 000000000..92aa20d62 --- /dev/null +++ b/.arcconfig @@ -0,0 +1,4 @@ +{ + "project_id" : "memgraph", + "conduit_uri" : "https://phabricator.tomicevic.com" +} diff --git a/cypher/.gitignore b/cypher/.gitignore index 0d539c1af..915128399 100644 --- a/cypher/.gitignore +++ b/cypher/.gitignore @@ -1,6 +1,7 @@ *.o cypher.cpp cypher.h +cypher.hpp parser cypher.out parser diff --git a/data_structures/bitset/bitblock.hpp b/data_structures/bitset/bitblock.hpp deleted file mode 100644 index 52adc1f1c..000000000 --- a/data_structures/bitset/bitblock.hpp +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef MEMGRAPH_DATA_STRUCTURES_BITBLOCK_HPP -#define MEMGRAPH_DATA_STRUCTURES_BITBLOCK_HPP - -#include -#include -#include -#include - -template -struct BitBlock -{ - BitBlock() : block(0) {} - - static constexpr size_t bits = sizeof(block_t) * 8; - static constexpr size_t size = bits / N; - - // e.g. if N = 2, - // mask = 11111111 >> 6 = 00000011 - static constexpr block_t mask = (block_t)(-1) >> (bits - N); - - block_t at(size_t n) - { - assert(n < size); - - block_t b = block.load(std::memory_order_relaxed); - return (b >> n * N) & mask; - } - - // caution! this method assumes that the value on the sub-block n is 0..0! - void set(size_t n, block_t value) - { - assert(n < size); - assert(value < (1UL << N)); - block_t b, new_value; - - while(true) - { - b = block.load(std::memory_order_relaxed); - new_value = b | (value << n * N); - - if(block.compare_exchange_weak(b, new_value, - std::memory_order_release, - std::memory_order_relaxed)) - { - break; - } - - // reduces contention and works better than pure while - usleep(250); - } - } - - void clear(size_t n) - { - assert(n < size); - block_t b, new_value; - - while(true) - { - b = block.load(std::memory_order_relaxed); - new_value = b & ~(mask << n * N); - - if(block.compare_exchange_weak(b, new_value, - std::memory_order_release, - std::memory_order_relaxed)) - { - break; - } - - // reduces contention and works better than pure while - usleep(250); - } - } - - std::atomic block; -}; - -#endif diff --git a/data_structures/bitset/dynamic_bitset.hpp b/data_structures/bitset/dynamic_bitset.hpp index cc2cee77d..b41fc4b75 100644 --- a/data_structures/bitset/dynamic_bitset.hpp +++ b/data_structures/bitset/dynamic_bitset.hpp @@ -1,59 +1,148 @@ #ifndef MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP #define MEMGRAPH_DATA_STRUCTURES_BITSET_DYNAMIC_BITSET_HPP +#include #include +#include #include +#include #include -#include "sync/spinlock.hpp" -#include "bitblock.hpp" +#include "threading/sync/lockable.hpp" +#include "threading/sync/spinlock.hpp" -template -class DynamicBitset +template +class DynamicBitset : Lockable { - using Block = BitBlock; + struct Block + { + Block(Block&) = delete; + Block(Block&&) = delete; + + static constexpr size_t size = sizeof(block_t) * 8; + + constexpr block_t bitmask(size_t group_size) + { + return (block_t)(-1) >> (size - group_size); + } + + block_t at(size_t k, size_t n, std::memory_order order) + { + assert(k + n - 1 < size); + return (block.load(order) >> k) & bitmask(n); + } + + void set(size_t k, size_t n, std::memory_order order) + { + assert(k + n - 1 < size); + block.fetch_or(bitmask(n) << k, order); + } + + void clear(size_t k, size_t n, std::memory_order order) + { + assert(k + n - 1 < size); + block.fetch_and(~(bitmask(n) << k), order); + } + + std::atomic block {0}; + }; + + struct Chunk + { + Chunk() : next(nullptr) + { + static_assert(chunk_size % sizeof(block_t) == 0, + "chunk size not divisible by block size"); + } + + Chunk(Chunk&) = delete; + Chunk(Chunk&&) = delete; + + ~Chunk() + { + delete next; + } + + static constexpr size_t size = chunk_size * Block::size; + static constexpr size_t n_blocks = chunk_size / sizeof(block_t); + + block_t at(size_t k, size_t n, std::memory_order order) + { + return blocks[k / Block::size].at(k % Block::size, n, order); + } + + void set(size_t k, size_t n, std::memory_order order) + { + blocks[k / Block::size].set(k % Block::size, n, order); + } + + void clear(size_t k, size_t n, std::memory_order order) + { + blocks[k / Block::size].clear(k % Block::size, n, order); + } + + Block blocks[n_blocks]; + std::atomic next; + }; public: - DynamicBitset(size_t n) : data(container_size(n)) {} - void resize(size_t n) + block_t at(size_t k, size_t n = 1) { - auto guard = acquire(); - data.resize(container_size(n)); + auto chunk = find_chunk(k); + chunk.at(k, n, std::memory_order_seq_cst); } - size_t size() const + void set(size_t k, size_t n = 1) { - return data.size(); + auto chunk = find_chunk(k); + chunk.set(k, n, std::memory_order_seq_cst); } - block_t at(size_t n) + void clear(size_t k, size_t n = 1) { - return data[n / Block::size].at(n % Block::size); - } - - void set(size_t n, block_t value) - { - data[n / Block::size].set(n % Block::size, value); + auto chunk = find_chunk(k); + chunk.clear(k, n, std::memory_order_seq_cst); } private: - - std::unique_lock acquire() + Chunk& find_chunk(size_t& k) { - return std::unique_lock(lock); + Chunk* chunk = head.load(), next = nullptr; + + // while i'm not in the right chunk + // (my index is bigger than the size of this chunk) + while(k >= Chunk::size) + { + next = chunk->next.load(); + + // if a next chunk exists, switch to it and decrement my + // pointer by the size of the current chunk + if(next != nullptr) + { + chunk = next; + k -= Chunk::size; + continue; + } + + // the next chunk does not exist and we need it. take an exclusive + // lock to prevent others that also want to create a new chunk + // from creating it + auto guard = acquire(); + + // double-check locking. if the chunk exists now, some other thread + // has just created it, continue searching for my chunk + if(chunk->next.load() != nullptr) + continue; + + chunk->next.store(new Chunk()); + } + + assert(chunk != nullptr); + return *chunk; } - size_t container_size(size_t num_elems) - { - return (num_elems + N - 1) / N; - } - - std::vector data; - - lock_t lock; + std::atomic head; }; #endif diff --git a/data_structures/list/lockfree_list.hpp b/data_structures/list/lockfree_list.hpp index 96185904d..c7c0b4895 100644 --- a/data_structures/list/lockfree_list.hpp +++ b/data_structures/list/lockfree_list.hpp @@ -2,107 +2,204 @@ #define MEMGRAPH_DATA_STRUCTURES_LIST_LOCKFREE_LIST_HPP #include -#include #include +#include "threading/sync/lockable.hpp" + namespace lockfree { template -class List +class List : Lockable { - struct Node - { - T item; - std::shared_ptr next; - }; - public: List() = default; - - // the default destructor is recursive so it could blow the stack if the - // list is long enough. the head node is destructed first via a shared_ptr - // and then it automatically destructs the second node and the second - // destructs the third and so on. keep that in mind - ~List() = default; List(List&) = delete; + List(List&&) = delete; + void operator=(List&) = delete; - class iterator + class read_iterator { public: - iterator(std::shared_ptr ptr) : ptr(ptr) {} + read_iterator(T* curr) : curr(curr) {} + + T& operator*() { return *curr; } + T* operator->() { return curr; } - T& operator*() { return ptr->item; } - T* operator->() { return &ptr->item; } + operator T*() { return curr; } - iterator& operator++() + read_iterator& operator++() { - if(ptr) - ptr = ptr->next; + // todo add curr->next to the hazard pointer list + // (synchronization with GC) + curr = curr->next.load(); return *this; } - iterator& operator++(int) + read_iterator& operator++(int) { - operator++(); + return operator++(); + } + + private: + T* curr; + }; + + class read_write_iterator + { + friend class List; + + public: + read_write_iterator(T* prev, T* curr) : prev(prev), curr(curr) {} + + T& operator*() { return *curr; } + T* operator->() { return curr; } + + operator T*() { return curr; } + + read_write_iterator& operator++() + { + // todo add curr->next to the hazard pointer list + // (synchronization with GC) + + prev = curr; + curr = curr->next.load(); return *this; } - + + read_write_iterator& operator++(int) + { + return operator++(); + } + private: - std::shared_ptr ptr; + T* prev, curr; }; - iterator begin() { return iterator(std::move(head.load())); } - iterator end() { return iterator(nullptr); } - - auto find(T item) + read_iterator begin() { - auto p = head.load(); - - while(p && p->item != item) - p = p->next; - - return iterator(std::move(p)); + return read_iterator(head.load()); } - iterator push_front(T item) + read_write_iterator rw_begin() { - auto p = std::make_shared(); - p->item = item; - p->next = std::move(head.load()); - - while(!head.compare_exchange_weak(p->next, p)) - usleep(sleep_time); - - return iterator(p); + return read_write_iterator(nullptr, head.load()); } - iterator pop_front() + void push_front(T* node) { - auto p = head.load(); - auto q = p; + // we want to push an item to front of a list like this + // HEAD --> [1] --> [2] --> [3] --> ... + + // read the value of head atomically and set the node's next pointer + // to point to the same location as head - while(p && !head.compare_exchange_weak(p, p->next)) + // HEAD --------> [1] --> [2] --> [3] --> ... + // | + // | + // NODE ------+ + + T* h = node->next = head.load(); + + // atomically do: if the value of node->next is equal to current value + // of head, make the head to point to the node. + // if this fails (another thread has just made progress), update the + // value of node->next to the current value of head and retry again + // until you succeed + + // HEAD ----|CAS|----------> [1] --> [2] --> [3] --> ... + // | | | + // | v | + // +-------|CAS|---> NODE ---+ + + while(!head.compare_exchange_weak(h, node)) { - q = p; + node->next.store(h); usleep(sleep_time); } - return iterator(q); + // the final state of the list after compare-and-swap looks like this + + // HEAD [1] --> [2] --> [3] --> ... + // | | + // | | + // +---> NODE ---+ + } + + bool remove(read_write_iterator& it) + { + // acquire an exclusive guard. + // we only care about push_front and iterator performance so we can + // we only care about push_front and iterator performance so we can + // tradeoff some remove speed for better reads and inserts. remove is + // used exclusively by the GC thread(s) so it can be slower + auto guard = acquire(); + + // even though concurrent removes are synchronized, we need to worry + // about concurrent reads (solved by using atomics) and concurrent + // inserts to head (VERY dangerous, suffers from ABA problem, solved + // by simply not deleting the head node until it gets pushed further + // down the list) + + // check if we're deleting the head node. we can't do that because of + // the ABA problem so just return false for now. the logic behind this + // is that this node will move further down the list next time the + // garbage collector traverses this list and therefore it will become + // deletable + if(it->prev == nullptr) + return false; + + // HEAD --> ... --> [i] --> [i + 1] --> [i + 2] --> ... + // + // prev curr next + + auto prev = it->prev; + auto curr = it->curr; + auto next = curr->next.load(std::memory_order_acquire); + + // effectively remove the curr node from the list + + // +---------------------+ + // | | + // | v + // HEAD --> ... --> [i] [i + 1] --> [i + 2] --> ... + // + // prev curr next + + prev->next.store(next, std::memory_order_release); + + // TODO curr is now removed from the list so no iterators will be able + // to reach it at this point, but we still need to check the hazard + // pointers and wait until everyone who currently holds a reference to + // it has stopped using it before we can physically delete it + + // while(hp.find(reinterpret_cast(curr))) + // sleep(sleep_time); + + return true; } - // TODO think about how to make this lock free and safe from ABA - // this can easily be thread safe if there is ONLY ONE concurrent - // remove operation - //bool remove(T item); - private: - std::atomic> head { nullptr }; + std::atomic head { nullptr }; }; -}; +template +bool operator==(typename List::read_iterator& a, + typename List::read_iterator& b) +{ + return a->curr == b->curr; +} + +template +bool operator!=(typename List::read_iterator& a, + typename List::read_iterator& b) +{ + return !operator==(a, b); +} + +} #endif diff --git a/data_structures/list/lockfree_list2.hpp b/data_structures/list/lockfree_list2.hpp deleted file mode 100644 index a9d762cff..000000000 --- a/data_structures/list/lockfree_list2.hpp +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef MEMGRAPH_DATA_STRUCTURES_LOCKFREE_LIST2_HPP -#define MEMGRAPH_DATA_STRUCTURES_LOCKFREE_LIST2_HPP - -#include - -#include "threading/sync/spinlock.hpp" -#include "threading/sync/lockable.hpp" - -namespace lockfree -{ - -template -class List2 : Lockable -{ - struct Node { T value; std::atomic next; }; - - class iterator - { - iterator(Node* node) : node(node) {} - - T& operator*() const { return *node; } - T* operator->() const { return node; } - - bool end() const - { - return node == nullptr; - } - - iterator& operator++() - { - node = node->next.load(std::memory_order_relaxed); - return *this; - } - - iterator& operator++(int) - { - return operator++(); - } - - protected: - Node* node; - }; - -public: - - ~List2() - { - Node* next, node = head.load(std::memory_order_relaxed); - - for(; node != nullptr; node = next) - { - next = node->next; - delete node; - } - } - - void insert(T value) - { - auto guard = acquire(); - head.store(new Node { value, head }); - } - - iterator begin() { return iterator(head.load()); } - -private: - std::atomic head; -}; - -} - -#endif diff --git a/promise.cpp b/promise.cpp new file mode 100644 index 000000000..8df711aef --- /dev/null +++ b/promise.cpp @@ -0,0 +1,115 @@ +#include +#include + +#include "server/uv/uv.hpp" +#include "server/http/http.hpp" + +template +class Promise +{ +public: + Promise(std::function f, Args&&... args) + { + result = f(std::forward(args)...); + } + + template + Promise then(std::function f) + { + return Promise(f, std::forward(result)); + } + + T get() + { + return result; + } + +private: + + + T result; + std::atomic completed; +}; + +class TaskPool +{ + template + class Task + { + public: + using task_t = Task; + using work_f = std::function; + using after_work_f = std::function; + + Task(work_f work, after_work_f callback) + : work(work), callback(callback) + { + req.data = this; + } + + void launch(uv::UvLoop& loop) + { + uv_queue_work(loop, &req, work_cb, after_work_cb); + } + + private: + std::function work; + std::function callback; + + R result; + + uv_work_t req; + + static void work_cb(uv_work_t* req) + { + auto& task = *reinterpret_cast(req->data); + } + + static void after_work_cb(uv_work_t* req, int) + { + auto task = reinterpret_cast(req->data); + delete task; + } + }; + +public: + TaskPool(uv::UvLoop& loop) : loop(loop) {} + + template + void launch(std::function func, + std::function callback) + { + auto task = new Task(func, callback); + task->launch(loop); + } + +private: + uv::UvLoop& loop; +}; + + + +int main(void) +{ + uv::UvLoop loop; + TaskPool tp(loop); + + tp.launch([](void) -> int { return 3 }, + [](int x) -> void { std::cout << x << std::endl; }); + +// http::HttpServer server(loop); +// +// http::Ipv4 ip("0.0.0.0", 3400); +// +// server.listen(ip, [](http::Request& req, http::Response& res) { +// +// +// +// res.send(req.url); +// }); +// +// loop.run(uv::UvLoop::Mode::Default); + + + return 0; +} diff --git a/threading/sync/timed_spinlock.hpp b/threading/sync/timed_spinlock.hpp index 5758843b6..77eb6aa3d 100644 --- a/threading/sync/timed_spinlock.hpp +++ b/threading/sync/timed_spinlock.hpp @@ -12,6 +12,7 @@ class LockExpiredError : public std::runtime_error using runtime_error::runtime_error; }; +template class TimedSpinLock { public: @@ -36,7 +37,7 @@ public: if(clock::now() - start > expiration) throw LockExpiredError("This lock has expired"); - usleep(250); + usleep(microseconds); } } diff --git a/threading/task.hpp b/threading/task.hpp deleted file mode 100644 index d5be264c3..000000000 --- a/threading/task.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef MEMGRAPH_THREADING_TASK_HPP -#define MEMGRAPH_THREADING_TASK_HPP - -class Task -{ -public: - -private: - //std::function< -}; - -#endif diff --git a/utils/mark_ref.hpp b/utils/mark_ref.hpp new file mode 100644 index 000000000..b98d5c731 --- /dev/null +++ b/utils/mark_ref.hpp @@ -0,0 +1,45 @@ +#ifndef MEMGRAPH_UTILS_MARK_REF_HPP +#define MEMGRAPH_UTILS_MARK_REF_HPP + +#include + +template +struct MarkRef +{ + MarkRef() = default; + MarkRef(MarkRef&) = default; + MarkRef(MarkRef&&) = default; + + bool is_marked() const + { + return ptr & 0x1L; + } + + bool set_mark() + { + return ptr |= 0x1L; + } + + bool clear_mark() + { + return ptr &= ~0x1L; + } + + T* get() const + { + return reinterpret_cast(ptr & ~0x1L); + } + + T& operator*() { return *get(); } + T* operator->() { return get(); } + + uintptr_t ptr; +}; + +//template +//MarkRef make_markref(Args&&... args) +//{ +// +//} + +#endif