memgraph/src/data_structures/queue/mpsc_queue.hpp

150 lines
4.1 KiB
C++
Raw Normal View History

#pragma once
2015-10-06 05:24:38 +08:00
#include <atomic>
#include <memory>
namespace lockfree
{
/** @brief Multiple-Producer Single-Consumer Queue
* A wait-free (*) multiple-producer single-consumer queue.
*
* features:
* - wait-free
* - fast producers (only one atomic XCHG and and one atomic store with
* release semantics)
* - extremely fast consumer (only atomic loads with acquire semantics on
* the fast path and atomic loads + atomic XCHG on the slow path)
* - no need for order reversion -> pop() is always O(1)
* - ABA free
*
* great for using in loggers, garbage collectors etc.
*
* (*) there is a small window of inconsistency from the lock free design
* see the url below for details
* URL: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
*
* mine is not intrusive for better modularity, but with slightly worse
* performance because it needs to do two memory allocations instead of
* one
*
* @tparam T Type of the items to store in the queue
*/
template <class T>
class MpscQueue
{
struct Node
{
2015-10-07 03:02:47 +08:00
Node(Node* next, std::unique_ptr<T>&& item)
: next(next), item(std::forward<std::unique_ptr<T>>(item)) {}
2015-10-06 05:24:38 +08:00
std::atomic<Node*> next;
std::unique_ptr<T> item;
};
public:
MpscQueue()
{
2015-10-07 03:02:47 +08:00
auto stub = new Node(nullptr, nullptr);
head.store(stub);
tail = stub;
}
~MpscQueue()
{
// purge all elements from the queue
while(pop()) {}
// we are left with a stub, delete that
delete tail;
2015-10-06 05:24:38 +08:00
}
MpscQueue(MpscQueue&) = delete;
MpscQueue(MpscQueue&&) = delete;
/** @brief Pushes an item into the queue.
*
* Pushes an item into the front of the queue.
*
* @param item std::unique_ptr<T> An item to push into the queue
* @return void
*/
2015-10-07 03:02:47 +08:00
void push(std::unique_ptr<T>&& item)
2015-10-06 05:24:38 +08:00
{
2015-10-07 03:02:47 +08:00
push(new Node(nullptr, std::forward<std::unique_ptr<T>>(item)));
2015-10-06 05:24:38 +08:00
}
/** @brief Pops a node from the queue.
*
* Pops and returns a node from the back of the queue.
*
* @return std::unique_ptr<T> A pointer to the node popped from the
* queue, nullptr if nothing was popped
*/
std::unique_ptr<T> pop()
{
2015-10-07 03:02:47 +08:00
auto tail = this->tail;
// serialization point wrt producers
2015-10-06 05:24:38 +08:00
auto next = tail->next.load(std::memory_order_acquire);
2015-10-07 03:02:47 +08:00
if(next)
2015-10-06 05:24:38 +08:00
{
2015-10-07 03:02:47 +08:00
// remove the last stub from the queue
// make [2] the next stub and return it's data
//
// H --> [n] <- ... <- [2] <--+--[STUB] +-- T
2015-10-06 05:24:38 +08:00
// | |
// +-----------+
2015-10-07 03:02:47 +08:00
this->tail = next;
2015-10-06 05:24:38 +08:00
2015-10-07 03:02:47 +08:00
// delete the stub node
// H --> [n] <- ... <- [STUB] <-- T
delete tail;
2015-10-06 05:24:38 +08:00
2015-10-07 03:02:47 +08:00
return std::move(next->item);
2015-10-06 05:24:38 +08:00
}
return nullptr;
}
private:
std::atomic<Node*> head;
Node* tail;
/** @brief Pushes a new node into the queue.
*
* Pushes a new node containing the item into the front of the queue.
*
* @param node Node* A pointer to node you want to push into the queue
* @return void
*/
void push(Node* node)
{
// initial state
2015-10-07 03:02:47 +08:00
// H --> [3] <- [2] <- [STUB] <-- T
// serialization point wrt producers, acquire-release
auto old = head.exchange(node, std::memory_order_acq_rel);
2015-10-06 05:24:38 +08:00
// after exchange
2015-10-07 03:02:47 +08:00
// H --> [4] [3] <- [2] <- [STUB] <-- T
// this is the window of inconsistency, if the producer is blocked
// here, the consumer is also blocked. but this window is extremely
// small, it's followed by a store operation which is a
// serialization point wrt consumer
2015-10-06 05:24:38 +08:00
// old holds a pointer to node [3] and we need to link the [3] to a
// newly created node [4] using release semantics
2015-10-07 03:02:47 +08:00
// serialization point wrt consumer, release
2015-10-06 05:24:38 +08:00
old->next.store(node, std::memory_order_release);
// finally, we have a queue like this
// H --> [4] <- [3] <- [2] <- [1] <-- T
}
};
}