fixed mpsc queue

This commit is contained in:
Dominik Tomičević 2015-10-06 21:02:47 +02:00
parent cebcf7cc09
commit b551ab7f27
2 changed files with 114 additions and 66 deletions

View File

@ -1,5 +1,5 @@
#ifndef MEMGRAPH_DATA_STRUCTURES_QUEUE_MPSC_QUEUE
#define MEMGRAPH_DATA_STRUCTURES_QUEUE_MPSC_QUEUE
#ifndef MEMGRAPH_DATA_STRUCTURES_QUEUE_MPSC_QUEUE_HPP
#define MEMGRAPH_DATA_STRUCTURES_QUEUE_MPSC_QUEUE_HPP
#include <atomic>
#include <memory>
@ -36,6 +36,9 @@ class MpscQueue
{
struct Node
{
Node(Node* next, std::unique_ptr<T>&& item)
: next(next), item(std::forward<std::unique_ptr<T>>(item)) {}
std::atomic<Node*> next;
std::unique_ptr<T> item;
};
@ -43,9 +46,18 @@ class MpscQueue
public:
MpscQueue()
{
std::atomic_init(&stub.next, nullptr);
std::atomic_init(&head, &stub);
tail = &stub;
auto stub = new Node(nullptr, nullptr);
head.store(stub);
tail = stub;
}
~MpscQueue()
{
// purge all elements from the queue
while(pop()) {}
// we are left with a stub, delete that
delete tail;
}
MpscQueue(MpscQueue&) = delete;
@ -58,9 +70,9 @@ public:
* @param item std::unique_ptr<T> An item to push into the queue
* @return void
*/
void push(std::unique_ptr<T> item)
void push(std::unique_ptr<T>&& item)
{
push(new Node { nullptr, std::move(item) });
push(new Node(nullptr, std::forward<std::unique_ptr<T>>(item)));
}
/** @brief Pops a node from the queue.
@ -72,68 +84,26 @@ public:
*/
std::unique_ptr<T> pop()
{
auto last = tail;
auto tail = this->tail;
// serialization point wrt producers
auto next = tail->next.load(std::memory_order_acquire);
// there was a time when there weren't any items in the queue so a
// stub was pushed
// ... [STUB] <-- T
if (last == &stub)
if(next)
{
// check if this still is the only item in the queue
// H --> [STUB] <-- T
if(next == nullptr)
return nullptr;
// if it's not, the queue looks something like this
// H --> [n] <- ... <- [1] <- [STUB] <- T
// remove the stub from the queue
// H --> [n] <- ... <- [1] <--+--[STUB] +-- T
// remove the last stub from the queue
// make [2] the next stub and return it's data
//
// H --> [n] <- ... <- [2] <--+--[STUB] +-- T
// | |
// +-----------+
tail = next;
last = next;
next = next->next.load(std::memory_order_acquire);
}
this->tail = next;
if(next)
{
// node [1] has next, swap tail to next
// H --> [3] <- [2] <- [1] <- T
tail = next;
// delete the stub node
// H --> [n] <- ... <- [STUB] <-- T
delete tail;
// using unique ptr so that last will be deleted on return :D
auto last_unique = std::unique_ptr<Node>(last);
// move the item from the last node to the caller
return std::move(last->item);
}
auto first = head.load(std::memory_order_acquire);
// no need to push the stub if more elements exist in the queue
// H --> [n] <- ... <- [1] <-- T
if(tail != first)
return nullptr;
// the state looks like this
// H --> [1] <-- T
// so we need to push the stub in order to pop [1]
// H --> [STUB] <- [1] <-- T
push(&stub);
next = tail->next;
if(next)
{
// remove the item [1] from the queue
// H --> [n] <- ... <- [STUB] <--+--[1] +-- T
// | |
// +--------+
tail = next;
return tail;
return std::move(next->item);
}
return nullptr;
@ -142,7 +112,6 @@ public:
private:
std::atomic<Node*> head;
Node* tail;
Node stub;
/** @brief Pushes a new node into the queue.
*
@ -154,14 +123,23 @@ private:
void push(Node* node)
{
// initial state
// H --> [3] <- [2] <- [1] <-- T
auto old = head.exchange(node);
// H --> [3] <- [2] <- [STUB] <-- T
// serialization point wrt producers, acquire-release
auto old = head.exchange(node, std::memory_order_acq_rel);
// after exchange
// H --> [4] [3] <- [2] <- [1] <-- T
// H --> [4] [3] <- [2] <- [STUB] <-- T
// this is the window of inconsistency, if the producer is blocked
// here, the consumer is also blocked. but this window is extremely
// small, it's followed by a store operation which is a
// serialization point wrt consumer
// old holds a pointer to node [3] and we need to link the [3] to a
// newly created node [4] using release semantics
// serialization point wrt consumer, release
old->next.store(node, std::memory_order_release);
// finally, we have a queue like this

70
mpsc.cpp Normal file
View File

@ -0,0 +1,70 @@
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include "data_structures/queue/mpsc_queue.hpp"
static constexpr int N = 1000;
std::atomic<bool> alive { true };
using q_t = lockfree::MpscQueue<std::string>;
void produce(q_t& q)
{
std::hash<std::thread::id> hasher;
for(int i = 0; i < N; ++i)
{
auto str = std::to_string(i) + " " +
std::to_string(hasher(std::this_thread::get_id()));
q.push(std::make_unique<std::string>(str));
}
}
void consume(q_t& q)
{
using namespace std::chrono_literals;
int i = 0;
while(true)
{
auto message = q.pop();
if(message != nullptr)
{
std::cerr << (i++) << ": " << *message << std::endl;
continue;
}
if(!alive)
return;
std::this_thread::sleep_for(10ms);
}
}
int main(void)
{
constexpr int THREADS = 256;
q_t q;
auto consumer = std::thread([&q]() { consume(q); });
std::vector<std::thread> threads;
for(int i = 0; i < THREADS; ++i)
threads.push_back(std::thread([&q]() { produce(q); }));
for(auto& thread : threads)
thread.join();
std::cout << "FINISHED" << std::endl;
alive.store(false);
consumer.join();
return 0;
}