From 11b2b83c968a33b51dad8697b7cdd30348da4087 Mon Sep 17 00:00:00 2001 From: Mislav Bradac <mislav.bradac@memgraph.io> Date: Thu, 5 Oct 2017 21:19:32 +0200 Subject: [PATCH] Make gc more aggressive and optimise is_not_visible function Reviewers: florijan, mferencevic Reviewed By: florijan Subscribers: buda, pullbot Differential Revision: https://phabricator.memgraph.io/D849 --- .ycm_extra_conf.py | 2 +- src/database/graph_db.cpp | 67 ++++++++++++++++++++---------- src/mvcc/record.hpp | 68 ++++++++++++++++++++++--------- src/storage/deferred_deleter.hpp | 40 ++++++++++-------- src/storage/garbage_collector.hpp | 16 +++++--- src/transactions/commit_log.hpp | 17 ++++---- src/transactions/engine.hpp | 29 ++++--------- src/utils/scheduler.hpp | 26 ++++++------ tests/unit/deferred_deleter.cpp | 30 +++++++------- tests/unit/graph_db.cpp | 23 +++++------ 10 files changed, 183 insertions(+), 135 deletions(-) diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py index d8651ea40..43644062e 100644 --- a/.ycm_extra_conf.py +++ b/.ycm_extra_conf.py @@ -28,7 +28,7 @@ BASE_FLAGS = [ '-I./libs', '-I./libs/rapidcheck/include', '-I./libs/antlr4/runtime/Cpp/runtime/src', - '-I./build/libs/gflags/include', + '-I./libs/gflags/src/gflags-build/include', '-I./experimental/distributed/src', '-I./experimental/distributed/libs/cereal/include', '-I./libs/postgresql/include' diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 49ad4afae..aa01ef2e3 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -9,6 +9,7 @@ #include "durability/recovery.hpp" #include "storage/edge.hpp" #include "storage/garbage_collector.hpp" +#include "utils/timer.hpp" DEFINE_int32(gc_cycle_sec, 30, "Amount of time between starts of two cleaning cycles in seconds. " @@ -96,30 +97,53 @@ void GraphDb::RecoverDatabase(const fs::path &snapshot_db_dir) { void GraphDb::CollectGarbage() { // main garbage collection logic // see wiki documentation for logic explanation - const auto snapshot = this->tx_engine_.GcSnapshot(); + LOG(INFO) << "Garbage collector started"; + const auto snapshot = tx_engine_.GcSnapshot(); { // This can be run concurrently - this->gc_vertices_.Run(snapshot, this->tx_engine_); - this->gc_edges_.Run(snapshot, this->tx_engine_); + utils::Timer x; + gc_vertices_.Run(snapshot, tx_engine_); + gc_edges_.Run(snapshot, tx_engine_); + VLOG(1) << "Garbage collector mvcc phase time: " << x.Elapsed().count(); } // This has to be run sequentially after gc because gc modifies // version_lists and changes the oldest visible record, on which Refresh // depends. { // This can be run concurrently - this->labels_index_.Refresh(snapshot, this->tx_engine_); - this->edge_types_index_.Refresh(snapshot, this->tx_engine_); - this->label_property_index_.Refresh(snapshot, this->tx_engine_); + utils::Timer x; + labels_index_.Refresh(snapshot, tx_engine_); + edge_types_index_.Refresh(snapshot, tx_engine_); + label_property_index_.Refresh(snapshot, tx_engine_); + VLOG(1) << "Garbage collector index phase time: " << x.Elapsed().count(); } - // we free expired objects with snapshot.back(), which is - // the ID of the oldest active transaction (or next active, if there - // are no currently active). that's legal because that was the - // last possible transaction that could have obtained pointers - // to those records - this->edge_record_deleter_.FreeExpiredObjects(snapshot.back()); - this->vertex_record_deleter_.FreeExpiredObjects(snapshot.back()); - this->edge_version_list_deleter_.FreeExpiredObjects(snapshot.back()); - this->vertex_version_list_deleter_.FreeExpiredObjects(snapshot.back()); + { + // We free expired objects with snapshot.back(), which is + // the ID of the oldest active transaction (or next active, if there + // are no currently active). That's legal because that was the + // last possible transaction that could have obtained pointers + // to those records. New snapshot can be used, different than one used for + // first two phases of gc. + utils::Timer x; + const auto snapshot = tx_engine_.GcSnapshot(); + edge_record_deleter_.FreeExpiredObjects(snapshot.back()); + vertex_record_deleter_.FreeExpiredObjects(snapshot.back()); + edge_version_list_deleter_.FreeExpiredObjects(snapshot.back()); + vertex_version_list_deleter_.FreeExpiredObjects(snapshot.back()); + VLOG(1) << "Garbage collector deferred deletion phase time: " + << x.Elapsed().count(); + } + + LOG(INFO) << "Garbage collector finished"; + VLOG(2) << "gc snapshot: " << snapshot; + VLOG(2) << "edge_record_deleter_ size: " << edge_record_deleter_.Count(); + VLOG(2) << "vertex record deleter_ size: " << vertex_record_deleter_.Count(); + VLOG(2) << "edge_version_list_deleter_ size: " + << edge_version_list_deleter_.Count(); + VLOG(2) << "vertex_version_list_deleter_ size: " + << vertex_version_list_deleter_.Count(); + VLOG(2) << "vertices_ size: " << vertices_.access().size(); + VLOG(2) << "edges_ size: " << edges_.access().size(); } GraphDb::~GraphDb() { @@ -149,13 +173,12 @@ GraphDb::~GraphDb() { // Delete vertices and edges which weren't collected before, also deletes // records inside version list - for (auto &vertex : this->vertices_.access()) delete vertex; - for (auto &edge : this->edges_.access()) delete edge; + for (auto &vertex : vertices_.access()) delete vertex; + for (auto &edge : edges_.access()) delete edge; // Free expired records with the maximal possible id from all the deleters. - this->edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); - this->vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); - this->edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); - this->vertex_version_list_deleter_.FreeExpiredObjects( - tx::Transaction::MaxId()); + edge_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); + vertex_record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); + edge_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); + vertex_version_list_deleter_.FreeExpiredObjects(tx::Transaction::MaxId()); } diff --git a/src/mvcc/record.hpp b/src/mvcc/record.hpp index ec3ef3049..949cd946a 100644 --- a/src/mvcc/record.hpp +++ b/src/mvcc/record.hpp @@ -97,8 +97,9 @@ class Record : public Version<T> { // snapshot (consequently also not in the snapshots of // newer transactions) return (exp_id != 0 && exp_id < snapshot.back() && - engine.clog().is_committed(exp_id) && !snapshot.contains(exp_id)) || - engine.clog().is_aborted(tx_.cre); + committed(Hints::kExp, exp_id, engine) && + !snapshot.contains(exp_id)) || + cre_aborted(engine); } // TODO: Test this @@ -185,7 +186,7 @@ class Record : public Version<T> { // of the record to the current command in the running transaction. CreExp<tx::command_id_t> cmd_; - Hints hints_; + mutable Hints hints_; /** Fetch the (transaction, command) expiration before the check * because they can be concurrently modified by multiple transactions. * Do it in a loop to ensure that command is consistent with transaction. @@ -212,6 +213,8 @@ class Record : public Version<T> { * @param id - id to check if it's commited and visible * @return true if the id is commited and visible for the transaction t. */ + // TODO: Rename this function. Its semantics is different than of function + // below, but it has a same name. bool committed(uint8_t mask, tx::transaction_id_t id, const tx::Transaction &t) { debug_assert(mask == Hints::kCre || mask == Hints::kExp, @@ -232,8 +235,7 @@ class Record : public Version<T> { } /** - * @brief - Check if the transaction with the given `id` - * is committed. + * @brief - Check if the transaction with the given `id` is committed. * * @param mask - Hint bits mask (either Hints::kCre or Hints::kExp). * @param id - id to check if commited @@ -241,29 +243,55 @@ class Record : public Version<T> { * statuses * @return true if it's commited, false otherwise */ - bool committed(uint8_t mask, tx::transaction_id_t id, tx::Engine &engine) { + bool committed(uint8_t mask, tx::transaction_id_t id, + const tx::Engine &engine) const { debug_assert(mask == Hints::kCre || mask == Hints::kExp, "Mask must be either kCre or kExp"); - // if hints are set, return if id is committed + // If hints are set, return if id is committed. if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask); - // if hints are not set consult the commit log - if (engine.clog().is_committed(id)) { + // If hints are not set consult the commit log. + auto info = engine.clog().fetch_info(id); + if (info.is_committed()) { hints_.Set(Hints::kCmt & mask); return true; } + if (info.is_aborted() && mask == Hints::kCre) { + // We can't set hints for aborted if mask is kExp because of a + // race-condition that can occurr when tx.exp gets changed by some + // transaction. + // + // This is not a problem with hints.cre.X because only one transaction + // ever creates a record + hints_.Set(Hints::kAbt & mask); + } - // we can't set_aborted hints because of a race-condition that - // can occurr when tx.exp gets changed by some transaction. - // to be correct, tx.exp and hints.exp.set_aborted should be - // atomic. - // - // this is not a problem with hints.cre.X because - // only one transaction ever creates a record - // - // it's also not a problem with hints.exp.set_committed - // because only one transaction ever can expire a record - // and commit + return false; + } + + /** + * @brief - Check if tx_.cre is aborted. If you need to check for exp + * transaction do it manually by looking at commit log. This function can't do + * that for you since hints can't be used for exp transaction (reason is + * described in function above). + * + * @param engine - engine instance with information about transaction + * statuses + * @return true if it's aborted, false otherwise + */ + bool cre_aborted(const tx::Engine &engine) const { + // If hints are set, return if id is committed. + if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre); + + // If hints are not set consult the commit log. + auto info = engine.clog().fetch_info(tx_.cre); + if (info.is_aborted()) { + hints_.Set(Hints::kAbt & Hints::kCre); + return true; + } + if (info.is_committed()) { + hints_.Set(Hints::kCmt & Hints::kCre); + } return false; } }; diff --git a/src/storage/deferred_deleter.hpp b/src/storage/deferred_deleter.hpp index a11602cc0..a696bd7cb 100644 --- a/src/storage/deferred_deleter.hpp +++ b/src/storage/deferred_deleter.hpp @@ -2,6 +2,7 @@ #include <malloc.h> +#include <limits> #include <list> #include "mvcc/record.hpp" @@ -16,6 +17,16 @@ template <typename T> class DeferredDeleter { public: + /** + * @brief - keep track of what object was deleted at which time. + */ + struct DeletedObject { + const T *object; + const tx::transaction_id_t deleted_at; + DeletedObject(const T *object, tx::transaction_id_t deleted_at) + : object(object), deleted_at(deleted_at) {} + }; + /** * @brief - check if everything is freed */ @@ -26,18 +37,21 @@ class DeferredDeleter { /** * @brief - Add objects to this deleter. This method assumes that it will - * always be called with a non-decreasing sequence of `last_transaction`. + * always be called with a non-decreasing sequence of `deleted_at`. * @param objects - vector of objects to add * @param last_transaction - nothing newer or equal to it can see these * objects */ - void AddObjects(const std::vector<T *> &objects, - tx::transaction_id_t last_transaction) { - debug_assert( - objects_.size() == 0 || objects_.back().deleted_at <= last_transaction, - "Transaction ids are not non-decreasing."); - for (auto object : objects) - objects_.emplace_back(DeletedObject(object, last_transaction)); + void AddObjects(const std::vector<DeletedObject> &objects) { + auto previous_tx_id = objects_.empty() + ? std::numeric_limits<tx::transaction_id_t>::min() + : objects_.back().deleted_at; + for (auto object : objects) { + CHECK(previous_tx_id <= object.deleted_at) + << "deleted_at must be non-decreasing"; + previous_tx_id = object.deleted_at; + objects_.push_back(object); + } } /** @@ -65,16 +79,6 @@ class DeferredDeleter { size_t Count() { return objects_.size(); } private: - /** - * @brief - keep track of what object was deleted at which time. - */ - struct DeletedObject { - const T *object; - const tx::transaction_id_t deleted_at; - DeletedObject(T *object, tx::transaction_id_t deleted_at) - : object(object), deleted_at(deleted_at) {} - }; - // Ascendingly sorted list of deleted objects by `deleted_at`. std::list<DeletedObject> objects_; }; diff --git a/src/storage/garbage_collector.hpp b/src/storage/garbage_collector.hpp index 6d7733cbd..712c807f9 100644 --- a/src/storage/garbage_collector.hpp +++ b/src/storage/garbage_collector.hpp @@ -5,6 +5,7 @@ #include "data_structures/concurrent/skiplist.hpp" #include "mvcc/version_list.hpp" #include "storage/deferred_deleter.hpp" +#include "storage/deferred_deleter.hpp" #include "transactions/engine.hpp" /** @@ -32,17 +33,20 @@ class GarbageCollector { void Run(const tx::Snapshot &snapshot, const tx::Engine &engine) { auto collection_accessor = this->skiplist_.access(); uint64_t count = 0; - std::vector<T *> deleted_records; - std::vector<mvcc::VersionList<T> *> deleted_version_lists; + std::vector<typename DeferredDeleter<T>::DeletedObject> deleted_records; + std::vector<typename DeferredDeleter<mvcc::VersionList<T>>::DeletedObject> + deleted_version_lists; for (auto version_list : collection_accessor) { // If the version_list is empty, i.e. there is nothing else to be read // from it we can delete it. auto ret = version_list->GcDeleted(snapshot, engine); if (ret.first) { - deleted_version_lists.push_back(version_list); + deleted_version_lists.emplace_back(version_list, + engine.LockFreeCount()); count += collection_accessor.remove(version_list); } - if (ret.second != nullptr) deleted_records.push_back(ret.second); + if (ret.second != nullptr) + deleted_records.emplace_back(ret.second, engine.LockFreeCount()); } DLOG_IF(INFO, count > 0) << "GC started cleaning with snapshot: " << snapshot; @@ -50,10 +54,10 @@ class GarbageCollector { // Add records to deleter, with the id larger or equal than the last active // transaction. - record_deleter_.AddObjects(deleted_records, engine.Count()); + record_deleter_.AddObjects(deleted_records); // Add version_lists to deleter, with the id larger or equal than the last // active transaction. - version_list_deleter_.AddObjects(deleted_version_lists, engine.Count()); + version_list_deleter_.AddObjects(deleted_version_lists); } private: diff --git a/src/transactions/commit_log.hpp b/src/transactions/commit_log.hpp index 590987656..92b6d4480 100644 --- a/src/transactions/commit_log.hpp +++ b/src/transactions/commit_log.hpp @@ -31,27 +31,30 @@ class CommitLog { void set_aborted(transaction_id_t id) { log.set(2 * id + 1); } - private: - struct Info { + class Info { + public: enum Status { ACTIVE = 0, // 00 COMMITTED = 1, // 01 ABORTED = 2, // 10 }; + Info(uint8_t flags) : flags_(flags) {} - bool is_active() const { return flags == ACTIVE; } + bool is_active() const { return flags_ == ACTIVE; } - bool is_committed() const { return flags & COMMITTED; } + bool is_committed() const { return flags_ & COMMITTED; } - bool is_aborted() const { return flags & ABORTED; } + bool is_aborted() const { return flags_ & ABORTED; } - operator uint8_t() const { return flags; } + operator uint8_t() const { return flags_; } - uint8_t flags; + private: + uint8_t flags_; }; Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; } + private: DynamicBitset<uint8_t, 32768> log; }; } diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 8e76ed848..21c9b102e 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -32,19 +32,6 @@ class Engine : Lockable<SpinLock> { static constexpr auto kMaxCommandId = std::numeric_limits<decltype(std::declval<Transaction>().cid())>::max(); - template <class T> - class SimpleCounter { - public: - SimpleCounter(T initial) : counter(initial) {} - - T next() { return ++counter; } - - T count() const { return counter; } - - private: - T counter; - }; - public: /** Begins a transaction and returns a pointer to * it's object. @@ -56,7 +43,7 @@ class Engine : Lockable<SpinLock> { Transaction *Begin() { auto guard = this->acquire_unique(); - transaction_id_t id{counter_.next()}; + transaction_id_t id{++counter_}; auto t = new Transaction(id, active_, *this); active_.insert(id); @@ -105,7 +92,7 @@ class Engine : Lockable<SpinLock> { // No active transactions. if (active_.size() == 0) { auto snapshot_copy = active_; - snapshot_copy.insert(counter_.count() + 1); + snapshot_copy.insert(counter_ + 1); return snapshot_copy; } @@ -133,11 +120,16 @@ class Engine : Lockable<SpinLock> { Finalize(t); } + /** Returns transaction id of last transaction without taking a lock. New + * transactions can be created or destroyed during call of this function. + */ + auto LockFreeCount() const { return counter_.load(); } + /** The total number of transactions that have executed since the creation of * this engine */ auto Count() const { auto guard = this->acquire_unique(); - return counter_.count(); + return counter_.load(); } /** The count of currently active transactions */ @@ -170,10 +162,6 @@ class Engine : Lockable<SpinLock> { store_.del(t.id_); } - // Transaction counter. contains the number of transactions ever created till - // now. - SimpleCounter<transaction_id_t> counter_{0}; - // A snapshot of currently active transactions. Snapshot active_; @@ -187,5 +175,6 @@ class Engine : Lockable<SpinLock> { // garbage collected and we are sure that we will not be having problems with // lifetimes of each object. ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_; + std::atomic<transaction_id_t> counter_{0}; }; } diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index eb818677e..4004dfc6a 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -9,17 +9,15 @@ #include "utils/assert.hpp" /** - * Class used to run scheduled function execution. Class is templated with - * mutex class TMutex which is used to synchronize threads. Default template - * value is std::mutex. + * Class used to run scheduled function execution. */ class Scheduler { public: Scheduler() {} /** * @param pause - Duration between two function executions. If function is - * still running when it should be ran again, it will not be ran and next - * start time will be increased to current time plus pause. + * still running when it should be ran again, it will run right after it + * finishes its previous run. * @param f - Function * @Tparam TRep underlying arithmetic type in duration * @Tparam TPeriod duration in seconds between two ticks @@ -29,22 +27,24 @@ class Scheduler { const std::function<void()> &f) { debug_assert(is_working_ == false, "Thread already running."); debug_assert(pause > std::chrono::seconds(0), "Pause is invalid."); + is_working_ = true; thread_ = std::thread([this, pause, f]() { auto start_time = std::chrono::system_clock::now(); - for (;;) { - if (!is_working_.load()) break; + while (is_working_) { f(); std::unique_lock<std::mutex> lk(mutex_); - auto now = std::chrono::system_clock::now(); - while (now >= start_time) start_time += pause; - - condition_variable_.wait_for( - lk, start_time - now, [&] { return is_working_.load() == false; }); - lk.unlock(); + start_time += pause; + if (start_time > now) { + condition_variable_.wait_for(lk, start_time - now, [&] { + return is_working_.load() == false; + }); + } else { + start_time = now; + } } }); } diff --git a/tests/unit/deferred_deleter.cpp b/tests/unit/deferred_deleter.cpp index 528a13d4c..5e5144d6b 100644 --- a/tests/unit/deferred_deleter.cpp +++ b/tests/unit/deferred_deleter.cpp @@ -10,13 +10,13 @@ TEST(DeferredDeleter, AddObjects) { DeferredDeleter<Vertex> deleter; for (int i = 0; i < 10; ++i) { - std::vector<Vertex *> V; - V.push_back(new Vertex()); - V.push_back(new Vertex()); -deleter.AddObjects(V, 5); -EXPECT_EQ(deleter.Count(), (i + 1) * 2); -} -deleter.FreeExpiredObjects(tx::Transaction::MaxId()); + std::vector<DeferredDeleter<Vertex>::DeletedObject> v; + v.emplace_back(new Vertex, 5); + v.emplace_back(new Vertex, 5); + deleter.AddObjects(v); + EXPECT_EQ(deleter.Count(), (i + 1) * 2); + } + deleter.FreeExpiredObjects(tx::Transaction::MaxId()); } // Check that the deleter can't be destroyed while it still has objects. @@ -24,10 +24,10 @@ TEST(DeferredDeleter, Destructor) { std::atomic<int> count{0}; DeferredDeleter<DestrCountRec> *deleter = new DeferredDeleter<DestrCountRec>; for (int i = 0; i < 10; ++i) { - std::vector<DestrCountRec *> V; - V.push_back(new DestrCountRec(count)); - V.push_back(new DestrCountRec(count)); - deleter->AddObjects(V, 5); + std::vector<DeferredDeleter<DestrCountRec>::DeletedObject> v; + v.emplace_back(new DestrCountRec(count), 5); + v.emplace_back(new DestrCountRec(count), 5); + deleter->AddObjects(v); EXPECT_EQ(deleter->Count(), (i + 1) * 2); } EXPECT_EQ(0, count); @@ -40,11 +40,11 @@ TEST(DeferredDeleter, Destructor) { // Check if deleter frees objects. TEST(DeferredDeleter, FreeExpiredObjects) { DeferredDeleter<DestrCountRec> deleter; - std::vector<DestrCountRec *> V; + std::vector<DeferredDeleter<DestrCountRec>::DeletedObject> v; std::atomic<int> count{0}; - V.push_back(new DestrCountRec(count)); - V.push_back(new DestrCountRec(count)); - deleter.AddObjects(V, 5); + v.emplace_back(new DestrCountRec(count), 5); + v.emplace_back(new DestrCountRec(count), 5); + deleter.AddObjects(v); deleter.FreeExpiredObjects(5); EXPECT_EQ(deleter.Count(), 2); diff --git a/tests/unit/graph_db.cpp b/tests/unit/graph_db.cpp index 5d9671a19..6fda3c523 100644 --- a/tests/unit/graph_db.cpp +++ b/tests/unit/graph_db.cpp @@ -7,37 +7,34 @@ #include "database/graph_db_datatypes.hpp" #include "database/indexes/label_property_index.hpp" -class GraphDbTest : public testing::Test { - protected: +DECLARE_int32(gc_cycle_sec); + +TEST(GraphDbTest, GarbageCollectIndices) { + FLAGS_gc_cycle_sec = -1; GraphDb graph_db{"default", fs::path()}; std::unique_ptr<GraphDbAccessor> dba = std::make_unique<GraphDbAccessor>(graph_db); - void Commit() { + auto commit = [&] { dba->Commit(); - auto dba2 = std::make_unique<GraphDbAccessor>(graph_db); - dba.swap(dba2); - } -}; - -TEST_F(GraphDbTest, GarbageCollectIndices) { + dba = std::make_unique<GraphDbAccessor>(graph_db); + }; auto label = dba->Label("label"); auto property = dba->Property("property"); dba->BuildIndex(label, property); - Commit(); + commit(); auto vertex = dba->InsertVertex(); vertex.add_label(label); vertex.PropsSet(property, 42); - Commit(); + commit(); EXPECT_EQ(dba->VerticesCount(label, property), 1); auto vertex_transferred = dba->Transfer(vertex); dba->RemoveVertex(vertex_transferred.value()); EXPECT_EQ(dba->VerticesCount(label, property), 1); - Commit(); + commit(); EXPECT_EQ(dba->VerticesCount(label, property), 1); graph_db.CollectGarbage(); EXPECT_EQ(dba->VerticesCount(label, property), 0); - }