diff --git a/src/data_structures/bitset/dynamic_bitset.hpp b/src/data_structures/bitset/dynamic_bitset.hpp deleted file mode 100644 index 5a0275fa5..000000000 --- a/src/data_structures/bitset/dynamic_bitset.hpp +++ /dev/null @@ -1,218 +0,0 @@ -#pragma once - -#include - -#include "glog/logging.h" - -/** - * A sequentially ordered non-unique lock-free concurrent collection of bits. - * - * Grows dynamically to accomodate the maximum set-bit position. Does not - * dynamically decrease in size. - * - * Bits can be set, retrieved and cleared in groups. Note that all group - * operations fail if the group spans multiple basic storage units. For example, - * if basic storage is in 8 bits, calling at(5, 2) is legal, but calling at(7, - * 2) is not. - * - * @tparam block_t - Basic storage type. Must have lock-free atomic. Should - * probably always be uint_8. - * @tparam chunk_size - number of bits in one chunk - */ -template -class DynamicBitset { - // Basic storage unit. - struct Block { - Block() = default; - - Block(const Block &) = delete; - Block(Block &&) = delete; - Block &operator=(const Block &) = delete; - Block &operator=(Block &&) = delete; - - // The number of bits in one Block. - static constexpr size_t kSize = sizeof(block_t) * 8; - - block_t at(size_t k, size_t n) const { - DCHECK(k + n - 1 < kSize) << "Invalid index."; - return (block_.load() >> k) & bitmask(n); - } - - void set(size_t k, size_t n) { - DCHECK(k + n - 1 < kSize) << "Invalid index."; - block_.fetch_or(bitmask(n) << k); - } - - void clear(size_t k, size_t n) { - DCHECK(k + n - 1 < kSize) << "Invalid index."; - block_.fetch_and(~(bitmask(n) << k)); - } - - private: - std::atomic block_{0}; - - constexpr block_t bitmask(size_t group_size) const { - return (block_t)(-1) >> (kSize - group_size); - } - }; - - struct Chunk { - Chunk(Chunk *next, int64_t chunk_id) : next_(next), chunk_id_(chunk_id) { - static_assert(chunk_size % Block::kSize == 0, - "chunk size not divisible by block size"); - } - - Chunk(const Chunk &) = delete; - Chunk(Chunk &&) = delete; - Chunk &operator=(const Chunk &) = delete; - Chunk &operator=(Chunk &&) = delete; - - // The number of bits in one chunk. - static constexpr size_t kSize = chunk_size; - // The number of blocks_ in one chunk. - static constexpr size_t kNumBlocks = chunk_size / Block::kSize; - - block_t at(size_t k, size_t n) const { - return blocks_[k / Block::kSize].at(k % Block::kSize, n); - } - - void set(size_t k, size_t n) { - blocks_[k / Block::kSize].set(k % Block::kSize, n); - } - - void clear(size_t k, size_t n) { - blocks_[k / Block::kSize].clear(k % Block::kSize, n); - } - - // Range of the bits stored in this chunk is [low, high>. - int64_t low() const { return chunk_id_ * kSize; } - - int64_t high() const { return (chunk_id_ + 1) * kSize; } - - Block blocks_[kNumBlocks]; - std::atomic next_; - const int64_t chunk_id_; - }; - - public: - DynamicBitset() {} - - // Can't move nor copy a DynamicBitset because of atomic head and locking. - DynamicBitset(const DynamicBitset &) = delete; - DynamicBitset(DynamicBitset &&) = delete; - DynamicBitset &operator=(const DynamicBitset &) = delete; - DynamicBitset &operator=(DynamicBitset &&) = delete; - - ~DynamicBitset() { - auto now = head_.load(); - while (now != nullptr) { - auto next = now->next_.load(); - delete now; - now = next; - } - } - - /** - * Gets the block of bit starting at bit k and containing the the following n - * bits. The bit index with k in this bitset is zeroth bit in the returned - * value. - */ - block_t at(size_t k, size_t n) const { - if (k >= head_.load()->high()) return 0; - - const auto &chunk = FindChunk(k); - return chunk.at(k, n); - } - - /** Returns k-th bit's value. */ - bool at(size_t k) const { return at(k, 1); } - - /** - * Set all the bits in the group of size `n`, starting from bit `k`. - */ - void set(size_t k, size_t n = 1) { - auto &chunk = FindOrCreateChunk(k); - return chunk.set(k, n); - } - - /** - * Clears all the bits in the group of size `n`, starting from bit `k`. - */ - void clear(size_t k, size_t n = 1) { - // If desired bit is out of bounds, it's already clear. - if (k >= head_.load()->high()) return; - - auto &chunk = FindOrCreateChunk(k); - return chunk.clear(k, n); - } - - /** - * Deletes all blocks which contain all positions lower than pos, assumes that - * there doesn't exist a pointer to those blocks, i.e. it's safe to delete - * them - */ - void delete_prefix(size_t pos) { - // Never delete head as that might invalidate the whole structure which - // depends on head being available - Chunk *last = head_.load(); - Chunk *chunk = last->next_; - - // High is exclusive endpoint of interval - while (chunk != nullptr && chunk->high() > pos) { - last = chunk; - chunk = chunk->next_; - } - - if (chunk != nullptr) { - // Unlink from last - last->next_ = nullptr; - // Deletes chunks - Chunk *next; - while (chunk) { - next = chunk->next_; - delete chunk; - chunk = next; - } - } - } - - private: - // Finds the chunk to which k-th bit belongs fails if k is out of bounds. - const Chunk &FindChunk(size_t &k) const { - DCHECK(k < head_.load()->high()) << "Index out of bounds"; - Chunk *chunk = head_; - - while (k < chunk->low()) { - chunk = chunk->next_; - CHECK(chunk != nullptr) << "chunk is nullptr"; - } - k -= chunk->low(); - return *chunk; - } - - /** - * Finds the chunk to which k-th bit belong. If k is out of bounds, the chunk - * for it is created. - */ - Chunk &FindOrCreateChunk(size_t &k) { - Chunk *head = head_; - - while (k >= head->high()) { - // The next chunk does not exist and we need it, so we will try to create - // it. - Chunk *new_head = new Chunk(head, head->chunk_id_ + 1); - if (!head_.compare_exchange_strong(head, new_head)) { - // Other thread updated head_ before us, so we need to delete new_head. - head = head_; - delete new_head; - continue; - } - } - - // Now we are sure chunk exists and we can call find function. - // const_cast is used to avoid code duplication. - return const_cast(FindChunk(k)); - } - - std::atomic head_{new Chunk(nullptr, 0)}; -}; diff --git a/src/data_structures/concurrent/concurrent_map.hpp b/src/data_structures/concurrent/concurrent_map.hpp deleted file mode 100644 index fc9b21834..000000000 --- a/src/data_structures/concurrent/concurrent_map.hpp +++ /dev/null @@ -1,89 +0,0 @@ -#pragma once - -#include "data_structures/concurrent/skiplist.hpp" -#include "utils/total_ordering.hpp" - -/// Thread-safe map intended for high concurrent throughput. -/// -/// @tparam TKey is a type of key. -/// @tparam TValue is a type of data. -template -class ConcurrentMap { - /// At item in the concurrent map. A pair of that compares on - /// the first value (key). Comparable to another Item, or only to the TKey. - class Item : public utils::TotalOrdering, - public utils::TotalOrdering, - public utils::TotalOrdering, - public std::pair { - public: - using std::pair::pair; - - friend constexpr bool operator<(const Item &lhs, const Item &rhs) { - return lhs.first < rhs.first; - } - - friend constexpr bool operator==(const Item &lhs, const Item &rhs) { - return lhs.first == rhs.first; - } - - friend constexpr bool operator<(const TKey &lhs, const Item &rhs) { - return lhs < rhs.first; - } - - friend constexpr bool operator==(const TKey &lhs, const Item &rhs) { - return lhs == rhs.first; - } - - friend constexpr bool operator<(const Item &lhs, const TKey &rhs) { - return lhs.first < rhs; - } - - friend constexpr bool operator==(const Item &lhs, const TKey &rhs) { - return lhs.first == rhs; - } - }; - - using Iterator = typename SkipList::Iterator; - - public: - ConcurrentMap() {} - - template - class Accessor : public SkipList::template Accessor { - public: - friend class ConcurrentMap; - - using SkipList::template Accessor::Accessor; - - std::pair insert(const TKey &key, const TValue &data) { - return SkipList::template Accessor::insert( - Item(key, data)); - } - - std::pair insert(const TKey &key, TValue &&data) { - return SkipList::template Accessor::insert( - Item(key, std::move(data))); - } - - std::pair insert(TKey &&key, TValue &&data) { - return SkipList::template Accessor::insert( - Item(std::forward(key), std::forward(data))); - } - - template - std::pair emplace(const TKey &key, - std::tuple first_args, - std::tuple second_args) { - return SkipList::template Accessor::emplace( - key, std::piecewise_construct, - std::forward>(first_args), - std::forward>(second_args)); - } - }; - - auto access() { return Accessor>(&skiplist); } - auto access() const { return Accessor>(&skiplist); } - - private: - SkipList skiplist; -}; diff --git a/src/data_structures/concurrent/push_queue.hpp b/src/data_structures/concurrent/push_queue.hpp deleted file mode 100644 index 1840b65e5..000000000 --- a/src/data_structures/concurrent/push_queue.hpp +++ /dev/null @@ -1,180 +0,0 @@ -#pragma once - -#include -#include - -#include "glog/logging.h" - -/** @brief A queue with lock-free concurrent push and - * single-threaded deletion. - * - * Deletions are done via the iterator. Currently only - * tail-deletion is supported, but individual element - * deletion should be possible to implement. - * Iteration can also be concurrent, as long as nobody - * is deleting. - * - * @tparam TElement Type of element stored in the - * queue. - */ -template -class ConcurrentPushQueue { - private: - // The queue is implemented as as singly-linked list - // and this is one list element - class Node { - public: - // constructor that accepts only arguments for - // creating the element. it can accept both rvalue and lvalue - // refs to an element, or arguments for emplace creation of - // an element - template - explicit Node(TArgs &&... args) : element_(std::forward(args)...) {} - - // element itself and pointer to next Node - TElement element_; - Node *next_{nullptr}; - }; - - /** @brief Iterator over the queue. - * - * Exposes standard forward-iterator ops, plus the - * delete_tail() function. - * - * @tparam TQueue - either const or non-const ConcurrentPushQueue - */ - template - struct Iterator { - public: - Iterator(TQueue &queue, Node *current) : queue_(queue), current_(current) {} - - // copying, moving and destroying is all fine - - bool operator==(const Iterator &rhs) const { - return rhs.current_ == this->current_; - } - - bool operator!=(const Iterator &rhs) const { return !(*this == rhs); } - - Iterator &operator++() { - DCHECK(current_ != nullptr) << "Prefix increment on invalid iterator"; - previous_ = current_; - current_ = current_->next_; - return *this; - } - - Iterator operator++(int) { - DCHECK(current_ != nullptr) << "Postfix increment on invalid iterator"; - Iterator rval(queue_, current_); - previous_ = current_; - current_ = current_->next_; - return rval; - } - - TElement &operator*() { - DCHECK(current_ != nullptr) - << "Dereferencing operator on invalid iterator"; - return current_->element_; - } - TElement *operator->() { - DCHECK(current_ != nullptr) << "Arrow operator on invalid iterator"; - return ¤t_->element_; - } - - /** @brief Deletes the current element of the iterator, - * and all subsequent elements. - * - * After this op this iterator is not valid and is equal - * to push_queue.end(). Invalidates all the iterators - * that were equal to this or further away from the head. - * - * @return The number of deleted elements. - */ - size_t delete_tail() { - // compare and swap queue head - // if it succeeds, it means that the current node of this - // iterator was head, and we have just replaced head with nullptr - // if it fails, it means the current node was not head, and - // we got the new_head - auto new_head = current_; - bool was_head = - queue_.get().head_.compare_exchange_strong(new_head, nullptr); - // if we have no previous_ (iterator wasn't incremented and thinks it's - // head), but in reality we weren't head, then we need to find the actual - // previous_ node, so we can cut the tail off - if (!previous_ && !was_head) { - previous_ = new_head; - while (previous_->next_ != current_) previous_ = previous_->next_; - } - - // cut the tail off - if (previous_) previous_->next_ = nullptr; - - // delete all from current to the end, track deletion count - size_t deleted = 0; - previous_ = current_; - while (current_) { - previous_ = current_; - current_ = current_->next_; - delete previous_; - deleted++; - } - - // update the size of the queue and return - queue_.get().size_ -= deleted; - return deleted; - } - - private: - // the queue we're iterating over - // use a reference wrapper because it facilitates move and copy - std::reference_wrapper queue_; - // the current node of this iterator - Node *current_{nullptr}; - // the previous node of this iterator - // used for deleting the current node - Node *previous_{nullptr}; - }; - - public: - ConcurrentPushQueue() = default; - ~ConcurrentPushQueue() { begin().delete_tail(); } - - // copy and move ops are disabled due to atomic member variable - - /// Pushes an element to the queue. - // TODO: review - this is at the same time push and emplace, - // how should it be called? - template - void push(TArgs &&... args) { - auto node = new Node(std::forward(args)...); - while (!head_.compare_exchange_strong(node->next_, node)) continue; - size_++; - }; - - // non-const iterators - auto begin() { return Iterator(*this, head_.load()); } - auto end() { return Iterator(*this, nullptr); } - - // const iterators - auto cbegin() const { - return Iterator(*this, head_.load()); - } - auto cend() const { - return Iterator(*this, nullptr); - } - auto begin() const { - return Iterator(*this, head_.load()); - } - auto end() const { - return Iterator(*this, nullptr); - } - - auto size() const { return size_.load(); } - - private: - // head of the queue (last-added element) - std::atomic head_{nullptr}; - // number of elements in the queue - std::atomic size_{0}; -}; diff --git a/src/data_structures/concurrent/skiplist.hpp b/src/data_structures/concurrent/skiplist.hpp deleted file mode 100644 index 3f01e4196..000000000 --- a/src/data_structures/concurrent/skiplist.hpp +++ /dev/null @@ -1,1030 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "glog/logging.h" - -#include "data_structures/concurrent/skiplist_gc.hpp" -#include "utils/crtp.hpp" -#include "utils/placeholder.hpp" -#include "utils/random/fast_binomial.hpp" -#include "utils/thread/sync.hpp" - -/** - * computes the height for the new node from the interval [1...H] - * with p(k) = (1/2)^k for all k from the interval - */ -static thread_local utils::random::FastBinomial<> rnd; - -/** @brief Concurrent lock-based skiplist with fine grained locking - * - * From Wikipedia: - * "A skip list is a data structure that allows fast search within an - * ordered sequence of elements. Fast search is made possible by - * maintaining a linked hierarchy of subsequences, each skipping over - * fewer elements. Searching starts in the sparsest subsequence until - * two consecutive elements have been found, one smaller and one - * larger than or equal to the element searched for." - * - * [_]---------------->[+]----------->[_] - * [_]->[+]----------->[+]------>[+]->[_] - * [_]->[+]------>[+]->[+]------>[+]->[_] - * [_]->[+]->[+]->[+]->[+]->[+]->[+]->[_] - * head 1 2 4 5 8 9 nil - * - * The logarithmic properties are maintained by randomizing the height for - * every new node using the binomial distribution - * p(k) = (1/2)^k for k in [1...H]. - * - * The implementation is based on the work described in the paper - * "A Provably Correct Scalable Concurrent Skip List" - * URL: https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf - * - * The proposed implementation is in Java so the authors don't worry about - * garbage collection, but obviously we have to. This implementation uses - * lazy garbage collection. When all clients stop using the skiplist, we can - * be sure that all logically removed nodes are not visible to anyone so - * we can safely remove them. The idea of counting active clients implies - * the use of a intermediary structure (called Accessor) when accessing the - * skiplist. - * - * The implementation has an interface which closely resembles the functions - * with arguments and returned types frequently used by the STL. - * - * Example usage: - * Skiplist skiplist; - * - * { - * auto accessor = skiplist.access(); - * - * // inserts item into the skiplist and returns - * // pair. iterator points to the newly created - * // node and the boolean member evaluates to true denoting that the - * // insertion was successful - * accessor.insert(item1); - * - * // nothing gets inserted because item1 already exist in the skiplist - * // returned iterator points to the existing element and the return - * // boolean evaluates to false denoting the failed insertion - * accessor.insert(item1); - * - * // returns an iterator to the element item1 - * auto it = accessor.find(item1); - * - * // returns an empty iterator. it == accessor.end() - * auto it = accessor.find(item2); - * - * // iterate over all items - * for(auto it = accessor.begin(); it != accessor.end(); ++it) - * cout << *it << endl; - * - * // range based for loops also work - * for(auto& e : accessor) - * cout << e << endl; - * - * accessor.remove(item1); // returns true - * accessor.remove(item1); // returns false because key1 doesn't exist - * } - * - * // accessor out of scope, garbage collection might occur - * - * For detailed operations available, please refer to the Accessor class - * inside the public section of the SkipList class. - * - * @tparam T Type to use as the item - * @tparam H Maximum node height. Determines the effective number of nodes - * the skiplist can hold in order to preserve it's log2 properties - * @tparam lock_t Lock type used when locking is needed during the creation - * and deletion of nodes. - */ -template -class SkipList : private utils::Lockable { - public: - /** @brief Wrapper class for flags used in the implementation - * - * MARKED flag is used to logically delete a node. - * FULLY_LINKED is used to mark the node as fully inserted, i.e. linked - * at all layers in the skiplist up to the node height - */ - struct Flags { - enum node_flags : uint8_t { - MARKED = 0x01, - FULLY_LINKED = 0x10, - }; - - bool is_marked() const { return flags.load() & MARKED; } - - void set_marked() { flags.fetch_or(MARKED); } - - bool is_fully_linked() const { return flags.load() & FULLY_LINKED; } - - /** Waits until the these flags don't get the "fully linked" status. */ - void wait_fully_linked() const { - while (!is_fully_linked()) usleep(250); - } - - void set_fully_linked() { flags.fetch_or(FULLY_LINKED); } - - private: - std::atomic flags{0}; - }; - - class Node : utils::Lockable { - public: - friend class SkipList; - - const uint8_t height; - Flags flags; - - T &value() { return data.get(); } - - const T &value() const { return data.get(); } - - static Node *sentinel(uint8_t height) { - // we have raw memory and we need to construct an object - // of type Node on it - return new (allocate(height)) Node(height); - } - - /** - * Creates a new node for the given item. - * - * @param item - The item that is being inserted into the skiplist. - * @param height - Node height. - * @tparam TItem - This function is templatized so it can accept - * a universal reference to the item. TItem should be the same - * as T. - */ - template - static Node *create(TItem &&item, uint8_t height) { - auto node = allocate(height); - - // we have raw memory and we need to construct an object - // of type Node on it - return new (node) Node(std::forward(item), height); - } - - template - static Node *emplace(uint8_t height, Args &&... args) { - auto node = allocate(height); - - // we have raw memory and we need to construct an object - // of type Node on it - return new (node) Node(height, std::forward(args)...); - } - - bool ok_delete(int level) const { - return flags.is_fully_linked() && height - 1 == level && - !flags.is_marked(); - } - - static void destroy(Node *node) { - node->~Node(); - std::free(node); - } - - Node *forward(size_t level) const { return tower[level].load(); } - - void forward(size_t level, Node *next) { tower[level].store(next); } - - private: - explicit Node(uint8_t height) : height(height) { - // here we assume, that the memory for N towers (N = height) has - // been allocated right after the Node structure so we need to - // initialize that memory - for (auto i = 0; i < height; ++i) - new (&tower[i]) std::atomic{nullptr}; - } - - template - Node(uint8_t height, Args &&... args) : Node(height) { - this->data.emplace(std::forward(args)...); - } - - Node(const T &data, uint8_t height) : Node(height) { this->data.set(data); } - - Node(T &&data, uint8_t height) : Node(height) { - this->data.set(std::move(data)); - } - - ~Node() { - for (auto i = 0; i < height; ++i) tower[i].~atomic(); - } - - static Node *allocate(uint8_t height) { - // [ Node ][Node*][Node*][Node*]...[Node*] - // | | | | | - // | 0 1 2 height-1 - // |----------------||-----------------------------| - // space for Node space for tower pointers - // structure right after the Node - // structure - auto size = sizeof(Node) + height * sizeof(std::atomic); - auto node = static_cast(std::malloc(size)); - - return node; - } - - utils::Placeholder data; - - /** - * this creates an array of the size zero. we can't put any sensible - * value here since we don't know what size it will be untill the - * node is allocated. we could make it a Node** but then we would - * have two memory allocations, one for node and one for the forward - * list. this way we avoid expensive malloc/free calls and also cache - * thrashing when following a pointer on the heap - */ - std::atomic tower[0]; - }; - - public: - template - class IteratorBase : public utils::Crtp { - protected: - explicit IteratorBase(Node *node) : node(node) {} - - Node *node{nullptr}; - - public: - IteratorBase() = default; - IteratorBase(const IteratorBase &) = default; - - const T &operator*() const { - DCHECK(node != nullptr) << "Node is nullptr."; - return node->value(); - } - - const T *operator->() const { - DCHECK(node != nullptr) << "Node is nullptr."; - return &node->value(); - } - - T &operator*() { - DCHECK(node != nullptr) << "Node is nullptr."; - return node->value(); - } - - T *operator->() { - DCHECK(node != nullptr) << "Node is nullptr."; - return &node->value(); - } - - operator T &() { - DCHECK(node != nullptr) << "Node is nullptr."; - return node->value(); - } - - It &operator++() { - DCHECK(node != nullptr) << "Node is nullptr."; - node = node->forward(0); - return this->derived(); - } - - bool has_next() { - DCHECK(node != nullptr) << "Node is nullptr."; - return node->forward(0) != nullptr; - } - - It &operator++(int) { return operator++(); } - - friend bool operator==(const It &a, const It &b) { - return a.node == b.node; - } - - friend bool operator!=(const It &a, const It &b) { return !(a == b); } - }; - - class ConstIterator : public IteratorBase { - friend class SkipList; - explicit ConstIterator(Node *node) : IteratorBase(node) {} - - public: - ConstIterator() = default; - ConstIterator(const ConstIterator &) = default; - - const T &operator*() const { - return IteratorBase::operator*(); - } - - const T *operator->() const { - return IteratorBase::operator->(); - } - - operator const T &() { return IteratorBase::operator T &(); } - }; - - class Iterator : public IteratorBase { - friend class SkipList; - explicit Iterator(Node *node) : IteratorBase(node) {} - - public: - Iterator() = default; - Iterator(const Iterator &) = default; - }; - - /** - @class ReverseIterator - @author Sandi Fatic - - @brief - ReverseIterator is used to iterate the skiplist in backwards. The current - implementation complexity is M*C*Log(N) where M is the number of elements - we are iterating, N the size of the skiplist and some constant C. The time - is better then using find M times. Look the benchmark for this class. - - The performance of the reverse iterator is similar to M finds, but faster - because it stores the preds and shortens the search space. - - @todo - Research possible better and faster more optimized traversals. - */ - class ReverseIterator : public utils::Crtp { - friend class SkipList; - - explicit ReverseIterator(Node *node) : node_(node) {} - - public: - ReverseIterator(SkipList *skiplist, Node *node, Node *preds[]) - : skiplist_(skiplist), node_(node) { - for (int i = 0; i < H; i++) preds_[i] = preds[i]; - if (node_ == skiplist_->header) node_ = node_->forward(0); - } - - T &operator*() { - DCHECK(node_ != nullptr) << "Node is nullptr."; - return node_->value(); - } - - T *operator->() { - DCHECK(node_ != nullptr) << "Node is nullptr."; - return &node_->value(); - } - - operator T &() { - DCHECK(node_ != nullptr) << "Node is nullptr."; - return node_->value(); - } - - ReverseIterator &operator++() { - DCHECK(node_ != nullptr) << "Node is nullptr."; - do { - next(); - } while (node_->flags.is_marked()); - return this->derived(); - } - - friend bool operator==(const ReverseIterator &a, const ReverseIterator &b) { - return a.node_ == b.node_; - } - - ReverseIterator &operator++(int) { return operator++(); } - - /** - @brief - The next() function generates the previous element in skiplist if exists. - - It uses the stored preds to find the previous element and updates the - preds to optimize the search. - */ - void next() { - int level_found = -1, curr = 0; - auto prev = preds_[0]->value(); - - Node *pred = preds_[curr]; - - // finds the level from which to start the search - while (curr < H) { - curr++; - if (pred != preds_[curr]) break; - } - - // goes on level up if possible for better performance - // not always the optimal but benchmarks are better - if (curr + 1 < H) curr++; - - while (level_found == -1 && curr < H) { - Node *pred = preds_[curr]; - for (int level = curr; level >= 0; --level) { - Node *node = pred->forward(level); - - while (greater(prev, node)) { - pred = node, node = pred->forward(level); - } - - if (level_found == -1 && !less(prev, node)) level_found = level; - - preds_[level] = pred; - } - curr++; - } - - node_ = preds_[0]; - if (node_ == skiplist_->header) node_ = node_->forward(0); - } - - bool has_next() { return node_ != skiplist_->header; } - - private: - SkipList *skiplist_; - Node *node_{nullptr}; - Node *preds_[H]; - }; - - SkipList() : header(Node::sentinel(H)) {} - - ~SkipList() { - // Someone could be using this map through an Accessor. - Node *now = header; - header = nullptr; - - while (now != nullptr) { - Node *next = now->forward(0); - Node::destroy(now); - now = next; - } - } - - friend class Accessor; - - template - class Accessor { - friend class SkipList; - - // The iterator type that will be return from non-const iterator returning - // methods (begin, end, find...). Must be ConstIterator if the SkipList is - // const, even if the Accessor is not const. - using IteratorT = typename std::conditional::value, - ConstIterator, Iterator>::type; - - protected: - explicit Accessor(TSkipList *skiplist) - : skiplist(skiplist), status_(skiplist->gc.CreateNewAccessor()) { - DCHECK(skiplist != nullptr) << "Skiplist is nullptr."; - } - - public: - Accessor(const Accessor &) = delete; - - // cppcheck-suppress uninitMemberVar - Accessor(Accessor &&other) noexcept - : skiplist(other.skiplist), status_(other.status_) { - other.skiplist = nullptr; - } - - ~Accessor() { - if (skiplist == nullptr) return; - - status_.alive_ = false; - } - - IteratorT begin() { return skiplist->begin(); } - ConstIterator begin() const { return skiplist->cbegin(); } - ConstIterator cbegin() const { return skiplist->cbegin(); } - - IteratorT end() { return skiplist->end(); } - ConstIterator end() const { return skiplist->cend(); } - ConstIterator cend() const { return skiplist->cend(); } - - ReverseIterator rbegin() { return skiplist->rbegin(); } - ReverseIterator rend() { return skiplist->rend(); } - - std::pair insert(const T &item) { - return skiplist->insert(preds, succs, item); - } - - std::pair insert(T &&item) { - return skiplist->insert(preds, succs, std::move(item)); - } - - template - std::pair emplace(K &key, Args &&... args) { - return skiplist->emplace(preds, succs, key, std::forward(args)...); - } - - template - IteratorT find(const K &item) { - return skiplist->find(item); - } - - template - ConstIterator find(const K &item) const { - return static_cast(*skiplist).find(item); - } - - template - std::pair reverse(const K &item) { - return skiplist->reverse(item); - } - - /** - * Returns an iterator pointing to the element equal to - * item, or the first larger element. - * - * @param item An item that is comparable to skiplist element type. - * @tparam TItem item type - */ - template - IteratorT find_or_larger(const TItem &item) { - return skiplist->template find_or_larger(item); - } - - /** - * Returns an iterator pointing to the element equal to - * item, or the first larger element. - * - * @param item An item that is comparable to skiplist element type. - * @tparam TItem item type - */ - template - ConstIterator find_or_larger(const TItem &item) const { - return static_cast(*skiplist) - .find_or_larger(item); - } - - /** - * Position and count estimation. Gives estimates - * on the position of the given item in this skiplist, and - * the number of identical items according to 'less'. - * - * If `item` is not contained in the skiplist, - * then the position where it would be inserted is returned - * as the position estimate, and 0 as count estimate. - * - * Position and count detection works by iterating over the - * list at a certain level. These levels can be tuned as - * a performance vs precision optimization. Lower levels mean - * higher precision, higher levels mean better performance. - * TODO: tune the levels once benchmarks are available. - * - * @param item The item for which the position is estimated. - * @param less Comparison function. It must be partially - * consistent with natural comparison of Skiplist elements: - * if `less` indicates that X is less than - * Y, then natural comparison must indicate the same. The - * reverse does not have to hold. - * @param greater Comparsion function, analogue to less. - * @param position_level_reduction - Defines at which level - * item position is estimated. Position level is defined - * as log2(skiplist->size()) - position_level_reduction. - * @param count_max_level - Defines the max level at which - * item count is estimated. - * @tparam TItem - type of item skiplist elements are compared - * to. Does not have to be the same type as skiplist element. - * @tparam TLess - less comparison function type. - * @tparam TEqual - equality comparison function type. - * @return A pair of ints where the first element is the estimated - * position of item, and the second is the estimated number - * of items that are the same according to `less`. - */ - template , - typename TEqual = std::equal_to> - std::pair position_and_count( - const TItem &item, TLess less = TLess(), TEqual equal = TEqual(), - int position_level_reduction = 10, int count_max_level = 3) { - // the level at which position will be sought - int position_level = std::max( - 0, static_cast(std::lround(std::log2(skiplist->size()))) - - position_level_reduction); - - Node *pred = skiplist->header; - Node *succ = nullptr; - - int position = 0; - for (int i = position_level; i >= 0; i--) { - // count how many towers we pass on this level, - // used for calculating item position - int tower_count = 0; - - // on the current height (i) find the last tower - // whose value is lesser than item, store it in pred - // while succ will be either skiplist end or the - // first element greater than item - succ = pred->forward(i); - while (succ && - !(less(item, succ->value()) || equal(item, succ->value()))) { - pred = succ; - succ = succ->forward(i); - tower_count++; - } - - // in the succs field we'll keep track of successors - // that are equal to item, or nullptr otherwise - succs[i] = (!succ || less(item, succ->value())) ? nullptr : succ; - - position += (1 << i) * tower_count; - } - - // if succ is nullptr, then item is greater than all elements in the list - if (succ == nullptr) return std::make_pair(size(), 0); - - // now we need to estimate the count of elements equal to item - // we'll do that by looking for the first element that is greater - // than item, and counting how far we have to look - - // first find the rightmost (highest) succ that has value == item - int count_level = 0; - for (int i = position_level; i >= 0; i--) - if (succs[i]) { - count_level = i; - break; - } - count_level = std::min(count_level, count_max_level); - succ = succs[count_level]; - - // it is possible that succ just became null (even though before - // it wasn't). that happens when item is lesser then all list elems - if (!succ) return std::make_pair(0, 0); - - // now expand to the right as long as element value == item - // at the same time accumulate count - int count = 1 << count_level; - for (; count_level >= 0; count_level--) { - Node *next = succ->forward(count_level); - while (next && !less(item, next->value())) { - succ = next; - next = next->forward(count_level); - count += 1 << count_level; - } - } - - return std::make_pair(position, count); - } - - template - bool contains(const K &item) const { - return this->find(item) != this->end(); - } - - template - bool remove(const K &item) { - return skiplist->remove(item, preds, succs); - } - - size_t size() const { return skiplist->size(); } - - template - size_t distance(const K &first, const K &second) { - return skiplist->distance(first, second); - } - - private: - TSkipList *skiplist; - Node *preds[H], *succs[H]; - typename SkipListGC::AccessorStatus &status_; - }; - - Accessor access() { return Accessor(this); } - - Accessor access() const { - return Accessor(this); - } - - private: - using guard_t = std::unique_lock; - - Iterator begin() { return Iterator(header->forward(0)); } - - ConstIterator begin() const { return ConstIterator(header->forward(0)); } - - ConstIterator cbegin() const { return ConstIterator(header->forward(0)); } - - Iterator end() { return Iterator(); } - - ReverseIterator rend() { return ReverseIterator(header->forward(0)); } - - ReverseIterator rbegin() { - Node *begin = header; - Node *preds[H]; - for (int level = H - 1; level >= 0; level--) { - preds[level] = begin; - while (begin->forward(level) != nullptr) { - begin = begin->forward(level); - preds[level] = begin; - } - } - return ReverseIterator(this, begin, preds); - } - - ConstIterator end() const { return ConstIterator(); } - - ConstIterator cend() const { return ConstIterator(); } - - size_t size() const { return count.load(); } - - template - static bool greater(const K &item, const Node *const node) { - return node && item > node->value(); - } - - template - static bool less(const K &item, const Node *const node) { - return (node == nullptr) || item < node->value(); - } - - /** - * Returns first occurence of item if there exists one. - */ - template - ConstIterator find(const K &item) const { - return const_cast(this)->find_node(item); - } - - /** - * Returns first occurence of item if there exists one. - */ - template - Iterator find(const K &item) { - return find_node(item); - } - - template - std::pair reverse(const K &item) { - auto it = find(item); - if (it == end()) { - return std::make_pair(rend(), false); - } - - // TODO why are preds created here and not reused from accessor? - Node *preds[H]; - find_path(item, preds); - return std::make_pair(ReverseIterator(this, preds[0], preds), true); - } - - template - It find_node(const K &item) const { - auto it = find_or_larger(item); - if (it.node == nullptr || item == *it) { - return std::move(it); - } else { - return It(); - } - } - - /** - * Returns iterator on searched element or the first larger element. - */ - template - It find_or_larger(const K &item) const { - Node *node = nullptr, *pred = header; - int h = static_cast(pred->height) - 1; - - while (true) { - // try to descend down first while the next key on this layer overshoots - // or the next key is marked for deletion - for (; h >= 0 && less(item, node = pred->forward(h)); --h) { - } - - // if we overshoot at every layer, item doesn't exist - if (h < 0) return It(node); - - // the item is farther to the right, continue going right as long - // as the key is greater than the current node's key - while (greater(item, node)) pred = node, node = node->forward(h); - - // check if we have a hit. if not, we need to descend down again - if (!less(item, node)) { - if (!node->flags.is_marked()) return It(node); - return It(nullptr); - } - } - } - - /** - * Finds the location in the skiplist for the given item. - * Fills up the predecessor and successor nodes if pointers - * are given. - * - * @param item - the item for which to find the location. - * @param preds - An array of predecessor nodes. Filled up with - * towers that would link to the new tower. If nullptr, it is - * ignored. - * @param succs - Like preds, for successor nodes. - * @tparam K - type of item that must be comparable to the - * type of item stored in the skiplist. - * @return - The height of the node already present in the - * skiplist, that matches the given item (is equal to it). - * Returns -1 if there is no matching item in the skiplist. - */ - template - int find_path(const K &item, Node *preds[] = nullptr, - Node *succs[] = nullptr) const { - int level_found = -1; - Node *pred = header; - - for (int level = H - 1; level >= 0; --level) { - Node *node = pred->forward(level); - - while (greater(item, node)) pred = node, node = pred->forward(level); - - if (level_found == -1 && !less(item, node)) level_found = level; - - if (preds != nullptr) preds[level] = pred; - if (succs != nullptr) succs[level] = node; - } - - return level_found; - } - - /** - @brief - Distance method approximates the distance between two elements. - - General idea of the skiplist distance function is to find the preds which - are the same for two elements and based on that calculate the distance. - With every pred which is the same the skiplist should be 2 times smaller. - - @todo - Current implementation is trivial. The mistake is quite big (up to x8) - times in some cases. - */ - template - size_t distance(const K &first, const K &second) { - auto skiplist_size = size(); - - // finds the max level of the skiplist based on the size (simple math). - auto level = static_cast(std::round(std::log2(skiplist_size))); - - // TODO - // inconsistent design, it seems that Accessor is trying to reuse nodes - // and pass the same ones to SkipList functions, why is this function - // doing it differently? - // also, why is 32 hardcoded? - Node *first_preds[32]; - Node *second_preds[32]; - - find_path(first, first_preds, nullptr); - find_path(second, second_preds, nullptr); - - for (int i = level; i >= 0; i--) - if (first_preds[i] == second_preds[i]) skiplist_size /= 2; - - return skiplist_size; - } - - template - static bool lock_nodes(uint8_t height, guard_t guards[], Node *preds[], - Node *succs[]) { - Node *prepred = nullptr; - bool valid = true; - - for (int level = 0; valid && level < height; ++level) { - Node *pred = preds[level], *succ = succs[level]; - - if (pred != prepred) - guards[level] = pred->acquire_unique(), prepred = pred; - - valid = !pred->flags.is_marked() && pred->forward(level) == succ; - - if (ADDING) - valid = valid && (succ == nullptr || !succ->flags.is_marked()); - } - - return valid; - } - - /** - * Insert an element into the skiplist. - * - * @param preds - Predecessor nodes - * @param succs - Successor nodes - * @param data - Item to insert into the skiplist - * @tparam TItem - Item type. Must be the same as skiplist item - * type . This function is templatized so it can accept - * universal references and keep the code dry. - */ - template - std::pair insert(Node *preds[], Node *succs[], TItem &&data) { - while (true) { - auto level = find_path(data, preds, succs); - - if (level != -1) { - auto found = succs[level]; - - if (found->flags.is_marked()) continue; - found->flags.wait_fully_linked(); - - return {Iterator{succs[level]}, false}; - } - - auto height = rnd(H); - guard_t guards[H]; - - // try to acquire the locks for predecessors up to the height of - // the new node. release the locks and try again if someone else - // has the locks - if (!lock_nodes(height, guards, preds, succs)) continue; - - return {insert_here(Node::create(std::forward(data), height), - preds, succs, height), - true}; - } - } - - /** - * Insert unique data - * - * NOTE: This is almost all duplicate code from insert. - */ - template - std::pair emplace(Node *preds[], Node *succs[], K &key, - Args &&... args) { - while (true) { - auto level = find_path(key, preds, succs); - - if (level != -1) { - auto found = succs[level]; - - if (found->flags.is_marked()) continue; - found->flags.wait_fully_linked(); - - return {Iterator{succs[level]}, false}; - } - - auto height = rnd(H); - guard_t guards[H]; - - // try to acquire the locks for predecessors up to the height of - // the new node. release the locks and try again if someone else - // has the locks - if (!lock_nodes(height, guards, preds, succs)) continue; - - return {insert_here(Node::emplace(height, std::forward(args)...), - preds, succs, height), - true}; - } - } - - /** - * Inserts data to specified locked location. - */ - Iterator insert_here(Node *new_node, Node *preds[], Node *succs[], - int height) { - // link the predecessors and successors, e.g. - // - // 4 HEAD ... P ------------------------> S ... NULL - // 3 HEAD ... ... P -----> NEW ---------> S ... NULL - // 2 HEAD ... ... P -----> NEW -----> S ... ... NULL - // 1 HEAD ... ... ... P -> NEW -> S ... ... ... NULL - for (uint8_t level = 0; level < height; ++level) { - new_node->forward(level, succs[level]); - preds[level]->forward(level, new_node); - } - - new_node->flags.set_fully_linked(); - count++; - - return Iterator{new_node}; - } - - /** - * Removes item found with fp with arguments skiplist, preds and succs. - */ - template - bool remove(const K &item, Node *preds[], Node *succs[]) { - Node *node = nullptr; - guard_t node_guard; - bool marked = false; - int height = 0; - - while (true) { - auto level = find_path(item, preds, succs); - - if (!marked && (level == -1 || !succs[level]->ok_delete(level))) - return false; - - if (!marked) { - node = succs[level]; - height = node->height; - node_guard = node->acquire_unique(); - - if (node->flags.is_marked()) return false; - - node->flags.set_marked(); - marked = true; - } - - guard_t guards[H]; - - if (!lock_nodes(height, guards, preds, succs)) continue; - - for (int level = height - 1; level >= 0; --level) - preds[level]->forward(level, node->forward(level)); - - gc.Collect(node); - - count--; - return true; - } - } - - /** - * number of elements - */ - std::atomic count{0}; - Node *header; - mutable SkipListGC gc; -}; diff --git a/src/data_structures/concurrent/skiplist_gc.cpp b/src/data_structures/concurrent/skiplist_gc.cpp deleted file mode 100644 index 6af50d6ba..000000000 --- a/src/data_structures/concurrent/skiplist_gc.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "skiplist_gc.hpp" - -#include -#include "utils/flag_validation.hpp" - -DEFINE_VALIDATED_HIDDEN_int32( - skiplist_gc_interval, 10, - "Interval of how often does skiplist gc run in seconds. To " - "disable set to -1.", - FLAG_IN_RANGE(-1, std::numeric_limits::max())); diff --git a/src/data_structures/concurrent/skiplist_gc.hpp b/src/data_structures/concurrent/skiplist_gc.hpp deleted file mode 100644 index 40b882ccd..000000000 --- a/src/data_structures/concurrent/skiplist_gc.hpp +++ /dev/null @@ -1,173 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include -#include - -#include "data_structures/concurrent/push_queue.hpp" - -#include "utils/executor.hpp" -#include "utils/thread/sync.hpp" - -DECLARE_int32(skiplist_gc_interval); - -/** - * @brief Garbage collects nodes. - * We are doing garbage collection by keeping track of alive accessors which - * were requested from the parent skiplist. When some prefix [id, id+n] of - * accessors becomes dead we try to empty the collection of (accessors_id, - * entry*) with the id of that last dead accessor. Each entry is added to - * collection after it has been re-linked and can't be seen by any accessors - * created after that time and that marks the safe time for deleting entry. - * @Tparam TNode - type of underlying pointer to objects which will be - * collected. - */ -template -class SkipListGC { - public: - explicit SkipListGC() { - if (GetExecutor()) { - executor_job_id_ = - GetExecutor()->RegisterJob([this]() { GarbageCollect(); }); - } - } - - ~SkipListGC() { - // We have to unregister the job because otherwise Executor might access - // some member variables of this class after it has been destructed. - if (GetExecutor()) GetExecutor()->UnRegisterJob(executor_job_id_); - for (auto it = deleted_queue_.begin(); it != deleted_queue_.end(); ++it) { - TNode::destroy(it->second); - } - deleted_queue_.begin().delete_tail(); - } - - /** - * @brief - Returns instance of executor shared between all SkipLists. - */ - auto &GetExecutor() { - // TODO: Even though executor is static, there are multiple instance: - // one for each TNode param type. We probably don't want that kind of - // behavior. Static variables with nontrivial destructor create subtle bugs - // because of their order of destruction. For example of one bug take a look - // at documentation in database/dbms.hpp. Rethink ownership and lifetime of - // executor. - static auto executor = ConstructExecutor(); - - return executor; - } - - SkipListGC(const SkipListGC &other) = delete; - SkipListGC(SkipListGC &&other) = delete; - SkipListGC operator=(const SkipListGC &other) = delete; - SkipListGC operator=(SkipListGC &&other) = delete; - - /** - * @brief - Keep track of each accessor with it's status, so we know which - * ones are alive and which ones are dead. - */ - struct AccessorStatus { - int64_t id_{-1}; - bool alive_{false}; - }; - - /** - * @brief - Creates a new accessors and returns reference to it's status. This - * method is thread-safe. - */ - AccessorStatus &CreateNewAccessor() { - std::unique_lock lock(mutex_); - accessors_.push_back({++last_accessor_id_, true}); - return accessors_.back(); - } - - /** - * @brief - Destroys objects which were previously collected and can be safely - * removed. This method is not thread-safe. - */ - void GarbageCollect() { - std::unique_lock lock(mutex_); - auto last_dead_accessor = accessors_.end(); - for (auto it = accessors_.begin(); it != accessors_.end(); ++it) { - if (it->alive_) break; - last_dead_accessor = it; - } - // We didn't find any dead accessor and that means we are not sure that we - // can delete anything. - if (last_dead_accessor == accessors_.end()) return; - // We don't need lock anymore because we are not modifying this structure - // anymore, or accessing it any further down. - const int64_t safe_id = last_dead_accessor->id_; - accessors_.erase(accessors_.begin(), ++last_dead_accessor); - lock.unlock(); - - // We can only modify this in a not-thread safe way because we are the only - // thread ever accessing it here, i.e. there is at most one thread doing - // this GarbageCollection. - - auto oldest_not_deletable = deleted_queue_.begin(); - bool delete_all = true; - for (auto it = oldest_not_deletable; it != deleted_queue_.end(); ++it) { - if (it->first > safe_id) { - oldest_not_deletable = it; - delete_all = false; - } - } - - // deleted_list is already empty, nothing to delete here. - if (oldest_not_deletable == deleted_queue_.end()) return; - - // In case we didn't find anything that we can't delete we shouldn't - // increment this because that would mean we skip over the first record - // which is ready for destruction. - if (!delete_all) ++oldest_not_deletable; - int64_t destroyed = 0; - for (auto it = oldest_not_deletable; it != deleted_queue_.end(); ++it) { - TNode::destroy(it->second); - ++destroyed; - } - oldest_not_deletable.delete_tail(); - if (destroyed) DLOG(INFO) << "Number of destroyed elements: " << destroyed; - } - - /** - * @brief - Collect object for garbage collection. Call to this method means - * that no new accessor can possibly access the object by iterating over some - * storage. - */ - void Collect(TNode *object) { - // We can afford some inaccuary here - it's possible that some new accessor - // incremented the last_accessor_id after we enter this method and as such - // we might be a bit pessimistic here. - deleted_queue_.push(last_accessor_id_.load(), object); - } - - private: - /// Constructs executor depending on flag - has to be done this way because of - /// C++14 - auto ConstructExecutor() { - std::unique_ptr executor; - if (FLAGS_skiplist_gc_interval != -1) { - executor = std::make_unique( - std::chrono::seconds(FLAGS_skiplist_gc_interval)); - } - return executor; - } - - int64_t executor_job_id_{-1}; - std::mutex mutex_; - - // List of accesssors from begin to end by an increasing id. - std::list accessors_; - std::atomic last_accessor_id_{0}; - - // List of pairs of accessor_ids and pointers to entries which should be - // destroyed sorted approximately descendingly by id. - ConcurrentPushQueue> deleted_queue_; -}; diff --git a/src/utils/atomic.hpp b/src/utils/atomic.hpp deleted file mode 100644 index 5b727635f..000000000 --- a/src/utils/atomic.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include - -namespace utils { - -/** - * Ensures that the given atomic is greater or equal to the given value. If it - * already satisfies that predicate, it's not modified. - * - * @param atomic - The atomic variable to ensure on. - * @param value - The minimal value the atomic must have after this call. - * @tparam TValue - Type of value. - */ -template -void EnsureAtomicGe(std::atomic &atomic, TValue value) { - while (true) { - auto current = atomic.load(); - if (current >= value) break; - if (atomic.compare_exchange_weak(current, value)) break; - } -} -} // namespace utils diff --git a/src/utils/crtp.hpp b/src/utils/crtp.hpp deleted file mode 100644 index cd4cc2bb8..000000000 --- a/src/utils/crtp.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -// a helper class for implementing static casting to a derived class using the -// curiously recurring template pattern - -namespace utils { - -template -struct Crtp { - Derived &derived() { return *static_cast(this); } - - const Derived &derived() const { return *static_cast(this); } -}; - -} // namespace utils diff --git a/src/utils/executor.hpp b/src/utils/executor.hpp deleted file mode 100644 index fa176b8e7..000000000 --- a/src/utils/executor.hpp +++ /dev/null @@ -1,99 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "utils/scheduler.hpp" - -namespace utils { - -/** - * @brief - Provides execution of jobs in job queue on one thread with 'pause' - * time between two consecutives starts. - */ -class Executor { - public: - template - explicit Executor(const std::chrono::duration pause) { - DCHECK(pause > std::chrono::seconds(0)) - << "Duration between executions should be reasonable"; - scheduler_.Run("Executor", pause, std::bind(&Executor::Execute, this)); - } - - ~Executor() { - // Be sure to first stop scheduler because otherwise we might destroy the - // mutex before the scheduler and that might cause problems since mutex is - // used in Execute method passed to scheduler along with jobs vector. - scheduler_.Stop(); - } - - Executor(Executor &&e) = delete; - Executor &operator=(Executor &&) = delete; - Executor(const Executor &e) = delete; - Executor &operator=(const Executor &) = delete; - - /** - * @brief - Add function to job queue. - */ - int64_t RegisterJob(const std::function &f) { - { - std::unique_lock lock(update_mutex_); - id_job_pairs_.emplace_back(std::make_pair(++count_, f)); - return id_job_pairs_.back().first; - } - } - - /** - * @brief - Remove id from job queue. - */ - void UnRegisterJob(const int64_t id) { - { - // First wait for execute lock and then for the update lock because - // execute lock will be unavailable for longer and there is no point in - // blocking other threads with update lock. - std::unique_lock execute_lock(execute_mutex_); - std::unique_lock update_lock(update_mutex_); - - for (auto id_job_pair_it = id_job_pairs_.begin(); - id_job_pair_it != id_job_pairs_.end(); ++id_job_pair_it) { - if (id_job_pair_it->first == id) { - id_job_pairs_.erase(id_job_pair_it); - return; - } - } - } - } - - private: - /** - * @brief - Execute method executes jobs from id_job_pairs vector. - * The reason for doing double locking is the following: we don't want to - * block creation of new jobs since that will slow down all of memgraph so we - * use a special lock for job update. Execute lock is here so that we can - * guarantee that after some job is unregistered it's also stopped. - */ - void Execute() { - std::unique_lock execute_lock(execute_mutex_); - std::vector>> id_job_pairs; - - // Acquire newest current version of jobs but being careful not to access - // the vector in corrupt state. - { - std::unique_lock update_lock(update_mutex_); - id_job_pairs = id_job_pairs_; - } - - for (auto id_job_pair : id_job_pairs) { - id_job_pair.second(); - } - } - - int64_t count_{0}; - std::mutex execute_mutex_; - std::mutex update_mutex_; - Scheduler scheduler_; - std::vector>> id_job_pairs_; -}; - -} // namespace utils diff --git a/src/utils/placeholder.hpp b/src/utils/placeholder.hpp deleted file mode 100644 index ee5ce7cb4..000000000 --- a/src/utils/placeholder.hpp +++ /dev/null @@ -1,97 +0,0 @@ -#pragma once - -#include - -#include "glog/logging.h" - -#include - -namespace utils { - -/** - * @class Placeholder - * - * @brief - * Placeholder is used to allocate memory for an object on heap providing - * methods for setting and getting the object and making sure that the - * object is initialized. - * - * @tparam T type of object to be wrapped in the placeholder - */ - -template -class Placeholder { - public: - Placeholder() = default; - - Placeholder(Placeholder &) = delete; - Placeholder(Placeholder &&) = delete; - - /** - * The destructor automatically calls the wrapped objects destructor. - */ - ~Placeholder() { - if (initialized) get().~T(); - }; - - /** - * @return returns true if object is set in memory otherwise false. - */ - bool is_initialized() { return initialized; } - - T &get() noexcept { - DCHECK(initialized) << "Placeholder object not initialized"; - return *data._M_ptr(); - } - - /** - * @return const reference to object. - */ - const T &get() const noexcept { - DCHECK(initialized) << "Placeholder object not initialized"; - return *data._M_ptr(); - } - - /** - * Sets item in allocated memory and sets the initialized flag. - * - * @param T& item reference to the item initialized in allocated memory - */ - void set(const T &item) { - DCHECK(!initialized) << "Placeholder object already initialized"; - new (data._M_addr()) T(item); - initialized = true; - } - - /** - * Moves item to allocated memory and sets initialized flag.1 - * - * @param T&& rvalue reference to the item which is moved to allocated memory - */ - void set(T &&item) { - DCHECK(!initialized) << "Placeholder object already initialized"; - new (data._M_addr()) T(std::move(item)); - initialized = true; - } - - /** - * Emplaces item to allocated memory and calls the Constructor with specified - * arguments. - * - * @tparam Args type of arguments to be passed to the objects constructor. - * @param Parameters passed to the objects constructor. - */ - template - void emplace(Args &&... args) { - DCHECK(!initialized) << "Placeholder object already initialized"; - new (data._M_addr()) T(args...); - initialized = true; - } - - private: - // libstd aligned buffer struct - __gnu_cxx::__aligned_buffer data; - bool initialized = false; -}; - -} // namespace utils diff --git a/src/utils/random/fast_binomial.hpp b/src/utils/random/fast_binomial.hpp deleted file mode 100644 index 4c23644e2..000000000 --- a/src/utils/random/fast_binomial.hpp +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#include "utils/likely.hpp" -#include "utils/random/xorshift128plus.hpp" - -namespace utils::random { - -template -class FastBinomial { - // fast binomial draws coin tosses from a single generated random number - // let's draw a random 4 bit number and count trailing ones - // - // 1 0000 -> 1 = - // 2 0001 -> 2 == 8 x = p = 8/16 = 1/2 - // 3 0010 -> 1 = 4 x == p = 4/16 = 1/4 p_total = 15/16 - // 4 0011 -> 3 === 2 x === p = 2/16 = 1/8 - // 5 0100 -> 1 = 1 x ==== p = 1/16 = 1/16 - // 6 0101 -> 2 == -------------------------- - // 7 0110 -> 1 = 1 x ===== p = 1/16 invalid value, retry! - // 8 0111 -> 4 ==== - // 9 1000 -> 1 = - // 10 1001 -> 2 == - // 11 1010 -> 1 = - // 12 1011 -> 3 === - // 13 1100 -> 1 = - // 14 1101 -> 2 == - // 15 1110 -> 1 = - // ------------------ - // 16 1111 -> 5 ===== - - public: - /** - * Return random number X between 1 and tparam N with probability 2^-X. - */ - unsigned operator()(const int n) { - while (true) { - // couting trailing ones is equal to counting trailing zeros - // since the probability for both is 1/2 and we're going to - // count zeros because they are easier to work with - - // generate a random number - auto x = random() & mask(n); - - // if we have all zeros, then we have an invalid case and we - // need to generate again, we have this every (1/2)^N times - // so therefore we could say it's very unlikely to happen for - // large N. e.g. N = 32; p = 2.328 * 10^-10 - if (UNLIKELY(!x)) continue; - - // ctzl = count trailing zeros from long - // ^ ^ ^ ^ - return __builtin_ctzl(x) + 1; - } - } - - private: - uint64_t mask(const int n) { return (1ULL << n) - 1; } - R random; -}; - -} // namespace utils::random diff --git a/src/utils/random/xorshift128plus.hpp b/src/utils/random/xorshift128plus.hpp deleted file mode 100644 index b869b43b7..000000000 --- a/src/utils/random/xorshift128plus.hpp +++ /dev/null @@ -1,60 +0,0 @@ -#pragma once - -#include -#include - -namespace utils::random { - -/* Xorshift algorithm (plus variant) - * - * This is the fastest generator passing BigCrush without systematic failures, - * but due to the relatively short period it is acceptable only for - * applications with a mild amount of parallelism, otherwise, use a - * xorshift1024* generator. - */ -struct Xorshift128plus { - public: - Xorshift128plus() { - // use a slow, more complex rnd generator to initialize a fast one - // make sure to call this before requesting any random numbers! - - // NOTE: Valgird complanis to next instruction - std::random_device rd; - std::mt19937_64 gen(rd()); - // std::mt19937_64 gen(time(0)); - - std::uniform_int_distribution dist; - - // the number generated by MT can be full of zeros and xorshift - // doesn't like this so we use MurmurHash3 64bit finalizer to - // make it less biased - s[0] = avalance(dist(gen)); - s[1] = avalance(dist(gen)); - } - - uint64_t operator()() { - uint64_t s1 = s[0]; - const uint64_t s0 = s[1]; - - s[0] = s0; - s1 ^= s1 << 23; - - return (s[1] = (s1 ^ s0 ^ (s1 >> 17) ^ (s0 >> 26))) + s0; - } - - private: - uint64_t s[2]; - - uint64_t avalance(uint64_t s) { - // MurmurHash3 finalizer - s ^= s >> 33; - s *= 0xff51afd7ed558ccd; - s ^= s >> 33; - s *= 0xc4ceb9fe1a85ec53; - s ^= s >> 33; - - return s; - } -}; - -} // namespace utils::random diff --git a/src/utils/thread/sync.hpp b/src/utils/thread/sync.hpp deleted file mode 100644 index c5b8de8fc..000000000 --- a/src/utils/thread/sync.hpp +++ /dev/null @@ -1,47 +0,0 @@ -/// @file -#pragma once - -#include - -#include "utils/exceptions.hpp" -#include "utils/spin_lock.hpp" - -namespace utils { - -/// Improves contention in spinlocks by hinting the processor that we're in a -/// spinlock and not doing much. -inline void CpuRelax() { - // if IBMPower - // HMT_very_low() - // http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc - asm("PAUSE"); -} - -class LockTimeoutException : public BasicException { - public: - using BasicException::BasicException; -}; - -/// Lockable is used as an custom implementation of a mutex mechanism. -/// -/// It is implemented as a wrapper around std::lock_guard and std::unique_guard -/// with a default lock called Spinlock. -/// -/// @tparam lock_t type of lock to be used (default = Spinlock) -template -class Lockable { - public: - using lock_type = lock_t; - - std::lock_guard acquire_guard() const { - return std::lock_guard(lock); - } - - std::unique_lock acquire_unique() const { - return std::unique_lock(lock); - } - - mutable lock_t lock; -}; - -} // namespace utils diff --git a/src/utils/total_ordering.hpp b/src/utils/total_ordering.hpp deleted file mode 100644 index 177424c19..000000000 --- a/src/utils/total_ordering.hpp +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -namespace utils { - -/** - * Implements all the logical comparison operators based on '==' - * and '<' operators. - * - * @tparam TLhs First operand type. - * @tparam TRhs Second operand type. Defaults to the same type - * as first operand. - * @tparam TReturn Return type, defaults to bool. - */ -template -struct TotalOrdering { - protected: - ~TotalOrdering() {} - - public: - friend constexpr TReturn operator!=(const TLhs &a, const TRhs &b) { - return !(a == b); - } - - friend constexpr TReturn operator<=(const TLhs &a, const TRhs &b) { - return a < b || a == b; - } - - friend constexpr TReturn operator>(const TLhs &a, const TRhs &b) { - return !(a <= b); - } - - friend constexpr TReturn operator>=(const TLhs &a, const TRhs &b) { - return !(a < b); - } -}; - -} // namespace utils diff --git a/tests/concurrent/CMakeLists.txt b/tests/concurrent/CMakeLists.txt index 0e0dc26ff..24d176042 100644 --- a/tests/concurrent/CMakeLists.txt +++ b/tests/concurrent/CMakeLists.txt @@ -26,9 +26,6 @@ target_link_libraries(${test_prefix}network_server mg-communication) add_concurrent_test(network_session_leak.cpp) target_link_libraries(${test_prefix}network_session_leak mg-communication) -add_concurrent_test(push_queue.cpp) -target_link_libraries(${test_prefix}push_queue glog gflags Threads::Threads) - add_concurrent_test(stack.cpp) target_link_libraries(${test_prefix}stack mg-utils) diff --git a/tests/concurrent/push_queue.cpp b/tests/concurrent/push_queue.cpp deleted file mode 100644 index 37785611a..000000000 --- a/tests/concurrent/push_queue.cpp +++ /dev/null @@ -1,191 +0,0 @@ -#include -#include - -#include "gmock/gmock.h" -#include "gtest/gtest.h" - -#include "data_structures/concurrent/push_queue.hpp" - -class IntQueue : public ::testing::Test { - protected: - ConcurrentPushQueue cpq; - - void AddElems(int count) { - for (int i = 0; i < count; i++) cpq.push(i); - } - - int CountIterations() { - int rval = 0; - for ([[gnu::unused]] int x : cpq) rval++; - return rval; - } -}; - -TEST_F(IntQueue, EmptyQueueAndPush) { - ConcurrentPushQueue cpq; - EXPECT_EQ(cpq.size(), 0); - cpq.push(1); - EXPECT_EQ(cpq.size(), 1); - cpq.push(1); - EXPECT_EQ(cpq.size(), 2); -} - -TEST_F(IntQueue, Size) { - AddElems(10); - EXPECT_EQ(cpq.size(), 10); -} - -TEST_F(IntQueue, ConcurrentPush) { - // perform 1000 insertions in 50 concurrent threads - std::vector threads; - for (int i = 0; i < 50; i++) - threads.emplace_back([&]() { - for (int i = 0; i < 1000; i++) cpq.push(i); - }); - - for (auto &thread : threads) thread.join(); - EXPECT_EQ(cpq.size(), 50000); - EXPECT_EQ(CountIterations(), 50000); -} - -TEST_F(IntQueue, IteratorBegin) { - AddElems(5); - auto it = cpq.begin(); - EXPECT_EQ(*it, 4); -} - -TEST_F(IntQueue, IteratorPrefixIncrement) { - AddElems(3); - auto it = cpq.begin(); - EXPECT_EQ(*(++it), 1); - EXPECT_EQ(*it, 1); -} - -TEST_F(IntQueue, IteratorPostfixIncrement) { - AddElems(3); - auto it = cpq.begin(); - EXPECT_EQ(*it++, 2); - EXPECT_EQ(*it, 1); -} - -TEST_F(IntQueue, IteratorEquality) { - AddElems(5); - auto it = cpq.begin(); - EXPECT_EQ(it, cpq.begin()); - it++; - EXPECT_NE(it, cpq.begin()); -} - -TEST_F(IntQueue, IteratorEnd) { - AddElems(5); - auto it = cpq.begin(); - EXPECT_NE(it, cpq.end()); - for (int i = 0; i < 5; i++) it++; - EXPECT_EQ(it, cpq.end()); -} - -TEST_F(IntQueue, IteratorCopy) { - AddElems(5); - auto it = cpq.begin(); - auto it_copy = it; - EXPECT_EQ(it, it_copy); - it++; - EXPECT_NE(it, it_copy); - EXPECT_EQ(*it, 3); - EXPECT_EQ(*it_copy, 4); -} - -TEST_F(IntQueue, IteratorMove) { - AddElems(5); - auto it = cpq.begin(); - it++; - EXPECT_EQ(*it, 3); - decltype(it) it_moved = std::move(it); - EXPECT_EQ(*it_moved++, 3); - EXPECT_EQ(*it_moved, 2); -} - -TEST_F(IntQueue, IteratorDeleteTail) { - AddElems(13); - ASSERT_EQ(cpq.size(), 13); - ASSERT_EQ(CountIterations(), 13); - auto it = cpq.begin(); - for (int i = 0; i < 5; i++) it++; - EXPECT_EQ(it.delete_tail(), 8); - EXPECT_EQ(it, cpq.end()); - - ASSERT_EQ(cpq.size(), 5); - ASSERT_EQ(CountIterations(), 5); - auto it2 = cpq.begin(); - EXPECT_EQ(it2.delete_tail(), 5); - EXPECT_EQ(it2, cpq.end()); - EXPECT_EQ(cpq.size(), 0); - EXPECT_EQ(CountIterations(), 0); - EXPECT_EQ(cpq.begin(), cpq.end()); -} - -TEST_F(IntQueue, IteratorDeleteTailConcurrent) { - // we will be deleting the whole queue (tail-delete on head) - // while other threads are pushing to the queue. - // we'll also ensure that head gets updated in the queue, - // and delete_tail from an iterator that believes it's the head - size_t kElems = 500; - size_t kThreads = 100; - size_t kMillis = 2; - std::vector threads; - for (size_t i = 0; i < kThreads; i++) - threads.emplace_back([&]() { - for (size_t i = 0; i < kElems; i++) { - cpq.push(i); - std::this_thread::sleep_for(std::chrono::milliseconds(kMillis)); - } - }); - - size_t deletions = 0; - while (deletions < kThreads * kElems) { - auto head_it = cpq.begin(); - // sleep till some thread inserts - std::this_thread::sleep_for(std::chrono::milliseconds(5 * kMillis)); - deletions += head_it.delete_tail(); - } - - // those threads should be done, but join anyway to avoid aborts - // due to thread object destruction - for (auto &thread : threads) thread.join(); - - EXPECT_EQ(cpq.size(), 0); - EXPECT_EQ(CountIterations(), 0); -} - -TEST(ConcurrentPushQueue, RvalueLvalueElements) { - ConcurrentPushQueue cpq; - cpq.push(std::string("rvalue")); - std::string lvalue("lvalue"); - cpq.push(lvalue); - std::vector expected; - for (auto &elem : cpq) expected.emplace_back(elem); - EXPECT_THAT(expected, ::testing::ElementsAre("lvalue", "rvalue")); -} - -TEST(ConcurrentPushQueue, Emplace) { - // test with atomic because it's not copy/move constructable - ConcurrentPushQueue> cpq; - cpq.push(3); - EXPECT_EQ(cpq.size(), 1); - EXPECT_EQ(cpq.begin()->load(), 3); - cpq.begin().delete_tail(); - EXPECT_EQ(cpq.size(), 0); -} - -TEST_F(IntQueue, ConstQueue) { - AddElems(5); - - auto const_queue_accepting = [](const ConcurrentPushQueue &const_queue) { - EXPECT_EQ(const_queue.size(), 5); - int count = 0; - for ([[gnu::unused]] int x : const_queue) count++; - EXPECT_EQ(count, 5); - }; - - const_queue_accepting(cpq); -} diff --git a/tests/macro_benchmark/clients/common.hpp b/tests/macro_benchmark/clients/common.hpp index a02c29583..32e91aaec 100644 --- a/tests/macro_benchmark/clients/common.hpp +++ b/tests/macro_benchmark/clients/common.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -10,7 +11,7 @@ #include "communication/bolt/v1/value.hpp" #include "utils/algorithm.hpp" #include "utils/exceptions.hpp" -#include "utils/thread/sync.hpp" +#include "utils/spin_lock.hpp" #include "utils/timer.hpp" using communication::ClientContext; @@ -77,9 +78,8 @@ std::pair ExecuteNTimesTillSuccess( } utils::Timer t; std::chrono::microseconds to_sleep(rand_dist_(pseudo_rand_gen_)); - while (t.Elapsed() < to_sleep) { - utils::CpuRelax(); - } + while (t.Elapsed() < to_sleep) + ; } } } diff --git a/tests/macro_benchmark/clients/query_client.cpp b/tests/macro_benchmark/clients/query_client.cpp index 1081eb597..f514238f9 100644 --- a/tests/macro_benchmark/clients/query_client.cpp +++ b/tests/macro_benchmark/clients/query_client.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -6,8 +7,8 @@ #include #include "utils/algorithm.hpp" +#include "utils/spin_lock.hpp" #include "utils/string.hpp" -#include "utils/thread/sync.hpp" #include "utils/timer.hpp" #include "common.hpp" diff --git a/tests/manual/CMakeLists.txt b/tests/manual/CMakeLists.txt index 9cb2a44b1..85e5a0c04 100644 --- a/tests/manual/CMakeLists.txt +++ b/tests/manual/CMakeLists.txt @@ -24,9 +24,6 @@ target_link_libraries(${test_prefix}antlr_sigsegv gtest gtest_main add_manual_test(antlr_tree_pretty_print.cpp) target_link_libraries(${test_prefix}antlr_tree_pretty_print antlr_opencypher_parser_lib) -add_manual_test(binomial.cpp) -target_link_libraries(${test_prefix}binomial mg-utils) - add_manual_test(bolt_client.cpp) target_link_libraries(${test_prefix}bolt_client mg-communication) @@ -63,7 +60,3 @@ target_link_libraries(${test_prefix}ssl_client mg-communication) add_manual_test(ssl_server.cpp) target_link_libraries(${test_prefix}ssl_server mg-communication) - -add_manual_test(xorshift.cpp) -target_link_libraries(${test_prefix}xorshift mg-utils) - diff --git a/tests/manual/binomial.cpp b/tests/manual/binomial.cpp deleted file mode 100644 index dc1518ba5..000000000 --- a/tests/manual/binomial.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* Plots the distribution histogram of the fast_binomial algorithm - * (spoiler alert: it's pleasingly (1/2)^N all the way :D) - */ -#include -#include -#include -#include -#include - -#include -#include - -#include "utils/random/fast_binomial.hpp" - -static constexpr unsigned B = 24; -static thread_local utils::random::FastBinomial<> rnd; - -static constexpr unsigned M = 4; -static constexpr size_t N = 1ULL << 34; -static constexpr size_t per_thread_iters = N / M; - -std::array, B> buckets; - -void generate() { - for (size_t i = 0; i < per_thread_iters; ++i) - buckets[rnd(B) - 1].fetch_add(1); -} - -int main(void) { - struct winsize w; - ioctl(STDOUT_FILENO, TIOCGWINSZ, &w); - - auto bar_len = w.ws_col - 20; - - std::array threads; - - for (auto &bucket : buckets) bucket.store(0); - - for (auto &t : threads) t = std::thread([]() { generate(); }); - - for (auto &t : threads) t.join(); - - auto max = std::accumulate( - buckets.begin(), buckets.end(), (uint64_t)0, - [](auto &acc, auto &x) { return std::max(acc, x.load()); }); - - std::cout << std::fixed; - - for (size_t i = 0; i < buckets.size(); ++i) { - auto x = buckets[i].load(); - auto rel = bar_len * x / max; - - std::cout << std::setw(2) << i + 1 << " "; - - for (size_t i = 0; i < rel; ++i) std::cout << "="; - - std::cout << " " << 100 * (double)x / N << "%" << std::endl; - } - - return 0; -} diff --git a/tests/manual/xorshift.cpp b/tests/manual/xorshift.cpp deleted file mode 100644 index ce4efd591..000000000 --- a/tests/manual/xorshift.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* Plots the distribution histogram of the xorshift algorithm - * (spoiler alert: it's pleasingly uniform all the way :D) - */ -#include -#include -#include -#include - -#include "glog/logging.h" - -#include -#include - -#include "utils/random/xorshift128plus.hpp" - -static thread_local utils::random::Xorshift128plus rnd; -static constexpr unsigned B = 1 << 10; -static constexpr uint64_t K = (uint64_t)(-1) / B; - -static constexpr unsigned M = 4; -static constexpr size_t N = 1ULL << 34; -static constexpr size_t per_thread_iters = N / M; - -std::array, B> buckets; - -void generate() { - for (size_t i = 0; i < per_thread_iters; ++i) buckets[rnd() / K].fetch_add(1); -} - -int main(void) { - struct winsize w; - ioctl(STDOUT_FILENO, TIOCGWINSZ, &w); - - auto bar_len = w.ws_col - 20; - - std::array threads; - - for (auto &bucket : buckets) bucket.store(0); - - for (auto &t : threads) t = std::thread([]() { generate(); }); - - for (auto &t : threads) t.join(); - - auto max = std::accumulate( - buckets.begin(), buckets.end(), 0u, - [](auto &acc, auto &x) { return std::max(acc, x.load()); }); - DCHECK(max != 0u) << "max is 0."; - - std::cout << std::fixed; - - for (auto &bucket : buckets) { - auto x = bucket.load(); - auto rel = bar_len * x / max; - - for (size_t i = 0; i < rel; ++i) std::cout << "="; - - std::cout << " " << 100.0 * x / N * B - 100 << "%" << std::endl; - } - - return 0; -} diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 188dc1d72..6ba297cdd 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -158,9 +158,6 @@ target_link_libraries(${test_prefix}utils_algorithm mg-utils) add_unit_test(utils_exceptions.cpp) target_link_libraries(${test_prefix}utils_exceptions mg-utils) -add_unit_test(utils_executor.cpp) -target_link_libraries(${test_prefix}utils_executor mg-utils) - add_unit_test(utils_file.cpp) target_link_libraries(${test_prefix}utils_file mg-utils) diff --git a/tests/unit/ring_buffer.cpp b/tests/unit/ring_buffer.cpp index d3b746b6d..8dc02bd49 100644 --- a/tests/unit/ring_buffer.cpp +++ b/tests/unit/ring_buffer.cpp @@ -1,10 +1,11 @@ +#include #include #include #include "gtest/gtest.h" #include "data_structures/ring_buffer.hpp" -#include "utils/thread/sync.hpp" +#include "utils/spin_lock.hpp" TEST(RingBuffer, MultithreadedUsage) { auto test_f = [](int producer_count, int elems_per_producer, diff --git a/tests/unit/utils_executor.cpp b/tests/unit/utils_executor.cpp deleted file mode 100644 index a2a0ab8cd..000000000 --- a/tests/unit/utils_executor.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include -#include - -#include "gtest/gtest.h" - -#include "utils/executor.hpp" - -TEST(Executor, Run) { - std::atomic count{0}; - { - utils::Executor exec(std::chrono::milliseconds(500)); - // Be sure executor is sleeping. - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - - exec.RegisterJob([&count]() { ++count; }); - exec.RegisterJob([&count]() { ++count; }); - exec.RegisterJob([&count]() { ++count; }); - - // Be sure executor execute thread is triggered - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - EXPECT_EQ(count, 3); -} - -TEST(Executor, RunUnregister) { - std::atomic count1{0}; - std::atomic count2{0}; - { - utils::Executor exec(std::chrono::milliseconds(500)); - // Be sure executor is sleeping. - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - auto job = exec.RegisterJob([&count1]() { ++count1; }); - exec.RegisterJob([&count2]() { ++count2; }); - - // Be sure executor execute thread is triggered - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - exec.UnRegisterJob(job); - - // Be sure executor execute thread is triggered - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - EXPECT_EQ(count1, 1); - EXPECT_EQ(count2, 2); -}