Remove leftover implementation required by the old storage

Reviewers: teon.banek, ipaljak

Reviewed By: teon.banek, ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2687
This commit is contained in:
Matej Ferencevic 2020-02-25 16:31:58 +01:00
parent 565927631f
commit 0e95934719
24 changed files with 8 additions and 2516 deletions

View File

@ -1,218 +0,0 @@
#pragma once
#include <atomic>
#include "glog/logging.h"
/**
* A sequentially ordered non-unique lock-free concurrent collection of bits.
*
* Grows dynamically to accomodate the maximum set-bit position. Does not
* dynamically decrease in size.
*
* Bits can be set, retrieved and cleared in groups. Note that all group
* operations fail if the group spans multiple basic storage units. For example,
* if basic storage is in 8 bits, calling at(5, 2) is legal, but calling at(7,
* 2) is not.
*
* @tparam block_t - Basic storage type. Must have lock-free atomic. Should
* probably always be uint_8.
* @tparam chunk_size - number of bits in one chunk
*/
template <class block_t = uint8_t, size_t chunk_size = 32768>
class DynamicBitset {
// Basic storage unit.
struct Block {
Block() = default;
Block(const Block &) = delete;
Block(Block &&) = delete;
Block &operator=(const Block &) = delete;
Block &operator=(Block &&) = delete;
// The number of bits in one Block.
static constexpr size_t kSize = sizeof(block_t) * 8;
block_t at(size_t k, size_t n) const {
DCHECK(k + n - 1 < kSize) << "Invalid index.";
return (block_.load() >> k) & bitmask(n);
}
void set(size_t k, size_t n) {
DCHECK(k + n - 1 < kSize) << "Invalid index.";
block_.fetch_or(bitmask(n) << k);
}
void clear(size_t k, size_t n) {
DCHECK(k + n - 1 < kSize) << "Invalid index.";
block_.fetch_and(~(bitmask(n) << k));
}
private:
std::atomic<block_t> block_{0};
constexpr block_t bitmask(size_t group_size) const {
return (block_t)(-1) >> (kSize - group_size);
}
};
struct Chunk {
Chunk(Chunk *next, int64_t chunk_id) : next_(next), chunk_id_(chunk_id) {
static_assert(chunk_size % Block::kSize == 0,
"chunk size not divisible by block size");
}
Chunk(const Chunk &) = delete;
Chunk(Chunk &&) = delete;
Chunk &operator=(const Chunk &) = delete;
Chunk &operator=(Chunk &&) = delete;
// The number of bits in one chunk.
static constexpr size_t kSize = chunk_size;
// The number of blocks_ in one chunk.
static constexpr size_t kNumBlocks = chunk_size / Block::kSize;
block_t at(size_t k, size_t n) const {
return blocks_[k / Block::kSize].at(k % Block::kSize, n);
}
void set(size_t k, size_t n) {
blocks_[k / Block::kSize].set(k % Block::kSize, n);
}
void clear(size_t k, size_t n) {
blocks_[k / Block::kSize].clear(k % Block::kSize, n);
}
// Range of the bits stored in this chunk is [low, high>.
int64_t low() const { return chunk_id_ * kSize; }
int64_t high() const { return (chunk_id_ + 1) * kSize; }
Block blocks_[kNumBlocks];
std::atomic<Chunk *> next_;
const int64_t chunk_id_;
};
public:
DynamicBitset() {}
// Can't move nor copy a DynamicBitset because of atomic head and locking.
DynamicBitset(const DynamicBitset &) = delete;
DynamicBitset(DynamicBitset &&) = delete;
DynamicBitset &operator=(const DynamicBitset &) = delete;
DynamicBitset &operator=(DynamicBitset &&) = delete;
~DynamicBitset() {
auto now = head_.load();
while (now != nullptr) {
auto next = now->next_.load();
delete now;
now = next;
}
}
/**
* Gets the block of bit starting at bit k and containing the the following n
* bits. The bit index with k in this bitset is zeroth bit in the returned
* value.
*/
block_t at(size_t k, size_t n) const {
if (k >= head_.load()->high()) return 0;
const auto &chunk = FindChunk(k);
return chunk.at(k, n);
}
/** Returns k-th bit's value. */
bool at(size_t k) const { return at(k, 1); }
/**
* Set all the bits in the group of size `n`, starting from bit `k`.
*/
void set(size_t k, size_t n = 1) {
auto &chunk = FindOrCreateChunk(k);
return chunk.set(k, n);
}
/**
* Clears all the bits in the group of size `n`, starting from bit `k`.
*/
void clear(size_t k, size_t n = 1) {
// If desired bit is out of bounds, it's already clear.
if (k >= head_.load()->high()) return;
auto &chunk = FindOrCreateChunk(k);
return chunk.clear(k, n);
}
/**
* Deletes all blocks which contain all positions lower than pos, assumes that
* there doesn't exist a pointer to those blocks, i.e. it's safe to delete
* them
*/
void delete_prefix(size_t pos) {
// Never delete head as that might invalidate the whole structure which
// depends on head being available
Chunk *last = head_.load();
Chunk *chunk = last->next_;
// High is exclusive endpoint of interval
while (chunk != nullptr && chunk->high() > pos) {
last = chunk;
chunk = chunk->next_;
}
if (chunk != nullptr) {
// Unlink from last
last->next_ = nullptr;
// Deletes chunks
Chunk *next;
while (chunk) {
next = chunk->next_;
delete chunk;
chunk = next;
}
}
}
private:
// Finds the chunk to which k-th bit belongs fails if k is out of bounds.
const Chunk &FindChunk(size_t &k) const {
DCHECK(k < head_.load()->high()) << "Index out of bounds";
Chunk *chunk = head_;
while (k < chunk->low()) {
chunk = chunk->next_;
CHECK(chunk != nullptr) << "chunk is nullptr";
}
k -= chunk->low();
return *chunk;
}
/**
* Finds the chunk to which k-th bit belong. If k is out of bounds, the chunk
* for it is created.
*/
Chunk &FindOrCreateChunk(size_t &k) {
Chunk *head = head_;
while (k >= head->high()) {
// The next chunk does not exist and we need it, so we will try to create
// it.
Chunk *new_head = new Chunk(head, head->chunk_id_ + 1);
if (!head_.compare_exchange_strong(head, new_head)) {
// Other thread updated head_ before us, so we need to delete new_head.
head = head_;
delete new_head;
continue;
}
}
// Now we are sure chunk exists and we can call find function.
// const_cast is used to avoid code duplication.
return const_cast<Chunk &>(FindChunk(k));
}
std::atomic<Chunk *> head_{new Chunk(nullptr, 0)};
};

View File

@ -1,89 +0,0 @@
#pragma once
#include "data_structures/concurrent/skiplist.hpp"
#include "utils/total_ordering.hpp"
/// Thread-safe map intended for high concurrent throughput.
///
/// @tparam TKey is a type of key.
/// @tparam TValue is a type of data.
template <typename TKey, typename TValue>
class ConcurrentMap {
/// At item in the concurrent map. A pair of <TKey, TValue> that compares on
/// the first value (key). Comparable to another Item, or only to the TKey.
class Item : public utils::TotalOrdering<Item>,
public utils::TotalOrdering<TKey, Item>,
public utils::TotalOrdering<Item, TKey>,
public std::pair<const TKey, TValue> {
public:
using std::pair<const TKey, TValue>::pair;
friend constexpr bool operator<(const Item &lhs, const Item &rhs) {
return lhs.first < rhs.first;
}
friend constexpr bool operator==(const Item &lhs, const Item &rhs) {
return lhs.first == rhs.first;
}
friend constexpr bool operator<(const TKey &lhs, const Item &rhs) {
return lhs < rhs.first;
}
friend constexpr bool operator==(const TKey &lhs, const Item &rhs) {
return lhs == rhs.first;
}
friend constexpr bool operator<(const Item &lhs, const TKey &rhs) {
return lhs.first < rhs;
}
friend constexpr bool operator==(const Item &lhs, const TKey &rhs) {
return lhs.first == rhs;
}
};
using Iterator = typename SkipList<Item>::Iterator;
public:
ConcurrentMap() {}
template <typename TSkipList>
class Accessor : public SkipList<Item>::template Accessor<TSkipList> {
public:
friend class ConcurrentMap;
using SkipList<Item>::template Accessor<TSkipList>::Accessor;
std::pair<Iterator, bool> insert(const TKey &key, const TValue &data) {
return SkipList<Item>::template Accessor<TSkipList>::insert(
Item(key, data));
}
std::pair<Iterator, bool> insert(const TKey &key, TValue &&data) {
return SkipList<Item>::template Accessor<TSkipList>::insert(
Item(key, std::move(data)));
}
std::pair<Iterator, bool> insert(TKey &&key, TValue &&data) {
return SkipList<Item>::template Accessor<TSkipList>::insert(
Item(std::forward<TKey>(key), std::forward<TValue>(data)));
}
template <class... Args1, class... Args2>
std::pair<Iterator, bool> emplace(const TKey &key,
std::tuple<Args1...> first_args,
std::tuple<Args2...> second_args) {
return SkipList<Item>::template Accessor<TSkipList>::emplace(
key, std::piecewise_construct,
std::forward<std::tuple<Args1...>>(first_args),
std::forward<std::tuple<Args2...>>(second_args));
}
};
auto access() { return Accessor<SkipList<Item>>(&skiplist); }
auto access() const { return Accessor<const SkipList<Item>>(&skiplist); }
private:
SkipList<Item> skiplist;
};

View File

@ -1,180 +0,0 @@
#pragma once
#include <atomic>
#include <mutex>
#include "glog/logging.h"
/** @brief A queue with lock-free concurrent push and
* single-threaded deletion.
*
* Deletions are done via the iterator. Currently only
* tail-deletion is supported, but individual element
* deletion should be possible to implement.
* Iteration can also be concurrent, as long as nobody
* is deleting.
*
* @tparam TElement Type of element stored in the
* queue.
*/
template <typename TElement>
class ConcurrentPushQueue {
private:
// The queue is implemented as as singly-linked list
// and this is one list element
class Node {
public:
// constructor that accepts only arguments for
// creating the element. it can accept both rvalue and lvalue
// refs to an element, or arguments for emplace creation of
// an element
template <typename... TArgs>
explicit Node(TArgs &&... args) : element_(std::forward<TArgs>(args)...) {}
// element itself and pointer to next Node
TElement element_;
Node *next_{nullptr};
};
/** @brief Iterator over the queue.
*
* Exposes standard forward-iterator ops, plus the
* delete_tail() function.
*
* @tparam TQueue - either const or non-const ConcurrentPushQueue
*/
template <typename TQueue>
struct Iterator {
public:
Iterator(TQueue &queue, Node *current) : queue_(queue), current_(current) {}
// copying, moving and destroying is all fine
bool operator==(const Iterator &rhs) const {
return rhs.current_ == this->current_;
}
bool operator!=(const Iterator &rhs) const { return !(*this == rhs); }
Iterator &operator++() {
DCHECK(current_ != nullptr) << "Prefix increment on invalid iterator";
previous_ = current_;
current_ = current_->next_;
return *this;
}
Iterator operator++(int) {
DCHECK(current_ != nullptr) << "Postfix increment on invalid iterator";
Iterator rval(queue_, current_);
previous_ = current_;
current_ = current_->next_;
return rval;
}
TElement &operator*() {
DCHECK(current_ != nullptr)
<< "Dereferencing operator on invalid iterator";
return current_->element_;
}
TElement *operator->() {
DCHECK(current_ != nullptr) << "Arrow operator on invalid iterator";
return &current_->element_;
}
/** @brief Deletes the current element of the iterator,
* and all subsequent elements.
*
* After this op this iterator is not valid and is equal
* to push_queue.end(). Invalidates all the iterators
* that were equal to this or further away from the head.
*
* @return The number of deleted elements.
*/
size_t delete_tail() {
// compare and swap queue head
// if it succeeds, it means that the current node of this
// iterator was head, and we have just replaced head with nullptr
// if it fails, it means the current node was not head, and
// we got the new_head
auto new_head = current_;
bool was_head =
queue_.get().head_.compare_exchange_strong(new_head, nullptr);
// if we have no previous_ (iterator wasn't incremented and thinks it's
// head), but in reality we weren't head, then we need to find the actual
// previous_ node, so we can cut the tail off
if (!previous_ && !was_head) {
previous_ = new_head;
while (previous_->next_ != current_) previous_ = previous_->next_;
}
// cut the tail off
if (previous_) previous_->next_ = nullptr;
// delete all from current to the end, track deletion count
size_t deleted = 0;
previous_ = current_;
while (current_) {
previous_ = current_;
current_ = current_->next_;
delete previous_;
deleted++;
}
// update the size of the queue and return
queue_.get().size_ -= deleted;
return deleted;
}
private:
// the queue we're iterating over
// use a reference wrapper because it facilitates move and copy
std::reference_wrapper<TQueue> queue_;
// the current node of this iterator
Node *current_{nullptr};
// the previous node of this iterator
// used for deleting the current node
Node *previous_{nullptr};
};
public:
ConcurrentPushQueue() = default;
~ConcurrentPushQueue() { begin().delete_tail(); }
// copy and move ops are disabled due to atomic member variable
/// Pushes an element to the queue.
// TODO: review - this is at the same time push and emplace,
// how should it be called?
template <typename... TArgs>
void push(TArgs &&... args) {
auto node = new Node(std::forward<TArgs>(args)...);
while (!head_.compare_exchange_strong(node->next_, node)) continue;
size_++;
};
// non-const iterators
auto begin() { return Iterator<ConcurrentPushQueue>(*this, head_.load()); }
auto end() { return Iterator<ConcurrentPushQueue>(*this, nullptr); }
// const iterators
auto cbegin() const {
return Iterator<const ConcurrentPushQueue>(*this, head_.load());
}
auto cend() const {
return Iterator<const ConcurrentPushQueue>(*this, nullptr);
}
auto begin() const {
return Iterator<const ConcurrentPushQueue>(*this, head_.load());
}
auto end() const {
return Iterator<const ConcurrentPushQueue>(*this, nullptr);
}
auto size() const { return size_.load(); }
private:
// head of the queue (last-added element)
std::atomic<Node *> head_{nullptr};
// number of elements in the queue
std::atomic<size_t> size_{0};
};

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +0,0 @@
#include "skiplist_gc.hpp"
#include <iostream>
#include "utils/flag_validation.hpp"
DEFINE_VALIDATED_HIDDEN_int32(
skiplist_gc_interval, 10,
"Interval of how often does skiplist gc run in seconds. To "
"disable set to -1.",
FLAG_IN_RANGE(-1, std::numeric_limits<int32_t>::max()));

View File

@ -1,173 +0,0 @@
#pragma once
#include <malloc.h>
#include <list>
#include <mutex>
#include <optional>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "data_structures/concurrent/push_queue.hpp"
#include "utils/executor.hpp"
#include "utils/thread/sync.hpp"
DECLARE_int32(skiplist_gc_interval);
/**
* @brief Garbage collects nodes.
* We are doing garbage collection by keeping track of alive accessors which
* were requested from the parent skiplist. When some prefix [id, id+n] of
* accessors becomes dead we try to empty the collection of (accessors_id,
* entry*) with the id of that last dead accessor. Each entry is added to
* collection after it has been re-linked and can't be seen by any accessors
* created after that time and that marks the safe time for deleting entry.
* @Tparam TNode - type of underlying pointer to objects which will be
* collected.
*/
template <class TNode>
class SkipListGC {
public:
explicit SkipListGC() {
if (GetExecutor()) {
executor_job_id_ =
GetExecutor()->RegisterJob([this]() { GarbageCollect(); });
}
}
~SkipListGC() {
// We have to unregister the job because otherwise Executor might access
// some member variables of this class after it has been destructed.
if (GetExecutor()) GetExecutor()->UnRegisterJob(executor_job_id_);
for (auto it = deleted_queue_.begin(); it != deleted_queue_.end(); ++it) {
TNode::destroy(it->second);
}
deleted_queue_.begin().delete_tail();
}
/**
* @brief - Returns instance of executor shared between all SkipLists.
*/
auto &GetExecutor() {
// TODO: Even though executor is static, there are multiple instance:
// one for each TNode param type. We probably don't want that kind of
// behavior. Static variables with nontrivial destructor create subtle bugs
// because of their order of destruction. For example of one bug take a look
// at documentation in database/dbms.hpp. Rethink ownership and lifetime of
// executor.
static auto executor = ConstructExecutor();
return executor;
}
SkipListGC(const SkipListGC &other) = delete;
SkipListGC(SkipListGC &&other) = delete;
SkipListGC operator=(const SkipListGC &other) = delete;
SkipListGC operator=(SkipListGC &&other) = delete;
/**
* @brief - Keep track of each accessor with it's status, so we know which
* ones are alive and which ones are dead.
*/
struct AccessorStatus {
int64_t id_{-1};
bool alive_{false};
};
/**
* @brief - Creates a new accessors and returns reference to it's status. This
* method is thread-safe.
*/
AccessorStatus &CreateNewAccessor() {
std::unique_lock<std::mutex> lock(mutex_);
accessors_.push_back({++last_accessor_id_, true});
return accessors_.back();
}
/**
* @brief - Destroys objects which were previously collected and can be safely
* removed. This method is not thread-safe.
*/
void GarbageCollect() {
std::unique_lock<std::mutex> lock(mutex_);
auto last_dead_accessor = accessors_.end();
for (auto it = accessors_.begin(); it != accessors_.end(); ++it) {
if (it->alive_) break;
last_dead_accessor = it;
}
// We didn't find any dead accessor and that means we are not sure that we
// can delete anything.
if (last_dead_accessor == accessors_.end()) return;
// We don't need lock anymore because we are not modifying this structure
// anymore, or accessing it any further down.
const int64_t safe_id = last_dead_accessor->id_;
accessors_.erase(accessors_.begin(), ++last_dead_accessor);
lock.unlock();
// We can only modify this in a not-thread safe way because we are the only
// thread ever accessing it here, i.e. there is at most one thread doing
// this GarbageCollection.
auto oldest_not_deletable = deleted_queue_.begin();
bool delete_all = true;
for (auto it = oldest_not_deletable; it != deleted_queue_.end(); ++it) {
if (it->first > safe_id) {
oldest_not_deletable = it;
delete_all = false;
}
}
// deleted_list is already empty, nothing to delete here.
if (oldest_not_deletable == deleted_queue_.end()) return;
// In case we didn't find anything that we can't delete we shouldn't
// increment this because that would mean we skip over the first record
// which is ready for destruction.
if (!delete_all) ++oldest_not_deletable;
int64_t destroyed = 0;
for (auto it = oldest_not_deletable; it != deleted_queue_.end(); ++it) {
TNode::destroy(it->second);
++destroyed;
}
oldest_not_deletable.delete_tail();
if (destroyed) DLOG(INFO) << "Number of destroyed elements: " << destroyed;
}
/**
* @brief - Collect object for garbage collection. Call to this method means
* that no new accessor can possibly access the object by iterating over some
* storage.
*/
void Collect(TNode *object) {
// We can afford some inaccuary here - it's possible that some new accessor
// incremented the last_accessor_id after we enter this method and as such
// we might be a bit pessimistic here.
deleted_queue_.push(last_accessor_id_.load(), object);
}
private:
/// Constructs executor depending on flag - has to be done this way because of
/// C++14
auto ConstructExecutor() {
std::unique_ptr<utils::Executor> executor;
if (FLAGS_skiplist_gc_interval != -1) {
executor = std::make_unique<utils::Executor>(
std::chrono::seconds(FLAGS_skiplist_gc_interval));
}
return executor;
}
int64_t executor_job_id_{-1};
std::mutex mutex_;
// List of accesssors from begin to end by an increasing id.
std::list<AccessorStatus> accessors_;
std::atomic<int64_t> last_accessor_id_{0};
// List of pairs of accessor_ids and pointers to entries which should be
// destroyed sorted approximately descendingly by id.
ConcurrentPushQueue<std::pair<int64_t, TNode *>> deleted_queue_;
};

View File

@ -1,23 +0,0 @@
#pragma once
#include <atomic>
namespace utils {
/**
* Ensures that the given atomic is greater or equal to the given value. If it
* already satisfies that predicate, it's not modified.
*
* @param atomic - The atomic variable to ensure on.
* @param value - The minimal value the atomic must have after this call.
* @tparam TValue - Type of value.
*/
template <typename TValue>
void EnsureAtomicGe(std::atomic<TValue> &atomic, TValue value) {
while (true) {
auto current = atomic.load();
if (current >= value) break;
if (atomic.compare_exchange_weak(current, value)) break;
}
}
} // namespace utils

View File

@ -1,15 +0,0 @@
#pragma once
// a helper class for implementing static casting to a derived class using the
// curiously recurring template pattern
namespace utils {
template <class Derived>
struct Crtp {
Derived &derived() { return *static_cast<Derived *>(this); }
const Derived &derived() const { return *static_cast<const Derived *>(this); }
};
} // namespace utils

View File

@ -1,99 +0,0 @@
#pragma once
#include <algorithm>
#include <mutex>
#include <vector>
#include "utils/scheduler.hpp"
namespace utils {
/**
* @brief - Provides execution of jobs in job queue on one thread with 'pause'
* time between two consecutives starts.
*/
class Executor {
public:
template <typename TRep, typename TPeriod>
explicit Executor(const std::chrono::duration<TRep, TPeriod> pause) {
DCHECK(pause > std::chrono::seconds(0))
<< "Duration between executions should be reasonable";
scheduler_.Run("Executor", pause, std::bind(&Executor::Execute, this));
}
~Executor() {
// Be sure to first stop scheduler because otherwise we might destroy the
// mutex before the scheduler and that might cause problems since mutex is
// used in Execute method passed to scheduler along with jobs vector.
scheduler_.Stop();
}
Executor(Executor &&e) = delete;
Executor &operator=(Executor &&) = delete;
Executor(const Executor &e) = delete;
Executor &operator=(const Executor &) = delete;
/**
* @brief - Add function to job queue.
*/
int64_t RegisterJob(const std::function<void()> &f) {
{
std::unique_lock<std::mutex> lock(update_mutex_);
id_job_pairs_.emplace_back(std::make_pair(++count_, f));
return id_job_pairs_.back().first;
}
}
/**
* @brief - Remove id from job queue.
*/
void UnRegisterJob(const int64_t id) {
{
// First wait for execute lock and then for the update lock because
// execute lock will be unavailable for longer and there is no point in
// blocking other threads with update lock.
std::unique_lock<std::mutex> execute_lock(execute_mutex_);
std::unique_lock<std::mutex> update_lock(update_mutex_);
for (auto id_job_pair_it = id_job_pairs_.begin();
id_job_pair_it != id_job_pairs_.end(); ++id_job_pair_it) {
if (id_job_pair_it->first == id) {
id_job_pairs_.erase(id_job_pair_it);
return;
}
}
}
}
private:
/**
* @brief - Execute method executes jobs from id_job_pairs vector.
* The reason for doing double locking is the following: we don't want to
* block creation of new jobs since that will slow down all of memgraph so we
* use a special lock for job update. Execute lock is here so that we can
* guarantee that after some job is unregistered it's also stopped.
*/
void Execute() {
std::unique_lock<std::mutex> execute_lock(execute_mutex_);
std::vector<std::pair<int, std::function<void()>>> id_job_pairs;
// Acquire newest current version of jobs but being careful not to access
// the vector in corrupt state.
{
std::unique_lock<std::mutex> update_lock(update_mutex_);
id_job_pairs = id_job_pairs_;
}
for (auto id_job_pair : id_job_pairs) {
id_job_pair.second();
}
}
int64_t count_{0};
std::mutex execute_mutex_;
std::mutex update_mutex_;
Scheduler scheduler_;
std::vector<std::pair<int, std::function<void()>>> id_job_pairs_;
};
} // namespace utils

View File

@ -1,97 +0,0 @@
#pragma once
#include <utility>
#include "glog/logging.h"
#include <ext/aligned_buffer.h>
namespace utils {
/**
* @class Placeholder
*
* @brief
* Placeholder is used to allocate memory for an object on heap providing
* methods for setting and getting the object and making sure that the
* object is initialized.
*
* @tparam T type of object to be wrapped in the placeholder
*/
template <class T>
class Placeholder {
public:
Placeholder() = default;
Placeholder(Placeholder &) = delete;
Placeholder(Placeholder &&) = delete;
/**
* The destructor automatically calls the wrapped objects destructor.
*/
~Placeholder() {
if (initialized) get().~T();
};
/**
* @return returns true if object is set in memory otherwise false.
*/
bool is_initialized() { return initialized; }
T &get() noexcept {
DCHECK(initialized) << "Placeholder object not initialized";
return *data._M_ptr();
}
/**
* @return const reference to object.
*/
const T &get() const noexcept {
DCHECK(initialized) << "Placeholder object not initialized";
return *data._M_ptr();
}
/**
* Sets item in allocated memory and sets the initialized flag.
*
* @param T& item reference to the item initialized in allocated memory
*/
void set(const T &item) {
DCHECK(!initialized) << "Placeholder object already initialized";
new (data._M_addr()) T(item);
initialized = true;
}
/**
* Moves item to allocated memory and sets initialized flag.1
*
* @param T&& rvalue reference to the item which is moved to allocated memory
*/
void set(T &&item) {
DCHECK(!initialized) << "Placeholder object already initialized";
new (data._M_addr()) T(std::move(item));
initialized = true;
}
/**
* Emplaces item to allocated memory and calls the Constructor with specified
* arguments.
*
* @tparam Args type of arguments to be passed to the objects constructor.
* @param Parameters passed to the objects constructor.
*/
template <class... Args>
void emplace(Args &&... args) {
DCHECK(!initialized) << "Placeholder object already initialized";
new (data._M_addr()) T(args...);
initialized = true;
}
private:
// libstd aligned buffer struct
__gnu_cxx::__aligned_buffer<T> data;
bool initialized = false;
};
} // namespace utils

View File

@ -1,61 +0,0 @@
#pragma once
#include "utils/likely.hpp"
#include "utils/random/xorshift128plus.hpp"
namespace utils::random {
template <class R = Xorshift128plus>
class FastBinomial {
// fast binomial draws coin tosses from a single generated random number
// let's draw a random 4 bit number and count trailing ones
//
// 1 0000 -> 1 =
// 2 0001 -> 2 == 8 x = p = 8/16 = 1/2
// 3 0010 -> 1 = 4 x == p = 4/16 = 1/4 p_total = 15/16
// 4 0011 -> 3 === 2 x === p = 2/16 = 1/8
// 5 0100 -> 1 = 1 x ==== p = 1/16 = 1/16
// 6 0101 -> 2 == --------------------------
// 7 0110 -> 1 = 1 x ===== p = 1/16 invalid value, retry!
// 8 0111 -> 4 ====
// 9 1000 -> 1 =
// 10 1001 -> 2 ==
// 11 1010 -> 1 =
// 12 1011 -> 3 ===
// 13 1100 -> 1 =
// 14 1101 -> 2 ==
// 15 1110 -> 1 =
// ------------------
// 16 1111 -> 5 =====
public:
/**
* Return random number X between 1 and tparam N with probability 2^-X.
*/
unsigned operator()(const int n) {
while (true) {
// couting trailing ones is equal to counting trailing zeros
// since the probability for both is 1/2 and we're going to
// count zeros because they are easier to work with
// generate a random number
auto x = random() & mask(n);
// if we have all zeros, then we have an invalid case and we
// need to generate again, we have this every (1/2)^N times
// so therefore we could say it's very unlikely to happen for
// large N. e.g. N = 32; p = 2.328 * 10^-10
if (UNLIKELY(!x)) continue;
// ctzl = count trailing zeros from long
// ^ ^ ^ ^
return __builtin_ctzl(x) + 1;
}
}
private:
uint64_t mask(const int n) { return (1ULL << n) - 1; }
R random;
};
} // namespace utils::random

View File

@ -1,60 +0,0 @@
#pragma once
#include <cstdlib>
#include <random>
namespace utils::random {
/* Xorshift algorithm (plus variant)
*
* This is the fastest generator passing BigCrush without systematic failures,
* but due to the relatively short period it is acceptable only for
* applications with a mild amount of parallelism, otherwise, use a
* xorshift1024* generator.
*/
struct Xorshift128plus {
public:
Xorshift128plus() {
// use a slow, more complex rnd generator to initialize a fast one
// make sure to call this before requesting any random numbers!
// NOTE: Valgird complanis to next instruction
std::random_device rd;
std::mt19937_64 gen(rd());
// std::mt19937_64 gen(time(0));
std::uniform_int_distribution<unsigned long long> dist;
// the number generated by MT can be full of zeros and xorshift
// doesn't like this so we use MurmurHash3 64bit finalizer to
// make it less biased
s[0] = avalance(dist(gen));
s[1] = avalance(dist(gen));
}
uint64_t operator()() {
uint64_t s1 = s[0];
const uint64_t s0 = s[1];
s[0] = s0;
s1 ^= s1 << 23;
return (s[1] = (s1 ^ s0 ^ (s1 >> 17) ^ (s0 >> 26))) + s0;
}
private:
uint64_t s[2];
uint64_t avalance(uint64_t s) {
// MurmurHash3 finalizer
s ^= s >> 33;
s *= 0xff51afd7ed558ccd;
s ^= s >> 33;
s *= 0xc4ceb9fe1a85ec53;
s ^= s >> 33;
return s;
}
};
} // namespace utils::random

View File

@ -1,47 +0,0 @@
/// @file
#pragma once
#include <mutex>
#include "utils/exceptions.hpp"
#include "utils/spin_lock.hpp"
namespace utils {
/// Improves contention in spinlocks by hinting the processor that we're in a
/// spinlock and not doing much.
inline void CpuRelax() {
// if IBMPower
// HMT_very_low()
// http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc
asm("PAUSE");
}
class LockTimeoutException : public BasicException {
public:
using BasicException::BasicException;
};
/// Lockable is used as an custom implementation of a mutex mechanism.
///
/// It is implemented as a wrapper around std::lock_guard and std::unique_guard
/// with a default lock called Spinlock.
///
/// @tparam lock_t type of lock to be used (default = Spinlock)
template <class lock_t = SpinLock>
class Lockable {
public:
using lock_type = lock_t;
std::lock_guard<lock_t> acquire_guard() const {
return std::lock_guard<lock_t>(lock);
}
std::unique_lock<lock_t> acquire_unique() const {
return std::unique_lock<lock_t>(lock);
}
mutable lock_t lock;
};
} // namespace utils

View File

@ -1,37 +0,0 @@
#pragma once
namespace utils {
/**
* Implements all the logical comparison operators based on '=='
* and '<' operators.
*
* @tparam TLhs First operand type.
* @tparam TRhs Second operand type. Defaults to the same type
* as first operand.
* @tparam TReturn Return type, defaults to bool.
*/
template <typename TLhs, typename TRhs = TLhs, typename TReturn = bool>
struct TotalOrdering {
protected:
~TotalOrdering() {}
public:
friend constexpr TReturn operator!=(const TLhs &a, const TRhs &b) {
return !(a == b);
}
friend constexpr TReturn operator<=(const TLhs &a, const TRhs &b) {
return a < b || a == b;
}
friend constexpr TReturn operator>(const TLhs &a, const TRhs &b) {
return !(a <= b);
}
friend constexpr TReturn operator>=(const TLhs &a, const TRhs &b) {
return !(a < b);
}
};
} // namespace utils

View File

@ -26,9 +26,6 @@ target_link_libraries(${test_prefix}network_server mg-communication)
add_concurrent_test(network_session_leak.cpp)
target_link_libraries(${test_prefix}network_session_leak mg-communication)
add_concurrent_test(push_queue.cpp)
target_link_libraries(${test_prefix}push_queue glog gflags Threads::Threads)
add_concurrent_test(stack.cpp)
target_link_libraries(${test_prefix}stack mg-utils)

View File

@ -1,191 +0,0 @@
#include <thread>
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "data_structures/concurrent/push_queue.hpp"
class IntQueue : public ::testing::Test {
protected:
ConcurrentPushQueue<int> cpq;
void AddElems(int count) {
for (int i = 0; i < count; i++) cpq.push(i);
}
int CountIterations() {
int rval = 0;
for ([[gnu::unused]] int x : cpq) rval++;
return rval;
}
};
TEST_F(IntQueue, EmptyQueueAndPush) {
ConcurrentPushQueue<int> cpq;
EXPECT_EQ(cpq.size(), 0);
cpq.push(1);
EXPECT_EQ(cpq.size(), 1);
cpq.push(1);
EXPECT_EQ(cpq.size(), 2);
}
TEST_F(IntQueue, Size) {
AddElems(10);
EXPECT_EQ(cpq.size(), 10);
}
TEST_F(IntQueue, ConcurrentPush) {
// perform 1000 insertions in 50 concurrent threads
std::vector<std::thread> threads;
for (int i = 0; i < 50; i++)
threads.emplace_back([&]() {
for (int i = 0; i < 1000; i++) cpq.push(i);
});
for (auto &thread : threads) thread.join();
EXPECT_EQ(cpq.size(), 50000);
EXPECT_EQ(CountIterations(), 50000);
}
TEST_F(IntQueue, IteratorBegin) {
AddElems(5);
auto it = cpq.begin();
EXPECT_EQ(*it, 4);
}
TEST_F(IntQueue, IteratorPrefixIncrement) {
AddElems(3);
auto it = cpq.begin();
EXPECT_EQ(*(++it), 1);
EXPECT_EQ(*it, 1);
}
TEST_F(IntQueue, IteratorPostfixIncrement) {
AddElems(3);
auto it = cpq.begin();
EXPECT_EQ(*it++, 2);
EXPECT_EQ(*it, 1);
}
TEST_F(IntQueue, IteratorEquality) {
AddElems(5);
auto it = cpq.begin();
EXPECT_EQ(it, cpq.begin());
it++;
EXPECT_NE(it, cpq.begin());
}
TEST_F(IntQueue, IteratorEnd) {
AddElems(5);
auto it = cpq.begin();
EXPECT_NE(it, cpq.end());
for (int i = 0; i < 5; i++) it++;
EXPECT_EQ(it, cpq.end());
}
TEST_F(IntQueue, IteratorCopy) {
AddElems(5);
auto it = cpq.begin();
auto it_copy = it;
EXPECT_EQ(it, it_copy);
it++;
EXPECT_NE(it, it_copy);
EXPECT_EQ(*it, 3);
EXPECT_EQ(*it_copy, 4);
}
TEST_F(IntQueue, IteratorMove) {
AddElems(5);
auto it = cpq.begin();
it++;
EXPECT_EQ(*it, 3);
decltype(it) it_moved = std::move(it);
EXPECT_EQ(*it_moved++, 3);
EXPECT_EQ(*it_moved, 2);
}
TEST_F(IntQueue, IteratorDeleteTail) {
AddElems(13);
ASSERT_EQ(cpq.size(), 13);
ASSERT_EQ(CountIterations(), 13);
auto it = cpq.begin();
for (int i = 0; i < 5; i++) it++;
EXPECT_EQ(it.delete_tail(), 8);
EXPECT_EQ(it, cpq.end());
ASSERT_EQ(cpq.size(), 5);
ASSERT_EQ(CountIterations(), 5);
auto it2 = cpq.begin();
EXPECT_EQ(it2.delete_tail(), 5);
EXPECT_EQ(it2, cpq.end());
EXPECT_EQ(cpq.size(), 0);
EXPECT_EQ(CountIterations(), 0);
EXPECT_EQ(cpq.begin(), cpq.end());
}
TEST_F(IntQueue, IteratorDeleteTailConcurrent) {
// we will be deleting the whole queue (tail-delete on head)
// while other threads are pushing to the queue.
// we'll also ensure that head gets updated in the queue,
// and delete_tail from an iterator that believes it's the head
size_t kElems = 500;
size_t kThreads = 100;
size_t kMillis = 2;
std::vector<std::thread> threads;
for (size_t i = 0; i < kThreads; i++)
threads.emplace_back([&]() {
for (size_t i = 0; i < kElems; i++) {
cpq.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(kMillis));
}
});
size_t deletions = 0;
while (deletions < kThreads * kElems) {
auto head_it = cpq.begin();
// sleep till some thread inserts
std::this_thread::sleep_for(std::chrono::milliseconds(5 * kMillis));
deletions += head_it.delete_tail();
}
// those threads should be done, but join anyway to avoid aborts
// due to thread object destruction
for (auto &thread : threads) thread.join();
EXPECT_EQ(cpq.size(), 0);
EXPECT_EQ(CountIterations(), 0);
}
TEST(ConcurrentPushQueue, RvalueLvalueElements) {
ConcurrentPushQueue<std::string> cpq;
cpq.push(std::string("rvalue"));
std::string lvalue("lvalue");
cpq.push(lvalue);
std::vector<std::string> expected;
for (auto &elem : cpq) expected.emplace_back(elem);
EXPECT_THAT(expected, ::testing::ElementsAre("lvalue", "rvalue"));
}
TEST(ConcurrentPushQueue, Emplace) {
// test with atomic because it's not copy/move constructable
ConcurrentPushQueue<std::atomic<int>> cpq;
cpq.push(3);
EXPECT_EQ(cpq.size(), 1);
EXPECT_EQ(cpq.begin()->load(), 3);
cpq.begin().delete_tail();
EXPECT_EQ(cpq.size(), 0);
}
TEST_F(IntQueue, ConstQueue) {
AddElems(5);
auto const_queue_accepting = [](const ConcurrentPushQueue<int> &const_queue) {
EXPECT_EQ(const_queue.size(), 5);
int count = 0;
for ([[gnu::unused]] int x : const_queue) count++;
EXPECT_EQ(count, 5);
};
const_queue_accepting(cpq);
}

View File

@ -2,6 +2,7 @@
#include <chrono>
#include <map>
#include <mutex>
#include <optional>
#include <random>
#include <string>
@ -10,7 +11,7 @@
#include "communication/bolt/v1/value.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/thread/sync.hpp"
#include "utils/spin_lock.hpp"
#include "utils/timer.hpp"
using communication::ClientContext;
@ -77,9 +78,8 @@ std::pair<communication::bolt::QueryData, int> ExecuteNTimesTillSuccess(
}
utils::Timer t;
std::chrono::microseconds to_sleep(rand_dist_(pseudo_rand_gen_));
while (t.Elapsed() < to_sleep) {
utils::CpuRelax();
}
while (t.Elapsed() < to_sleep)
;
}
}
}

View File

@ -1,4 +1,5 @@
#include <fstream>
#include <mutex>
#include <thread>
#include <vector>
@ -6,8 +7,8 @@
#include <glog/logging.h>
#include "utils/algorithm.hpp"
#include "utils/spin_lock.hpp"
#include "utils/string.hpp"
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
#include "common.hpp"

View File

@ -24,9 +24,6 @@ target_link_libraries(${test_prefix}antlr_sigsegv gtest gtest_main
add_manual_test(antlr_tree_pretty_print.cpp)
target_link_libraries(${test_prefix}antlr_tree_pretty_print antlr_opencypher_parser_lib)
add_manual_test(binomial.cpp)
target_link_libraries(${test_prefix}binomial mg-utils)
add_manual_test(bolt_client.cpp)
target_link_libraries(${test_prefix}bolt_client mg-communication)
@ -63,7 +60,3 @@ target_link_libraries(${test_prefix}ssl_client mg-communication)
add_manual_test(ssl_server.cpp)
target_link_libraries(${test_prefix}ssl_server mg-communication)
add_manual_test(xorshift.cpp)
target_link_libraries(${test_prefix}xorshift mg-utils)

View File

@ -1,61 +0,0 @@
/* Plots the distribution histogram of the fast_binomial algorithm
* (spoiler alert: it's pleasingly (1/2)^N all the way :D)
*/
#include <array>
#include <atomic>
#include <iomanip>
#include <iostream>
#include <thread>
#include <sys/ioctl.h>
#include <unistd.h>
#include "utils/random/fast_binomial.hpp"
static constexpr unsigned B = 24;
static thread_local utils::random::FastBinomial<> rnd;
static constexpr unsigned M = 4;
static constexpr size_t N = 1ULL << 34;
static constexpr size_t per_thread_iters = N / M;
std::array<std::atomic<uint64_t>, B> buckets;
void generate() {
for (size_t i = 0; i < per_thread_iters; ++i)
buckets[rnd(B) - 1].fetch_add(1);
}
int main(void) {
struct winsize w;
ioctl(STDOUT_FILENO, TIOCGWINSZ, &w);
auto bar_len = w.ws_col - 20;
std::array<std::thread, M> threads;
for (auto &bucket : buckets) bucket.store(0);
for (auto &t : threads) t = std::thread([]() { generate(); });
for (auto &t : threads) t.join();
auto max = std::accumulate(
buckets.begin(), buckets.end(), (uint64_t)0,
[](auto &acc, auto &x) { return std::max(acc, x.load()); });
std::cout << std::fixed;
for (size_t i = 0; i < buckets.size(); ++i) {
auto x = buckets[i].load();
auto rel = bar_len * x / max;
std::cout << std::setw(2) << i + 1 << " ";
for (size_t i = 0; i < rel; ++i) std::cout << "=";
std::cout << " " << 100 * (double)x / N << "%" << std::endl;
}
return 0;
}

View File

@ -1,61 +0,0 @@
/* Plots the distribution histogram of the xorshift algorithm
* (spoiler alert: it's pleasingly uniform all the way :D)
*/
#include <array>
#include <atomic>
#include <iostream>
#include <thread>
#include "glog/logging.h"
#include <sys/ioctl.h>
#include <unistd.h>
#include "utils/random/xorshift128plus.hpp"
static thread_local utils::random::Xorshift128plus rnd;
static constexpr unsigned B = 1 << 10;
static constexpr uint64_t K = (uint64_t)(-1) / B;
static constexpr unsigned M = 4;
static constexpr size_t N = 1ULL << 34;
static constexpr size_t per_thread_iters = N / M;
std::array<std::atomic<unsigned>, B> buckets;
void generate() {
for (size_t i = 0; i < per_thread_iters; ++i) buckets[rnd() / K].fetch_add(1);
}
int main(void) {
struct winsize w;
ioctl(STDOUT_FILENO, TIOCGWINSZ, &w);
auto bar_len = w.ws_col - 20;
std::array<std::thread, M> threads;
for (auto &bucket : buckets) bucket.store(0);
for (auto &t : threads) t = std::thread([]() { generate(); });
for (auto &t : threads) t.join();
auto max = std::accumulate(
buckets.begin(), buckets.end(), 0u,
[](auto &acc, auto &x) { return std::max(acc, x.load()); });
DCHECK(max != 0u) << "max is 0.";
std::cout << std::fixed;
for (auto &bucket : buckets) {
auto x = bucket.load();
auto rel = bar_len * x / max;
for (size_t i = 0; i < rel; ++i) std::cout << "=";
std::cout << " " << 100.0 * x / N * B - 100 << "%" << std::endl;
}
return 0;
}

View File

@ -158,9 +158,6 @@ target_link_libraries(${test_prefix}utils_algorithm mg-utils)
add_unit_test(utils_exceptions.cpp)
target_link_libraries(${test_prefix}utils_exceptions mg-utils)
add_unit_test(utils_executor.cpp)
target_link_libraries(${test_prefix}utils_executor mg-utils)
add_unit_test(utils_file.cpp)
target_link_libraries(${test_prefix}utils_file mg-utils)

View File

@ -1,10 +1,11 @@
#include <mutex>
#include <thread>
#include <unordered_set>
#include "gtest/gtest.h"
#include "data_structures/ring_buffer.hpp"
#include "utils/thread/sync.hpp"
#include "utils/spin_lock.hpp"
TEST(RingBuffer, MultithreadedUsage) {
auto test_f = [](int producer_count, int elems_per_producer,

View File

@ -1,45 +0,0 @@
#include <atomic>
#include <memory>
#include "gtest/gtest.h"
#include "utils/executor.hpp"
TEST(Executor, Run) {
std::atomic<int> count{0};
{
utils::Executor exec(std::chrono::milliseconds(500));
// Be sure executor is sleeping.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
exec.RegisterJob([&count]() { ++count; });
exec.RegisterJob([&count]() { ++count; });
exec.RegisterJob([&count]() { ++count; });
// Be sure executor execute thread is triggered
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
EXPECT_EQ(count, 3);
}
TEST(Executor, RunUnregister) {
std::atomic<int> count1{0};
std::atomic<int> count2{0};
{
utils::Executor exec(std::chrono::milliseconds(500));
// Be sure executor is sleeping.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto job = exec.RegisterJob([&count1]() { ++count1; });
exec.RegisterJob([&count2]() { ++count2; });
// Be sure executor execute thread is triggered
std::this_thread::sleep_for(std::chrono::milliseconds(500));
exec.UnRegisterJob(job);
// Be sure executor execute thread is triggered
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
EXPECT_EQ(count1, 1);
EXPECT_EQ(count2, 2);
}