From ebdee4e5098a6190ce40d26acf42782f65760a8d Mon Sep 17 00:00:00 2001
From: Dominik Gleich <dominik.gleich@memgraph.io>
Date: Wed, 7 Jun 2017 10:15:08 +0200
Subject: [PATCH] Skiplist garbage collector rework.

Summary:
Drawing:
https://drive.google.com/open?id=0B-W7PQZqMD9hcG04b0lKaGZGOWM

Reviewers: mislav.bradac, buda, florijan

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D395
---
 .ycm_extra_conf.py                            |   3 +-
 CMakeLists.txt                                |  12 +-
 libs/CMakeLists.txt                           |   4 +-
 src/data_structures/concurrent/skiplist.hpp   |  14 +-
 .../concurrent/skiplist_gc.cpp                |   6 +
 .../concurrent/skiplist_gc.hpp                | 215 ++++++++++++------
 src/query/plan_compiler.hpp                   |   8 +-
 src/query/plan_template_cpp                   |   2 +
 src/utils/executioner.hpp                     |  89 ++++++++
 src/utils/scheduler.hpp                       |   3 +
 tests/integration/query_engine.cpp            |   2 +
 tests/unit/dbms_recovery.cpp                  |   4 +-
 tests/unit/executioner.cpp                    |  60 +++++
 tests/unit/recovery.cpp                       |  14 +-
 tests/unit/skiplist_gc.cpp                    | 152 +++++++------
 15 files changed, 432 insertions(+), 156 deletions(-)
 create mode 100644 src/data_structures/concurrent/skiplist_gc.cpp
 create mode 100644 src/utils/executioner.hpp
 create mode 100644 tests/unit/executioner.cpp

diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py
index 3a773a4d1..70398c84a 100644
--- a/.ycm_extra_conf.py
+++ b/.ycm_extra_conf.py
@@ -24,7 +24,8 @@ BASE_FLAGS = [
     '-I./libs/googletest/googletest/include',
     '-I./libs/googletest/googlemock/include',
     '-I./libs/benchmark/include',
-    '-I./libs/antlr4/runtime/Cpp/runtime/src'
+    '-I./libs/antlr4/runtime/Cpp/runtime/src',
+    '-I./build/libs/gflags/include'
 ]
 
 SOURCE_EXTENSIONS = [
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9c24514ac..6b2cb7275 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -57,6 +57,7 @@ add_custom_target(clean_all
 
 # threading
 find_package(Threads REQUIRED)
+
 # optional readline
 find_package(Readline REQUIRED)
 if (READLINE_FOUND)
@@ -367,6 +368,7 @@ set(memgraph_src_files
     ${src_dir}/logging/log.cpp
     ${src_dir}/database/graph_db.cpp
     ${src_dir}/database/graph_db_accessor.cpp
+    ${src_dir}/data_structures/concurrent/skiplist_gc.cpp
     ${src_dir}/query/stripper.cpp
     ${src_dir}/query/console.cpp
     ${src_dir}/query/frontend/ast/cypher_main_visitor.cpp
@@ -382,19 +384,19 @@ set(memgraph_src_files
 
 # STATIC library used by memgraph executables
 add_library(memgraph_lib STATIC ${memgraph_src_files})
-target_link_libraries(memgraph_lib stdc++fs)
+target_link_libraries(memgraph_lib stdc++fs gflags)
 add_dependencies(memgraph_lib generate_opencypher_parser
                               generate_plan_compiler_flags)
 # executables that require memgraph_lib should link MEMGRAPH_ALL_LIBS to link all dependant libraries
-set(MEMGRAPH_ALL_LIBS gflags memgraph_lib stdc++fs Threads::Threads fmt yaml-cpp antlr_opencypher_parser_lib dl)
+set(MEMGRAPH_ALL_LIBS memgraph_lib stdc++fs Threads::Threads fmt yaml-cpp antlr_opencypher_parser_lib dl)
 if (READLINE_FOUND)
     list(APPEND MEMGRAPH_ALL_LIBS ${READLINE_LIBRARY})
 endif()
 # -----------------------------------------------------------------------------
 
 # STATIC PIC library used by query engine
-add_library(memgraph_pic STATIC ${memgraph_src_files})
-target_link_libraries(memgraph_pic stdc++fs)
+add_library(memgraph_pic STATIC)
+target_link_libraries(memgraph_pic stdc++fs gflags)
 add_dependencies(memgraph_pic generate_opencypher_parser
                               generate_plan_compiler_flags)
 set_property(TARGET memgraph_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
@@ -414,7 +416,7 @@ endif()
 # -----------------------------------------------------------------------------
 
 execute_process(
-    COMMAND ./recursive_include --roots ${src_dir} ${libs_dir} --start ${src_dir}/query/plan_template_cpp --copy ${CMAKE_BINARY_DIR}/include
+    COMMAND ./recursive_include --roots ${src_dir} ${libs_dir} ${CMAKE_BINARY_DIR}/libs/gflags/include --start ${src_dir}/query/plan_template_cpp --copy ${CMAKE_BINARY_DIR}/include
     WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/cmake
 )
 
diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt
index b2aa6d6dd..80dc828ea 100644
--- a/libs/CMakeLists.txt
+++ b/libs/CMakeLists.txt
@@ -17,9 +17,11 @@ add_subdirectory(fmt)
 add_subdirectory(googletest)
 
 # setup google flags
-set(BUILD_gflags_LIB ON)
+set(GFLAGS_BUILD_gflags_nothreads_LIB OFF)
+set(GFLAGS_BUILD_gflags_LIB ON)
 add_subdirectory(gflags)
 
+
 # setup yaml cpp
 # disable tests because yaml doesn't have MASTER_PROJECT flag like fmt has
 # to override an option use option :)
diff --git a/src/data_structures/concurrent/skiplist.hpp b/src/data_structures/concurrent/skiplist.hpp
index 2c8220081..cd149fd31 100644
--- a/src/data_structures/concurrent/skiplist.hpp
+++ b/src/data_structures/concurrent/skiplist.hpp
@@ -581,23 +581,23 @@ class SkipList : private Lockable<lock_t> {
   class Accessor {
     friend class SkipList;
 
-    Accessor(SkipList *skiplist) : skiplist(skiplist) {
+    Accessor(SkipList *skiplist)
+        : skiplist(skiplist), status_(skiplist->gc.CreateNewAccessor()) {
       debug_assert(skiplist != nullptr, "Skiplist is nullptr.");
-
-      skiplist->gc.AddRef();
     }
 
    public:
     Accessor(const Accessor &) = delete;
 
-    Accessor(Accessor &&other) : skiplist(other.skiplist) {
+    Accessor(Accessor &&other)
+        : skiplist(other.skiplist), status_(other.status_) {
       other.skiplist = nullptr;
     }
 
     ~Accessor() {
       if (skiplist == nullptr) return;
 
-      skiplist->gc.ReleaseRef();
+      status_.alive_ = false;
     }
 
     Iterator begin() { return skiplist->begin(); }
@@ -698,6 +698,7 @@ class SkipList : private Lockable<lock_t> {
    private:
     SkipList *skiplist;
     Node *preds[H], *succs[H];
+    typename SkipListGC<Node>::AccessorStatus &status_;
   };
 
   Accessor access() { return Accessor(this); }
@@ -1139,7 +1140,6 @@ class SkipList : private Lockable<lock_t> {
       for (int level = height - 1; level >= 0; --level)
         preds[level]->forward(level, node->forward(level));
 
-      // TODO: review and test
       gc.Collect(node);
 
       count.fetch_sub(1);
@@ -1152,5 +1152,5 @@ class SkipList : private Lockable<lock_t> {
    */
   std::atomic<size_t> count{0};
   Node *header;
-  SkiplistGC<Node> gc;
+  SkipListGC<Node> gc;
 };
diff --git a/src/data_structures/concurrent/skiplist_gc.cpp b/src/data_structures/concurrent/skiplist_gc.cpp
new file mode 100644
index 000000000..a5982951e
--- /dev/null
+++ b/src/data_structures/concurrent/skiplist_gc.cpp
@@ -0,0 +1,6 @@
+#include "skiplist_gc.hpp"
+#include "gflags/gflags.h"
+
+DEFINE_int32(skiplist_gc_interval, 10,
+             "Interval of how often does skiplist gc run in seconds. To "
+             "disable set to 0.");
diff --git a/src/data_structures/concurrent/skiplist_gc.hpp b/src/data_structures/concurrent/skiplist_gc.hpp
index 6eb287035..014db3901 100644
--- a/src/data_structures/concurrent/skiplist_gc.hpp
+++ b/src/data_structures/concurrent/skiplist_gc.hpp
@@ -1,78 +1,165 @@
 #pragma once
 
-// TODO: remove from here and from the project
-#include <functional>
-#include <iostream>
-#include <thread>
+#include <malloc.h>
 
+#include <list>
+#include <mutex>
+#include <utility>
+
+#include "gflags/gflags.h"
+
+#include "data_structures/concurrent/concurrent_list.hpp"
 #include "logging/loggable.hpp"
-#include "memory/freelist.hpp"
-#include "memory/lazy_gc.hpp"
-#include "threading/global_pool.hpp"
+
 #include "threading/sync/spinlock.hpp"
-#include "utils/assert.hpp"
+#include "utils/executioner.hpp"
 
-template <class T, class lock_t = SpinLock>
-class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>,
-                   public Loggable {
+DECLARE_int32(skiplist_gc_interval);
+
+/**
+ * @brief Garbage collects nodes.
+ * We are doing garbage collection by keeping track of alive accessors which
+ * were requested from the parent skiplist. When some prefix [id, id+n] of
+ * accessors becomes dead we try to empty the collection of (accessors_id,
+ * entry*) with the id of that last dead accessor. Each entry is added to
+ * collection after it has been re-linked and can't be seen by any accessors
+ * created after that time and that marks the safe time for deleting entry.
+ * @Tparam TNode - type of underlying pointer to objects which will be
+ * collected.
+ */
+template <class TNode>
+class SkipListGC : public Loggable {
  public:
-  SkiplistGC() : Loggable("SkiplistGC") {}
+  explicit SkipListGC() : Loggable("SkipListGc") {
+    executor_job_id_ = GetExecutioner().RegisterJob(
+        std::bind(&SkipListGC::GarbageCollect, this));
+  }
 
-  /**
-   * ReleaseRef method should be called by some thread which finishes access to
-   * skiplist. If thread reference_count_ becomes zero, all objects in the
-   * local_freelist are going to be deleted. The only problem with this approach
-   * is that GC may never be called, but for now we can deal with that.
-   */
-  void ReleaseRef() {
-    // This has to be a shared_ptr since std::function requires that the
-    // callable object be copy-constructable.
-    std::shared_ptr<std::vector<T *>> local_freelist =
-        std::make_shared<std::vector<T *>>();
-
-    // take freelist if there is no more threads
-    {
-      auto lock = this->acquire_unique();
-      debug_assert(this->reference_count_ > 0, "Count is equal to zero.");
-      --this->reference_count_;
-      if (this->reference_count_ == 0) {
-        freelist_.swap(*local_freelist);
-      }
-    }
-
-    if (local_freelist->size() > 0) {
-      // We need to use a Global thread pool, otherwise we run into problems
-      // because each skiplist would use it's own thread pool and that would
-      // invoke too many threads.
-      GlobalPool::getSingletonInstance()->run(std::bind(
-          [](std::shared_ptr<std::vector<T *>> local_freelist) {
-            // Comment logger out for now because we can't send an instance of
-            // this to global pool because this SkipListGc instance could be
-            // destroyed before the GC starts and as such will SEGFAULT.
-
-            // logger.trace("GC started");
-            // logger.trace("Local list size: {}", local_freelist->size());
-            long long destroyed = 0;
-            // destroy all elements from local_freelist
-            for (auto element : *local_freelist) {
-              if (element->flags.is_marked()) {
-                T::destroy(element);
-                destroyed++;
-              } else {
-                //    logger.warn(
-                //       "Unmarked node appeared in the collection ready for "
-                //      "destruction.");
-              }
-            }
-            //  logger.trace("Number of destroyed elements: {}", destroyed);
-          },
-          local_freelist));
+  ~SkipListGC() {
+    // We have to unregister the job because otherwise Executioner might access
+    // some member variables of this class after it has been destructed.
+    GetExecutioner().UnRegisterJob(executor_job_id_);
+    for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) {
+      TNode::destroy(it->second);
+      it.remove();
     }
   }
 
-  void Collect(T *node) { freelist_.add(node); }
+  /**
+   * @brief - Returns instance of executioner shared between all SkipLists.
+   */
+  auto &GetExecutioner() {
+    static Executioner executioner(
+        (std::chrono::seconds(FLAGS_skiplist_gc_interval)));
+
+    return executioner;
+  }
+
+  SkipListGC(const SkipListGC &other) = delete;
+  SkipListGC(SkipListGC &&other) = delete;
+  SkipListGC operator=(const SkipListGC &other) = delete;
+  SkipListGC operator=(SkipListGC &&other) = delete;
+
+  /**
+   * @brief - Keep track of each accessor with it's status, so we know which
+   * ones are alive and which ones are dead.
+   */
+  struct AccessorStatus {
+    AccessorStatus(const int64_t id, bool alive) : id_(id), alive_(alive) {}
+
+    AccessorStatus(AccessorStatus &&other) = default;
+
+    AccessorStatus(const AccessorStatus &other) = delete;
+    AccessorStatus operator=(const AccessorStatus &other) = delete;
+    AccessorStatus operator=(AccessorStatus &&other) = delete;
+
+    const int64_t id_{-1};
+    bool alive_{false};
+  };
+
+  /**
+   * @brief - Creates a new accessors and returns reference to it's status. This
+   * method is thread-safe.
+   */
+  AccessorStatus &CreateNewAccessor() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    accessors_.emplace_back(++last_accessor_id_, true);
+    return accessors_.back();
+  }
+
+  /**
+   * @brief - Destroys objects which were previously collected and can be safely
+   * removed. This method is not thread-safe.
+   */
+  void GarbageCollect() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    auto last_dead_accessor = accessors_.end();
+    for (auto it = accessors_.begin(); it != accessors_.end(); ++it) {
+      if (it->alive_) break;
+      last_dead_accessor = it;
+    }
+    // We didn't find any dead accessor and that means we are not sure that we
+    // can delete anything.
+    if (last_dead_accessor == accessors_.end()) return;
+    // We don't need lock anymore because we are not modifying this structure
+    // anymore, or accessing it any further down.
+    const int64_t safe_id = last_dead_accessor->id_;
+    accessors_.erase(accessors_.begin(), ++last_dead_accessor);
+    lock.unlock();
+
+    // We can only modify this in a not-thread safe way because we are the only
+    // thread ever accessing it here, i.e. there is at most one thread doing
+    // this GarbageCollection.
+    auto oldest_not_deletable = deleted_list_.begin();
+    bool delete_all = true;
+    for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) {
+      if (it->first > safe_id) {
+        // We have to increase iterator manually because the copy assignment
+        // operator is deleted.
+        while (oldest_not_deletable != it) ++oldest_not_deletable;
+        delete_all = false;
+      }
+    }
+
+    // deleted_list is already empty, nothing to delete here.
+    if (oldest_not_deletable == deleted_list_.end()) return;
+
+    // In case we didn't find anything that we can't delete we shouldn't
+    // increment this because that would mean we skip over the first record
+    // which is ready for destruction.
+    if (!delete_all) ++oldest_not_deletable;
+    int64_t destroyed = 0;
+    for (auto &it = oldest_not_deletable; it != deleted_list_.end(); ++it) {
+      TNode::destroy(it->second);
+      it.remove();
+      ++destroyed;
+    }
+    if (destroyed) logger.trace("Number of destroyed elements: {}", destroyed);
+  }
+
+  /**
+   * @brief - Collect object for garbage collection. Call to this method means
+   * that no new accessor can possibly access the object by iterating over some
+   * storage.
+   */
+  void Collect(TNode *object) {
+    // We can afford some inaccuary here - it's possible that some new accessor
+    // incremented the last_accessor_id after we enter this method and as such
+    // we might be a bit pessimistic here.
+    deleted_list_.begin().push(
+        std::make_pair(last_accessor_id_.load(), object));
+  }
 
  private:
-  // We use FreeList since it's thread-safe.
-  FreeList<T *> freelist_;
+  int64_t executor_job_id_{-1};
+  std::mutex mutex_;
+  std::mutex singleton_mutex_;
+
+  // List of accesssors from begin to end by an increasing id.
+  std::list<AccessorStatus> accessors_;
+  std::atomic<int64_t> last_accessor_id_{0};
+
+  // List of pairs of accessor_ids and pointers to entries which should be
+  // destroyed sorted approximately descendingly by id.
+  ConcurrentList<std::pair<int64_t, TNode *>> deleted_list_;
 };
diff --git a/src/query/plan_compiler.hpp b/src/query/plan_compiler.hpp
index a4c866e79..20ba36405 100644
--- a/src/query/plan_compiler.hpp
+++ b/src/query/plan_compiler.hpp
@@ -34,10 +34,10 @@ class PlanCompiler : public Loggable {
 #ifdef HARDCODED_OUTPUT_STREAM
                      "-DHARDCODED_OUTPUT_STREAM",
 #endif
-                     in_file,         // input file
-                     "-o", out_file,  // ouput file
-                     include_dirs, link_dirs, "-lmemgraph_pic",
-                     "-shared -fPIC"},  // shared library flags
+                     in_file,                  // input file
+                     "-o", out_file,           // ouput file
+                     include_dirs, link_dirs,  //"-lmemgraph_pic",
+                     "-shared -fPIC"},         // shared library flags
                     " ");
 
     logger.debug("compile command -> {}", compile_command);
diff --git a/src/query/plan_template_cpp b/src/query/plan_template_cpp
index a9229c995..a9f0b7a63 100644
--- a/src/query/plan_template_cpp
+++ b/src/query/plan_template_cpp
@@ -1,6 +1,8 @@
 #include <iostream>
 #include <string>
 
+#include "gflags/gflags.h"
+#include "gflags/gflags_declare.h"
 #include "data_structures/bitset/static_bitset.hpp"
 #include "communication/bolt/v1/encoder/result_stream.hpp"
 #include "io/network/socket.hpp"
diff --git a/src/utils/executioner.hpp b/src/utils/executioner.hpp
new file mode 100644
index 000000000..f6e54487e
--- /dev/null
+++ b/src/utils/executioner.hpp
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <algorithm>
+#include <mutex>
+#include <vector>
+
+#include "utils/scheduler.hpp"
+
+/**
+ * @brief - Provides execution of jobs in job queue on one thread with 'pause'
+ * time between two consecutives starts.
+ */
+class Executioner {
+ public:
+  template <typename TRep, typename TPeriod>
+  Executioner(const std::chrono::duration<TRep, TPeriod> pause) {
+    if (pause != pause.zero())
+      scheduler_.Run(pause, std::bind(&Executioner::Execute, this));
+  }
+
+  ~Executioner() {
+    // Be sure to first stop scheduler because otherwise we might destroy the
+    // mutex before the scheduler and that might cause problems since mutex is
+    // used in Execute method passed to scheduler along with jobs vector.
+    scheduler_.Stop();
+  }
+
+  /**
+   * @brief - Add function to job queue.
+   */
+  int64_t RegisterJob(const std::function<void()> &f) {
+    {
+      std::unique_lock<std::mutex> lock(update_mutex_);
+      id_job_pairs_.emplace_back(std::make_pair(++count_, f));
+      return id_job_pairs_.back().first;
+    }
+  }
+
+  /**
+   * @brief - Remove id from job queue.
+   */
+  void UnRegisterJob(const int64_t id) {
+    {
+      // First wait for execute lock and then for the update lock because
+      // execute lock will be unavailable for longer and there is no point in
+      // blocking other threads with update lock.
+      std::unique_lock<std::mutex> execute_lock(execute_mutex_);
+      std::unique_lock<std::mutex> update_lock(update_mutex_);
+
+      for (auto id_job_pair_it = id_job_pairs_.begin();
+           id_job_pair_it != id_job_pairs_.end(); ++id_job_pair_it) {
+        if (id_job_pair_it->first == id) {
+          id_job_pairs_.erase(id_job_pair_it);
+          return;
+        }
+      }
+    }
+  }
+
+ private:
+  /**
+   * @brief - Execute method executes jobs from id_job_pairs vector.
+   * The reason for doing double locking is the following: we don't want to
+   * block creation of new jobs since that will slow down all of memgraph so we
+   * use a special lock for job update. Execute lock is here so that we can
+   * guarantee that after some job is unregistered it's also stopped.
+   */
+  void Execute() {
+    std::unique_lock<std::mutex> execute_lock(execute_mutex_);
+    std::vector<std::pair<int, std::function<void()>>> id_job_pairs;
+
+    // Acquire newest current version of jobs but being careful not to access
+    // the vector in corrupt state.
+    {
+      std::unique_lock<std::mutex> update_lock(update_mutex_);
+      id_job_pairs = id_job_pairs_;
+    }
+
+    for (auto id_job_pair : id_job_pairs) {
+      id_job_pair.second();
+    }
+  }
+
+  int64_t count_{0};
+  std::mutex execute_mutex_;
+  std::mutex update_mutex_;
+  Scheduler<std::mutex> scheduler_;
+  std::vector<std::pair<int, std::function<void()>>> id_job_pairs_;
+};
diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp
index 9e5a6295d..9a3447d05 100644
--- a/src/utils/scheduler.hpp
+++ b/src/utils/scheduler.hpp
@@ -1,3 +1,5 @@
+#pragma once
+
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
@@ -27,6 +29,7 @@ class Scheduler {
   void Run(const std::chrono::duration<TRep, TPeriod> &pause,
            const std::function<void()> &f) {
     debug_assert(is_working_ == false, "Thread already running.");
+    debug_assert(pause > std::chrono::seconds(0), "Pause is invalid.");
     is_working_ = true;
     thread_ = std::thread([this, pause, f]() {
       auto start_time = std::chrono::system_clock::now();
diff --git a/tests/integration/query_engine.cpp b/tests/integration/query_engine.cpp
index 22e2d2b8f..95b31ea38 100644
--- a/tests/integration/query_engine.cpp
+++ b/tests/integration/query_engine.cpp
@@ -1,5 +1,7 @@
 #define HARDCODED_OUTPUT_STREAM
 
+#include "gflags/gflags.h"
+
 #include "config/config.hpp"
 #include "dbms/dbms.hpp"
 #include "query_engine_common.hpp"
diff --git a/tests/unit/dbms_recovery.cpp b/tests/unit/dbms_recovery.cpp
index afd599f25..ae8386c2d 100644
--- a/tests/unit/dbms_recovery.cpp
+++ b/tests/unit/dbms_recovery.cpp
@@ -69,14 +69,14 @@ void RecoverDbms() {
   std::vector<EdgeAccessor> edges;
 
   int vertex_count = 0;
-  for (auto const &vertex : dba->vertices()) {
+  for (auto const &vertex : dba->vertices(false)) {
     vertices.push_back(vertex);
     vertex_count++;
   }
   EXPECT_EQ(vertex_count, 3);
 
   int edge_count = 0;
-  for (auto const &edge : dba->edges()) {
+  for (auto const &edge : dba->edges(false)) {
     EXPECT_NE(vertices.end(),
               std::find(vertices.begin(), vertices.end(), edge.to()));
     EXPECT_NE(vertices.end(),
diff --git a/tests/unit/executioner.cpp b/tests/unit/executioner.cpp
new file mode 100644
index 000000000..01c3ef694
--- /dev/null
+++ b/tests/unit/executioner.cpp
@@ -0,0 +1,60 @@
+#include <atomic>
+#include <memory>
+
+#include "gtest/gtest.h"
+
+#include "utils/executioner.hpp"
+
+TEST(Executioner, DontRun) {
+  std::atomic<int> count{0};
+  {
+    Executioner exec(std::chrono::seconds(0));
+    // Be sure executioner is sleeping.
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    exec.RegisterJob([&count]() { ++count; });
+    // Try to wait to test if executioner might somehow become awake and execute
+    // the job.
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  }
+  EXPECT_EQ(count, 0);
+}
+
+TEST(Executioner, Run) {
+  std::atomic<int> count{0};
+  {
+    Executioner exec(std::chrono::milliseconds(500));
+    // Be sure executioner is sleeping.
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    exec.RegisterJob([&count]() { ++count; });
+    exec.RegisterJob([&count]() { ++count; });
+    exec.RegisterJob([&count]() { ++count; });
+
+    // Be sure executioner execute thread is triggered
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  }
+  EXPECT_EQ(count, 3);
+}
+
+TEST(Executioner, RunUnregister) {
+  std::atomic<int> count1{0};
+  std::atomic<int> count2{0};
+  {
+    Executioner exec(std::chrono::milliseconds(500));
+    // Be sure executioner is sleeping.
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    auto job = exec.RegisterJob([&count1]() { ++count1; });
+    exec.RegisterJob([&count2]() { ++count2; });
+
+    // Be sure executioner execute thread is triggered
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+    exec.UnRegisterJob(job);
+
+    // Be sure executioner execute thread is triggered
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+  }
+  EXPECT_EQ(count1, 1);
+  EXPECT_EQ(count2, 2);
+}
diff --git a/tests/unit/recovery.cpp b/tests/unit/recovery.cpp
index 0e4767ab4..058ec2b59 100644
--- a/tests/unit/recovery.cpp
+++ b/tests/unit/recovery.cpp
@@ -125,9 +125,9 @@ TEST_F(RecoveryTest, TestEncoding) {
   buffer.Close();
 
   permanent_assert(static_cast<int>(to.size()) == 2,
-                    "There should be two edges.");
+                   "There should be two edges.");
   permanent_assert(static_cast<int>(from.size()) == 2,
-                    "There should be two edges.");
+                   "There should be two edges.");
 
   EXPECT_EQ(buffer.hash(), summary.hash_);
   EXPECT_NE(edge_types.end(),
@@ -161,14 +161,14 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) {
 
   auto dba = dbms_recover.active();
   int64_t vertex_count = 0;
-  for (const auto &vertex : dba->vertices()) {
+  for (const auto &vertex : dba->vertices(false)) {
     vertices.push_back(vertex);
     vertex_count++;
   }
   EXPECT_EQ(vertex_count, 3);
 
   int64_t edge_count = 0;
-  for (const auto &edge : dba->edges()) {
+  for (const auto &edge : dba->edges(false)) {
     EXPECT_NE(vertices.end(),
               std::find(vertices.begin(), vertices.end(), edge.to()));
     EXPECT_NE(vertices.end(),
@@ -177,7 +177,7 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) {
     edge_count++;
   }
   permanent_assert(static_cast<int>(edges.size()) == 2,
-                    "There should be two edges.");
+                   "There should be two edges.");
 
   EXPECT_EQ(edge_count, 2);
   EXPECT_TRUE(edges[0].to() == edges[1].to());
@@ -201,7 +201,7 @@ TEST_F(RecoveryTest, TestEncodingAndRecovering) {
 
   auto dba_get = dbms_recover.active();
   int64_t vertex_count = 0;
-  for (const auto &vertex : dba_get->vertices()) {
+  for (const auto &vertex : dba_get->vertices(false)) {
     EXPECT_EQ(vertex.labels().size(), 1);
     EXPECT_TRUE(vertex.has_label(dba_get->label("label")));
     query::TypedValue prop =
@@ -213,7 +213,7 @@ TEST_F(RecoveryTest, TestEncodingAndRecovering) {
   EXPECT_EQ(vertex_count, 1000);
 
   int64_t edge_count = 0;
-  for (const auto &edge : dba_get->edges()) {
+  for (const auto &edge : dba_get->edges(false)) {
     EXPECT_EQ(edge.edge_type(), dba_get->edge_type("type"));
     query::TypedValue prop =
         query::TypedValue(edge.PropsAt(dba_get->property("prop")));
diff --git a/tests/unit/skiplist_gc.cpp b/tests/unit/skiplist_gc.cpp
index 3b3a80ec9..6068176d0 100644
--- a/tests/unit/skiplist_gc.cpp
+++ b/tests/unit/skiplist_gc.cpp
@@ -1,10 +1,11 @@
+#include "gflags/gflags.h"
 #include "gtest/gtest.h"
 
 #include <chrono>
 #include <memory>
 #include <thread>
 
-#include "data_structures/concurrent/skiplist.hpp"
+#include "data_structures/concurrent/skiplist_gc.hpp"
 #include "logging/streams/stderr.hpp"
 
 /**
@@ -23,82 +24,103 @@ class FakeItem {
     return this->value > item.value;
   }
 
+  static void destroy(FakeItem *item) { delete item; }
+
  private:
   std::atomic<int> &count;
   int value;
 };
 
-TEST(SkipListGC, TripleScopeGC) {
-  SkipList<FakeItem> skiplist;
-  std::atomic<int> *count = new std::atomic<int>{0};
+DECLARE_int32(skiplist_gc_interval);
 
-  auto item = FakeItem(*count, 1);
+TEST(SkipListGC, CreateNewAccessors) {
+  FLAGS_skiplist_gc_interval = 0;
+  SkipListGC<FakeItem> gc;
+  auto &accessor1 = gc.CreateNewAccessor();
+  auto &accessor2 = gc.CreateNewAccessor();
+  auto &accessor3 = gc.CreateNewAccessor();
+
+  EXPECT_EQ(accessor1.id_, 1);
+  EXPECT_EQ(accessor2.id_, 2);
+  EXPECT_EQ(accessor3.id_, 3);
+
+  accessor1.alive_ = false;
+  accessor2.alive_ = false;
+  accessor3.alive_ = false;
+}
+
+TEST(SkipListGC, DeleteItem) {
+  FLAGS_skiplist_gc_interval = 0;
+  SkipListGC<FakeItem> gc;
+  auto &accessor1 = gc.CreateNewAccessor();
+  std::atomic<int> count{0};
+  auto item1 = new FakeItem(count, 1);
+  gc.Collect(item1);
+
+  // Kill the accesssor
+  accessor1.alive_ = false;
+  gc.GarbageCollect();
+  EXPECT_EQ(count, 1);
+}
+
+TEST(SkipListGC, DontDeleteItem) {
+  FLAGS_skiplist_gc_interval = 0;
+  SkipListGC<FakeItem> gc;
+  auto &accessor1 = gc.CreateNewAccessor();
+  auto &accessor2 = gc.CreateNewAccessor();
+  std::atomic<int> count{0};
+  auto item1 = new FakeItem(count, 1);
+  gc.Collect(item1);
+
+  // Kill the accesssor
+  accessor2.alive_ = false;
+
+  // Nothing deleted because accessor1 is blocking.
+  gc.GarbageCollect();
+  EXPECT_EQ(count, 0);
+
+  // Accessor 1 is not blocking anymore.
+  accessor1.alive_ = false;
+  gc.GarbageCollect();
+  EXPECT_EQ(count, 1);
+}
+
+TEST(SkipListGC, Destructor) {
+  FLAGS_skiplist_gc_interval = 0;
+  std::atomic<int> count{0};
+  auto item1 = new FakeItem(count, 1);
   {
-    auto access_1 = skiplist.access();
-    {
-      auto access_2 = skiplist.access();
-      {
-        auto access_3 = skiplist.access();
-        access_1.insert(item);  // add with 1
-        access_2.remove(item);  // remove with 2
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        EXPECT_EQ(*count, 0);
-      }
-      std::this_thread::sleep_for(std::chrono::milliseconds(100));
-      EXPECT_EQ(*count, 0);
-    }
-    std::this_thread::sleep_for(std::chrono::milliseconds(100));
-    EXPECT_EQ(*count, 0);
-  }  // scope end - GC called
-  for (int i = 0; i < 10; ++i) {
-    if (*count != 0) break;
-    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    SkipListGC<FakeItem> gc;
+    gc.Collect(item1);
+    EXPECT_EQ(count, 0);
   }
-  EXPECT_EQ(*count, 1);
+  EXPECT_EQ(count, 1);
 }
 
-TEST(SkipListGC, BlockedGCNoGC) {
-  SkipList<FakeItem> skiplist;
-  std::atomic<int> *count = new std::atomic<int>{0};
-  auto item = FakeItem(*count, 1);
-  auto blocking_access = skiplist.access();
-  {
-    auto access = skiplist.access();
-    access.insert(item);
-    access.remove(item);
-  }  // scope end - GC still isn't called because of blocking_access
-  std::this_thread::sleep_for(std::chrono::milliseconds(100));
-  EXPECT_EQ(*count, 0);
-}
+TEST(SkipListGC, MultipleDeletes) {
+  FLAGS_skiplist_gc_interval = 0;
+  SkipListGC<FakeItem> gc;
+  std::atomic<int> count{0};
+  auto &accessor1 = gc.CreateNewAccessor();
+  auto item1 = new FakeItem(count, 1);
+  gc.Collect(item1);
 
-TEST(SkipListGC, NotInScopeGC) {
-  SkipList<FakeItem> skiplist;
-  std::atomic<int> *count = new std::atomic<int>{0};
-  auto item = FakeItem(*count, 1);
-  {
-    auto access = skiplist.access();
-    access.insert(item);
-    access.remove(item);
-  }  // scope end - GC called
-  for (int i = 0; i < 10; ++i) {
-    if (*count == 1) break;
-    std::this_thread::sleep_for(std::chrono::milliseconds(100));
-  }
-  // If this count is not 1 that means we are still doing GC in the background
-  // and might crash the test if we try to modify count variable after it's been
-  // deallocated.
-  ASSERT_EQ(*count, 1);
-}
+  auto &accessor2 = gc.CreateNewAccessor();
+  auto item2 = new FakeItem(count, 1);
+  gc.Collect(item2);
 
-TEST(SkipListGC, StillInScopeNoGC) {
-  SkipList<FakeItem> skiplist;
-  std::atomic<int> *count = new std::atomic<int>{0};
-  auto item = FakeItem(*count, 1);
-  auto access = skiplist.access();
-  access.insert(item);
-  access.remove(item);
-  std::this_thread::sleep_for(std::chrono::milliseconds(100));
-  EXPECT_EQ(*count, 0);
+  auto &accessor3 = gc.CreateNewAccessor();
+  auto item3 = new FakeItem(count, 1);
+  gc.Collect(item3);
+
+  accessor1.alive_ = false;
+  accessor2.alive_ = false;
+  gc.GarbageCollect();
+  EXPECT_EQ(count, 2);
+
+  accessor3.alive_ = false;
+  gc.GarbageCollect();
+  EXPECT_EQ(count, 3);
 }
 
 int main(int argc, char **argv) {