Make gc more aggressive and optimise is_not_visible function

Reviewers: florijan, mferencevic

Reviewed By: florijan

Subscribers: buda, pullbot

Differential Revision: https://phabricator.memgraph.io/D849
This commit is contained in:
Mislav Bradac 2017-10-05 21:19:32 +02:00
parent d0fb4d7242
commit 11b2b83c96
10 changed files with 183 additions and 135 deletions

View File

@ -28,7 +28,7 @@ BASE_FLAGS = [
'-I./libs',
'-I./libs/rapidcheck/include',
'-I./libs/antlr4/runtime/Cpp/runtime/src',
'-I./build/libs/gflags/include',
'-I./libs/gflags/src/gflags-build/include',
'-I./experimental/distributed/src',
'-I./experimental/distributed/libs/cereal/include',
'-I./libs/postgresql/include'

View File

@ -9,6 +9,7 @@
#include "durability/recovery.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
#include "utils/timer.hpp"
DEFINE_int32(gc_cycle_sec, 30,
"Amount of time between starts of two cleaning cycles in seconds. "
@ -96,30 +97,53 @@ void GraphDb::RecoverDatabase(const fs::path &snapshot_db_dir) {
void GraphDb::CollectGarbage() {
// main garbage collection logic
// see wiki documentation for logic explanation
const auto snapshot = this->tx_engine_.GcSnapshot();
LOG(INFO) << "Garbage collector started";
const auto snapshot = tx_engine_.GcSnapshot();
{
// This can be run concurrently
this->gc_vertices_.Run(snapshot, this->tx_engine_);
this->gc_edges_.Run(snapshot, this->tx_engine_);
utils::Timer x;
gc_vertices_.Run(snapshot, tx_engine_);
gc_edges_.Run(snapshot, tx_engine_);
VLOG(1) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
}
// This has to be run sequentially after gc because gc modifies
// version_lists and changes the oldest visible record, on which Refresh
// depends.
{
// This can be run concurrently
this->labels_index_.Refresh(snapshot, this->tx_engine_);
this->edge_types_index_.Refresh(snapshot, this->tx_engine_);
this->label_property_index_.Refresh(snapshot, this->tx_engine_);
utils::Timer x;
labels_index_.Refresh(snapshot, tx_engine_);
edge_types_index_.Refresh(snapshot, tx_engine_);
label_property_index_.Refresh(snapshot, tx_engine_);
VLOG(1) << "Garbage collector index phase time: " << x.Elapsed().count();
}
// we free expired objects with snapshot.back(), which is
// the ID of the oldest active transaction (or next active, if there
// are no currently active). that's legal because that was the
// last possible transaction that could have obtained pointers
// to those records
this->edge_record_deleter_.FreeExpiredObjects(snapshot.back());
this->vertex_record_deleter_.FreeExpiredObjects(snapshot.back());
this->edge_version_list_deleter_.FreeExpiredObjects(snapshot.back());
this->vertex_version_list_deleter_.FreeExpiredObjects(snapshot.back());
{
// We free expired objects with snapshot.back(), which is
// the ID of the oldest active transaction (or next active, if there
// are no currently active). That's legal because that was the
// last possible transaction that could have obtained pointers
// to those records. New snapshot can be used, different than one used for
// first two phases of gc.
utils::Timer x;
const auto snapshot = tx_engine_.GcSnapshot();
edge_record_deleter_.FreeExpiredObjects(snapshot.back());
vertex_record_deleter_.FreeExpiredObjects(snapshot.back());
edge_version_list_deleter_.FreeExpiredObjects(snapshot.back());
vertex_version_list_deleter_.FreeExpiredObjects(snapshot.back());
VLOG(1) << "Garbage collector deferred deletion phase time: "
<< x.Elapsed().count();
}
LOG(INFO) << "Garbage collector finished";
VLOG(2) << "gc snapshot: " << snapshot;
VLOG(2) << "edge_record_deleter_ size: " << edge_record_deleter_.Count();
VLOG(2) << "vertex record deleter_ size: " << vertex_record_deleter_.Count();
VLOG(2) << "edge_version_list_deleter_ size: "
<< edge_version_list_deleter_.Count();
VLOG(2) << "vertex_version_list_deleter_ size: "
<< vertex_version_list_deleter_.Count();
VLOG(2) << "vertices_ size: " << vertices_.access().size();
VLOG(2) << "edges_ size: " << edges_.access().size();
}
GraphDb::~GraphDb() {
@ -149,13 +173,12 @@ GraphDb::~GraphDb() {
// Delete vertices and edges which weren't collected before, also deletes
// records inside version list
for (auto &vertex : this->vertices_.access()) delete vertex;
for (auto &edge : this->edges_.access()) delete edge;
for (auto &vertex : vertices_.access()) delete vertex;
for (auto &edge : edges_.access()) delete edge;
// Free expired records with the maximal possible id from all the deleters.
this->edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
this->vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
this->edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
this->vertex_version_list_deleter_.FreeExpiredObjects(
tx::Transaction::MaxId());
edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
vertex_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
}

View File

@ -97,8 +97,9 @@ class Record : public Version<T> {
// snapshot (consequently also not in the snapshots of
// newer transactions)
return (exp_id != 0 && exp_id < snapshot.back() &&
engine.clog().is_committed(exp_id) && !snapshot.contains(exp_id)) ||
engine.clog().is_aborted(tx_.cre);
committed(Hints::kExp, exp_id, engine) &&
!snapshot.contains(exp_id)) ||
cre_aborted(engine);
}
// TODO: Test this
@ -185,7 +186,7 @@ class Record : public Version<T> {
// of the record to the current command in the running transaction.
CreExp<tx::command_id_t> cmd_;
Hints hints_;
mutable Hints hints_;
/** Fetch the (transaction, command) expiration before the check
* because they can be concurrently modified by multiple transactions.
* Do it in a loop to ensure that command is consistent with transaction.
@ -212,6 +213,8 @@ class Record : public Version<T> {
* @param id - id to check if it's commited and visible
* @return true if the id is commited and visible for the transaction t.
*/
// TODO: Rename this function. Its semantics is different than of function
// below, but it has a same name.
bool committed(uint8_t mask, tx::transaction_id_t id,
const tx::Transaction &t) {
debug_assert(mask == Hints::kCre || mask == Hints::kExp,
@ -232,8 +235,7 @@ class Record : public Version<T> {
}
/**
* @brief - Check if the transaction with the given `id`
* is committed.
* @brief - Check if the transaction with the given `id` is committed.
*
* @param mask - Hint bits mask (either Hints::kCre or Hints::kExp).
* @param id - id to check if commited
@ -241,29 +243,55 @@ class Record : public Version<T> {
* statuses
* @return true if it's commited, false otherwise
*/
bool committed(uint8_t mask, tx::transaction_id_t id, tx::Engine &engine) {
bool committed(uint8_t mask, tx::transaction_id_t id,
const tx::Engine &engine) const {
debug_assert(mask == Hints::kCre || mask == Hints::kExp,
"Mask must be either kCre or kExp");
// if hints are set, return if id is committed
// If hints are set, return if id is committed.
if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask);
// if hints are not set consult the commit log
if (engine.clog().is_committed(id)) {
// If hints are not set consult the commit log.
auto info = engine.clog().fetch_info(id);
if (info.is_committed()) {
hints_.Set(Hints::kCmt & mask);
return true;
}
if (info.is_aborted() && mask == Hints::kCre) {
// We can't set hints for aborted if mask is kExp because of a
// race-condition that can occurr when tx.exp gets changed by some
// transaction.
//
// This is not a problem with hints.cre.X because only one transaction
// ever creates a record
hints_.Set(Hints::kAbt & mask);
}
// we can't set_aborted hints because of a race-condition that
// can occurr when tx.exp gets changed by some transaction.
// to be correct, tx.exp and hints.exp.set_aborted should be
// atomic.
//
// this is not a problem with hints.cre.X because
// only one transaction ever creates a record
//
// it's also not a problem with hints.exp.set_committed
// because only one transaction ever can expire a record
// and commit
return false;
}
/**
* @brief - Check if tx_.cre is aborted. If you need to check for exp
* transaction do it manually by looking at commit log. This function can't do
* that for you since hints can't be used for exp transaction (reason is
* described in function above).
*
* @param engine - engine instance with information about transaction
* statuses
* @return true if it's aborted, false otherwise
*/
bool cre_aborted(const tx::Engine &engine) const {
// If hints are set, return if id is committed.
if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre);
// If hints are not set consult the commit log.
auto info = engine.clog().fetch_info(tx_.cre);
if (info.is_aborted()) {
hints_.Set(Hints::kAbt & Hints::kCre);
return true;
}
if (info.is_committed()) {
hints_.Set(Hints::kCmt & Hints::kCre);
}
return false;
}
};

View File

@ -2,6 +2,7 @@
#include <malloc.h>
#include <limits>
#include <list>
#include "mvcc/record.hpp"
@ -16,6 +17,16 @@
template <typename T>
class DeferredDeleter {
public:
/**
* @brief - keep track of what object was deleted at which time.
*/
struct DeletedObject {
const T *object;
const tx::transaction_id_t deleted_at;
DeletedObject(const T *object, tx::transaction_id_t deleted_at)
: object(object), deleted_at(deleted_at) {}
};
/**
* @brief - check if everything is freed
*/
@ -26,18 +37,21 @@ class DeferredDeleter {
/**
* @brief - Add objects to this deleter. This method assumes that it will
* always be called with a non-decreasing sequence of `last_transaction`.
* always be called with a non-decreasing sequence of `deleted_at`.
* @param objects - vector of objects to add
* @param last_transaction - nothing newer or equal to it can see these
* objects
*/
void AddObjects(const std::vector<T *> &objects,
tx::transaction_id_t last_transaction) {
debug_assert(
objects_.size() == 0 || objects_.back().deleted_at <= last_transaction,
"Transaction ids are not non-decreasing.");
for (auto object : objects)
objects_.emplace_back(DeletedObject(object, last_transaction));
void AddObjects(const std::vector<DeletedObject> &objects) {
auto previous_tx_id = objects_.empty()
? std::numeric_limits<tx::transaction_id_t>::min()
: objects_.back().deleted_at;
for (auto object : objects) {
CHECK(previous_tx_id <= object.deleted_at)
<< "deleted_at must be non-decreasing";
previous_tx_id = object.deleted_at;
objects_.push_back(object);
}
}
/**
@ -65,16 +79,6 @@ class DeferredDeleter {
size_t Count() { return objects_.size(); }
private:
/**
* @brief - keep track of what object was deleted at which time.
*/
struct DeletedObject {
const T *object;
const tx::transaction_id_t deleted_at;
DeletedObject(T *object, tx::transaction_id_t deleted_at)
: object(object), deleted_at(deleted_at) {}
};
// Ascendingly sorted list of deleted objects by `deleted_at`.
std::list<DeletedObject> objects_;
};

View File

@ -5,6 +5,7 @@
#include "data_structures/concurrent/skiplist.hpp"
#include "mvcc/version_list.hpp"
#include "storage/deferred_deleter.hpp"
#include "storage/deferred_deleter.hpp"
#include "transactions/engine.hpp"
/**
@ -32,17 +33,20 @@ class GarbageCollector {
void Run(const tx::Snapshot &snapshot, const tx::Engine &engine) {
auto collection_accessor = this->skiplist_.access();
uint64_t count = 0;
std::vector<T *> deleted_records;
std::vector<mvcc::VersionList<T> *> deleted_version_lists;
std::vector<typename DeferredDeleter<T>::DeletedObject> deleted_records;
std::vector<typename DeferredDeleter<mvcc::VersionList<T>>::DeletedObject>
deleted_version_lists;
for (auto version_list : collection_accessor) {
// If the version_list is empty, i.e. there is nothing else to be read
// from it we can delete it.
auto ret = version_list->GcDeleted(snapshot, engine);
if (ret.first) {
deleted_version_lists.push_back(version_list);
deleted_version_lists.emplace_back(version_list,
engine.LockFreeCount());
count += collection_accessor.remove(version_list);
}
if (ret.second != nullptr) deleted_records.push_back(ret.second);
if (ret.second != nullptr)
deleted_records.emplace_back(ret.second, engine.LockFreeCount());
}
DLOG_IF(INFO, count > 0) << "GC started cleaning with snapshot: "
<< snapshot;
@ -50,10 +54,10 @@ class GarbageCollector {
// Add records to deleter, with the id larger or equal than the last active
// transaction.
record_deleter_.AddObjects(deleted_records, engine.Count());
record_deleter_.AddObjects(deleted_records);
// Add version_lists to deleter, with the id larger or equal than the last
// active transaction.
version_list_deleter_.AddObjects(deleted_version_lists, engine.Count());
version_list_deleter_.AddObjects(deleted_version_lists);
}
private:

View File

@ -31,27 +31,30 @@ class CommitLog {
void set_aborted(transaction_id_t id) { log.set(2 * id + 1); }
private:
struct Info {
class Info {
public:
enum Status {
ACTIVE = 0, // 00
COMMITTED = 1, // 01
ABORTED = 2, // 10
};
Info(uint8_t flags) : flags_(flags) {}
bool is_active() const { return flags == ACTIVE; }
bool is_active() const { return flags_ == ACTIVE; }
bool is_committed() const { return flags & COMMITTED; }
bool is_committed() const { return flags_ & COMMITTED; }
bool is_aborted() const { return flags & ABORTED; }
bool is_aborted() const { return flags_ & ABORTED; }
operator uint8_t() const { return flags; }
operator uint8_t() const { return flags_; }
uint8_t flags;
private:
uint8_t flags_;
};
Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; }
private:
DynamicBitset<uint8_t, 32768> log;
};
}

View File

@ -32,19 +32,6 @@ class Engine : Lockable<SpinLock> {
static constexpr auto kMaxCommandId =
std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max();
template <class T>
class SimpleCounter {
public:
SimpleCounter(T initial) : counter(initial) {}
T next() { return ++counter; }
T count() const { return counter; }
private:
T counter;
};
public:
/** Begins a transaction and returns a pointer to
* it's object.
@ -56,7 +43,7 @@ class Engine : Lockable<SpinLock> {
Transaction *Begin() {
auto guard = this->acquire_unique();
transaction_id_t id{counter_.next()};
transaction_id_t id{++counter_};
auto t = new Transaction(id, active_, *this);
active_.insert(id);
@ -105,7 +92,7 @@ class Engine : Lockable<SpinLock> {
// No active transactions.
if (active_.size() == 0) {
auto snapshot_copy = active_;
snapshot_copy.insert(counter_.count() + 1);
snapshot_copy.insert(counter_ + 1);
return snapshot_copy;
}
@ -133,11 +120,16 @@ class Engine : Lockable<SpinLock> {
Finalize(t);
}
/** Returns transaction id of last transaction without taking a lock. New
* transactions can be created or destroyed during call of this function.
*/
auto LockFreeCount() const { return counter_.load(); }
/** The total number of transactions that have executed since the creation of
* this engine */
auto Count() const {
auto guard = this->acquire_unique();
return counter_.count();
return counter_.load();
}
/** The count of currently active transactions */
@ -170,10 +162,6 @@ class Engine : Lockable<SpinLock> {
store_.del(t.id_);
}
// Transaction counter. contains the number of transactions ever created till
// now.
SimpleCounter<transaction_id_t> counter_{0};
// A snapshot of currently active transactions.
Snapshot active_;
@ -187,5 +175,6 @@ class Engine : Lockable<SpinLock> {
// garbage collected and we are sure that we will not be having problems with
// lifetimes of each object.
ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_;
std::atomic<transaction_id_t> counter_{0};
};
}

View File

@ -9,17 +9,15 @@
#include "utils/assert.hpp"
/**
* Class used to run scheduled function execution. Class is templated with
* mutex class TMutex which is used to synchronize threads. Default template
* value is std::mutex.
* Class used to run scheduled function execution.
*/
class Scheduler {
public:
Scheduler() {}
/**
* @param pause - Duration between two function executions. If function is
* still running when it should be ran again, it will not be ran and next
* start time will be increased to current time plus pause.
* still running when it should be ran again, it will run right after it
* finishes its previous run.
* @param f - Function
* @Tparam TRep underlying arithmetic type in duration
* @Tparam TPeriod duration in seconds between two ticks
@ -29,22 +27,24 @@ class Scheduler {
const std::function<void()> &f) {
debug_assert(is_working_ == false, "Thread already running.");
debug_assert(pause > std::chrono::seconds(0), "Pause is invalid.");
is_working_ = true;
thread_ = std::thread([this, pause, f]() {
auto start_time = std::chrono::system_clock::now();
for (;;) {
if (!is_working_.load()) break;
while (is_working_) {
f();
std::unique_lock<std::mutex> lk(mutex_);
auto now = std::chrono::system_clock::now();
while (now >= start_time) start_time += pause;
condition_variable_.wait_for(
lk, start_time - now, [&] { return is_working_.load() == false; });
lk.unlock();
start_time += pause;
if (start_time > now) {
condition_variable_.wait_for(lk, start_time - now, [&] {
return is_working_.load() == false;
});
} else {
start_time = now;
}
}
});
}

View File

@ -10,13 +10,13 @@
TEST(DeferredDeleter, AddObjects) {
DeferredDeleter<Vertex> deleter;
for (int i = 0; i < 10; ++i) {
std::vector<Vertex *> V;
V.push_back(new Vertex());
V.push_back(new Vertex());
deleter.AddObjects(V, 5);
EXPECT_EQ(deleter.Count(), (i + 1) * 2);
}
deleter.FreeExpiredObjects(tx::Transaction::MaxId());
std::vector<DeferredDeleter<Vertex>::DeletedObject> v;
v.emplace_back(new Vertex, 5);
v.emplace_back(new Vertex, 5);
deleter.AddObjects(v);
EXPECT_EQ(deleter.Count(), (i + 1) * 2);
}
deleter.FreeExpiredObjects(tx::Transaction::MaxId());
}
// Check that the deleter can't be destroyed while it still has objects.
@ -24,10 +24,10 @@ TEST(DeferredDeleter, Destructor) {
std::atomic<int> count{0};
DeferredDeleter<DestrCountRec> *deleter = new DeferredDeleter<DestrCountRec>;
for (int i = 0; i < 10; ++i) {
std::vector<DestrCountRec *> V;
V.push_back(new DestrCountRec(count));
V.push_back(new DestrCountRec(count));
deleter->AddObjects(V, 5);
std::vector<DeferredDeleter<DestrCountRec>::DeletedObject> v;
v.emplace_back(new DestrCountRec(count), 5);
v.emplace_back(new DestrCountRec(count), 5);
deleter->AddObjects(v);
EXPECT_EQ(deleter->Count(), (i + 1) * 2);
}
EXPECT_EQ(0, count);
@ -40,11 +40,11 @@ TEST(DeferredDeleter, Destructor) {
// Check if deleter frees objects.
TEST(DeferredDeleter, FreeExpiredObjects) {
DeferredDeleter<DestrCountRec> deleter;
std::vector<DestrCountRec *> V;
std::vector<DeferredDeleter<DestrCountRec>::DeletedObject> v;
std::atomic<int> count{0};
V.push_back(new DestrCountRec(count));
V.push_back(new DestrCountRec(count));
deleter.AddObjects(V, 5);
v.emplace_back(new DestrCountRec(count), 5);
v.emplace_back(new DestrCountRec(count), 5);
deleter.AddObjects(v);
deleter.FreeExpiredObjects(5);
EXPECT_EQ(deleter.Count(), 2);

View File

@ -7,37 +7,34 @@
#include "database/graph_db_datatypes.hpp"
#include "database/indexes/label_property_index.hpp"
class GraphDbTest : public testing::Test {
protected:
DECLARE_int32(gc_cycle_sec);
TEST(GraphDbTest, GarbageCollectIndices) {
FLAGS_gc_cycle_sec = -1;
GraphDb graph_db{"default", fs::path()};
std::unique_ptr<GraphDbAccessor> dba =
std::make_unique<GraphDbAccessor>(graph_db);
void Commit() {
auto commit = [&] {
dba->Commit();
auto dba2 = std::make_unique<GraphDbAccessor>(graph_db);
dba.swap(dba2);
}
};
TEST_F(GraphDbTest, GarbageCollectIndices) {
dba = std::make_unique<GraphDbAccessor>(graph_db);
};
auto label = dba->Label("label");
auto property = dba->Property("property");
dba->BuildIndex(label, property);
Commit();
commit();
auto vertex = dba->InsertVertex();
vertex.add_label(label);
vertex.PropsSet(property, 42);
Commit();
commit();
EXPECT_EQ(dba->VerticesCount(label, property), 1);
auto vertex_transferred = dba->Transfer(vertex);
dba->RemoveVertex(vertex_transferred.value());
EXPECT_EQ(dba->VerticesCount(label, property), 1);
Commit();
commit();
EXPECT_EQ(dba->VerticesCount(label, property), 1);
graph_db.CollectGarbage();
EXPECT_EQ(dba->VerticesCount(label, property), 0);
}