From a417ef36f100f6abae203322ee372b7b123436bd Mon Sep 17 00:00:00 2001
From: Teon Banek <teon.banek@memgraph.io>
Date: Tue, 18 Jun 2019 14:51:28 +0200
Subject: [PATCH] Use MemoryResource in SkipList

Summary:
This is a preparation step in case we want to have a custom allocator in
SkipList. For example, pool based allocator for SkipListNode.
Introduction of MemoryResource and removal of `calloc` has reduced the
performance a bit according to micro benchmarks. This performance hit is
not visible on benchmarks which do more concurrent operations.

Reviewers: mferencevic, mtomic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2140
---
 src/utils/skip_list.hpp              | 99 ++++++++++++++++++++--------
 tests/benchmark/skip_list_vs_stl.cpp |  4 +-
 2 files changed, 74 insertions(+), 29 deletions(-)

diff --git a/src/utils/skip_list.hpp b/src/utils/skip_list.hpp
index 678dea561..07305fb4e 100644
--- a/src/utils/skip_list.hpp
+++ b/src/utils/skip_list.hpp
@@ -13,6 +13,7 @@
 
 #include "utils/bound.hpp"
 #include "utils/linux.hpp"
+#include "utils/memory.hpp"
 #include "utils/on_scope_exit.hpp"
 #include "utils/spin_lock.hpp"
 #include "utils/stack.hpp"
@@ -82,11 +83,28 @@ struct SkipListNode {
   SpinLock lock;
   std::atomic<bool> marked;
   std::atomic<bool> fully_linked;
+  static_assert(std::numeric_limits<uint8_t>::max() >= kSkipListMaxHeight,
+                "Maximum height doesn't fit in uint8_t");
   uint8_t height;
   // uint8_t PAD;
   std::atomic<SkipListNode<TObj> *> nexts[0];
 };
 
+/// Maximum size of a single SkipListNode instance.
+///
+/// This can be used to tune the pool allocator for SkipListNode instances.
+template <typename TObj>
+constexpr size_t MaxSkipListNodeSize() {
+  return sizeof(SkipListNode<TObj>) +
+         kSkipListMaxHeight * sizeof(std::atomic<SkipListNode<TObj> *>);
+}
+
+/// Get the size in bytes of the given SkipListNode instance.
+template <typename TObj>
+size_t SkipListNodeSize(const SkipListNode<TObj> &node) {
+  return sizeof(node) + node.height * sizeof(std::atomic<SkipListNode<TObj> *>);
+}
+
 /// The skip list doesn't have built-in reclamation of removed nodes (objects).
 /// This class handles all operations necessary to remove the nodes safely.
 ///
@@ -133,7 +151,13 @@ class SkipListGc final {
     std::lock_guard<SpinLock> guard(lock_);
     Block *curr_head = head_.load(std::memory_order_acquire);
     if (curr_head == head) {
-      Block *block = new (calloc(1, sizeof(Block))) Block();
+      // Construct through allocator so it propagates if needed.
+      Allocator<Block> block_allocator(memory_);
+      Block *block = block_allocator.allocate(1);
+      // `calloc` would be faster, but the API has no such call.
+      memset(block, 0, sizeof(Block));
+      // Block constructor should not throw.
+      block_allocator.construct(block);
       block->prev.store(curr_head, std::memory_order_release);
       block->succ.store(nullptr, std::memory_order_release);
       block->first_id = last_id_;
@@ -151,7 +175,7 @@ class SkipListGc final {
   }
 
  public:
-  SkipListGc() {
+  explicit SkipListGc(MemoryResource *memory) : memory_(memory) {
     static_assert(sizeof(Block) % kLinuxPageSize == 0,
                   "It is recommended that you set the kSkipListGcBlockSize "
                   "constant so that the size of SkipListGc::Block is a "
@@ -166,15 +190,17 @@ class SkipListGc final {
   ~SkipListGc() {
     Block *head = head_.load(std::memory_order_acquire);
     while (head != nullptr) {
+      Allocator<Block> block_allocator(memory_);
       Block *prev = head->prev.load(std::memory_order_acquire);
       head->~Block();
-      free(head);
+      block_allocator.deallocate(head, 1);
       head = prev;
     }
     std::optional<TDeleted> item;
     while ((item = deleted_.Pop())) {
+      size_t bytes = SkipListNodeSize(*item->second);
       item->second->~TNode();
-      free(item->second);
+      memory_->Deallocate(item->second, bytes);
     }
   }
 
@@ -256,13 +282,14 @@ class SkipListGc final {
       // thread doesn't have a pointer to the block that it got from reading
       // `head_`. We bail out here, this block will be freed next time.
       if (remove_block && next != nullptr) {
+        Allocator<Block> block_allocator(memory_);
         CHECK(tail == tail_.load(std::memory_order_acquire))
             << "Can't remove SkipListGc block that is in the middle!";
         next->prev.store(nullptr, std::memory_order_release);
         tail_.store(next, std::memory_order_release);
         // Destroy the block.
         tail->~Block();
-        free(tail);
+        block_allocator.deallocate(tail, 1);
       }
       tail = next;
     }
@@ -270,8 +297,9 @@ class SkipListGc final {
     std::optional<TDeleted> item;
     while ((item = deleted_.Pop())) {
       if (item->first < last_dead) {
+        size_t bytes = SkipListNodeSize(*item->second);
         item->second->~TNode();
-        free(item->second);
+        memory_->Deallocate(item->second, bytes);
       } else {
         leftover.Push(*item);
       }
@@ -281,7 +309,10 @@ class SkipListGc final {
     }
   }
 
+  MemoryResource *GetMemoryResource() const { return memory_; }
+
  private:
+  MemoryResource *memory_;
   SpinLock lock_;
   std::atomic<uint64_t> accessor_id_{0};
   std::atomic<Block *> head_{nullptr};
@@ -457,6 +488,9 @@ class SkipList final {
   using TNode = SkipListNode<TObj>;
 
  public:
+  /// Allocator type so that STL containers are aware that we need one.
+  using allocator_type = Allocator<TNode>;
+
   class ConstIterator;
 
   class Iterator final {
@@ -749,35 +783,40 @@ class SkipList final {
     uint64_t id_{0};
   };
 
-  SkipList() {
+  explicit SkipList(MemoryResource *memory = NewDeleteResource())
+      : gc_(memory) {
     static_assert(kSkipListMaxHeight <= 32,
                   "The SkipList height must be less or equal to 32!");
-    // Here we use `calloc` instead of `malloc` to ensure that the memory is
-    // filled with zeros before we call the constructor. We don't use `malloc` +
-    // `memset` because `calloc` is smarter:
-    // https://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc/2688522#2688522
-    // Also, here we don't call the `SkipListNode` constructor so that the
-    // `TObj` doesn't have to have a default constructor. The initialization of
-    // the `height` and `lock` variables is done manually.
+    void *ptr = memory->Allocate(MaxSkipListNodeSize<TObj>());
+    // `calloc` would be faster, but the API has no such call.
+    memset(ptr, 0, MaxSkipListNodeSize<TObj>());
+    // Here we don't call the `SkipListNode` constructor so that the `TObj`
+    // doesn't have to have a default constructor. The initialization of the
+    // `height` and `lock` variables is done manually.
     // NOTE: The `head_` node doesn't have a valid `TObj` (because we didn't
     // call the constructor), so you mustn't perform any comparisons using its
     // value.
-    head_ = reinterpret_cast<TNode *>(calloc(
-        1, sizeof(TNode) + kSkipListMaxHeight * sizeof(std::atomic<TNode *>)));
+    head_ = static_cast<TNode *>(ptr);
     head_->height = kSkipListMaxHeight;
     new (&head_->lock) utils::SpinLock();
   }
 
   SkipList(SkipList &&other) noexcept
-      : head_(other.head_), size_(other.size_.load()) {
+      : head_(other.head_),
+        gc_(other.GetMemoryResource()),
+        size_(other.size_.load()) {
     other.head_ = nullptr;
   }
+
   SkipList &operator=(SkipList &&other) noexcept {
+    CHECK(other.GetMemoryResource() == GetMemoryResource())
+        << "Move assignment with different MemoryResource is not supported";
     TNode *head = head_;
     while (head != nullptr) {
       TNode *succ = head->nexts[0].load(std::memory_order_acquire);
+      size_t bytes = SkipListNodeSize(*head);
       head->~TNode();
-      free(head);
+      GetMemoryResource()->Deallocate(head, bytes);
       head = succ;
     }
     head_ = other.head_;
@@ -794,15 +833,16 @@ class SkipList final {
       TNode *curr = head_->nexts[0].load(std::memory_order_acquire);
       while (curr != nullptr) {
         TNode *succ = curr->nexts[0].load(std::memory_order_acquire);
+        size_t bytes = SkipListNodeSize(*curr);
         curr->~TNode();
-        free(curr);
+        GetMemoryResource()->Deallocate(curr, bytes);
         curr = succ;
       }
       // We need to free the `head_` node manually because we didn't call its
       // constructor (see the note in the `SkipList` constructor). We mustn't
       // call the `TObj` destructor because we didn't call its constructor.
       head_->lock.~SpinLock();
-      free(head_);
+      GetMemoryResource()->Deallocate(head_, SkipListNodeSize(*head_));
     }
   }
 
@@ -815,6 +855,8 @@ class SkipList final {
   /// atomic operation.
   uint64_t size() const { return size_.load(std::memory_order_acquire); }
 
+  MemoryResource *GetMemoryResource() const { return gc_.GetMemoryResource(); }
+
  private:
   template <typename TKey>
   int find_node(const TKey &key, TNode *preds[], TNode *succs[]) const {
@@ -876,13 +918,15 @@ class SkipList final {
 
       if (!valid) continue;
 
-      // Here we use `calloc` instead of `malloc` to ensure that the memory is
-      // filled with zeros before we call the constructor. We don't use `malloc`
-      // + `memset` because `calloc` is smarter:
-      // https://stackoverflow.com/questions/2688466/why-mallocmemset-is-slower-than-calloc/2688522#2688522
-      TNode *new_node = new (
-          calloc(1, sizeof(TNode) + top_layer * sizeof(std::atomic<TNode *>)))
-          TNode(top_layer, std::forward<TObjUniv>(object));
+      size_t node_bytes =
+          sizeof(TNode) + top_layer * sizeof(std::atomic<TNode *>);
+      void *ptr = GetMemoryResource()->Allocate(node_bytes);
+      // `calloc` would be faster, but the API has no such call.
+      memset(ptr, 0, node_bytes);
+      auto *new_node = static_cast<TNode *>(ptr);
+      // Construct through allocator so it propagates if needed.
+      Allocator<TNode> allocator(GetMemoryResource());
+      allocator.construct(new_node, top_layer, std::forward<TObjUniv>(object));
 
       // The paper is also wrong here. It states that the loop should go up to
       // `top_layer` which is wrong.
@@ -1118,6 +1162,7 @@ class SkipList final {
 
  private:
   TNode *head_{nullptr};
+  // gc_ also stores the only copy of `MemoryResource *`, to save space.
   mutable SkipListGc<TObj> gc_;
 
   std::mt19937 gen_{std::random_device{}()};
diff --git a/tests/benchmark/skip_list_vs_stl.cpp b/tests/benchmark/skip_list_vs_stl.cpp
index 07fbc2562..c42ccb046 100644
--- a/tests/benchmark/skip_list_vs_stl.cpp
+++ b/tests/benchmark/skip_list_vs_stl.cpp
@@ -19,7 +19,7 @@ class SkipListSetInsertFixture : public benchmark::Fixture {
  protected:
   void SetUp(const benchmark::State &state) override {
     if (state.thread_index == 0) {
-      list = {};
+      list = utils::SkipList<uint64_t>();
     }
   }
 
@@ -178,7 +178,7 @@ class SkipListMapInsertFixture : public benchmark::Fixture {
  protected:
   void SetUp(const benchmark::State &state) override {
     if (state.thread_index == 0) {
-      list = {};
+      list = utils::SkipList<MapObject>();;
     }
   }