Concurrent::PushQueue added

Summary:
PushQueue with concurrent lock-free pushing, and single-threaded deletion. Iteration without modification can also be concurrent. Deletion should NOT be concurrent with iteration and other deletions, but can be concurrent with pushing.

There is no const iteraton at the moment, we can add it when necessary. Also I've not handled std::iterator_traits, might be fun getting into that :D

Reviewers: buda, dgleich, mislav.bradac

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D420
This commit is contained in:
florijan 2017-06-16 15:30:25 +02:00
parent 366c2d4520
commit ad148692c3
5 changed files with 392 additions and 21 deletions

View File

@ -0,0 +1,180 @@
#include <atomic>
#include <mutex>
#include "utils/assert.hpp"
/** @brief A queue with lock-free concurrent push and
* single-threaded deletion.
*
* Deletions are done via the iterator. Currently only
* tail-deletion is supported, but individual element
* deletion should be possible to implement.
* Iteration can also be concurrent, as long as nobody
* is deleting.
*
* @tparam TElement Type of element stored in the
* queue.
*/
template <typename TElement>
class ConcurrentPushQueue {
private:
// The queue is implemented as as singly-linked list
// and this is one list element
class Node {
public:
// constructor that accepts only arguments for
// creating the element. it can accept both rvalue and lvalue
// refs to an element, or arguments for emplace creation of
// an element
template <typename... TArgs>
Node(TArgs &&... args) : element_(std::forward<TArgs>(args)...) {}
// element itself and pointer to next Node
TElement element_;
Node *next_{nullptr};
};
/** @brief Iterator over the queue.
*
* Exposes standard forward-iterator ops, plus the
* delete_tail() function.
*
* @tparam TQueue - either const or non-const ConcurrentPushQueue
*/
template <typename TQueue>
struct Iterator {
public:
Iterator(TQueue &queue, Node *current) : queue_(queue), current_(current) {}
// copying, moving and destroying is all fine
bool operator==(const Iterator &rhs) const {
return rhs.current_ == this->current_;
}
bool operator!=(const Iterator &rhs) const { return !(*this == rhs); }
Iterator &operator++() {
debug_assert(current_ != nullptr, "Prefix increment on invalid iterator");
previous_ = current_;
current_ = current_->next_;
return *this;
}
Iterator operator++(int) {
debug_assert(current_ != nullptr,
"Postfix increment on invalid iterator");
Iterator rval(queue_, current_);
previous_ = current_;
current_ = current_->next_;
return rval;
}
TElement &operator*() {
debug_assert(current_ != nullptr,
"Dereferencing operator on invalid iterator");
return current_->element_;
}
TElement *operator->() {
debug_assert(current_ != nullptr, "Arrow operator on invalid iterator");
return &current_->element_;
}
/** @brief Deletes the current element of the iterator,
* and all subsequent elements.
*
* After this op this iterator is not valid and is equal
* to push_queue.end(). Invalidates all the iterators
* that were equal to this or further away from the head.
*
* @return The number of deleted elements.
*/
size_t delete_tail() {
// compare and swap queue head
// if it succeeds, it means that the current node of this
// iterator was head, and we have just replaced head with nullptr
// if it fails, it means the current node was not head, and
// we got the new_head
auto new_head = current_;
bool was_head =
queue_.get().head_.compare_exchange_strong(new_head, nullptr);
// if we have no previous_ (iterator wasn't incremented and thinks it's
// head), but in reality we weren't head, then we need to find the actual
// previous_ node, so we can cut the tail off
if (!previous_ && !was_head) {
previous_ = new_head;
while (previous_->next_ != current_) previous_ = previous_->next_;
}
// cut the tail off
if (previous_) previous_->next_ = nullptr;
// delete all from current to the end, track deletion count
size_t deleted = 0;
previous_ = current_;
while (current_) {
previous_ = current_;
current_ = current_->next_;
delete previous_;
deleted++;
}
// update the size of the queue and return
queue_.get().size_ -= deleted;
return deleted;
}
private:
// the queue we're iterating over
// use a reference wrapper because it facilitates move and copy
std::reference_wrapper<TQueue> queue_;
// the current node of this iterator
Node *current_{nullptr};
// the previous node of this iterator
// used for deleting the current node
Node *previous_{nullptr};
};
public:
ConcurrentPushQueue() = default;
~ConcurrentPushQueue() { begin().delete_tail(); }
// copy and move ops are disabled due to atomic member variable
/// Pushes an element to the queue.
// TODO: review - this is at the same time push and emplace,
// how should it be called?
template <typename... TArgs>
void push(TArgs &&... args) {
auto node = new Node(std::forward<TArgs>(args)...);
while (!head_.compare_exchange_strong(node->next_, node))
;
size_++;
};
// non-const iterators
auto begin() { return Iterator<ConcurrentPushQueue>(*this, head_.load()); }
auto end() { return Iterator<ConcurrentPushQueue>(*this, nullptr); }
// const iterators
auto cbegin() const {
return Iterator<const ConcurrentPushQueue>(*this, head_.load());
}
auto cend() const {
return Iterator<const ConcurrentPushQueue>(*this, nullptr);
}
auto begin() const {
return Iterator<const ConcurrentPushQueue>(*this, head_.load());
}
auto end() const {
return Iterator<const ConcurrentPushQueue>(*this, nullptr);
}
auto size() const { return size_.load(); }
private:
// head of the queue (last-added element)
std::atomic<Node *> head_{nullptr};
// number of elements in the queue
std::atomic<size_t> size_{0};
};

View File

@ -5,6 +5,7 @@
#include <memory>
#include "utils/assert.hpp"
#include "utils/crtp.hpp"
#include "utils/placeholder.hpp"
#include "utils/random/fast_binomial.hpp"

View File

@ -8,7 +8,7 @@
#include "gflags/gflags.h"
#include "data_structures/concurrent/concurrent_list.hpp"
#include "data_structures/concurrent/push_queue.hpp"
#include "logging/loggable.hpp"
#include "threading/sync/spinlock.hpp"
@ -39,10 +39,10 @@ class SkipListGC : public Loggable {
// We have to unregister the job because otherwise Executioner might access
// some member variables of this class after it has been destructed.
GetExecutioner().UnRegisterJob(executor_job_id_);
for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) {
for (auto it = deleted_queue_.begin(); it != deleted_queue_.end(); ++it) {
TNode::destroy(it->second);
it.remove();
}
deleted_queue_.begin().delete_tail();
}
/**
@ -111,36 +111,28 @@ class SkipListGC : public Loggable {
// thread ever accessing it here, i.e. there is at most one thread doing
// this GarbageCollection.
// find the oldest not deletable record
// since we can't copy a concurrent list iterator, we must use two
// separate ones, while ensuring they point to the same record
// in the beginning of the search-loop
auto it = deleted_list_.begin();
auto oldest_not_deletable = deleted_list_.begin();
while (oldest_not_deletable != it) oldest_not_deletable++;
// we need a bool to track if oldest_not_deletable should get
// deleted too (we have not skipped any record due to safe_id condition)
auto oldest_not_deletable = deleted_queue_.begin();
bool delete_all = true;
for (; it != deleted_list_.end(); ++it) {
for (auto it = oldest_not_deletable; it != deleted_queue_.end(); ++it) {
if (it->first > safe_id) {
while (oldest_not_deletable != it) ++oldest_not_deletable;
oldest_not_deletable = it;
delete_all = false;
}
}
// deleted_list is already empty, nothing to delete here.
if (oldest_not_deletable == deleted_list_.end()) return;
if (oldest_not_deletable == deleted_queue_.end()) return;
// In case we didn't find anything that we can't delete we shouldn't
// increment this because that would mean we skip over the first record
// which is ready for destruction.
if (!delete_all) ++oldest_not_deletable;
int64_t destroyed = 0;
for (auto &it = oldest_not_deletable; it != deleted_list_.end(); ++it) {
for (auto it = oldest_not_deletable; it != deleted_queue_.end(); ++it) {
TNode::destroy(it->second);
it.remove();
++destroyed;
}
oldest_not_deletable.delete_tail();
if (destroyed) logger.trace("Number of destroyed elements: {}", destroyed);
}
@ -153,8 +145,7 @@ class SkipListGC : public Loggable {
// We can afford some inaccuary here - it's possible that some new accessor
// incremented the last_accessor_id after we enter this method and as such
// we might be a bit pessimistic here.
deleted_list_.begin().push(
std::make_pair(last_accessor_id_.load(), object));
deleted_queue_.push(last_accessor_id_.load(), object);
}
private:
@ -168,5 +159,5 @@ class SkipListGC : public Loggable {
// List of pairs of accessor_ids and pointers to entries which should be
// destroyed sorted approximately descendingly by id.
ConcurrentList<std::pair<int64_t, TNode *>> deleted_list_;
ConcurrentPushQueue<std::pair<int64_t, TNode *>> deleted_queue_;
};

View File

@ -28,7 +28,7 @@ foreach(test_cpp ${test_type_cpps})
# link libraries
target_link_libraries(${target_name} memgraph_lib)
# gtest
target_link_libraries(${target_name} gtest gtest_main)
target_link_libraries(${target_name} gtest gtest_main gmock)
# register test
add_test(${target_name} ${exec_name})

View File

@ -0,0 +1,199 @@
#include <thread>
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "data_structures/concurrent/push_queue.hpp"
class IntQueue : public ::testing::Test {
protected:
ConcurrentPushQueue<int> cpq;
void AddElems(int count) {
for (int i = 0; i < count; i++) cpq.push(i);
}
int CountIterations() {
int rval = 0;
for ([[gnu::unused]] int x : cpq) rval++;
return rval;
}
};
TEST_F(IntQueue, EmptyQueueAndPush) {
ConcurrentPushQueue<int> cpq;
EXPECT_EQ(cpq.size(), 0);
cpq.push(1);
EXPECT_EQ(cpq.size(), 1);
cpq.push(1);
EXPECT_EQ(cpq.size(), 2);
}
TEST_F(IntQueue, Size) {
AddElems(10);
EXPECT_EQ(cpq.size(), 10);
}
TEST_F(IntQueue, ConcurrentPush) {
// perform 1000 insertions in 50 concurrent threads
std::vector<std::thread> threads;
for (int i = 0; i < 50; i++)
threads.emplace_back([&]() {
for (int i = 0; i < 1000; i++) cpq.push(i);
});
for (auto &thread : threads) thread.join();
EXPECT_EQ(cpq.size(), 50000);
EXPECT_EQ(CountIterations(), 50000);
}
TEST_F(IntQueue, IteratorBegin) {
AddElems(5);
auto it = cpq.begin();
EXPECT_EQ(*it, 4);
}
TEST_F(IntQueue, IteratorPrefixIncrement) {
AddElems(3);
auto it = cpq.begin();
EXPECT_EQ(*(++it), 1);
EXPECT_EQ(*it, 1);
++it;
++it;
EXPECT_DEATH(++it, "Prefix");
}
TEST_F(IntQueue, IteratorPostfixIncrement) {
AddElems(3);
auto it = cpq.begin();
EXPECT_EQ(*it++, 2);
EXPECT_EQ(*it, 1);
it++;
it++;
EXPECT_DEATH(it++, "Postfix");
}
TEST_F(IntQueue, IteratorEquality) {
AddElems(5);
auto it = cpq.begin();
EXPECT_EQ(it, cpq.begin());
it++;
EXPECT_NE(it, cpq.begin());
}
TEST_F(IntQueue, IteratorEnd) {
AddElems(5);
auto it = cpq.begin();
EXPECT_NE(it, cpq.end());
for (int i = 0; i < 5; i++) it++;
EXPECT_EQ(it, cpq.end());
EXPECT_DEATH(*it, "Dereferencing");
}
TEST_F(IntQueue, IteratorCopy) {
AddElems(5);
auto it = cpq.begin();
auto it_copy = it;
EXPECT_EQ(it, it_copy);
it++;
EXPECT_NE(it, it_copy);
EXPECT_EQ(*it, 3);
EXPECT_EQ(*it_copy, 4);
}
TEST_F(IntQueue, IteratorMove) {
AddElems(5);
auto it = cpq.begin();
it++;
EXPECT_EQ(*it, 3);
decltype(it) it_moved = std::move(it);
EXPECT_EQ(*it_moved++, 3);
EXPECT_EQ(*it_moved, 2);
}
TEST_F(IntQueue, IteratorDeleteTail) {
AddElems(13);
ASSERT_EQ(cpq.size(), 13);
ASSERT_EQ(CountIterations(), 13);
auto it = cpq.begin();
for (int i = 0; i < 5; i++) it++;
EXPECT_EQ(it.delete_tail(), 8);
EXPECT_EQ(it, cpq.end());
ASSERT_EQ(cpq.size(), 5);
ASSERT_EQ(CountIterations(), 5);
auto it2 = cpq.begin();
EXPECT_EQ(it2.delete_tail(), 5);
EXPECT_EQ(it2, cpq.end());
EXPECT_EQ(cpq.size(), 0);
EXPECT_EQ(CountIterations(), 0);
EXPECT_EQ(cpq.begin(), cpq.end());
}
TEST_F(IntQueue, IteratorDeleteTailConcurrent) {
// we will be deleting the whole queue (tail-delete on head)
// while other threads are pushing to the queue.
// we'll also ensure that head gets updated in the queue,
// and delete_tail from an iterator that believes it's the head
size_t kElems = 500;
size_t kThreads = 100;
size_t kMillis = 2;
std::vector<std::thread> threads;
for (size_t i = 0; i < kThreads; i++)
threads.emplace_back([&]() {
for (size_t i = 0; i < kElems; i++) {
cpq.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(kMillis));
}
});
size_t deletions = 0;
while (deletions < kThreads * kElems) {
auto head_it = cpq.begin();
// sleep till some thread inserts
std::this_thread::sleep_for(std::chrono::milliseconds(5 * kMillis));
deletions += head_it.delete_tail();
}
// those threads should be done, but join anyway to avoid aborts
// due to thread object destruction
for (auto &thread : threads) thread.join();
EXPECT_EQ(cpq.size(), 0);
EXPECT_EQ(CountIterations(), 0);
}
TEST(ConcurrentPushQueue, RvalueLvalueElements) {
ConcurrentPushQueue<std::string> cpq;
cpq.push(std::string("rvalue"));
std::string lvalue("lvalue");
cpq.push(lvalue);
std::vector<std::string> expected;
for (auto &elem : cpq)
expected.emplace_back(elem);
EXPECT_THAT(expected, ::testing::ElementsAre("lvalue", "rvalue"));
}
TEST(ConcurrentPushQueue, Emplace) {
// test with atomic because it's not copy/move constructable
ConcurrentPushQueue<std::atomic<int>> cpq;
cpq.push(3);
EXPECT_EQ(cpq.size(), 1);
EXPECT_EQ(cpq.begin()->load(), 3);
cpq.begin().delete_tail();
EXPECT_EQ(cpq.size(), 0);
}
TEST_F(IntQueue, ConstQueue) {
AddElems(5);
auto const_queue_accepting = [](const ConcurrentPushQueue<int> &const_queue) {
EXPECT_EQ(const_queue.size(), 5);
int count = 0;
for ([[gnu::unused]] int x : const_queue) count++;
EXPECT_EQ(count, 5);
};
const_queue_accepting(cpq);
}