Skiplist garbage collector rework.
Summary: Drawing: https://drive.google.com/open?id=0B-W7PQZqMD9hcG04b0lKaGZGOWM Reviewers: mislav.bradac, buda, florijan Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D395
This commit is contained in:
parent
141d9b3bb8
commit
ebdee4e509
@ -24,7 +24,8 @@ BASE_FLAGS = [
|
||||
'-I./libs/googletest/googletest/include',
|
||||
'-I./libs/googletest/googlemock/include',
|
||||
'-I./libs/benchmark/include',
|
||||
'-I./libs/antlr4/runtime/Cpp/runtime/src'
|
||||
'-I./libs/antlr4/runtime/Cpp/runtime/src',
|
||||
'-I./build/libs/gflags/include'
|
||||
]
|
||||
|
||||
SOURCE_EXTENSIONS = [
|
||||
|
@ -57,6 +57,7 @@ add_custom_target(clean_all
|
||||
|
||||
# threading
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
# optional readline
|
||||
find_package(Readline REQUIRED)
|
||||
if (READLINE_FOUND)
|
||||
@ -367,6 +368,7 @@ set(memgraph_src_files
|
||||
${src_dir}/logging/log.cpp
|
||||
${src_dir}/database/graph_db.cpp
|
||||
${src_dir}/database/graph_db_accessor.cpp
|
||||
${src_dir}/data_structures/concurrent/skiplist_gc.cpp
|
||||
${src_dir}/query/stripper.cpp
|
||||
${src_dir}/query/console.cpp
|
||||
${src_dir}/query/frontend/ast/cypher_main_visitor.cpp
|
||||
@ -382,19 +384,19 @@ set(memgraph_src_files
|
||||
|
||||
# STATIC library used by memgraph executables
|
||||
add_library(memgraph_lib STATIC ${memgraph_src_files})
|
||||
target_link_libraries(memgraph_lib stdc++fs)
|
||||
target_link_libraries(memgraph_lib stdc++fs gflags)
|
||||
add_dependencies(memgraph_lib generate_opencypher_parser
|
||||
generate_plan_compiler_flags)
|
||||
# executables that require memgraph_lib should link MEMGRAPH_ALL_LIBS to link all dependant libraries
|
||||
set(MEMGRAPH_ALL_LIBS gflags memgraph_lib stdc++fs Threads::Threads fmt yaml-cpp antlr_opencypher_parser_lib dl)
|
||||
set(MEMGRAPH_ALL_LIBS memgraph_lib stdc++fs Threads::Threads fmt yaml-cpp antlr_opencypher_parser_lib dl)
|
||||
if (READLINE_FOUND)
|
||||
list(APPEND MEMGRAPH_ALL_LIBS ${READLINE_LIBRARY})
|
||||
endif()
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
# STATIC PIC library used by query engine
|
||||
add_library(memgraph_pic STATIC ${memgraph_src_files})
|
||||
target_link_libraries(memgraph_pic stdc++fs)
|
||||
add_library(memgraph_pic STATIC)
|
||||
target_link_libraries(memgraph_pic stdc++fs gflags)
|
||||
add_dependencies(memgraph_pic generate_opencypher_parser
|
||||
generate_plan_compiler_flags)
|
||||
set_property(TARGET memgraph_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
|
||||
@ -414,7 +416,7 @@ endif()
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
execute_process(
|
||||
COMMAND ./recursive_include --roots ${src_dir} ${libs_dir} --start ${src_dir}/query/plan_template_cpp --copy ${CMAKE_BINARY_DIR}/include
|
||||
COMMAND ./recursive_include --roots ${src_dir} ${libs_dir} ${CMAKE_BINARY_DIR}/libs/gflags/include --start ${src_dir}/query/plan_template_cpp --copy ${CMAKE_BINARY_DIR}/include
|
||||
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/cmake
|
||||
)
|
||||
|
||||
|
@ -17,9 +17,11 @@ add_subdirectory(fmt)
|
||||
add_subdirectory(googletest)
|
||||
|
||||
# setup google flags
|
||||
set(BUILD_gflags_LIB ON)
|
||||
set(GFLAGS_BUILD_gflags_nothreads_LIB OFF)
|
||||
set(GFLAGS_BUILD_gflags_LIB ON)
|
||||
add_subdirectory(gflags)
|
||||
|
||||
|
||||
# setup yaml cpp
|
||||
# disable tests because yaml doesn't have MASTER_PROJECT flag like fmt has
|
||||
# to override an option use option :)
|
||||
|
@ -581,23 +581,23 @@ class SkipList : private Lockable<lock_t> {
|
||||
class Accessor {
|
||||
friend class SkipList;
|
||||
|
||||
Accessor(SkipList *skiplist) : skiplist(skiplist) {
|
||||
Accessor(SkipList *skiplist)
|
||||
: skiplist(skiplist), status_(skiplist->gc.CreateNewAccessor()) {
|
||||
debug_assert(skiplist != nullptr, "Skiplist is nullptr.");
|
||||
|
||||
skiplist->gc.AddRef();
|
||||
}
|
||||
|
||||
public:
|
||||
Accessor(const Accessor &) = delete;
|
||||
|
||||
Accessor(Accessor &&other) : skiplist(other.skiplist) {
|
||||
Accessor(Accessor &&other)
|
||||
: skiplist(other.skiplist), status_(other.status_) {
|
||||
other.skiplist = nullptr;
|
||||
}
|
||||
|
||||
~Accessor() {
|
||||
if (skiplist == nullptr) return;
|
||||
|
||||
skiplist->gc.ReleaseRef();
|
||||
status_.alive_ = false;
|
||||
}
|
||||
|
||||
Iterator begin() { return skiplist->begin(); }
|
||||
@ -698,6 +698,7 @@ class SkipList : private Lockable<lock_t> {
|
||||
private:
|
||||
SkipList *skiplist;
|
||||
Node *preds[H], *succs[H];
|
||||
typename SkipListGC<Node>::AccessorStatus &status_;
|
||||
};
|
||||
|
||||
Accessor access() { return Accessor(this); }
|
||||
@ -1139,7 +1140,6 @@ class SkipList : private Lockable<lock_t> {
|
||||
for (int level = height - 1; level >= 0; --level)
|
||||
preds[level]->forward(level, node->forward(level));
|
||||
|
||||
// TODO: review and test
|
||||
gc.Collect(node);
|
||||
|
||||
count.fetch_sub(1);
|
||||
@ -1152,5 +1152,5 @@ class SkipList : private Lockable<lock_t> {
|
||||
*/
|
||||
std::atomic<size_t> count{0};
|
||||
Node *header;
|
||||
SkiplistGC<Node> gc;
|
||||
SkipListGC<Node> gc;
|
||||
};
|
||||
|
6
src/data_structures/concurrent/skiplist_gc.cpp
Normal file
6
src/data_structures/concurrent/skiplist_gc.cpp
Normal file
@ -0,0 +1,6 @@
|
||||
#include "skiplist_gc.hpp"
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
DEFINE_int32(skiplist_gc_interval, 10,
|
||||
"Interval of how often does skiplist gc run in seconds. To "
|
||||
"disable set to 0.");
|
@ -1,78 +1,165 @@
|
||||
#pragma once
|
||||
|
||||
// TODO: remove from here and from the project
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <malloc.h>
|
||||
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
#include "data_structures/concurrent/concurrent_list.hpp"
|
||||
#include "logging/loggable.hpp"
|
||||
#include "memory/freelist.hpp"
|
||||
#include "memory/lazy_gc.hpp"
|
||||
#include "threading/global_pool.hpp"
|
||||
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/assert.hpp"
|
||||
#include "utils/executioner.hpp"
|
||||
|
||||
template <class T, class lock_t = SpinLock>
|
||||
class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>,
|
||||
public Loggable {
|
||||
DECLARE_int32(skiplist_gc_interval);
|
||||
|
||||
/**
|
||||
* @brief Garbage collects nodes.
|
||||
* We are doing garbage collection by keeping track of alive accessors which
|
||||
* were requested from the parent skiplist. When some prefix [id, id+n] of
|
||||
* accessors becomes dead we try to empty the collection of (accessors_id,
|
||||
* entry*) with the id of that last dead accessor. Each entry is added to
|
||||
* collection after it has been re-linked and can't be seen by any accessors
|
||||
* created after that time and that marks the safe time for deleting entry.
|
||||
* @Tparam TNode - type of underlying pointer to objects which will be
|
||||
* collected.
|
||||
*/
|
||||
template <class TNode>
|
||||
class SkipListGC : public Loggable {
|
||||
public:
|
||||
SkiplistGC() : Loggable("SkiplistGC") {}
|
||||
explicit SkipListGC() : Loggable("SkipListGc") {
|
||||
executor_job_id_ = GetExecutioner().RegisterJob(
|
||||
std::bind(&SkipListGC::GarbageCollect, this));
|
||||
}
|
||||
|
||||
/**
|
||||
* ReleaseRef method should be called by some thread which finishes access to
|
||||
* skiplist. If thread reference_count_ becomes zero, all objects in the
|
||||
* local_freelist are going to be deleted. The only problem with this approach
|
||||
* is that GC may never be called, but for now we can deal with that.
|
||||
*/
|
||||
void ReleaseRef() {
|
||||
// This has to be a shared_ptr since std::function requires that the
|
||||
// callable object be copy-constructable.
|
||||
std::shared_ptr<std::vector<T *>> local_freelist =
|
||||
std::make_shared<std::vector<T *>>();
|
||||
|
||||
// take freelist if there is no more threads
|
||||
{
|
||||
auto lock = this->acquire_unique();
|
||||
debug_assert(this->reference_count_ > 0, "Count is equal to zero.");
|
||||
--this->reference_count_;
|
||||
if (this->reference_count_ == 0) {
|
||||
freelist_.swap(*local_freelist);
|
||||
}
|
||||
}
|
||||
|
||||
if (local_freelist->size() > 0) {
|
||||
// We need to use a Global thread pool, otherwise we run into problems
|
||||
// because each skiplist would use it's own thread pool and that would
|
||||
// invoke too many threads.
|
||||
GlobalPool::getSingletonInstance()->run(std::bind(
|
||||
[](std::shared_ptr<std::vector<T *>> local_freelist) {
|
||||
// Comment logger out for now because we can't send an instance of
|
||||
// this to global pool because this SkipListGc instance could be
|
||||
// destroyed before the GC starts and as such will SEGFAULT.
|
||||
|
||||
// logger.trace("GC started");
|
||||
// logger.trace("Local list size: {}", local_freelist->size());
|
||||
long long destroyed = 0;
|
||||
// destroy all elements from local_freelist
|
||||
for (auto element : *local_freelist) {
|
||||
if (element->flags.is_marked()) {
|
||||
T::destroy(element);
|
||||
destroyed++;
|
||||
} else {
|
||||
// logger.warn(
|
||||
// "Unmarked node appeared in the collection ready for "
|
||||
// "destruction.");
|
||||
}
|
||||
}
|
||||
// logger.trace("Number of destroyed elements: {}", destroyed);
|
||||
},
|
||||
local_freelist));
|
||||
~SkipListGC() {
|
||||
// We have to unregister the job because otherwise Executioner might access
|
||||
// some member variables of this class after it has been destructed.
|
||||
GetExecutioner().UnRegisterJob(executor_job_id_);
|
||||
for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) {
|
||||
TNode::destroy(it->second);
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
||||
void Collect(T *node) { freelist_.add(node); }
|
||||
/**
|
||||
* @brief - Returns instance of executioner shared between all SkipLists.
|
||||
*/
|
||||
auto &GetExecutioner() {
|
||||
static Executioner executioner(
|
||||
(std::chrono::seconds(FLAGS_skiplist_gc_interval)));
|
||||
|
||||
return executioner;
|
||||
}
|
||||
|
||||
SkipListGC(const SkipListGC &other) = delete;
|
||||
SkipListGC(SkipListGC &&other) = delete;
|
||||
SkipListGC operator=(const SkipListGC &other) = delete;
|
||||
SkipListGC operator=(SkipListGC &&other) = delete;
|
||||
|
||||
/**
|
||||
* @brief - Keep track of each accessor with it's status, so we know which
|
||||
* ones are alive and which ones are dead.
|
||||
*/
|
||||
struct AccessorStatus {
|
||||
AccessorStatus(const int64_t id, bool alive) : id_(id), alive_(alive) {}
|
||||
|
||||
AccessorStatus(AccessorStatus &&other) = default;
|
||||
|
||||
AccessorStatus(const AccessorStatus &other) = delete;
|
||||
AccessorStatus operator=(const AccessorStatus &other) = delete;
|
||||
AccessorStatus operator=(AccessorStatus &&other) = delete;
|
||||
|
||||
const int64_t id_{-1};
|
||||
bool alive_{false};
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief - Creates a new accessors and returns reference to it's status. This
|
||||
* method is thread-safe.
|
||||
*/
|
||||
AccessorStatus &CreateNewAccessor() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
accessors_.emplace_back(++last_accessor_id_, true);
|
||||
return accessors_.back();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief - Destroys objects which were previously collected and can be safely
|
||||
* removed. This method is not thread-safe.
|
||||
*/
|
||||
void GarbageCollect() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
auto last_dead_accessor = accessors_.end();
|
||||
for (auto it = accessors_.begin(); it != accessors_.end(); ++it) {
|
||||
if (it->alive_) break;
|
||||
last_dead_accessor = it;
|
||||
}
|
||||
// We didn't find any dead accessor and that means we are not sure that we
|
||||
// can delete anything.
|
||||
if (last_dead_accessor == accessors_.end()) return;
|
||||
// We don't need lock anymore because we are not modifying this structure
|
||||
// anymore, or accessing it any further down.
|
||||
const int64_t safe_id = last_dead_accessor->id_;
|
||||
accessors_.erase(accessors_.begin(), ++last_dead_accessor);
|
||||
lock.unlock();
|
||||
|
||||
// We can only modify this in a not-thread safe way because we are the only
|
||||
// thread ever accessing it here, i.e. there is at most one thread doing
|
||||
// this GarbageCollection.
|
||||
auto oldest_not_deletable = deleted_list_.begin();
|
||||
bool delete_all = true;
|
||||
for (auto it = deleted_list_.begin(); it != deleted_list_.end(); ++it) {
|
||||
if (it->first > safe_id) {
|
||||
// We have to increase iterator manually because the copy assignment
|
||||
// operator is deleted.
|
||||
while (oldest_not_deletable != it) ++oldest_not_deletable;
|
||||
delete_all = false;
|
||||
}
|
||||
}
|
||||
|
||||
// deleted_list is already empty, nothing to delete here.
|
||||
if (oldest_not_deletable == deleted_list_.end()) return;
|
||||
|
||||
// In case we didn't find anything that we can't delete we shouldn't
|
||||
// increment this because that would mean we skip over the first record
|
||||
// which is ready for destruction.
|
||||
if (!delete_all) ++oldest_not_deletable;
|
||||
int64_t destroyed = 0;
|
||||
for (auto &it = oldest_not_deletable; it != deleted_list_.end(); ++it) {
|
||||
TNode::destroy(it->second);
|
||||
it.remove();
|
||||
++destroyed;
|
||||
}
|
||||
if (destroyed) logger.trace("Number of destroyed elements: {}", destroyed);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief - Collect object for garbage collection. Call to this method means
|
||||
* that no new accessor can possibly access the object by iterating over some
|
||||
* storage.
|
||||
*/
|
||||
void Collect(TNode *object) {
|
||||
// We can afford some inaccuary here - it's possible that some new accessor
|
||||
// incremented the last_accessor_id after we enter this method and as such
|
||||
// we might be a bit pessimistic here.
|
||||
deleted_list_.begin().push(
|
||||
std::make_pair(last_accessor_id_.load(), object));
|
||||
}
|
||||
|
||||
private:
|
||||
// We use FreeList since it's thread-safe.
|
||||
FreeList<T *> freelist_;
|
||||
int64_t executor_job_id_{-1};
|
||||
std::mutex mutex_;
|
||||
std::mutex singleton_mutex_;
|
||||
|
||||
// List of accesssors from begin to end by an increasing id.
|
||||
std::list<AccessorStatus> accessors_;
|
||||
std::atomic<int64_t> last_accessor_id_{0};
|
||||
|
||||
// List of pairs of accessor_ids and pointers to entries which should be
|
||||
// destroyed sorted approximately descendingly by id.
|
||||
ConcurrentList<std::pair<int64_t, TNode *>> deleted_list_;
|
||||
};
|
||||
|
@ -34,10 +34,10 @@ class PlanCompiler : public Loggable {
|
||||
#ifdef HARDCODED_OUTPUT_STREAM
|
||||
"-DHARDCODED_OUTPUT_STREAM",
|
||||
#endif
|
||||
in_file, // input file
|
||||
"-o", out_file, // ouput file
|
||||
include_dirs, link_dirs, "-lmemgraph_pic",
|
||||
"-shared -fPIC"}, // shared library flags
|
||||
in_file, // input file
|
||||
"-o", out_file, // ouput file
|
||||
include_dirs, link_dirs, //"-lmemgraph_pic",
|
||||
"-shared -fPIC"}, // shared library flags
|
||||
" ");
|
||||
|
||||
logger.debug("compile command -> {}", compile_command);
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
#include "gflags/gflags_declare.h"
|
||||
#include "data_structures/bitset/static_bitset.hpp"
|
||||
#include "communication/bolt/v1/encoder/result_stream.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
|
89
src/utils/executioner.hpp
Normal file
89
src/utils/executioner.hpp
Normal file
@ -0,0 +1,89 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/scheduler.hpp"
|
||||
|
||||
/**
|
||||
* @brief - Provides execution of jobs in job queue on one thread with 'pause'
|
||||
* time between two consecutives starts.
|
||||
*/
|
||||
class Executioner {
|
||||
public:
|
||||
template <typename TRep, typename TPeriod>
|
||||
Executioner(const std::chrono::duration<TRep, TPeriod> pause) {
|
||||
if (pause != pause.zero())
|
||||
scheduler_.Run(pause, std::bind(&Executioner::Execute, this));
|
||||
}
|
||||
|
||||
~Executioner() {
|
||||
// Be sure to first stop scheduler because otherwise we might destroy the
|
||||
// mutex before the scheduler and that might cause problems since mutex is
|
||||
// used in Execute method passed to scheduler along with jobs vector.
|
||||
scheduler_.Stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief - Add function to job queue.
|
||||
*/
|
||||
int64_t RegisterJob(const std::function<void()> &f) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(update_mutex_);
|
||||
id_job_pairs_.emplace_back(std::make_pair(++count_, f));
|
||||
return id_job_pairs_.back().first;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief - Remove id from job queue.
|
||||
*/
|
||||
void UnRegisterJob(const int64_t id) {
|
||||
{
|
||||
// First wait for execute lock and then for the update lock because
|
||||
// execute lock will be unavailable for longer and there is no point in
|
||||
// blocking other threads with update lock.
|
||||
std::unique_lock<std::mutex> execute_lock(execute_mutex_);
|
||||
std::unique_lock<std::mutex> update_lock(update_mutex_);
|
||||
|
||||
for (auto id_job_pair_it = id_job_pairs_.begin();
|
||||
id_job_pair_it != id_job_pairs_.end(); ++id_job_pair_it) {
|
||||
if (id_job_pair_it->first == id) {
|
||||
id_job_pairs_.erase(id_job_pair_it);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief - Execute method executes jobs from id_job_pairs vector.
|
||||
* The reason for doing double locking is the following: we don't want to
|
||||
* block creation of new jobs since that will slow down all of memgraph so we
|
||||
* use a special lock for job update. Execute lock is here so that we can
|
||||
* guarantee that after some job is unregistered it's also stopped.
|
||||
*/
|
||||
void Execute() {
|
||||
std::unique_lock<std::mutex> execute_lock(execute_mutex_);
|
||||
std::vector<std::pair<int, std::function<void()>>> id_job_pairs;
|
||||
|
||||
// Acquire newest current version of jobs but being careful not to access
|
||||
// the vector in corrupt state.
|
||||
{
|
||||
std::unique_lock<std::mutex> update_lock(update_mutex_);
|
||||
id_job_pairs = id_job_pairs_;
|
||||
}
|
||||
|
||||
for (auto id_job_pair : id_job_pairs) {
|
||||
id_job_pair.second();
|
||||
}
|
||||
}
|
||||
|
||||
int64_t count_{0};
|
||||
std::mutex execute_mutex_;
|
||||
std::mutex update_mutex_;
|
||||
Scheduler<std::mutex> scheduler_;
|
||||
std::vector<std::pair<int, std::function<void()>>> id_job_pairs_;
|
||||
};
|
@ -1,3 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
@ -27,6 +29,7 @@ class Scheduler {
|
||||
void Run(const std::chrono::duration<TRep, TPeriod> &pause,
|
||||
const std::function<void()> &f) {
|
||||
debug_assert(is_working_ == false, "Thread already running.");
|
||||
debug_assert(pause > std::chrono::seconds(0), "Pause is invalid.");
|
||||
is_working_ = true;
|
||||
thread_ = std::thread([this, pause, f]() {
|
||||
auto start_time = std::chrono::system_clock::now();
|
||||
|
@ -1,5 +1,7 @@
|
||||
#define HARDCODED_OUTPUT_STREAM
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
#include "config/config.hpp"
|
||||
#include "dbms/dbms.hpp"
|
||||
#include "query_engine_common.hpp"
|
||||
|
@ -69,14 +69,14 @@ void RecoverDbms() {
|
||||
std::vector<EdgeAccessor> edges;
|
||||
|
||||
int vertex_count = 0;
|
||||
for (auto const &vertex : dba->vertices()) {
|
||||
for (auto const &vertex : dba->vertices(false)) {
|
||||
vertices.push_back(vertex);
|
||||
vertex_count++;
|
||||
}
|
||||
EXPECT_EQ(vertex_count, 3);
|
||||
|
||||
int edge_count = 0;
|
||||
for (auto const &edge : dba->edges()) {
|
||||
for (auto const &edge : dba->edges(false)) {
|
||||
EXPECT_NE(vertices.end(),
|
||||
std::find(vertices.begin(), vertices.end(), edge.to()));
|
||||
EXPECT_NE(vertices.end(),
|
||||
|
60
tests/unit/executioner.cpp
Normal file
60
tests/unit/executioner.cpp
Normal file
@ -0,0 +1,60 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "utils/executioner.hpp"
|
||||
|
||||
TEST(Executioner, DontRun) {
|
||||
std::atomic<int> count{0};
|
||||
{
|
||||
Executioner exec(std::chrono::seconds(0));
|
||||
// Be sure executioner is sleeping.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
exec.RegisterJob([&count]() { ++count; });
|
||||
// Try to wait to test if executioner might somehow become awake and execute
|
||||
// the job.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
EXPECT_EQ(count, 0);
|
||||
}
|
||||
|
||||
TEST(Executioner, Run) {
|
||||
std::atomic<int> count{0};
|
||||
{
|
||||
Executioner exec(std::chrono::milliseconds(500));
|
||||
// Be sure executioner is sleeping.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
exec.RegisterJob([&count]() { ++count; });
|
||||
exec.RegisterJob([&count]() { ++count; });
|
||||
exec.RegisterJob([&count]() { ++count; });
|
||||
|
||||
// Be sure executioner execute thread is triggered
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
EXPECT_EQ(count, 3);
|
||||
}
|
||||
|
||||
TEST(Executioner, RunUnregister) {
|
||||
std::atomic<int> count1{0};
|
||||
std::atomic<int> count2{0};
|
||||
{
|
||||
Executioner exec(std::chrono::milliseconds(500));
|
||||
// Be sure executioner is sleeping.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
auto job = exec.RegisterJob([&count1]() { ++count1; });
|
||||
exec.RegisterJob([&count2]() { ++count2; });
|
||||
|
||||
// Be sure executioner execute thread is triggered
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
|
||||
exec.UnRegisterJob(job);
|
||||
|
||||
// Be sure executioner execute thread is triggered
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
EXPECT_EQ(count1, 1);
|
||||
EXPECT_EQ(count2, 2);
|
||||
}
|
@ -125,9 +125,9 @@ TEST_F(RecoveryTest, TestEncoding) {
|
||||
buffer.Close();
|
||||
|
||||
permanent_assert(static_cast<int>(to.size()) == 2,
|
||||
"There should be two edges.");
|
||||
"There should be two edges.");
|
||||
permanent_assert(static_cast<int>(from.size()) == 2,
|
||||
"There should be two edges.");
|
||||
"There should be two edges.");
|
||||
|
||||
EXPECT_EQ(buffer.hash(), summary.hash_);
|
||||
EXPECT_NE(edge_types.end(),
|
||||
@ -161,14 +161,14 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) {
|
||||
|
||||
auto dba = dbms_recover.active();
|
||||
int64_t vertex_count = 0;
|
||||
for (const auto &vertex : dba->vertices()) {
|
||||
for (const auto &vertex : dba->vertices(false)) {
|
||||
vertices.push_back(vertex);
|
||||
vertex_count++;
|
||||
}
|
||||
EXPECT_EQ(vertex_count, 3);
|
||||
|
||||
int64_t edge_count = 0;
|
||||
for (const auto &edge : dba->edges()) {
|
||||
for (const auto &edge : dba->edges(false)) {
|
||||
EXPECT_NE(vertices.end(),
|
||||
std::find(vertices.begin(), vertices.end(), edge.to()));
|
||||
EXPECT_NE(vertices.end(),
|
||||
@ -177,7 +177,7 @@ TEST_F(RecoveryTest, TestEncodingAndDecoding) {
|
||||
edge_count++;
|
||||
}
|
||||
permanent_assert(static_cast<int>(edges.size()) == 2,
|
||||
"There should be two edges.");
|
||||
"There should be two edges.");
|
||||
|
||||
EXPECT_EQ(edge_count, 2);
|
||||
EXPECT_TRUE(edges[0].to() == edges[1].to());
|
||||
@ -201,7 +201,7 @@ TEST_F(RecoveryTest, TestEncodingAndRecovering) {
|
||||
|
||||
auto dba_get = dbms_recover.active();
|
||||
int64_t vertex_count = 0;
|
||||
for (const auto &vertex : dba_get->vertices()) {
|
||||
for (const auto &vertex : dba_get->vertices(false)) {
|
||||
EXPECT_EQ(vertex.labels().size(), 1);
|
||||
EXPECT_TRUE(vertex.has_label(dba_get->label("label")));
|
||||
query::TypedValue prop =
|
||||
@ -213,7 +213,7 @@ TEST_F(RecoveryTest, TestEncodingAndRecovering) {
|
||||
EXPECT_EQ(vertex_count, 1000);
|
||||
|
||||
int64_t edge_count = 0;
|
||||
for (const auto &edge : dba_get->edges()) {
|
||||
for (const auto &edge : dba_get->edges(false)) {
|
||||
EXPECT_EQ(edge.edge_type(), dba_get->edge_type("type"));
|
||||
query::TypedValue prop =
|
||||
query::TypedValue(edge.PropsAt(dba_get->property("prop")));
|
||||
|
@ -1,10 +1,11 @@
|
||||
#include "gflags/gflags.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
#include "data_structures/concurrent/skiplist_gc.hpp"
|
||||
#include "logging/streams/stderr.hpp"
|
||||
|
||||
/**
|
||||
@ -23,82 +24,103 @@ class FakeItem {
|
||||
return this->value > item.value;
|
||||
}
|
||||
|
||||
static void destroy(FakeItem *item) { delete item; }
|
||||
|
||||
private:
|
||||
std::atomic<int> &count;
|
||||
int value;
|
||||
};
|
||||
|
||||
TEST(SkipListGC, TripleScopeGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
DECLARE_int32(skiplist_gc_interval);
|
||||
|
||||
auto item = FakeItem(*count, 1);
|
||||
TEST(SkipListGC, CreateNewAccessors) {
|
||||
FLAGS_skiplist_gc_interval = 0;
|
||||
SkipListGC<FakeItem> gc;
|
||||
auto &accessor1 = gc.CreateNewAccessor();
|
||||
auto &accessor2 = gc.CreateNewAccessor();
|
||||
auto &accessor3 = gc.CreateNewAccessor();
|
||||
|
||||
EXPECT_EQ(accessor1.id_, 1);
|
||||
EXPECT_EQ(accessor2.id_, 2);
|
||||
EXPECT_EQ(accessor3.id_, 3);
|
||||
|
||||
accessor1.alive_ = false;
|
||||
accessor2.alive_ = false;
|
||||
accessor3.alive_ = false;
|
||||
}
|
||||
|
||||
TEST(SkipListGC, DeleteItem) {
|
||||
FLAGS_skiplist_gc_interval = 0;
|
||||
SkipListGC<FakeItem> gc;
|
||||
auto &accessor1 = gc.CreateNewAccessor();
|
||||
std::atomic<int> count{0};
|
||||
auto item1 = new FakeItem(count, 1);
|
||||
gc.Collect(item1);
|
||||
|
||||
// Kill the accesssor
|
||||
accessor1.alive_ = false;
|
||||
gc.GarbageCollect();
|
||||
EXPECT_EQ(count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, DontDeleteItem) {
|
||||
FLAGS_skiplist_gc_interval = 0;
|
||||
SkipListGC<FakeItem> gc;
|
||||
auto &accessor1 = gc.CreateNewAccessor();
|
||||
auto &accessor2 = gc.CreateNewAccessor();
|
||||
std::atomic<int> count{0};
|
||||
auto item1 = new FakeItem(count, 1);
|
||||
gc.Collect(item1);
|
||||
|
||||
// Kill the accesssor
|
||||
accessor2.alive_ = false;
|
||||
|
||||
// Nothing deleted because accessor1 is blocking.
|
||||
gc.GarbageCollect();
|
||||
EXPECT_EQ(count, 0);
|
||||
|
||||
// Accessor 1 is not blocking anymore.
|
||||
accessor1.alive_ = false;
|
||||
gc.GarbageCollect();
|
||||
EXPECT_EQ(count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, Destructor) {
|
||||
FLAGS_skiplist_gc_interval = 0;
|
||||
std::atomic<int> count{0};
|
||||
auto item1 = new FakeItem(count, 1);
|
||||
{
|
||||
auto access_1 = skiplist.access();
|
||||
{
|
||||
auto access_2 = skiplist.access();
|
||||
{
|
||||
auto access_3 = skiplist.access();
|
||||
access_1.insert(item); // add with 1
|
||||
access_2.remove(item); // remove with 2
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(*count, 0);
|
||||
} // scope end - GC called
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
if (*count != 0) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
SkipListGC<FakeItem> gc;
|
||||
gc.Collect(item1);
|
||||
EXPECT_EQ(count, 0);
|
||||
}
|
||||
EXPECT_EQ(*count, 1);
|
||||
EXPECT_EQ(count, 1);
|
||||
}
|
||||
|
||||
TEST(SkipListGC, BlockedGCNoGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
auto item = FakeItem(*count, 1);
|
||||
auto blocking_access = skiplist.access();
|
||||
{
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
} // scope end - GC still isn't called because of blocking_access
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(*count, 0);
|
||||
}
|
||||
TEST(SkipListGC, MultipleDeletes) {
|
||||
FLAGS_skiplist_gc_interval = 0;
|
||||
SkipListGC<FakeItem> gc;
|
||||
std::atomic<int> count{0};
|
||||
auto &accessor1 = gc.CreateNewAccessor();
|
||||
auto item1 = new FakeItem(count, 1);
|
||||
gc.Collect(item1);
|
||||
|
||||
TEST(SkipListGC, NotInScopeGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
auto item = FakeItem(*count, 1);
|
||||
{
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
} // scope end - GC called
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
if (*count == 1) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
// If this count is not 1 that means we are still doing GC in the background
|
||||
// and might crash the test if we try to modify count variable after it's been
|
||||
// deallocated.
|
||||
ASSERT_EQ(*count, 1);
|
||||
}
|
||||
auto &accessor2 = gc.CreateNewAccessor();
|
||||
auto item2 = new FakeItem(count, 1);
|
||||
gc.Collect(item2);
|
||||
|
||||
TEST(SkipListGC, StillInScopeNoGC) {
|
||||
SkipList<FakeItem> skiplist;
|
||||
std::atomic<int> *count = new std::atomic<int>{0};
|
||||
auto item = FakeItem(*count, 1);
|
||||
auto access = skiplist.access();
|
||||
access.insert(item);
|
||||
access.remove(item);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
EXPECT_EQ(*count, 0);
|
||||
auto &accessor3 = gc.CreateNewAccessor();
|
||||
auto item3 = new FakeItem(count, 1);
|
||||
gc.Collect(item3);
|
||||
|
||||
accessor1.alive_ = false;
|
||||
accessor2.alive_ = false;
|
||||
gc.GarbageCollect();
|
||||
EXPECT_EQ(count, 2);
|
||||
|
||||
accessor3.alive_ = false;
|
||||
gc.GarbageCollect();
|
||||
EXPECT_EQ(count, 3);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
Loading…
Reference in New Issue
Block a user