diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py index 3a773a4d1..70398c84a 100644 --- a/.ycm_extra_conf.py +++ b/.ycm_extra_conf.py @@ -24,7 +24,8 @@ BASE_FLAGS = [ '-I./libs/googletest/googletest/include', '-I./libs/googletest/googlemock/include', '-I./libs/benchmark/include', - '-I./libs/antlr4/runtime/Cpp/runtime/src' + '-I./libs/antlr4/runtime/Cpp/runtime/src', + '-I./build/libs/gflags/include' ] SOURCE_EXTENSIONS = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c24514ac..6b2cb7275 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ add_custom_target(clean_all # threading find_package(Threads REQUIRED) + # optional readline find_package(Readline REQUIRED) if (READLINE_FOUND) @@ -367,6 +368,7 @@ set(memgraph_src_files ${src_dir}/logging/log.cpp ${src_dir}/database/graph_db.cpp ${src_dir}/database/graph_db_accessor.cpp + ${src_dir}/data_structures/concurrent/skiplist_gc.cpp ${src_dir}/query/stripper.cpp ${src_dir}/query/console.cpp ${src_dir}/query/frontend/ast/cypher_main_visitor.cpp @@ -382,19 +384,19 @@ set(memgraph_src_files # STATIC library used by memgraph executables add_library(memgraph_lib STATIC ${memgraph_src_files}) -target_link_libraries(memgraph_lib stdc++fs) +target_link_libraries(memgraph_lib stdc++fs gflags) add_dependencies(memgraph_lib generate_opencypher_parser generate_plan_compiler_flags) # executables that require memgraph_lib should link MEMGRAPH_ALL_LIBS to link all dependant libraries -set(MEMGRAPH_ALL_LIBS gflags memgraph_lib stdc++fs Threads::Threads fmt yaml-cpp antlr_opencypher_parser_lib dl) +set(MEMGRAPH_ALL_LIBS memgraph_lib stdc++fs Threads::Threads fmt yaml-cpp antlr_opencypher_parser_lib dl) if (READLINE_FOUND) list(APPEND MEMGRAPH_ALL_LIBS ${READLINE_LIBRARY}) endif() # ----------------------------------------------------------------------------- # STATIC PIC library used by query engine -add_library(memgraph_pic STATIC ${memgraph_src_files}) -target_link_libraries(memgraph_pic stdc++fs) +add_library(memgraph_pic STATIC) +target_link_libraries(memgraph_pic stdc++fs gflags) add_dependencies(memgraph_pic generate_opencypher_parser generate_plan_compiler_flags) set_property(TARGET memgraph_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE) @@ -414,7 +416,7 @@ endif() # ----------------------------------------------------------------------------- execute_process( - COMMAND ./recursive_include --roots ${src_dir} ${libs_dir} --start ${src_dir}/query/plan_template_cpp --copy ${CMAKE_BINARY_DIR}/include + COMMAND ./recursive_include --roots ${src_dir} ${libs_dir} ${CMAKE_BINARY_DIR}/libs/gflags/include --start ${src_dir}/query/plan_template_cpp --copy ${CMAKE_BINARY_DIR}/include WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/cmake ) diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index b2aa6d6dd..80dc828ea 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -17,9 +17,11 @@ add_subdirectory(fmt) add_subdirectory(googletest) # setup google flags -set(BUILD_gflags_LIB ON) +set(GFLAGS_BUILD_gflags_nothreads_LIB OFF) +set(GFLAGS_BUILD_gflags_LIB ON) add_subdirectory(gflags) + # setup yaml cpp # disable tests because yaml doesn't have MASTER_PROJECT flag like fmt has # to override an option use option :) diff --git a/src/data_structures/concurrent/skiplist.hpp b/src/data_structures/concurrent/skiplist.hpp index 2c8220081..cd149fd31 100644 --- a/src/data_structures/concurrent/skiplist.hpp +++ b/src/data_structures/concurrent/skiplist.hpp @@ -581,23 +581,23 @@ class SkipList : private Lockable { class Accessor { friend class SkipList; - Accessor(SkipList *skiplist) : skiplist(skiplist) { + Accessor(SkipList *skiplist) + : skiplist(skiplist), status_(skiplist->gc.CreateNewAccessor()) { debug_assert(skiplist != nullptr, "Skiplist is nullptr."); - - skiplist->gc.AddRef(); } public: Accessor(const Accessor &) = delete; - Accessor(Accessor &&other) : skiplist(other.skiplist) { + Accessor(Accessor &&other) + : skiplist(other.skiplist), status_(other.status_) { other.skiplist = nullptr; } ~Accessor() { if (skiplist == nullptr) return; - skiplist->gc.ReleaseRef(); + status_.alive_ = false; } Iterator begin() { return skiplist->begin(); } @@ -698,6 +698,7 @@ class SkipList : private Lockable { private: SkipList *skiplist; Node *preds[H], *succs[H]; + typename SkipListGC::AccessorStatus &status_; }; Accessor access() { return Accessor(this); } @@ -1139,7 +1140,6 @@ class SkipList : private Lockable { for (int level = height - 1; level >= 0; --level) preds[level]->forward(level, node->forward(level)); - // TODO: review and test gc.Collect(node); count.fetch_sub(1); @@ -1152,5 +1152,5 @@ class SkipList : private Lockable { */ std::atomic count{0}; Node *header; - SkiplistGC gc; + SkipListGC gc; }; diff --git a/src/data_structures/concurrent/skiplist_gc.cpp b/src/data_structures/concurrent/skiplist_gc.cpp new file mode 100644 index 000000000..a5982951e --- /dev/null +++ b/src/data_structures/concurrent/skiplist_gc.cpp @@ -0,0 +1,6 @@ +#include "skiplist_gc.hpp" +#include "gflags/gflags.h" + +DEFINE_int32(skiplist_gc_interval, 10, + "Interval of how often does skiplist gc run in seconds. To " + "disable set to 0."); diff --git a/src/data_structures/concurrent/skiplist_gc.hpp b/src/data_structures/concurrent/skiplist_gc.hpp index 6eb287035..014db3901 100644 --- a/src/data_structures/concurrent/skiplist_gc.hpp +++ b/src/data_structures/concurrent/skiplist_gc.hpp @@ -1,78 +1,165 @@ #pragma once -// TODO: remove from here and from the project -#include -#include -#include +#include +#include +#include +#include + +#include "gflags/gflags.h" + +#include "data_structures/concurrent/concurrent_list.hpp" #include "logging/loggable.hpp" -#include "memory/freelist.hpp" -#include "memory/lazy_gc.hpp" -#include "threading/global_pool.hpp" + #include "threading/sync/spinlock.hpp" -#include "utils/assert.hpp" +#include "utils/executioner.hpp" -template -class SkiplistGC : public LazyGC, lock_t>, - public Loggable { +DECLARE_int32(skiplist_gc_interval); + +/** + * @brief Garbage collects nodes. + * We are doing garbage collection by keeping track of alive accessors which + * were requested from the parent skiplist. When some prefix [id, id+n] of + * accessors becomes dead we try to empty the collection of (accessors_id, + * entry*) with the id of that last dead accessor. Each entry is added to + * collection after it has been re-linked and can't be seen by any accessors + * created after that time and that marks the safe time for deleting entry. + * @Tparam TNode - type of underlying pointer to objects which will be + * collected. + */ +template +class SkipListGC : public Loggable { public: - SkiplistGC() : Loggable("SkiplistGC") {} + explicit SkipListGC() : Loggable("SkipListGc") { + executor_job_id_ = GetExecutioner().RegisterJob( + std::bind(&SkipListGC::GarbageCollect, this)); + } - /** - * ReleaseRef method should be called by some thread which finishes access to - * skiplist. If thread reference_count_ becomes zero, all objects in the - * local_freelist are going to be deleted. The only problem with this approach - * is that GC may never be called, but for now we can deal with that. - */ - void ReleaseRef() { - // This has to be a shared_ptr since std::function requires that the - // callable object be copy-constructable. - std::shared_ptr> local_freelist = - std::make_shared>(); - - // take freelist if there is no more threads - { - auto lock = this->acquire_unique(); - debug_assert(this->reference_count_ > 0, "Count is equal to zero."); - --this->reference_count_; - if (this->reference_count_ == 0) { - freelist_.swap(*local_freelist); - } - } - - if (local_freelist->size() > 0) { - // We need to use a Global thread pool, otherwise we run into problems - // because each skiplist would use it's own thread pool and that would - // invoke too many threads. - GlobalPool::getSingletonInstance()->run(std::bind( - [](std::shared_ptr> local_freelist) { - // Comment logger out for now because we can't send an instance of - // this to global pool because this SkipListGc instance could be - // destroyed before the GC starts and as such will SEGFAULT. - - // logger.trace("GC started"); - // logger.trace("Local list size: {}", local_freelist->size()); - long long destroyed = 0; - // destroy all elements from local_freelist - for (auto element : *local_freelist) { - if (element->flags.is_marked()) { - T::destroy(element); - destroyed++; - } else { - // logger.warn( - // "Unmarked node appeared in the collection ready for " - // "destruction."); - } - } - // logger.trace("Number of destroyed elements: {}", destroyed); - }, - local_freelist)); + ~SkipListGC() { + // We have to unregister the job because otherwise Executioner might access + // some member variables of this class after it has been destructed. + GetExecutioner().UnRegisterJob(executor_job_id_); + for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) { + TNode::destroy(it->second); + it.remove(); } } - void Collect(T *node) { freelist_.add(node); } + /** + * @brief - Returns instance of executioner shared between all SkipLists. + */ + auto &GetExecutioner() { + static Executioner executioner( + (std::chrono::seconds(FLAGS_skiplist_gc_interval))); + + return executioner; + } + + SkipListGC(const SkipListGC &other) = delete; + SkipListGC(SkipListGC &&other) = delete; + SkipListGC operator=(const SkipListGC &other) = delete; + SkipListGC operator=(SkipListGC &&other) = delete; + + /** + * @brief - Keep track of each accessor with it's status, so we know which + * ones are alive and which ones are dead. + */ + struct AccessorStatus { + AccessorStatus(const int64_t id, bool alive) : id_(id), alive_(alive) {} + + AccessorStatus(AccessorStatus &&other) = default; + + AccessorStatus(const AccessorStatus &other) = delete; + AccessorStatus operator=(const AccessorStatus &other) = delete; + AccessorStatus operator=(AccessorStatus &&other) = delete; + + const int64_t id_{-1}; + bool alive_{false}; + }; + + /** + * @brief - Creates a new accessors and returns reference to it's status. This + * method is thread-safe. + */ + AccessorStatus &CreateNewAccessor() { + std::unique_lock lock(mutex_); + accessors_.emplace_back(++last_accessor_id_, true); + return accessors_.back(); + } + + /** + * @brief - Destroys objects which were previously collected and can be safely + * removed. This method is not thread-safe. + */ + void GarbageCollect() { + std::unique_lock lock(mutex_); + auto last_dead_accessor = accessors_.end(); + for (auto it = accessors_.begin(); it != accessors_.end(); ++it) { + if (it->alive_) break; + last_dead_accessor = it; + } + // We didn't find any dead accessor and that means we are not sure that we + // can delete anything. + if (last_dead_accessor == accessors_.end()) return; + // We don't need lock anymore because we are not modifying this structure + // anymore, or accessing it any further down. + const int64_t safe_id = last_dead_accessor->id_; + accessors_.erase(accessors_.begin(), ++last_dead_accessor); + lock.unlock(); + + // We can only modify this in a not-thread safe way because we are the only + // thread ever accessing it here, i.e. there is at most one thread doing + // this GarbageCollection. + auto oldest_not_deletable = deleted_list_.begin(); + bool delete_all = true; + for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) { + if (it->first > safe_id) { + // We have to increase iterator manually because the copy assignment + // operator is deleted. + while (oldest_not_deletable != it) ++oldest_not_deletable; + delete_all = false; + } + } + + // deleted_list is already empty, nothing to delete here. + if (oldest_not_deletable == deleted_list_.end()) return; + + // In case we didn't find anything that we can't delete we shouldn't + // increment this because that would mean we skip over the first record + // which is ready for destruction. + if (!delete_all) ++oldest_not_deletable; + int64_t destroyed = 0; + for (auto &it = oldest_not_deletable; it != deleted_list_.end(); ++it) { + TNode::destroy(it->second); + it.remove(); + ++destroyed; + } + if (destroyed) logger.trace("Number of destroyed elements: {}", destroyed); + } + + /** + * @brief - Collect object for garbage collection. Call to this method means + * that no new accessor can possibly access the object by iterating over some + * storage. + */ + void Collect(TNode *object) { + // We can afford some inaccuary here - it's possible that some new accessor + // incremented the last_accessor_id after we enter this method and as such + // we might be a bit pessimistic here. + deleted_list_.begin().push( + std::make_pair(last_accessor_id_.load(), object)); + } private: - // We use FreeList since it's thread-safe. - FreeList freelist_; + int64_t executor_job_id_{-1}; + std::mutex mutex_; + std::mutex singleton_mutex_; + + // List of accesssors from begin to end by an increasing id. + std::list accessors_; + std::atomic last_accessor_id_{0}; + + // List of pairs of accessor_ids and pointers to entries which should be + // destroyed sorted approximately descendingly by id. + ConcurrentList> deleted_list_; }; diff --git a/src/query/plan_compiler.hpp b/src/query/plan_compiler.hpp index a4c866e79..20ba36405 100644 --- a/src/query/plan_compiler.hpp +++ b/src/query/plan_compiler.hpp @@ -34,10 +34,10 @@ class PlanCompiler : public Loggable { #ifdef HARDCODED_OUTPUT_STREAM "-DHARDCODED_OUTPUT_STREAM", #endif - in_file, // input file - "-o", out_file, // ouput file - include_dirs, link_dirs, "-lmemgraph_pic", - "-shared -fPIC"}, // shared library flags + in_file, // input file + "-o", out_file, // ouput file + include_dirs, link_dirs, //"-lmemgraph_pic", + "-shared -fPIC"}, // shared library flags " "); logger.debug("compile command -> {}", compile_command); diff --git a/src/query/plan_template_cpp b/src/query/plan_template_cpp index a9229c995..a9f0b7a63 100644 --- a/src/query/plan_template_cpp +++ b/src/query/plan_template_cpp @@ -1,6 +1,8 @@ #include #include +#include "gflags/gflags.h" +#include "gflags/gflags_declare.h" #include "data_structures/bitset/static_bitset.hpp" #include "communication/bolt/v1/encoder/result_stream.hpp" #include "io/network/socket.hpp" diff --git a/src/utils/executioner.hpp b/src/utils/executioner.hpp new file mode 100644 index 000000000..f6e54487e --- /dev/null +++ b/src/utils/executioner.hpp @@ -0,0 +1,89 @@ +#pragma once + +#include +#include +#include + +#include "utils/scheduler.hpp" + +/** + * @brief - Provides execution of jobs in job queue on one thread with 'pause' + * time between two consecutives starts. + */ +class Executioner { + public: + template + Executioner(const std::chrono::duration pause) { + if (pause != pause.zero()) + scheduler_.Run(pause, std::bind(&Executioner::Execute, this)); + } + + ~Executioner() { + // Be sure to first stop scheduler because otherwise we might destroy the + // mutex before the scheduler and that might cause problems since mutex is + // used in Execute method passed to scheduler along with jobs vector. + scheduler_.Stop(); + } + + /** + * @brief - Add function to job queue. + */ + int64_t RegisterJob(const std::function &f) { + { + std::unique_lock lock(update_mutex_); + id_job_pairs_.emplace_back(std::make_pair(++count_, f)); + return id_job_pairs_.back().first; + } + } + + /** + * @brief - Remove id from job queue. + */ + void UnRegisterJob(const int64_t id) { + { + // First wait for execute lock and then for the update lock because + // execute lock will be unavailable for longer and there is no point in + // blocking other threads with update lock. + std::unique_lock execute_lock(execute_mutex_); + std::unique_lock update_lock(update_mutex_); + + for (auto id_job_pair_it = id_job_pairs_.begin(); + id_job_pair_it != id_job_pairs_.end(); ++id_job_pair_it) { + if (id_job_pair_it->first == id) { + id_job_pairs_.erase(id_job_pair_it); + return; + } + } + } + } + + private: + /** + * @brief - Execute method executes jobs from id_job_pairs vector. + * The reason for doing double locking is the following: we don't want to + * block creation of new jobs since that will slow down all of memgraph so we + * use a special lock for job update. Execute lock is here so that we can + * guarantee that after some job is unregistered it's also stopped. + */ + void Execute() { + std::unique_lock execute_lock(execute_mutex_); + std::vector>> id_job_pairs; + + // Acquire newest current version of jobs but being careful not to access + // the vector in corrupt state. + { + std::unique_lock update_lock(update_mutex_); + id_job_pairs = id_job_pairs_; + } + + for (auto id_job_pair : id_job_pairs) { + id_job_pair.second(); + } + } + + int64_t count_{0}; + std::mutex execute_mutex_; + std::mutex update_mutex_; + Scheduler scheduler_; + std::vector>> id_job_pairs_; +}; diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index 9e5a6295d..9a3447d05 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include #include @@ -27,6 +29,7 @@ class Scheduler { void Run(const std::chrono::duration &pause, const std::function &f) { debug_assert(is_working_ == false, "Thread already running."); + debug_assert(pause > std::chrono::seconds(0), "Pause is invalid."); is_working_ = true; thread_ = std::thread([this, pause, f]() { auto start_time = std::chrono::system_clock::now(); diff --git a/tests/integration/query_engine.cpp b/tests/integration/query_engine.cpp index 22e2d2b8f..95b31ea38 100644 --- a/tests/integration/query_engine.cpp +++ b/tests/integration/query_engine.cpp @@ -1,5 +1,7 @@ #define HARDCODED_OUTPUT_STREAM +#include "gflags/gflags.h" + #include "config/config.hpp" #include "dbms/dbms.hpp" #include "query_engine_common.hpp" diff --git a/tests/unit/dbms_recovery.cpp b/tests/unit/dbms_recovery.cpp index afd599f25..ae8386c2d 100644 --- a/tests/unit/dbms_recovery.cpp +++ b/tests/unit/dbms_recovery.cpp @@ -69,14 +69,14 @@ void RecoverDbms() { std::vector edges; int vertex_count = 0; - for (auto const &vertex : dba->vertices()) { + for (auto const &vertex : dba->vertices(false)) { vertices.push_back(vertex); vertex_count++; } EXPECT_EQ(vertex_count, 3); int edge_count = 0; - for (auto const &edge : dba->edges()) { + for (auto const &edge : dba->edges(false)) { EXPECT_NE(vertices.end(), std::find(vertices.begin(), vertices.end(), edge.to())); EXPECT_NE(vertices.end(), diff --git a/tests/unit/executioner.cpp b/tests/unit/executioner.cpp new file mode 100644 index 000000000..01c3ef694 --- /dev/null +++ b/tests/unit/executioner.cpp @@ -0,0 +1,60 @@ +#include +#include + +#include "gtest/gtest.h" + +#include "utils/executioner.hpp" + +TEST(Executioner, DontRun) { + std::atomic count{0}; + { + Executioner exec(std::chrono::seconds(0)); + // Be sure executioner is sleeping. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + exec.RegisterJob([&count]() { ++count; }); + // Try to wait to test if executioner might somehow become awake and execute + // the job. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + EXPECT_EQ(count, 0); +} + +TEST(Executioner, Run) { + std::atomic count{0}; + { + Executioner exec(std::chrono::milliseconds(500)); + // Be sure executioner is sleeping. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + exec.RegisterJob([&count]() { ++count; }); + exec.RegisterJob([&count]() { ++count; }); + exec.RegisterJob([&count]() { ++count; }); + + // Be sure executioner execute thread is triggered + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + EXPECT_EQ(count, 3); +} + +TEST(Executioner, RunUnregister) { + std::atomic count1{0}; + std::atomic count2{0}; + { + Executioner exec(std::chrono::milliseconds(500)); + // Be sure executioner is sleeping. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto job = exec.RegisterJob([&count1]() { ++count1; }); + exec.RegisterJob([&count2]() { ++count2; }); + + // Be sure executioner execute thread is triggered + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + exec.UnRegisterJob(job); + + // Be sure executioner execute thread is triggered + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + EXPECT_EQ(count1, 1); + EXPECT_EQ(count2, 2); +} diff --git a/tests/unit/recovery.cpp b/tests/unit/recovery.cpp index 0e4767ab4..058ec2b59 100644 --- a/tests/unit/recovery.cpp +++ b/tests/unit/recovery.cpp @@ -125,9 +125,9 @@ TEST_F(RecoveryTest, TestEncoding) { buffer.Close(); permanent_assert(static_cast(to.size()) == 2, - "There should be two edges."); + "There should be two edges."); permanent_assert(static_cast(from.size()) == 2, - "There should be two edges."); + "There should be two edges."); EXPECT_EQ(buffer.hash(), summary.hash_); EXPECT_NE(edge_types.end(), @@ -161,14 +161,14 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) { auto dba = dbms_recover.active(); int64_t vertex_count = 0; - for (const auto &vertex : dba->vertices()) { + for (const auto &vertex : dba->vertices(false)) { vertices.push_back(vertex); vertex_count++; } EXPECT_EQ(vertex_count, 3); int64_t edge_count = 0; - for (const auto &edge : dba->edges()) { + for (const auto &edge : dba->edges(false)) { EXPECT_NE(vertices.end(), std::find(vertices.begin(), vertices.end(), edge.to())); EXPECT_NE(vertices.end(), @@ -177,7 +177,7 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) { edge_count++; } permanent_assert(static_cast(edges.size()) == 2, - "There should be two edges."); + "There should be two edges."); EXPECT_EQ(edge_count, 2); EXPECT_TRUE(edges[0].to() == edges[1].to()); @@ -201,7 +201,7 @@ TEST_F(RecoveryTest, TestEncodingAndRecovering) { auto dba_get = dbms_recover.active(); int64_t vertex_count = 0; - for (const auto &vertex : dba_get->vertices()) { + for (const auto &vertex : dba_get->vertices(false)) { EXPECT_EQ(vertex.labels().size(), 1); EXPECT_TRUE(vertex.has_label(dba_get->label("label"))); query::TypedValue prop = @@ -213,7 +213,7 @@ TEST_F(RecoveryTest, TestEncodingAndRecovering) { EXPECT_EQ(vertex_count, 1000); int64_t edge_count = 0; - for (const auto &edge : dba_get->edges()) { + for (const auto &edge : dba_get->edges(false)) { EXPECT_EQ(edge.edge_type(), dba_get->edge_type("type")); query::TypedValue prop = query::TypedValue(edge.PropsAt(dba_get->property("prop"))); diff --git a/tests/unit/skiplist_gc.cpp b/tests/unit/skiplist_gc.cpp index 3b3a80ec9..6068176d0 100644 --- a/tests/unit/skiplist_gc.cpp +++ b/tests/unit/skiplist_gc.cpp @@ -1,10 +1,11 @@ +#include "gflags/gflags.h" #include "gtest/gtest.h" #include #include #include -#include "data_structures/concurrent/skiplist.hpp" +#include "data_structures/concurrent/skiplist_gc.hpp" #include "logging/streams/stderr.hpp" /** @@ -23,82 +24,103 @@ class FakeItem { return this->value > item.value; } + static void destroy(FakeItem *item) { delete item; } + private: std::atomic &count; int value; }; -TEST(SkipListGC, TripleScopeGC) { - SkipList skiplist; - std::atomic *count = new std::atomic{0}; +DECLARE_int32(skiplist_gc_interval); - auto item = FakeItem(*count, 1); +TEST(SkipListGC, CreateNewAccessors) { + FLAGS_skiplist_gc_interval = 0; + SkipListGC gc; + auto &accessor1 = gc.CreateNewAccessor(); + auto &accessor2 = gc.CreateNewAccessor(); + auto &accessor3 = gc.CreateNewAccessor(); + + EXPECT_EQ(accessor1.id_, 1); + EXPECT_EQ(accessor2.id_, 2); + EXPECT_EQ(accessor3.id_, 3); + + accessor1.alive_ = false; + accessor2.alive_ = false; + accessor3.alive_ = false; +} + +TEST(SkipListGC, DeleteItem) { + FLAGS_skiplist_gc_interval = 0; + SkipListGC gc; + auto &accessor1 = gc.CreateNewAccessor(); + std::atomic count{0}; + auto item1 = new FakeItem(count, 1); + gc.Collect(item1); + + // Kill the accesssor + accessor1.alive_ = false; + gc.GarbageCollect(); + EXPECT_EQ(count, 1); +} + +TEST(SkipListGC, DontDeleteItem) { + FLAGS_skiplist_gc_interval = 0; + SkipListGC gc; + auto &accessor1 = gc.CreateNewAccessor(); + auto &accessor2 = gc.CreateNewAccessor(); + std::atomic count{0}; + auto item1 = new FakeItem(count, 1); + gc.Collect(item1); + + // Kill the accesssor + accessor2.alive_ = false; + + // Nothing deleted because accessor1 is blocking. + gc.GarbageCollect(); + EXPECT_EQ(count, 0); + + // Accessor 1 is not blocking anymore. + accessor1.alive_ = false; + gc.GarbageCollect(); + EXPECT_EQ(count, 1); +} + +TEST(SkipListGC, Destructor) { + FLAGS_skiplist_gc_interval = 0; + std::atomic count{0}; + auto item1 = new FakeItem(count, 1); { - auto access_1 = skiplist.access(); - { - auto access_2 = skiplist.access(); - { - auto access_3 = skiplist.access(); - access_1.insert(item); // add with 1 - access_2.remove(item); // remove with 2 - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(*count, 0); - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(*count, 0); - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(*count, 0); - } // scope end - GC called - for (int i = 0; i < 10; ++i) { - if (*count != 0) break; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + SkipListGC gc; + gc.Collect(item1); + EXPECT_EQ(count, 0); } - EXPECT_EQ(*count, 1); + EXPECT_EQ(count, 1); } -TEST(SkipListGC, BlockedGCNoGC) { - SkipList skiplist; - std::atomic *count = new std::atomic{0}; - auto item = FakeItem(*count, 1); - auto blocking_access = skiplist.access(); - { - auto access = skiplist.access(); - access.insert(item); - access.remove(item); - } // scope end - GC still isn't called because of blocking_access - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(*count, 0); -} +TEST(SkipListGC, MultipleDeletes) { + FLAGS_skiplist_gc_interval = 0; + SkipListGC gc; + std::atomic count{0}; + auto &accessor1 = gc.CreateNewAccessor(); + auto item1 = new FakeItem(count, 1); + gc.Collect(item1); -TEST(SkipListGC, NotInScopeGC) { - SkipList skiplist; - std::atomic *count = new std::atomic{0}; - auto item = FakeItem(*count, 1); - { - auto access = skiplist.access(); - access.insert(item); - access.remove(item); - } // scope end - GC called - for (int i = 0; i < 10; ++i) { - if (*count == 1) break; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - // If this count is not 1 that means we are still doing GC in the background - // and might crash the test if we try to modify count variable after it's been - // deallocated. - ASSERT_EQ(*count, 1); -} + auto &accessor2 = gc.CreateNewAccessor(); + auto item2 = new FakeItem(count, 1); + gc.Collect(item2); -TEST(SkipListGC, StillInScopeNoGC) { - SkipList skiplist; - std::atomic *count = new std::atomic{0}; - auto item = FakeItem(*count, 1); - auto access = skiplist.access(); - access.insert(item); - access.remove(item); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(*count, 0); + auto &accessor3 = gc.CreateNewAccessor(); + auto item3 = new FakeItem(count, 1); + gc.Collect(item3); + + accessor1.alive_ = false; + accessor2.alive_ = false; + gc.GarbageCollect(); + EXPECT_EQ(count, 2); + + accessor3.alive_ = false; + gc.GarbageCollect(); + EXPECT_EQ(count, 3); } int main(int argc, char **argv) {