From 9652f8e26585275cd64bc33d41862c30876974bc Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Fri, 13 Sep 2019 13:26:59 +0200
Subject: [PATCH] Add `clear` method to SkipList

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2385
---
 src/utils/skip_list.hpp  | 86 +++++++++++++++++++++++++++++-----------
 tests/unit/skip_list.cpp | 55 +++++++++++++++++++++++++
 2 files changed, 117 insertions(+), 24 deletions(-)

diff --git a/src/utils/skip_list.hpp b/src/utils/skip_list.hpp
index 414fd7e1d..c0b901c39 100644
--- a/src/utils/skip_list.hpp
+++ b/src/utils/skip_list.hpp
@@ -187,24 +187,12 @@ class SkipListGc final {
   SkipListGc(SkipListGc &&other) = delete;
   SkipListGc &operator=(SkipListGc &&other) = delete;
 
-  ~SkipListGc() {
-    Block *head = head_.load(std::memory_order_acquire);
-    while (head != nullptr) {
-      Allocator<Block> block_allocator(memory_);
-      Block *prev = head->prev.load(std::memory_order_acquire);
-      head->~Block();
-      block_allocator.deallocate(head, 1);
-      head = prev;
-    }
-    std::optional<TDeleted> item;
-    while ((item = deleted_.Pop())) {
-      size_t bytes = SkipListNodeSize(*item->second);
-      item->second->~TNode();
-      memory_->Deallocate(item->second, bytes);
-    }
-  }
+  ~SkipListGc() { Clear(); }
 
   uint64_t AllocateId() {
+#ifndef NDEBUG
+    alive_accessors_.fetch_add(1, std::memory_order_acq_rel);
+#endif
     return accessor_id_.fetch_add(1, std::memory_order_acq_rel);
   }
 
@@ -238,6 +226,9 @@ class SkipListGc final {
         break;
       }
     }
+#ifndef NDEBUG
+    alive_accessors_.fetch_add(-1, std::memory_order_acq_rel);
+#endif
   }
 
   void Collect(TNode *node) {
@@ -311,6 +302,36 @@ class SkipListGc final {
 
   MemoryResource *GetMemoryResource() const { return memory_; }
 
+  void Clear() {
+#ifndef NDEBUG
+    CHECK(alive_accessors_ == 0)
+        << "The SkipList can't be cleared while there are existing accessors!";
+#endif
+    // Delete all allocated blocks.
+    Block *head = head_.load(std::memory_order_acquire);
+    while (head != nullptr) {
+      Allocator<Block> block_allocator(memory_);
+      Block *prev = head->prev.load(std::memory_order_acquire);
+      head->~Block();
+      block_allocator.deallocate(head, 1);
+      head = prev;
+    }
+
+    // Delete all items that have to be garbage collected.
+    std::optional<TDeleted> item;
+    while ((item = deleted_.Pop())) {
+      size_t bytes = SkipListNodeSize(*item->second);
+      item->second->~TNode();
+      memory_->Deallocate(item->second, bytes);
+    }
+
+    // Reset all variables.
+    accessor_id_ = 0;
+    head_ = nullptr;
+    tail_ = nullptr;
+    last_id_ = 0;
+  }
+
  private:
   MemoryResource *memory_;
   SpinLock lock_;
@@ -319,6 +340,9 @@ class SkipListGc final {
   std::atomic<Block *> tail_{nullptr};
   uint64_t last_id_{0};
   TStack deleted_;
+#ifndef NDEBUG
+  std::atomic<uint64_t> alive_accessors_{0};
+#endif
 };
 
 /// Concurrent skip list. It is mostly lock-free and fine-grained locking is
@@ -824,14 +848,9 @@ class SkipList final {
 
   ~SkipList() {
     if (head_ != nullptr) {
-      TNode *curr = head_->nexts[0].load(std::memory_order_acquire);
-      while (curr != nullptr) {
-        TNode *succ = curr->nexts[0].load(std::memory_order_acquire);
-        size_t bytes = SkipListNodeSize(*curr);
-        curr->~TNode();
-        GetMemoryResource()->Deallocate(curr, bytes);
-        curr = succ;
-      }
+      // Remove all items from the list.
+      clear();
+
       // We need to free the `head_` node manually because we didn't call its
       // constructor (see the note in the `SkipList` constructor). We mustn't
       // call the `TObj` destructor because we didn't call its constructor.
@@ -851,6 +870,25 @@ class SkipList final {
 
   MemoryResource *GetMemoryResource() const { return gc_.GetMemoryResource(); }
 
+  /// This function removes all elements from the list.
+  /// NOTE: The function *isn't* thread-safe. It must be called while there are
+  /// no more active accessors using the list.
+  void clear() {
+    TNode *curr = head_->nexts[0].load(std::memory_order_acquire);
+    while (curr != nullptr) {
+      TNode *succ = curr->nexts[0].load(std::memory_order_acquire);
+      size_t bytes = SkipListNodeSize(*curr);
+      curr->~TNode();
+      GetMemoryResource()->Deallocate(curr, bytes);
+      curr = succ;
+    }
+    for (int layer = 0; layer < kSkipListMaxHeight; ++layer) {
+      head_->nexts[layer] = nullptr;
+    }
+    size_ = 0;
+    gc_.Clear();
+  }
+
  private:
   template <typename TKey>
   int find_node(const TKey &key, TNode *preds[], TNode *succs[]) const {
diff --git a/tests/unit/skip_list.cpp b/tests/unit/skip_list.cpp
index f9f82cbbc..00e1b215c 100644
--- a/tests/unit/skip_list.cpp
+++ b/tests/unit/skip_list.cpp
@@ -346,6 +346,61 @@ TEST(SkipList, Move) {
   }
 }
 
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST(SkipList, Clear) {
+  utils::SkipList<int64_t> list;
+
+  {
+    auto acc = list.access();
+    for (int64_t i = -1000; i <= 1000; ++i) {
+      acc.insert(i);
+    }
+    ASSERT_EQ(acc.size(), 2001);
+  }
+
+  {
+    auto acc = list.access();
+    int64_t val = -1000;
+    for (auto &item : acc) {
+      ASSERT_EQ(item, val);
+      ++val;
+    }
+    ASSERT_EQ(val, 1001);
+    ASSERT_EQ(acc.size(), 2001);
+  }
+
+  list.clear();
+
+  {
+    auto acc = list.access();
+    uint64_t count = 0;
+    for (auto it = acc.begin(); it != acc.end(); ++it) {
+      ++count;
+    }
+    ASSERT_EQ(count, 0);
+    ASSERT_EQ(acc.size(), 0);
+  }
+
+  {
+    auto acc = list.access();
+    for (int64_t i = -1000; i <= 1000; ++i) {
+      acc.insert(i);
+    }
+    ASSERT_EQ(acc.size(), 2001);
+  }
+
+  {
+    auto acc = list.access();
+    int64_t val = -1000;
+    for (auto &item : acc) {
+      ASSERT_EQ(item, val);
+      ++val;
+    }
+    ASSERT_EQ(val, 1001);
+    ASSERT_EQ(acc.size(), 2001);
+  }
+}
+
 struct Inception {
   uint64_t id;
   utils::SkipList<uint64_t> data;