Add thread safe queue

Reviewers: dgleich, florijan

Reviewed By: dgleich

Differential Revision: https://phabricator.memgraph.io/D979
This commit is contained in:
Mislav Bradac 2017-11-13 16:43:47 +01:00
parent 85ef12def5
commit 5c0f378394
2 changed files with 204 additions and 0 deletions

View File

@ -0,0 +1,65 @@
#pragma once
#include <condition_variable>
#include <cstdint>
#include <experimental/optional>
#include <iostream>
#include <mutex>
#include <queue>
// Thread safe queue. Probably doesn't perform very well, but it works.
template <typename T>
class Queue {
public:
Queue() = default;
Queue(const Queue &) = delete;
Queue &operator=(const Queue &) = delete;
Queue(Queue &&) = delete;
Queue &operator=(Queue &&) = delete;
void Push(T x) {
std::unique_lock<std::mutex> guard(mutex_);
queue_.emplace(std::move(x));
guard.unlock();
cvar_.notify_one();
}
template <typename... Args>
void Emplace(Args &&... args) {
std::unique_lock<std::mutex> guard(mutex_);
queue_.emplace(std::forward<Args>(args)...);
guard.unlock();
cvar_.notify_one();
}
int64_t size() const {
std::unique_lock<std::mutex> guard(mutex_);
return queue_.size();
}
bool empty() const {
std::unique_lock<std::mutex> guard(mutex_);
return queue_.empty();
}
T AwaitPop() {
std::unique_lock<std::mutex> guard(mutex_);
cvar_.wait(guard, [this]() { return !queue_.empty(); });
auto x = std::move(queue_.front());
queue_.pop();
return x;
}
std::experimental::optional<T> MaybePop() {
std::unique_lock<std::mutex> guard(mutex_);
if (queue_.empty()) return std::experimental::nullopt;
auto x = std::move(queue_.front());
queue_.pop();
return x;
}
private:
std::queue<T> queue_;
std::condition_variable cvar_;
mutable std::mutex mutex_;
};

139
tests/unit/queue.cpp Normal file
View File

@ -0,0 +1,139 @@
#include <atomic>
#include <chrono>
#include <experimental/optional>
#include <string>
#include <thread>
#include <utility>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "data_structures/queue.hpp"
namespace {
using namespace std::literals::chrono_literals;
TEST(Queue, PushMaybePop) {
Queue<int> q;
q.Push(1);
EXPECT_EQ(*q.MaybePop(), 1);
EXPECT_EQ(q.MaybePop(), std::experimental::nullopt);
q.Push(2);
q.Push(3);
EXPECT_EQ(*q.MaybePop(), 2);
q.Push(4);
q.Push(5);
EXPECT_EQ(*q.MaybePop(), 3);
EXPECT_EQ(*q.MaybePop(), 4);
EXPECT_EQ(*q.MaybePop(), 5);
EXPECT_EQ(q.MaybePop(), std::experimental::nullopt);
}
TEST(Queue, Emplace) {
Queue<std::pair<std::string, int>> q;
q.Emplace("abc", 123);
EXPECT_THAT(*q.MaybePop(), testing::Pair("abc", 123));
}
TEST(Queue, Size) {
Queue<int> q;
EXPECT_EQ(q.size(), 0);
q.Push(1);
EXPECT_EQ(q.size(), 1);
q.Push(1);
EXPECT_EQ(q.size(), 2);
q.MaybePop();
EXPECT_EQ(q.size(), 1);
q.MaybePop();
EXPECT_EQ(q.size(), 0);
q.MaybePop();
EXPECT_EQ(q.size(), 0);
}
TEST(Queue, Empty) {
Queue<int> q;
EXPECT_TRUE(q.empty());
q.Push(1);
EXPECT_FALSE(q.empty());
q.MaybePop();
EXPECT_TRUE(q.empty());
}
TEST(Queue, AwaitPop) {
Queue<int> q;
std::thread t([&] {
q.Push(1);
q.Push(2);
std::this_thread::sleep_for(200ms);
q.Push(3);
q.Push(4);
});
EXPECT_EQ(q.AwaitPop(), 1);
EXPECT_EQ(q.AwaitPop(), 2);
EXPECT_EQ(q.AwaitPop(), 3);
EXPECT_EQ(q.AwaitPop(), 4);
t.join();
}
TEST(Queue, Concurrent) {
Queue<int> q;
const int kNumProducers = 10;
const int kNumConsumers = 10;
const int kNumElementsPerProducer = 300000;
std::vector<std::thread> producers;
std::atomic<int> next{0};
for (int i = 0; i < kNumProducers; ++i) {
producers.emplace_back([&] {
for (int i = 0; i < kNumElementsPerProducer; ++i) {
q.Push(next++);
}
});
}
std::vector<std::thread> consumers;
std::vector<int> retrieved[kNumConsumers];
std::atomic<int> num_retrieved{0};
for (int i = 0; i < kNumConsumers; ++i) {
consumers.emplace_back(
[&](int thread_id) {
while (true) {
int count = num_retrieved++;
if (count >= kNumProducers * kNumElementsPerProducer) break;
retrieved[thread_id].push_back(q.AwaitPop());
}
},
i);
}
for (auto &t : consumers) {
t.join();
}
for (auto &t : producers) {
t.join();
}
EXPECT_EQ(q.MaybePop(), std::experimental::nullopt);
std::set<int> all_elements;
for (auto &r : retrieved) {
all_elements.insert(r.begin(), r.end());
}
EXPECT_EQ(all_elements.size(), kNumProducers * kNumElementsPerProducer);
EXPECT_EQ(*all_elements.begin(), 0);
EXPECT_EQ(*all_elements.rbegin(),
kNumProducers * kNumElementsPerProducer - 1);
}
}