Refactor GC to use scheduler.

Reviewers: buda, matej.gradicek

Reviewed By: matej.gradicek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D247
This commit is contained in:
Dominik Gleich 2017-04-10 15:44:36 +02:00
parent 9855621c9e
commit fd2780155a
6 changed files with 48 additions and 94 deletions

View File

@ -29,7 +29,7 @@ constexpr const char *MAX_RETAINED_SNAPSHOTS = "max_retained_snapshots";
constexpr const char *INTERPRET = "interpret";
// -- all possible Memgraph's keys --
inline size_t to_int(std::string &&s) { return stoull(s); }
inline size_t to_int(std::string &s) { return stoull(s); }
// TODO: move this to register args because it doesn't make sense to convert
// str to bool for every lookup
inline bool to_bool(std::string &s) {

View File

@ -1,6 +1,8 @@
#include "database/graph_db.hpp"
#include <functional>
#include "config/config.hpp"
#include "database/creation_exception.hpp"
#include "database/graph_db.hpp"
#include "logging/logger.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
@ -12,11 +14,17 @@ GraphDb::GraphDb(const std::string &name, bool import_snapshot)
: name_(name),
gc_vertices_(&vertices_, &tx_engine),
gc_edges_(&edges_, &tx_engine) {
const std::string timeStr = CONFIG(config::CLEANING_CYCLE_SEC);
const std::string time_str = CONFIG(config::CLEANING_CYCLE_SEC);
int pause = DEFAULT_CLEANING_CYCLE_SEC;
if (!timeStr.empty()) pause = stoll(timeStr);
this->gc_edges_.Run(std::chrono::seconds(pause));
this->gc_vertices_.Run(std::chrono::seconds(pause));
if (!time_str.empty()) pause = CONFIG_INTEGER(config::CLEANING_CYCLE_SEC);
// Pause of -1 means we shouldn't run the GC.
if (pause != -1) {
gc_vertices_scheduler_.Run(
std::chrono::seconds(pause),
std::bind(&GarbageCollector<Vertex>::Run, gc_vertices_));
gc_edges_scheduler_.Run(std::chrono::seconds(pause),
std::bind(&GarbageCollector<Edge>::Run, gc_edges_));
}
// if (import_snapshot)
// snap_engine.import();
}

View File

@ -13,6 +13,7 @@
#include "storage/vertex.hpp"
#include "transactions/engine.hpp"
#include "utils/pass_key.hpp"
#include "utils/scheduler.hpp"
// TODO: Maybe split this in another layer between Db and Dbms. Where the new
// layer would hold SnapshotEngine and his kind of concept objects. Some
@ -69,4 +70,8 @@ class GraphDb {
// indexes
KeyIndex<GraphDbTypes::Label, Vertex> labels_index_;
KeyIndex<GraphDbTypes::EdgeType, Edge> edge_types_index_;
// Schedulers
Scheduler<std::mutex> gc_vertices_scheduler_;
Scheduler<std::mutex> gc_edges_scheduler_;
};

View File

@ -1,9 +1,5 @@
#pragma once
#include <chrono>
#include <condition_variable>
#include <thread>
#include "data_structures/concurrent/skiplist.hpp"
#include "logging/loggable.hpp"
#include "mvcc/id.hpp"
@ -23,61 +19,30 @@ class GarbageCollector : public Loggable {
permanent_assert(engine != nullptr, "Engine can't be nullptr.");
};
~GarbageCollector() {
destruction_.store(true);
{
std::unique_lock<std::mutex> lk(mutex_);
condition_variable_.notify_one();
}
if (run_thread_.joinable()) run_thread_.join();
}
/**
*@brief - Runs garbage collector. Starts a new thread which garbage collects
*in the background.
*@param pause - How long to sleep between successive garbage collector
*thread. If this parameter is -1 the GC will not run.
*@brief - Runs garbage collector.
*/
void Run(const std::chrono::seconds &pause) {
if (pause == std::chrono::seconds(-1)) return;
// Invoke new thread which will do the GC work.
run_thread_ = std::thread([this, pause]() {
for (;;) {
// If the whole class is being destructed we should end this thread.
if (this->destruction_.load(std::memory_order_seq_cst)) break;
auto accessor = this->skiplist_->access();
uint64_t count = 0;
// Acquire id of either the oldest active transaction, or the id of a
// transaction that will be assigned next. We should make sure that we
// get count before we ask for active transactions since some
// transaction could possibly increase the count while we ask for
// oldest_active transaction.
const auto next_id = engine_->count() + 1;
const auto id = this->engine_->oldest_active().get_or(next_id);
if (logger.Initialized())
logger.trace("Gc started cleaning everything deleted before {}", id);
for (auto x : accessor) {
// If the mvcc is empty, i.e. there is nothing else to be read from it
// we can delete it.
if (x->GcDeleted(id)) count += accessor.remove(x);
}
if (logger.Initialized()) logger.trace("Destroyed: {}", count);
std::unique_lock<std::mutex> lk(mutex_);
condition_variable_.wait_for(lk, std::chrono::seconds(pause), [&] {
return this->destruction_ == true;
});
lk.unlock();
}
});
void Run() {
auto accessor = this->skiplist_->access();
uint64_t count = 0;
// Acquire id of either the oldest active transaction, or the id of a
// transaction that will be assigned next. We should make sure that we
// get count before we ask for active transactions since some
// transaction could possibly increase the count while we ask for
// oldest_active transaction.
const auto next_id = engine_->count() + 1;
const auto id = this->engine_->oldest_active().get_or(next_id);
if (logger.Initialized())
logger.trace("Gc started cleaning everything deleted before {}", id);
for (auto x : accessor) {
// If the mvcc is empty, i.e. there is nothing else to be read from it
// we can delete it.
if (x->GcDeleted(id)) count += accessor.remove(x);
}
if (logger.Initialized()) logger.trace("Destroyed: {}", count);
}
private:
SkipList<mvcc::VersionList<T> *> *skiplist_{nullptr}; // Not owned.
tx::Engine *engine_{nullptr}; // Not owned.
std::thread run_thread_;
std::atomic<bool> destruction_;
std::mutex mutex_;
std::condition_variable condition_variable_;
};

View File

@ -56,24 +56,24 @@ class Config {
load_configuration(homedir);
// environment variable configuratoin
if (const char* env_path = std::getenv(Definition::env_config_key))
if (const char *env_path = std::getenv(Definition::env_config_key))
load_configuration(env_path);
}
public:
static Config<Definition>& instance() {
static Config<Definition> &instance() {
static Config<Definition> config;
return config;
}
void register_args(int argc, char** argv) {
void register_args(int argc, char **argv) {
REGISTER_ARGS(argc, argv);
for (const auto& argument : Definition::arguments) {
for (const auto &argument : Definition::arguments) {
dict[argument] = GET_ARG("--" + argument, dict[argument]).get_string();
}
}
std::string& operator[](const char* key) { return dict[key]; }
std::string &operator[](const char *key) { return dict[key]; }
};
}

View File

@ -64,11 +64,10 @@ TEST(VersionList, GcDeleted) {
* Test integration of garbage collector with MVCC GC. Delete mvcc's which are
* empty (not visible from any future transaction) from the skiplist.
*/
TEST(GarbageCollector, WaitAndClean) {
TEST(GarbageCollector, GcClean) {
SkipList<mvcc::VersionList<Prop> *> skiplist;
tx::Engine engine;
GarbageCollector<Prop> gc(&skiplist, &engine);
gc.Run(std::chrono::seconds(1));
auto t1 = engine.begin();
std::atomic<int> count;
@ -76,41 +75,18 @@ TEST(GarbageCollector, WaitAndClean) {
auto access = skiplist.access();
access.insert(vl);
gc.Run();
t1->commit();
gc.Run();
auto t2 = engine.begin();
EXPECT_EQ(vl->remove(*t2), true);
t2->commit();
std::this_thread::sleep_for(std::chrono::seconds(3));
gc.Run();
EXPECT_EQ(access.size(), (size_t)0);
}
/**
* Same as above, but, the GarbageCollector will never be run because the time
* between garbage collections is set to -1.
*/
TEST(GarbageCollector, WaitAndDontClean) {
SkipList<mvcc::VersionList<Prop> *> skiplist;
tx::Engine engine;
GarbageCollector<Prop> gc(&skiplist, &engine);
gc.Run(std::chrono::seconds(-1)); // Never run GC. This test is identical to
// the top one except GC is never run.
auto t1 = engine.begin();
std::atomic<int> count;
auto vl = new mvcc::VersionList<Prop>(*t1, count);
auto access = skiplist.access();
access.insert(vl);
t1->commit();
auto t2 = engine.begin();
EXPECT_EQ(vl->remove(*t2), true);
t2->commit();
std::this_thread::sleep_for(std::chrono::seconds(3));
EXPECT_EQ(access.size(), (size_t)1);
}
int main(int argc, char **argv) {
::logging::init_async();
::logging::log->pipe(std::make_unique<Stdout>());