Commit log gc

Summary: Adds a commit log garbage collector, which clears old transactions from the commit log

Reviewers: florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1310
This commit is contained in:
Dominik Gleich 2018-04-03 16:51:23 +02:00
parent 0bcf2edeae
commit eb30ecb6a0
17 changed files with 373 additions and 20 deletions

View File

@ -10,6 +10,7 @@
#include "distributed/index_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "distributed/pull_produce_rpc_messages.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "stats/stats_rpc_messages.hpp"
@ -117,6 +118,11 @@ BOOST_CLASS_EXPORT(distributed::RemoveInEdgeData);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoveInEdgeRes);
// Storage Gc.
BOOST_CLASS_EXPORT(distributed::GcClearedStatusReq);
BOOST_CLASS_EXPORT(distributed::GcClearedStatusRes);
// Transactional Cache Cleaner.
BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndReq);
BOOST_CLASS_EXPORT(distributed::WaitOnTransactionEndRes);

View File

@ -148,6 +148,36 @@ class DynamicBitset {
return chunk.clear(k, n);
}
/**
* Deletes all blocks which contain all positions lower than pos, assumes that
* there doesn't exist a pointer to those blocks, i.e. it's safe to delete
* them
*/
void delete_prefix(size_t pos) {
// Never delete head as that might invalidate the whole structure which
// depends on head being available
Chunk *last = head_.load();
Chunk *chunk = last->next_;
// High is exclusive endpoint of interval
while (chunk != nullptr && chunk->high() > pos) {
last = chunk;
chunk = chunk->next_;
}
if (chunk != nullptr) {
// Unlink from last
last->next_ = nullptr;
// Deletes chunks
Chunk *next;
while (chunk) {
next = chunk->next_;
delete chunk;
chunk = next;
}
}
}
private:
// Finds the chunk to which k-th bit belongs fails if k is out of bounds.
const Chunk &FindChunk(size_t &k) const {

View File

@ -2,6 +2,9 @@
#include "communication/rpc/server.hpp"
#include "database/graph_db.hpp"
#include "database/storage_gc_master.hpp"
#include "database/storage_gc_single_node.hpp"
#include "database/storage_gc_worker.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "distributed/data_manager.hpp"
@ -97,7 +100,7 @@ class SingleNode : public PrivateBase {
IMPL_GETTERS
tx::SingleNodeEngine tx_engine_{&wal_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
StorageGcSingleNode storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_;
database::SingleNodeCounters counters_;
std::vector<int> GetWorkerIds() const override { return {0}; }
@ -161,8 +164,9 @@ class Master : public PrivateBase {
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
distributed::MasterCoordination coordination_{server_};
StorageGcMaster storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec,
server_, coordination_};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::DataRpcServer data_server_{*this, server_};
@ -206,7 +210,9 @@ class Worker : public PrivateBase {
config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
StorageGcWorker storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec,
rpc_worker_clients_.GetClientPool(0),
config_.worker_id};
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};

View File

@ -25,7 +25,7 @@ class UpdatesRpcServer;
class UpdatesRpcClients;
class DataManager;
class IndexRpcClients;
}
} // namespace distributed
namespace database {

View File

@ -1,6 +1,7 @@
#pragma once
#include <chrono>
#include <queue>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/storage.hpp"
@ -39,8 +40,8 @@ class StorageGc {
* tx::Engine. If `pause_sec` is greater then zero, then GC gets triggered
* periodically. */
StorageGc(Storage &storage, tx::Engine &tx_engine, int pause_sec)
: storage_(storage),
tx_engine_(tx_engine),
: tx_engine_(tx_engine),
storage_(storage),
vertices_(storage.vertices_),
edges_(storage.edges_) {
if (pause_sec > 0)
@ -63,17 +64,19 @@ class StorageGc {
StorageGc &operator=(const StorageGc &) = delete;
StorageGc &operator=(StorageGc &&) = delete;
virtual void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) = 0;
void CollectGarbage() {
// main garbage collection logic
// see wiki documentation for logic explanation
LOG(INFO) << "Garbage collector started";
const auto snapshot = tx_engine_.GlobalGcSnapshot();
const auto snapshot_gc = tx_engine_.GlobalGcSnapshot();
{
// This can be run concurrently
utils::Timer x;
vertices_.gc_.Run(snapshot, tx_engine_);
edges_.gc_.Run(snapshot, tx_engine_);
vertices_.gc_.Run(snapshot_gc, tx_engine_);
edges_.gc_.Run(snapshot_gc, tx_engine_);
VLOG(21) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
}
@ -83,8 +86,8 @@ class StorageGc {
{
// This can be run concurrently
utils::Timer x;
storage_.labels_index_.Refresh(snapshot, tx_engine_);
storage_.label_property_index_.Refresh(snapshot, tx_engine_);
storage_.labels_index_.Refresh(snapshot_gc, tx_engine_);
storage_.label_property_index_.Refresh(snapshot_gc, tx_engine_);
VLOG(21) << "Garbage collector index phase time: " << x.Elapsed().count();
}
{
@ -95,17 +98,19 @@ class StorageGc {
// to those records. New snapshot can be used, different than one used for
// first two phases of gc.
utils::Timer x;
const auto snapshot = tx_engine_.GlobalGcSnapshot();
edges_.record_deleter_.FreeExpiredObjects(snapshot.back());
vertices_.record_deleter_.FreeExpiredObjects(snapshot.back());
edges_.version_list_deleter_.FreeExpiredObjects(snapshot.back());
vertices_.version_list_deleter_.FreeExpiredObjects(snapshot.back());
const auto snapshot_gc = tx_engine_.GlobalGcSnapshot();
edges_.record_deleter_.FreeExpiredObjects(snapshot_gc.back());
vertices_.record_deleter_.FreeExpiredObjects(snapshot_gc.back());
edges_.version_list_deleter_.FreeExpiredObjects(snapshot_gc.back());
vertices_.version_list_deleter_.FreeExpiredObjects(snapshot_gc.back());
VLOG(21) << "Garbage collector deferred deletion phase time: "
<< x.Elapsed().count();
}
LOG(INFO) << "Garbage collector finished";
VLOG(21) << "gc snapshot: " << snapshot;
CollectCommitLogGarbage(snapshot_gc.back());
gc_txid_ranges_.emplace(snapshot_gc.back(), tx_engine_.GlobalLast());
VLOG(21) << "gc snapshot: " << snapshot_gc;
VLOG(21) << "edge_record_deleter_ size: " << edges_.record_deleter_.Count();
VLOG(21) << "vertex record deleter_ size: "
<< vertices_.record_deleter_.Count();
@ -115,13 +120,37 @@ class StorageGc {
<< vertices_.version_list_deleter_.Count();
VLOG(21) << "vertices_ size: " << storage_.vertices_.access().size();
VLOG(21) << "edges_ size: " << storage_.edges_.access().size();
LOG(INFO) << "Garbage collector finished.";
}
protected:
// Find the largest transaction from which everything older is safe to
// delete, ones for which the hints have been set in the gc phase, and no
// alive transaction from the time before the hints were set is still alive
// (otherwise that transaction could still be waiting for a resolution of
// the query to the commit log about some old transaction)
std::experimental::optional<tx::transaction_id_t> GetClogSafeTransaction(
tx::transaction_id_t oldest_active) {
std::experimental::optional<tx::transaction_id_t> safe_to_delete;
while (!gc_txid_ranges_.empty() &&
gc_txid_ranges_.front().second < oldest_active) {
safe_to_delete = gc_txid_ranges_.front().first;
gc_txid_ranges_.pop();
}
return safe_to_delete;
}
tx::Engine &tx_engine_;
private:
Storage &storage_;
tx::Engine &tx_engine_;
MvccDeleter<Vertex> vertices_;
MvccDeleter<Edge> edges_;
Scheduler scheduler_;
// History of <oldest active transaction, next transaction to be ran> ranges
// that gc operated on at some previous time - used to clear commit log
std::queue<std::pair<tx::transaction_id_t, tx::transaction_id_t>>
gc_txid_ranges_;
};
} // namespace database

View File

@ -0,0 +1,59 @@
#pragma once
#include <mutex>
#include "database/storage_gc.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
namespace database {
class StorageGcMaster : public StorageGc {
public:
using StorageGc::StorageGc;
StorageGcMaster(Storage &storage, tx::Engine &tx_engine, int pause_sec,
communication::rpc::Server &rpc_server,
distributed::MasterCoordination &coordination)
: StorageGc(storage, tx_engine, pause_sec),
rpc_server_(rpc_server),
coordination_(coordination) {
rpc_server_.Register<distributed::RanLocalGcRpc>(
[this](const distributed::GcClearedStatusReq &req) {
std::unique_lock<std::mutex> lock(worker_safe_transaction_mutex_);
worker_safe_transaction_[req.worker_id] = req.local_oldest_active;
return std::make_unique<distributed::GcClearedStatusRes>();
});
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
// Workers are sending information when it's safe to delete every
// transaction older than oldest_active from their perspective i.e. there
// won't exist another transaction in the future with id larger than or
// equal to oldest_active that might trigger a query into a commit log about
// the state of transactions which we are deleting
auto safe_transaction = GetClogSafeTransaction(oldest_active);
if (safe_transaction) {
tx::transaction_id_t min_safe = *safe_transaction;
{
std::unique_lock<std::mutex> lock(worker_safe_transaction_mutex_);
for (auto worker_id : coordination_.GetWorkerIds()) {
// Skip itself
if (worker_id == 0) continue;
min_safe = std::min(min_safe, worker_safe_transaction_[worker_id]);
}
}
// All workers reported back at least once
if (min_safe > 0) {
tx_engine_.GarbageCollectCommitLog(min_safe);
LOG(INFO) << "Clearing master commit log with tx:" << min_safe;
}
}
}
communication::rpc::Server &rpc_server_;
distributed::MasterCoordination &coordination_;
// Mapping of worker ids and oldest active transaction which is safe for
// deletion from worker perspective
std::unordered_map<int, tx::transaction_id_t> worker_safe_transaction_;
std::mutex worker_safe_transaction_mutex_;
};
} // namespace database

View File

@ -0,0 +1,15 @@
#pragma once
#include "database/storage_gc.hpp"
namespace database {
class StorageGcSingleNode : public StorageGc {
public:
using StorageGc::StorageGc;
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
}
};
} // namespace database

View File

@ -0,0 +1,31 @@
#pragma once
#include "communication/rpc/client_pool.hpp"
#include "database/storage_gc.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
#include "transactions/transaction.hpp"
namespace database {
class StorageGcWorker : public StorageGc {
public:
StorageGcWorker(Storage &storage, tx::Engine &tx_engine, int pause_sec,
communication::rpc::ClientPool &master_client_pool,
int worker_id)
: StorageGc(storage, tx_engine, pause_sec),
master_client_pool_(master_client_pool),
worker_id_(worker_id) {}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) {
master_client_pool_.Call<distributed::RanLocalGcRpc>(*safe_to_delete,
worker_id_);
tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
}
}
communication::rpc::ClientPool &master_client_pool_;
int worker_id_;
};
} // namespace database

View File

@ -0,0 +1,39 @@
#pragma once
#include "boost/serialization/access.hpp"
#include "boost/serialization/base_object.hpp"
#include "communication/rpc/messages.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/transaction.hpp"
namespace distributed {
using communication::rpc::Message;
using Endpoint = io::network::Endpoint;
struct GcClearedStatusReq : public Message {
GcClearedStatusReq() {}
GcClearedStatusReq(tx::transaction_id_t local_oldest_active, int worker_id)
: local_oldest_active(local_oldest_active), worker_id(worker_id) {}
tx::transaction_id_t local_oldest_active;
int worker_id;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &boost::serialization::base_object<Message>(*this);
ar &local_oldest_active;
ar &worker_id;
}
};
RPC_NO_MEMBER_MESSAGE(GcClearedStatusRes);
using RanLocalGcRpc =
communication::rpc::RequestResponse<GcClearedStatusReq, GcClearedStatusRes>;
} // namespace distributed

View File

@ -11,6 +11,7 @@ namespace tx {
// this class and this class doesn't acquire any lock on method calls.
class CommitLog {
public:
static constexpr int kBitsetBlockSize = 32768;
CommitLog() = default;
CommitLog(const CommitLog &) = delete;
CommitLog(CommitLog &&) = delete;
@ -33,6 +34,10 @@ class CommitLog {
void set_aborted(transaction_id_t id) { log.set(2 * id + 1); }
// Clears the commit log from bits associated with transactions with an id
// lower than `id`.
void garbage_collect_older(transaction_id_t id) { log.delete_prefix(2 * id); }
class Info {
public:
Info() {} // Needed for serialization.
@ -66,7 +71,7 @@ class CommitLog {
Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; }
private:
DynamicBitset<uint8_t, 32768> log;
DynamicBitset<uint8_t, kBitsetBlockSize> log;
};
} // namespace tx

View File

@ -90,6 +90,9 @@ class Engine {
* the given id. */
virtual void EnsureNextIdGreater(transaction_id_t tx_id) = 0;
/** Garbage collects transactions older than tx_id from commit log. */
virtual void GarbageCollectCommitLog(transaction_id_t tx_id) = 0;
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }

View File

@ -110,6 +110,10 @@ transaction_id_t SingleNodeEngine::LocalOldestActive() const {
return active_.empty() ? counter_ + 1 : active_.front();
}
void SingleNodeEngine::GarbageCollectCommitLog(transaction_id_t tx_id) {
clog_.garbage_collect_older(tx_id);
}
void SingleNodeEngine::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
std::lock_guard<SpinLock> guard(lock_);

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <experimental/optional>
#include <unordered_map>
@ -43,6 +44,7 @@ class SingleNodeEngine : public Engine {
std::function<void(Transaction &)> f) override;
Transaction *RunningTransaction(transaction_id_t tx_id) override;
void EnsureNextIdGreater(transaction_id_t tx_id) override;
void GarbageCollectCommitLog(transaction_id_t tx_id) override;
private:
transaction_id_t counter_{0};

View File

@ -187,4 +187,8 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
void WorkerEngine::EnsureNextIdGreater(transaction_id_t tx_id) {
master_client_pool_.Call<EnsureNextIdGreaterRpc>(tx_id);
}
void WorkerEngine::GarbageCollectCommitLog(transaction_id_t tx_id) {
clog_.garbage_collect_older(tx_id);
}
} // namespace tx

View File

@ -43,6 +43,7 @@ class WorkerEngine : public Engine {
const Snapshot &snapshot);
void EnsureNextIdGreater(transaction_id_t tx_id) override;
void GarbageCollectCommitLog(tx::transaction_id_t tx_id) override;
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.

View File

@ -0,0 +1,78 @@
#include <gtest/gtest.h>
#include "distributed_common.hpp"
TEST_F(DistributedGraphDbTest, GarbageCollect) {
database::GraphDbAccessor dba{master()};
auto tx = dba.transaction_id();
dba.Commit();
// Create multiple transactions so that the commit log can be cleared
for (int i = 0; i < tx::CommitLog::kBitsetBlockSize; ++i) {
database::GraphDbAccessor dba{master()};
}
master().CollectGarbage();
worker(1).CollectGarbage();
worker(2).CollectGarbage();
EXPECT_EQ(master().tx_engine().Info(tx).is_committed(), true);
database::GraphDbAccessor dba2{master()};
auto tx_last = dba2.transaction_id();
dba2.Commit();
worker(1).CollectGarbage();
worker(2).CollectGarbage();
master().CollectGarbage();
EXPECT_DEATH(master().tx_engine().Info(tx), "chunk is nullptr");
EXPECT_DEATH(worker(1).tx_engine().Info(tx), "chunk is nullptr");
EXPECT_DEATH(worker(2).tx_engine().Info(tx), "chunk is nullptr");
EXPECT_EQ(master().tx_engine().Info(tx_last).is_committed(), true);
EXPECT_EQ(worker(1).tx_engine().Info(tx_last).is_committed(), true);
EXPECT_EQ(worker(2).tx_engine().Info(tx_last).is_committed(), true);
}
TEST_F(DistributedGraphDbTest, GarbageCollectBlocked) {
database::GraphDbAccessor dba{master()};
auto tx = dba.transaction_id();
dba.Commit();
// Block garbage collection because this is a still alive transaction on the
// worker
database::GraphDbAccessor dba3{worker(1)};
// Create multiple transactions so that the commit log can be cleared
for (int i = 0; i < tx::CommitLog::kBitsetBlockSize; ++i) {
database::GraphDbAccessor dba{master()};
}
// Query for a large id so that the commit log new block is created
master().tx_engine().Info(tx::CommitLog::kBitsetBlockSize);
master().CollectGarbage();
worker(1).CollectGarbage();
worker(2).CollectGarbage();
EXPECT_EQ(master().tx_engine().Info(tx).is_committed(), true);
database::GraphDbAccessor dba2{master()};
auto tx_last = dba2.transaction_id();
dba2.Commit();
worker(1).CollectGarbage();
worker(2).CollectGarbage();
master().CollectGarbage();
EXPECT_EQ(master().tx_engine().Info(tx).is_committed(), true);
EXPECT_EQ(worker(1).tx_engine().Info(tx).is_committed(), true);
EXPECT_EQ(worker(2).tx_engine().Info(tx).is_committed(), true);
EXPECT_EQ(master().tx_engine().Info(tx_last).is_committed(), true);
EXPECT_EQ(worker(1).tx_engine().Info(tx_last).is_committed(), true);
EXPECT_EQ(worker(2).tx_engine().Info(tx_last).is_committed(), true);
}
int main(int argc, char **argv) {
::testing::FLAGS_gtest_death_test_style = "threadsafe";
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -87,4 +87,45 @@ TEST(DynamicBitset, ConstBitset) {
dbs.set(17);
const_accepting(dbs);
}
TEST(DynamicBitSet, PrefixDeleteDontDeleteHead) {
DynamicBitset<uint8_t, 8> dbs;
dbs.set(7, 1);
dbs.delete_prefix(8);
EXPECT_EQ(dbs.at(7), 1);
}
// Checks that the block is not deleted when off by one error in interval
// endpoint
TEST(DynamicBitSet, PrefixDeleteDeleteOneBlockOffByOne) {
DynamicBitset<uint8_t, 8> dbs;
dbs.set(7, 1);
// Extends number of blocks
dbs.set(10, 1);
dbs.delete_prefix(7);
EXPECT_EQ(dbs.at(7), 1);
}
TEST(DynamicBitSet, DeletePrefixDeleteOneBlock) {
DynamicBitset<uint8_t, 8> dbs;
dbs.set(7, 1);
// Extends number of blocks
dbs.set(10, 1);
dbs.delete_prefix(8);
EXPECT_DEATH(dbs.at(7), "chunk is nullptr");
EXPECT_EQ(dbs.at(10), 1);
}
TEST(DynamicBitSet, DeletePrefixDeleteMultipleBlocks) {
DynamicBitset<uint8_t, 8> dbs;
dbs.set(7, 1);
dbs.set(15, 1);
dbs.set(23, 1);
dbs.set(31, 1);
dbs.delete_prefix(30);
EXPECT_DEATH(dbs.at(7), "chunk is nullptr");
EXPECT_DEATH(dbs.at(15), "chunk is nullptr");
EXPECT_DEATH(dbs.at(23), "chunk is nullptr");
EXPECT_EQ(dbs.at(31), 1);
}
} // namespace