diff --git a/src/data_structures/ring_buffer.hpp b/src/data_structures/ring_buffer.hpp new file mode 100644 index 000000000..4d6574673 --- /dev/null +++ b/src/data_structures/ring_buffer.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include <atomic> +#include <chrono> +#include <experimental/optional> +#include <mutex> +#include <thread> +#include <utility> + +#include "glog/logging.h" + +#include "threading/sync/spinlock.hpp" + +/** + * A thread-safe ring buffer. Multi-producer, multi-consumer. Producers get + * blocked if the buffer is full. Consumers get returnd a nullopt. + * + * @tparam TElement - type of element the buffer tracks. + */ +template <typename TElement, int capacity> +class RingBuffer { + public: + RingBuffer() = default; + RingBuffer(const RingBuffer &) = delete; + RingBuffer(RingBuffer &&) = delete; + RingBuffer &operator=(const RingBuffer &) = delete; + RingBuffer &operator=(RingBuffer &&) = delete; + + template <typename... TArgs> + void emplace(TArgs &&... args) { + while (true) { + { + std::lock_guard<SpinLock> guard(lock_); + if (size_ < capacity) { + buffer_[write_pos_++] = TElement(std::forward<TArgs>(args)...); + write_pos_ %= capacity; + size_++; + return; + } + } + + // Log a warning approximately once per second if buffer is full. + LOG_EVERY_N(WARNING, 4000) << "RingBuffer full: worker waiting"; + // Sleep time determined using tests/benchmark/ring_buffer.cpp + std::this_thread::sleep_for(std::chrono::microseconds(250)); + } + } + + std::experimental::optional<TElement> pop() { + std::lock_guard<SpinLock> guard(lock_); + if (size_ == 0) return std::experimental::nullopt; + size_--; + std::experimental::optional<TElement> result( + std::move(buffer_[read_pos_++])); + read_pos_ %= capacity; + return result; + } + + private: + TElement buffer_[capacity]; + SpinLock lock_; + int read_pos_{0}; + int write_pos_{0}; + int size_{0}; +}; diff --git a/tests/benchmark/data_structures/ring_buffer.cpp b/tests/benchmark/data_structures/ring_buffer.cpp new file mode 100644 index 000000000..fc92216f9 --- /dev/null +++ b/tests/benchmark/data_structures/ring_buffer.cpp @@ -0,0 +1,29 @@ +#include <benchmark/benchmark.h> +#include <benchmark/benchmark_api.h> +#include <glog/logging.h> +#include <iostream> + +#include "data_structures/ring_buffer.hpp" + +class RingBufferMultiThreaded : public benchmark::Fixture { + protected: + RingBuffer<int, 1024> buffer; +}; + +BENCHMARK_DEFINE_F(RingBufferMultiThreaded, MT)(benchmark::State &st) { + while (st.KeepRunning()) { + buffer.emplace(42); + buffer.pop(); + } +} + +BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(1); +BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(4); +BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(16); +BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(64); + +int main(int argc, char **argv) { + ::benchmark::Initialize(&argc, argv); + ::benchmark::RunSpecifiedBenchmarks(); + return 0; +} diff --git a/tests/unit/ring_buffer.cpp b/tests/unit/ring_buffer.cpp new file mode 100644 index 000000000..df3e23db7 --- /dev/null +++ b/tests/unit/ring_buffer.cpp @@ -0,0 +1,90 @@ +#include <thread> +#include <unordered_set> + +#include "gtest/gtest.h" + +#include "data_structures/ring_buffer.hpp" +#include "threading/sync/spinlock.hpp" + +TEST(RingBuffer, MultithreadedUsage) { + auto test_f = [](int producer_count, int elems_per_producer, + int producer_sleep_ms, int consumer_count, + int consumer_sleep_ms) { + + std::unordered_set<int> consumed; + SpinLock consumed_lock; + RingBuffer<int, 20> buffer; + + std::vector<std::thread> producers; + for (int i = 0; i < producer_count; i++) + producers.emplace_back( + [i, elems_per_producer, producer_sleep_ms, &buffer]() { + for (int j = 0; j < elems_per_producer; j++) { + std::this_thread::sleep_for( + std::chrono::milliseconds(producer_sleep_ms)); + buffer.emplace(j + i * elems_per_producer); + } + }); + + std::vector<std::thread> consumers; + size_t elem_total_count = producer_count * elems_per_producer; + for (int i = 0; i < consumer_count; i++) + consumers.emplace_back([elem_total_count, consumer_sleep_ms, &buffer, + &consumed, &consumed_lock]() { + while (true) { + std::this_thread::sleep_for( + std::chrono::milliseconds(consumer_sleep_ms)); + std::lock_guard<SpinLock> guard(consumed_lock); + if (consumed.size() == elem_total_count) break; + auto value = buffer.pop(); + if (value) consumed.emplace(*value); + } + }); + + for (auto &producer : producers) producer.join(); + for (auto &consumer : consumers) consumer.join(); + + return !buffer.pop() && consumed.size() == elem_total_count; + }; + + // Many slow producers, many fast consumers. + EXPECT_TRUE(test_f(10, 200, 3, 10, 0)); + + // Many fast producers, many slow consumers. + EXPECT_TRUE(test_f(10, 200, 0, 10, 3)); + + // One slower producer, many consumers. + EXPECT_TRUE(test_f(1, 500, 3, 10, 0)); + + // Many producers, one slow consumer. + EXPECT_TRUE(test_f(10, 200, 0, 1, 3)); +} + +TEST(RingBuffer, ComplexValues) { + RingBuffer<std::vector<int>, 10> buffer; + std::vector<int> element; + for (int i = 0 ; i < 5 ; i++) { + element.emplace_back(i); + buffer.emplace(element); + } + + element.clear(); + for (int i = 0 ; i < 5 ; i++) { + element.emplace_back(i); + EXPECT_EQ(*buffer.pop(), element); + } + + EXPECT_FALSE(buffer.pop()); +} + +TEST(RingBuffer, NonCopyable) { + RingBuffer<std::unique_ptr<std::string>, 10> buffer; + buffer.emplace(new std::string("string")); + buffer.emplace(new std::string("kifla")); + + EXPECT_EQ(**buffer.pop(), "string"); + EXPECT_EQ(**buffer.pop(), "kifla"); + EXPECT_FALSE(buffer.pop()); + + std::unique_ptr<std::string> a(new std::string("bla")); +}