From add801a80a0e753db50c134ad2d5d4420b006633 Mon Sep 17 00:00:00 2001
From: florijan <florijan@memgraph.io>
Date: Thu, 19 Oct 2017 13:03:05 +0200
Subject: [PATCH] Ring buffer added

Summary: Locked version. There are some benchmarks, it seems the lock won't be the bottleneck in the WAL (DB ops causing WAL delta insertions into it will be slower, flushing the WAL be slower).

Reviewers: buda, mislav.bradac, dgleich

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D919
---
 src/data_structures/ring_buffer.hpp           | 65 ++++++++++++++
 .../benchmark/data_structures/ring_buffer.cpp | 29 ++++++
 tests/unit/ring_buffer.cpp                    | 90 +++++++++++++++++++
 3 files changed, 184 insertions(+)
 create mode 100644 src/data_structures/ring_buffer.hpp
 create mode 100644 tests/benchmark/data_structures/ring_buffer.cpp
 create mode 100644 tests/unit/ring_buffer.cpp

diff --git a/src/data_structures/ring_buffer.hpp b/src/data_structures/ring_buffer.hpp
new file mode 100644
index 000000000..4d6574673
--- /dev/null
+++ b/src/data_structures/ring_buffer.hpp
@@ -0,0 +1,65 @@
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <experimental/optional>
+#include <mutex>
+#include <thread>
+#include <utility>
+
+#include "glog/logging.h"
+
+#include "threading/sync/spinlock.hpp"
+
+/**
+ * A thread-safe ring buffer. Multi-producer, multi-consumer. Producers get
+ * blocked if the buffer is full. Consumers get returnd a nullopt.
+ *
+ * @tparam TElement - type of element the buffer tracks.
+ */
+template <typename TElement, int capacity>
+class RingBuffer {
+ public:
+  RingBuffer() = default;
+  RingBuffer(const RingBuffer &) = delete;
+  RingBuffer(RingBuffer &&) = delete;
+  RingBuffer &operator=(const RingBuffer &) = delete;
+  RingBuffer &operator=(RingBuffer &&) = delete;
+
+  template <typename... TArgs>
+  void emplace(TArgs &&... args) {
+    while (true) {
+      {
+        std::lock_guard<SpinLock> guard(lock_);
+        if (size_ < capacity) {
+          buffer_[write_pos_++] = TElement(std::forward<TArgs>(args)...);
+          write_pos_ %= capacity;
+          size_++;
+          return;
+        }
+      }
+
+      // Log a warning approximately once per second if buffer is full.
+      LOG_EVERY_N(WARNING, 4000) << "RingBuffer full: worker waiting";
+      // Sleep time determined using tests/benchmark/ring_buffer.cpp
+      std::this_thread::sleep_for(std::chrono::microseconds(250));
+    }
+  }
+
+  std::experimental::optional<TElement> pop() {
+    std::lock_guard<SpinLock> guard(lock_);
+    if (size_ == 0) return std::experimental::nullopt;
+    size_--;
+    std::experimental::optional<TElement> result(
+        std::move(buffer_[read_pos_++]));
+    read_pos_ %= capacity;
+    return result;
+  }
+
+ private:
+  TElement buffer_[capacity];
+  SpinLock lock_;
+  int read_pos_{0};
+  int write_pos_{0};
+  int size_{0};
+};
diff --git a/tests/benchmark/data_structures/ring_buffer.cpp b/tests/benchmark/data_structures/ring_buffer.cpp
new file mode 100644
index 000000000..fc92216f9
--- /dev/null
+++ b/tests/benchmark/data_structures/ring_buffer.cpp
@@ -0,0 +1,29 @@
+#include <benchmark/benchmark.h>
+#include <benchmark/benchmark_api.h>
+#include <glog/logging.h>
+#include <iostream>
+
+#include "data_structures/ring_buffer.hpp"
+
+class RingBufferMultiThreaded : public benchmark::Fixture {
+ protected:
+  RingBuffer<int, 1024> buffer;
+};
+
+BENCHMARK_DEFINE_F(RingBufferMultiThreaded, MT)(benchmark::State &st) {
+  while (st.KeepRunning()) {
+    buffer.emplace(42);
+    buffer.pop();
+  }
+}
+
+BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(1);
+BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(4);
+BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(16);
+BENCHMARK_REGISTER_F(RingBufferMultiThreaded, MT)->Threads(64);
+
+int main(int argc, char **argv) {
+  ::benchmark::Initialize(&argc, argv);
+  ::benchmark::RunSpecifiedBenchmarks();
+  return 0;
+}
diff --git a/tests/unit/ring_buffer.cpp b/tests/unit/ring_buffer.cpp
new file mode 100644
index 000000000..df3e23db7
--- /dev/null
+++ b/tests/unit/ring_buffer.cpp
@@ -0,0 +1,90 @@
+#include <thread>
+#include <unordered_set>
+
+#include "gtest/gtest.h"
+
+#include "data_structures/ring_buffer.hpp"
+#include "threading/sync/spinlock.hpp"
+
+TEST(RingBuffer, MultithreadedUsage) {
+  auto test_f = [](int producer_count, int elems_per_producer,
+                   int producer_sleep_ms, int consumer_count,
+                   int consumer_sleep_ms) {
+
+    std::unordered_set<int> consumed;
+    SpinLock consumed_lock;
+    RingBuffer<int, 20> buffer;
+
+    std::vector<std::thread> producers;
+    for (int i = 0; i < producer_count; i++)
+      producers.emplace_back(
+          [i, elems_per_producer, producer_sleep_ms, &buffer]() {
+            for (int j = 0; j < elems_per_producer; j++) {
+              std::this_thread::sleep_for(
+                  std::chrono::milliseconds(producer_sleep_ms));
+              buffer.emplace(j + i * elems_per_producer);
+            }
+          });
+
+    std::vector<std::thread> consumers;
+    size_t elem_total_count = producer_count * elems_per_producer;
+    for (int i = 0; i < consumer_count; i++)
+      consumers.emplace_back([elem_total_count, consumer_sleep_ms, &buffer,
+                              &consumed, &consumed_lock]() {
+        while (true) {
+          std::this_thread::sleep_for(
+              std::chrono::milliseconds(consumer_sleep_ms));
+          std::lock_guard<SpinLock> guard(consumed_lock);
+          if (consumed.size() == elem_total_count) break;
+          auto value = buffer.pop();
+          if (value) consumed.emplace(*value);
+        }
+      });
+
+    for (auto &producer : producers) producer.join();
+    for (auto &consumer : consumers) consumer.join();
+
+    return !buffer.pop() && consumed.size() == elem_total_count;
+  };
+
+  // Many slow producers, many fast consumers.
+  EXPECT_TRUE(test_f(10, 200, 3, 10, 0));
+
+  // Many fast producers, many slow consumers.
+  EXPECT_TRUE(test_f(10, 200, 0, 10, 3));
+
+  // One slower producer, many consumers.
+  EXPECT_TRUE(test_f(1, 500, 3, 10, 0));
+
+  // Many producers, one slow consumer.
+  EXPECT_TRUE(test_f(10, 200, 0, 1, 3));
+}
+
+TEST(RingBuffer, ComplexValues) {
+  RingBuffer<std::vector<int>, 10> buffer;
+  std::vector<int> element;
+  for (int i = 0 ; i < 5 ; i++) {
+    element.emplace_back(i);
+    buffer.emplace(element);
+  }
+
+  element.clear();
+  for (int i = 0 ; i < 5 ; i++) {
+    element.emplace_back(i);
+    EXPECT_EQ(*buffer.pop(), element);
+  }
+
+  EXPECT_FALSE(buffer.pop());
+}
+
+TEST(RingBuffer, NonCopyable) {
+  RingBuffer<std::unique_ptr<std::string>, 10> buffer;
+  buffer.emplace(new std::string("string"));
+  buffer.emplace(new std::string("kifla"));
+
+  EXPECT_EQ(**buffer.pop(), "string");
+  EXPECT_EQ(**buffer.pop(), "kifla");
+  EXPECT_FALSE(buffer.pop());
+
+  std::unique_ptr<std::string> a(new std::string("bla"));
+}