Ring buffer added

Summary: Locked version. There are some benchmarks, it seems the lock won't be the bottleneck in the WAL (DB ops causing WAL delta insertions into it will be slower, flushing the WAL be slower).

Reviewers: buda, mislav.bradac, dgleich

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D919
This commit is contained in:
florijan 2017-10-19 13:03:05 +02:00
parent 4460dd79f9
commit add801a80a
3 changed files with 184 additions and 0 deletions

View File

@ -0,0 +1,65 @@
#pragma once
#include <atomic>
#include <chrono>
#include <experimental/optional>
#include <mutex>
#include <thread>
#include <utility>
#include "glog/logging.h"
#include "threading/sync/spinlock.hpp"
/**
* A thread-safe ring buffer. Multi-producer, multi-consumer. Producers get
* blocked if the buffer is full. Consumers get returnd a nullopt.
*
* @tparam TElement - type of element the buffer tracks.
*/
template <typename TElement, int capacity>
class RingBuffer {
public:
RingBuffer() = default;
RingBuffer(const RingBuffer &) = delete;
RingBuffer(RingBuffer &&) = delete;
RingBuffer &operator=(const RingBuffer &) = delete;
RingBuffer &operator=(RingBuffer &&) = delete;
template <typename... TArgs>
void emplace(TArgs &&... args) {
while (true) {
{
std::lock_guard<SpinLock> guard(lock_);
if (size_ < capacity) {
buffer_[write_pos_++] = TElement(std::forward<TArgs>(args)...);
write_pos_ %= capacity;
size_++;
return;
}
}
// Log a warning approximately once per second if buffer is full.
LOG_EVERY_N(WARNING, 4000) << "RingBuffer full: worker waiting";
// Sleep time determined using tests/benchmark/ring_buffer.cpp
std::this_thread::sleep_for(std::chrono::microseconds(250));
}
}
std::experimental::optional<TElement> pop() {
std::lock_guard<SpinLock> guard(lock_);
if (size_ == 0) return std::experimental::nullopt;
size_--;
std::experimental::optional<TElement> result(
std::move(buffer_[read_pos_++]));
read_pos_ %= capacity;
return result;
}
private:
TElement buffer_[capacity];
SpinLock lock_;
int read_pos_{0};
int write_pos_{0};
int size_{0};
};

View File

@ -0,0 +1,29 @@
#include <benchmark/benchmark.h>
#include <benchmark/benchmark_api.h>
#include <glog/logging.h>
#include <iostream>
#include "data_structures/ring_buffer.hpp"
class RingBufferMultiThreaded : public benchmark::Fixture {
protected:
RingBuffer<int, 1024> buffer;
};
BENCHMARK_DEFINE_F(RingBufferMultiThreaded, MT)(benchmark::State &st) {
while (st.KeepRunning()) {
buffer.emplace(42);
buffer.pop();
}
}
BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(1);
BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(4);
BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(16);
BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(64);
int main(int argc, char **argv) {
::benchmark::Initialize(&argc, argv);
::benchmark::RunSpecifiedBenchmarks();
return 0;
}

View File

@ -0,0 +1,90 @@
#include <thread>
#include <unordered_set>
#include "gtest/gtest.h"
#include "data_structures/ring_buffer.hpp"
#include "threading/sync/spinlock.hpp"
TEST(RingBuffer, MultithreadedUsage) {
auto test_f = [](int producer_count, int elems_per_producer,
int producer_sleep_ms, int consumer_count,
int consumer_sleep_ms) {
std::unordered_set<int> consumed;
SpinLock consumed_lock;
RingBuffer<int, 20> buffer;
std::vector<std::thread> producers;
for (int i = 0; i < producer_count; i++)
producers.emplace_back(
[i, elems_per_producer, producer_sleep_ms, &buffer]() {
for (int j = 0; j < elems_per_producer; j++) {
std::this_thread::sleep_for(
std::chrono::milliseconds(producer_sleep_ms));
buffer.emplace(j + i * elems_per_producer);
}
});
std::vector<std::thread> consumers;
size_t elem_total_count = producer_count * elems_per_producer;
for (int i = 0; i < consumer_count; i++)
consumers.emplace_back([elem_total_count, consumer_sleep_ms, &buffer,
&consumed, &consumed_lock]() {
while (true) {
std::this_thread::sleep_for(
std::chrono::milliseconds(consumer_sleep_ms));
std::lock_guard<SpinLock> guard(consumed_lock);
if (consumed.size() == elem_total_count) break;
auto value = buffer.pop();
if (value) consumed.emplace(*value);
}
});
for (auto &producer : producers) producer.join();
for (auto &consumer : consumers) consumer.join();
return !buffer.pop() && consumed.size() == elem_total_count;
};
// Many slow producers, many fast consumers.
EXPECT_TRUE(test_f(10, 200, 3, 10, 0));
// Many fast producers, many slow consumers.
EXPECT_TRUE(test_f(10, 200, 0, 10, 3));
// One slower producer, many consumers.
EXPECT_TRUE(test_f(1, 500, 3, 10, 0));
// Many producers, one slow consumer.
EXPECT_TRUE(test_f(10, 200, 0, 1, 3));
}
TEST(RingBuffer, ComplexValues) {
RingBuffer<std::vector<int>, 10> buffer;
std::vector<int> element;
for (int i = 0 ; i < 5 ; i++) {
element.emplace_back(i);
buffer.emplace(element);
}
element.clear();
for (int i = 0 ; i < 5 ; i++) {
element.emplace_back(i);
EXPECT_EQ(*buffer.pop(), element);
}
EXPECT_FALSE(buffer.pop());
}
TEST(RingBuffer, NonCopyable) {
RingBuffer<std::unique_ptr<std::string>, 10> buffer;
buffer.emplace(new std::string("string"));
buffer.emplace(new std::string("kifla"));
EXPECT_EQ(**buffer.pop(), "string");
EXPECT_EQ(**buffer.pop(), "kifla");
EXPECT_FALSE(buffer.pop());
std::unique_ptr<std::string> a(new std::string("bla"));
}