Prepare transactional engine for distributed

Summary:
The current idea is that the same MG binary can be used for single-node,
distributed master and distributed worker. The transactional engine in
the single-node and distributed master is the same: it determines the
transactional time and exposes all the "global" functionalities. In the
distributed worker the "global" functions must contact the master.

Reviewers: dgleich, mislav.bradac, buda

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1013
This commit is contained in:
florijan 2017-11-29 16:03:42 +01:00
parent 2fbe967465
commit ce3638b25e
28 changed files with 515 additions and 351 deletions

View File

@ -222,7 +222,8 @@ set(memgraph_src_files
${src_dir}/storage/record_accessor.cpp
${src_dir}/storage/vertex_accessor.cpp
${src_dir}/threading/thread.cpp
${src_dir}/transactions/transaction.cpp
${src_dir}/transactions/engine_master.cpp
${src_dir}/transactions/engine_worker.cpp
${src_dir}/utils/watchdog.cpp
)
# -----------------------------------------------------------------------------

View File

@ -11,12 +11,14 @@
#include "durability/snapshooter.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
#include "transactions/engine_master.hpp"
#include "utils/timer.hpp"
namespace fs = std::experimental::filesystem;
GraphDb::GraphDb(GraphDb::Config config)
: config_(config),
tx_engine_(new tx::MasterEngine()),
gc_vertices_(vertices_, vertex_record_deleter_,
vertex_version_list_deleter_),
gc_edges_(edges_, edge_record_deleter_, edge_version_list_deleter_),
@ -41,7 +43,7 @@ GraphDb::GraphDb(GraphDb::Config config)
std::chrono::seconds(
std::max(1, std::min(5, config.query_execution_time_sec / 4))),
[this]() {
tx_engine_.ForEachActiveTransaction([this](tx::Transaction &t) {
tx_engine_->LocalForEachActiveTransaction([this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
@ -54,7 +56,8 @@ GraphDb::GraphDb(GraphDb::Config config)
void GraphDb::Shutdown() {
is_accepting_transactions_ = false;
tx_engine_.ForEachActiveTransaction([](auto &t) { t.set_should_abort(); });
tx_engine_->LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
}
void GraphDb::StartSnapshooting() {
@ -77,12 +80,12 @@ void GraphDb::CollectGarbage() {
// main garbage collection logic
// see wiki documentation for logic explanation
LOG(INFO) << "Garbage collector started";
const auto snapshot = tx_engine_.GcSnapshot();
const auto snapshot = tx_engine_->GlobalGcSnapshot();
{
// This can be run concurrently
utils::Timer x;
gc_vertices_.Run(snapshot, tx_engine_);
gc_edges_.Run(snapshot, tx_engine_);
gc_vertices_.Run(snapshot, *tx_engine_);
gc_edges_.Run(snapshot, *tx_engine_);
VLOG(1) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
}
// This has to be run sequentially after gc because gc modifies
@ -91,8 +94,8 @@ void GraphDb::CollectGarbage() {
{
// This can be run concurrently
utils::Timer x;
labels_index_.Refresh(snapshot, tx_engine_);
label_property_index_.Refresh(snapshot, tx_engine_);
labels_index_.Refresh(snapshot, *tx_engine_);
label_property_index_.Refresh(snapshot, *tx_engine_);
VLOG(1) << "Garbage collector index phase time: " << x.Elapsed().count();
}
{
@ -103,7 +106,7 @@ void GraphDb::CollectGarbage() {
// to those records. New snapshot can be used, different than one used for
// first two phases of gc.
utils::Timer x;
const auto snapshot = tx_engine_.GcSnapshot();
const auto snapshot = tx_engine_->GlobalGcSnapshot();
edge_record_deleter_.FreeExpiredObjects(snapshot.back());
vertex_record_deleter_.FreeExpiredObjects(snapshot.back());
edge_version_list_deleter_.FreeExpiredObjects(snapshot.back());

View File

@ -1,5 +1,7 @@
#pragma once
#include <memory>
#include "cppitertools/filter.hpp"
#include "cppitertools/imap.hpp"
@ -88,11 +90,13 @@ class GraphDb {
Config config_;
/** transaction engine related to this database */
tx::Engine tx_engine_;
/** Transaction engine related to this database. Master instance if this
* GraphDb is a single-node deployment, or the master in a distributed system.
* Otherwise a WorkerEngine instance. */
std::unique_ptr<tx::Engine> tx_engine_;
std::atomic<int64_t> next_vertex_id_{0};
std::atomic<int64_t> next_edge_id{0};
std::atomic<int64_t> next_edge_id_{0};
// main storage for the graph
ConcurrentMap<int64_t, mvcc::VersionList<Vertex> *> vertices_;

View File

@ -1,14 +1,16 @@
#include "glog/logging.h"
#include "database/creation_exception.hpp"
#include "database/graph_db_accessor.hpp"
#include "storage/edge.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex.hpp"
#include "storage/vertex_accessor.hpp"
#include "utils/atomic.hpp"
#include "utils/on_scope_exit.hpp"
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db), transaction_(db.tx_engine_.Begin()) {
: db_(db), transaction_(MasterEngine().Begin()) {
db_.wal_.TxBegin(transaction_->id_);
}
@ -24,13 +26,13 @@ tx::transaction_id_t GraphDbAccessor::transaction_id() const {
void GraphDbAccessor::AdvanceCommand() {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
transaction_->engine_.Advance(transaction_->id_);
MasterEngine().Advance(transaction_->id_);
}
void GraphDbAccessor::Commit() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_;
transaction_->Commit();
MasterEngine().Commit(*transaction_);
db_.wal_.TxCommit(tid);
commited_ = true;
}
@ -38,7 +40,7 @@ void GraphDbAccessor::Commit() {
void GraphDbAccessor::Abort() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_;
transaction_->Abort();
MasterEngine().Abort(*transaction_);
db_.wal_.TxAbort(tid);
aborted_ = true;
}
@ -56,11 +58,7 @@ VertexAccessor GraphDbAccessor::InsertVertex(
auto id = opt_id ? *opt_id : db_.next_vertex_id_++;
if (opt_id) {
while (true) {
auto next_id = db_.next_vertex_id_.load();
if (next_id > id) break;
if (db_.next_vertex_id_.compare_exchange_strong(next_id, id + 1)) break;
}
utils::EnsureAtomicGe(db_.next_vertex_id_, id + 1);
}
auto vertex_vlist = new mvcc::VersionList<Vertex>(*transaction_, id);
@ -118,12 +116,12 @@ void GraphDbAccessor::BuildIndex(const GraphDbTypes::Label &label,
// happened earlier. We have to first wait for every transaction that
// happend before, or a bit later than CreateIndex to end.
{
auto wait_transactions = db_.tx_engine_.ActiveTransactions();
auto wait_transactions = db_.tx_engine_->GlobalActiveTransactions();
auto active_index_creation_transactions =
db_.index_build_tx_in_progress_.access();
for (auto id : wait_transactions) {
if (active_index_creation_transactions.contains(id)) continue;
while (db_.tx_engine_.IsActive(id)) {
while (db_.tx_engine_->GlobalIsActive(id)) {
// Active index creation set could only now start containing that id,
// since that thread could have not written to the set set and to avoid
// dead-lock we need to make sure we keep track of that
@ -278,13 +276,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
VertexAccessor &from, VertexAccessor &to, GraphDbTypes::EdgeType edge_type,
std::experimental::optional<int64_t> opt_id) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
auto id = opt_id ? *opt_id : db_.next_edge_id++;
auto id = opt_id ? *opt_id : db_.next_edge_id_++;
if (opt_id) {
while (true) {
auto next_id = db_.next_edge_id.load();
if (next_id > id) break;
if (db_.next_edge_id.compare_exchange_strong(next_id, id + 1)) break;
}
utils::EnsureAtomicGe(db_.next_edge_id_, id + 1);
}
auto edge_vlist = new mvcc::VersionList<Edge>(*transaction_, id, from.vlist_,

View File

@ -15,6 +15,7 @@
#include "graph_db.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex_accessor.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/transaction.hpp"
#include "utils/bound.hpp"
@ -493,24 +494,16 @@ class GraphDbAccessor {
/** Returns the id of this accessor's transaction */
tx::transaction_id_t transaction_id() const;
/**
* Advances transaction's command id by 1.
*/
/** Advances transaction's command id by 1. */
void AdvanceCommand();
/**
* Commit transaction.
*/
/** Commit transaction. */
void Commit();
/**
* Abort transaction.
*/
/** Abort transaction. */
void Abort();
/**
* Return true if transaction is hinted to abort.
*/
/** Return true if transaction is hinted to abort. */
bool should_abort() const;
/** Returns the transaction of this accessor */
@ -573,6 +566,15 @@ class GraphDbAccessor {
!accessor.new_->is_expired_by(*transaction_));
}
/** Casts the DB's engine to MasterEngine and returns it. If the DB's engine
* is
* RemoteEngine, this function will crash MG. */
tx::MasterEngine &MasterEngine() {
auto *local_engine = dynamic_cast<tx::MasterEngine *>(db_.tx_engine_.get());
DCHECK(local_engine) << "Asked for MasterEngine on distributed worker";
return *local_engine;
}
GraphDb &db_;
/** The current transaction */

View File

@ -251,7 +251,7 @@ class Record : public Version<T> {
if (hints_.Get(mask)) return hints_.Get(Hints::kCmt & mask);
// If hints are not set consult the commit log.
auto info = engine.FetchInfo(id);
auto info = engine.Info(id);
if (info.is_committed()) {
hints_.Set(Hints::kCmt & mask);
return true;
@ -284,7 +284,7 @@ class Record : public Version<T> {
if (hints_.Get(Hints::kCre)) return hints_.Get(Hints::kAbt & Hints::kCre);
// If hints are not set consult the commit log.
auto info = engine.FetchInfo(tx_.cre);
auto info = engine.Info(tx_.cre);
if (info.is_aborted()) {
hints_.Set(Hints::kAbt & Hints::kCre);
return true;

View File

@ -46,11 +46,11 @@ class GarbageCollector {
// from it we can delete it.
auto ret = vlist->GcDeleted(snapshot, engine);
if (ret.first) {
deleted_version_lists.emplace_back(vlist, engine.LockFreeCount());
deleted_version_lists.emplace_back(vlist, engine.LocalLast());
count += collection_accessor.remove(id_vlist.first);
}
if (ret.second != nullptr)
deleted_records.emplace_back(ret.second, engine.LockFreeCount());
deleted_records.emplace_back(ret.second, engine.LocalLast());
}
DLOG_IF(INFO, count > 0) << "GC started cleaning with snapshot: "
<< snapshot;

View File

@ -55,8 +55,8 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
tx::transaction_id_t owner = owner_;
if (owner_ == tx.id_) return LockStatus::AlreadyHeld;
// Insert edge into lock_graph.
auto accessor = engine.lock_graph().access();
// Insert edge into local lock_graph.
auto accessor = engine.local_lock_graph().access();
auto it = accessor.insert(tx.id_, owner).first;
auto abort_oldest_tx_in_lock_cycle = [&tx, &accessor, &engine]() {
@ -67,7 +67,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
// oldest one.
auto oldest = FindOldestTxInLockCycle(tx.id_, accessor);
if (oldest) {
engine.ForEachActiveTransaction([&](tx::Transaction &t) {
engine.LocalForEachActiveTransaction([&](tx::Transaction &t) {
if (t.id_ == oldest) {
t.set_should_abort();
}

View File

@ -1,76 +1,29 @@
#pragma once
#include <atomic>
#include <limits>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
#include "glog/logging.h"
#include "threading/sync/spinlock.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"
#include "transactions/type.hpp"
namespace tx {
/** Indicates an error in transaction handling (currently
* only command id overflow). */
class TransactionError : public utils::BasicException {
public:
using utils::BasicException::BasicException;
};
/** Database transaction egine.
/**
* Database transaction engine. Used for managing transactions and the related
* information such as transaction snapshots and the transaction state info.
*
* Used for managing transactions and the related information
* such as transaction snapshots and the commit log.
* This is an abstract base class for implementing a single-node transactional
* engine (MasterEngine), an engine for the master in a distributed system (also
* MasterEngine), and for the worker in a distributed system (WorkerEngine).
*
* Methods in this class are often prefixed with "Global" or "Local", depending
* on the guarantees that they need to satisfy. These guarantee requirements are
* determined by the users of a particular method.
*/
class Engine {
public:
Engine() = default;
virtual ~Engine() = default;
/** Begins a transaction and returns a pointer to
* it's object.
*
* The transaction object is owned by this engine.
* It will be released when the transaction gets
* committted or aborted.
*/
Transaction *Begin() {
std::lock_guard<SpinLock> guard(lock_);
transaction_id_t id{++counter_};
auto t = new Transaction(id, active_, *this);
active_.insert(id);
store_.emplace(id, t);
return t;
}
/** Advances the command on the transaction with the
* given id.
*
* @param id - Transation id. That transaction must
* be currently active.
*/
void Advance(transaction_id_t id) {
std::lock_guard<SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
Transaction *t = it->second.get();
if (t->cid_ == std::numeric_limits<command_id_t>::max())
throw TransactionError(
"Reached maximum number of commands in this "
"transaction.");
t->cid_++;
}
/** Returns the commit log Info about the given transaction. */
virtual CommitLog::Info Info(transaction_id_t tx) const = 0;
/** Returns the snapshot relevant to garbage collection of database records.
*
@ -83,85 +36,33 @@ class Engine {
* that was committed) by a transaction older then the older currently active.
* We need the full snapshot to prevent overlaps (see general GC
* documentation).
*
* The returned snapshot must be for the globally oldest active transaction.
* If we only looked at locally known transactions, it would be possible to
* delete something that and older active transaction can still see.
*/
Snapshot GcSnapshot() {
std::lock_guard<SpinLock> guard(lock_);
virtual Snapshot GlobalGcSnapshot() = 0;
// No active transactions.
if (active_.size() == 0) {
auto snapshot_copy = active_;
snapshot_copy.insert(counter_ + 1);
return snapshot_copy;
}
// There are active transactions.
auto snapshot_copy = store_.find(active_.front())->second->snapshot();
snapshot_copy.insert(active_.front());
return snapshot_copy;
}
/**
* Returns active transactions.
*/
Snapshot ActiveTransactions() {
std::lock_guard<SpinLock> guard(lock_);
Snapshot active_transactions = active_;
return active_transactions;
}
/** Comits the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Commit(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
store_.erase(store_.find(t.id_));
}
/** Aborts the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Abort(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
store_.erase(store_.find(t.id_));
}
/** Returns transaction id of last transaction without taking a lock. New
* transactions can be created or destroyed during call of this function.
*/
auto LockFreeCount() const { return counter_.load(); }
/** Returns active transactions. */
virtual Snapshot GlobalActiveTransactions() = 0;
/** Returns true if the transaction with the given ID is currently active. */
bool IsActive(transaction_id_t tx) const { return clog_.is_active(tx); }
virtual bool GlobalIsActive(transaction_id_t tx) const = 0;
/** Calls function f on each active transaction. */
void ForEachActiveTransaction(std::function<void(Transaction &)> f) {
std::lock_guard<SpinLock> guard(lock_);
for (auto transaction : active_) {
f(*store_.find(transaction)->second);
}
}
/** Returns the ID of last locally known transaction. */
virtual tx::transaction_id_t LocalLast() const = 0;
/** Returns the commit log Info about the given transaction. */
auto FetchInfo(transaction_id_t tx) const { return clog_.fetch_info(tx); }
/** Calls function f on each locally active transaction. */
virtual void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) = 0;
auto &lock_graph() { return lock_graph_; }
const auto &lock_graph() const { return lock_graph_; }
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }
private:
std::atomic<transaction_id_t> counter_{0};
CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
Snapshot active_;
SpinLock lock_;
// For each active transaction we store a transaction that holds a lock that
// mentioned transaction is also trying to acquire. We can think of this
// data structure as a graph: key being a start node of directed edges and
// value being an end node of that edge. ConcurrentMap is used since it is
// garbage collected and we are sure that we will not be having problems with
// lifetimes of each object.
ConcurrentMap<transaction_id_t, transaction_id_t> lock_graph_;
// Map lock dependencies. Each entry maps (tx_that_wants_lock,
// tx_that_holds_lock). Used for local deadlock resolution.
// TODO consider global deadlock resolution.
ConcurrentMap<transaction_id_t, transaction_id_t> local_lock_graph_;
};
} // namespace tx

View File

@ -0,0 +1,90 @@
#include <limits>
#include <mutex>
#include "glog/logging.h"
#include "transactions/engine_master.hpp"
namespace tx {
Transaction *MasterEngine::Begin() {
std::lock_guard<SpinLock> guard(lock_);
transaction_id_t id{++counter_};
auto t = new Transaction(id, active_, *this);
active_.insert(id);
store_.emplace(id, t);
return t;
}
void MasterEngine::Advance(transaction_id_t id) {
std::lock_guard<SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
Transaction *t = it->second.get();
if (t->cid_ == std::numeric_limits<command_id_t>::max())
throw TransactionError(
"Reached maximum number of commands in this "
"transaction.");
t->cid_++;
}
void MasterEngine::Commit(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
store_.erase(store_.find(t.id_));
}
void MasterEngine::Abort(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
store_.erase(store_.find(t.id_));
}
CommitLog::Info MasterEngine::Info(transaction_id_t tx) const {
return clog_.fetch_info(tx);
}
Snapshot MasterEngine::GlobalGcSnapshot() {
std::lock_guard<SpinLock> guard(lock_);
// No active transactions.
if (active_.size() == 0) {
auto snapshot_copy = active_;
snapshot_copy.insert(counter_ + 1);
return snapshot_copy;
}
// There are active transactions.
auto snapshot_copy = store_.find(active_.front())->second->snapshot();
snapshot_copy.insert(active_.front());
return snapshot_copy;
}
Snapshot MasterEngine::GlobalActiveTransactions() {
std::lock_guard<SpinLock> guard(lock_);
Snapshot active_transactions = active_;
return active_transactions;
}
bool MasterEngine::GlobalIsActive(transaction_id_t tx) const {
return clog_.is_active(tx);
}
tx::transaction_id_t MasterEngine::LocalLast() const { return counter_.load(); }
void MasterEngine::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
std::lock_guard<SpinLock> guard(lock_);
for (auto transaction : active_) {
f(*store_.find(transaction)->second);
}
}
} // namespace tx

View File

@ -0,0 +1,71 @@
#pragma once
#include <atomic>
#include <memory>
#include <unordered_map>
#include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"
namespace tx {
/** Indicates an error in transaction handling (currently
* only command id overflow). */
class TransactionError : public utils::BasicException {
public:
using utils::BasicException::BasicException;
};
/**
* A transaction engine that contains everything necessary for transactional
* handling. Used for single-node Memgraph deployments and for the master in a
* distributed system.
*/
class MasterEngine : public Engine {
public:
/**
* Begins a transaction and returns a pointer to
* it's object.
*
* The transaction object is owned by this engine.
* It will be released when the transaction gets
* committted or aborted.
*/
Transaction *Begin();
/**
* Advances the command on the transaction with the
* given id.
*
* @param id - Transation id. That transaction must
* be currently active.
*/
void Advance(transaction_id_t id);
/** Comits the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Commit(const Transaction &t);
/** Aborts the given transaction. Deletes the transaction object, it's not
* valid after this function executes. */
void Abort(const Transaction &t);
CommitLog::Info Info(transaction_id_t tx) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
bool GlobalIsActive(transaction_id_t tx) const override;
tx::transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
private:
std::atomic<transaction_id_t> counter_{0};
CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
Snapshot active_;
SpinLock lock_;
};
} // namespace tx

View File

@ -0,0 +1,61 @@
#include "glog/logging.h"
#include "transactions/engine_worker.hpp"
#include "utils/atomic.hpp"
namespace tx {
Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot = rpc_.Call<Snapshot>("Snapshot", tx_id);
auto *tx = new Transaction(tx_id, snapshot, *this);
auto insertion = accessor.insert(tx_id, tx);
if (!insertion.second) {
delete tx;
tx = insertion.first->second;
}
utils::EnsureAtomicGe(local_last_, tx_id);
return tx;
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the
// master about it and update the local commit log.
if (!(info.is_aborted() || info.is_committed())) {
// @review: this version of Call is just used because Info has no default
// constructor.
info = rpc_.Call<CommitLog::Info>("Info", info, tid);
DCHECK(info.is_committed() || info.is_aborted())
<< "It is expected that the transaction is not running anymore. This "
"function should be used only after the snapshot of the current "
"transaction is checked (in MVCC).";
if (info.is_committed()) clog_.set_committed(tid);
if (info.is_aborted()) clog_.set_aborted(tid);
}
return info;
}
Snapshot WorkerEngine::GlobalGcSnapshot() {
return rpc_.Call<Snapshot>("Snapshot");
}
Snapshot WorkerEngine::GlobalActiveTransactions() {
return rpc_.Call<Snapshot>("Active");
}
bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const {
return rpc_.Call<bool>("GlobalIsActive", tid);
}
tx::transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
void WorkerEngine::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
for (auto pair : active_.access()) f(*pair.second);
}
} // namespace tx

View File

@ -0,0 +1,52 @@
#pragma once
#include <atomic>
#include <memory>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
namespace tx {
/** A transactional engine for the worker in a distributed system. */
class WorkerEngine : public Engine {
// Mock class for RPC.
// TODO Replace with the real thing, once available.
class Rpc {
public:
template <typename TReturn, typename... TArgs>
TReturn Call(const std::string &, TArgs &&...) {
return TReturn{};
}
template <typename TReturn, typename... TArgs>
TReturn Call(const std::string &, TReturn default_return, TArgs &&...) {
return default_return;
}
};
public:
WorkerEngine(Rpc &rpc) : rpc_(rpc) {}
Transaction *LocalBegin(transaction_id_t tx_id);
CommitLog::Info Info(transaction_id_t tid) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
bool GlobalIsActive(transaction_id_t tid) const override;
tx::transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
private:
// Communication with the transactional engine on the master.
Rpc &rpc_;
// Local caches.
ConcurrentMap<transaction_id_t, Transaction *> active_;
std::atomic<transaction_id_t> local_last_;
// Mutable because just getting info can cause a cache fill.
mutable CommitLog clog_;
};
} // namespace tx

View File

@ -1,18 +0,0 @@
#include "transactions/transaction.hpp"
#include "transactions/engine.hpp"
namespace tx {
Transaction::Transaction(transaction_id_t id, const Snapshot &snapshot,
Engine &engine)
: id_(id), engine_(engine), snapshot_(snapshot) {}
void Transaction::TakeLock(RecordLock &lock) const {
locks_.Take(&lock, *this, engine_);
}
void Transaction::Commit() { engine_.Commit(*this); }
void Transaction::Abort() { engine_.Abort(*this); }
}

View File

@ -26,10 +26,12 @@ class Transaction {
}
private:
friend class Engine;
friend class MasterEngine;
friend class WorkerEngine;
// The constructor is private, only the Engine ever uses it.
Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine);
Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine)
: id_(id), engine_(engine), snapshot_(snapshot) {}
// A transaction can't be moved nor copied. it's owned by the transaction
// engine, and it's lifetime is managed by it.
@ -39,10 +41,6 @@ class Transaction {
Transaction &operator=(Transaction &&) = delete;
public:
/** Acquires the lock over the given RecordLock, preventing other transactions
* from doing the same */
void TakeLock(RecordLock &lock) const;
/** Commits this transaction. After this call this transaction object is no
* longer valid for use (it gets deleted by the engine that owns it). */
void Commit();
@ -51,6 +49,10 @@ class Transaction {
* longer valid for use (it gets deleted by the engine that owns it). */
void Abort();
/** Acquires the lock over the given RecordLock, preventing other transactions
* from doing the same */
void TakeLock(RecordLock &lock) const { locks_.Take(&lock, *this, engine_); }
/** Transaction's id. Unique in the engine that owns it */
const transaction_id_t id_;

23
src/utils/atomic.hpp Normal file
View File

@ -0,0 +1,23 @@
#pragma once
#include <atomic>
namespace utils {
/**
* Ensures that the given atomic is greater or equal to the given value. If it
* already satisfies that predicate, it's not modified.
*
* @param atomic - The atomic variable to ensure on.
* @param value - The minimal value the atomic must have after this call.
* @tparam TValue - Type of value.
*/
template <typename TValue>
void EnsureAtomicGe(std::atomic<TValue> &atomic, TValue value) {
while (true) {
auto current = atomic.load();
if (current >= value) break;
if (atomic.compare_exchange_strong(current, value)) break;
}
}
} // namespace utils

View File

@ -4,6 +4,7 @@
#include "mvcc/record.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/engine_master.hpp"
class Prop : public mvcc::Record<Prop> {
public:
@ -18,11 +19,11 @@ class Prop : public mvcc::Record<Prop> {
void MvccMix(benchmark::State &state) {
while (state.KeepRunning()) {
state.PauseTiming();
tx::Engine engine;
tx::MasterEngine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Prop> version_list(*t1, 0);
t1->Commit();
engine.Commit(*t1);
auto t2 = engine.Begin();
state.ResumeTiming();
@ -33,7 +34,7 @@ void MvccMix(benchmark::State &state) {
version_list.find(*t2);
state.PauseTiming();
t2->Abort();
engine.Abort(*t2);
auto t3 = engine.Begin();
state.ResumeTiming();
@ -48,8 +49,8 @@ void MvccMix(benchmark::State &state) {
}
state.PauseTiming();
t3->Commit();
t4->Commit();
engine.Commit(*t3);
engine.Commit(*t4);
state.ResumeTiming();
}
}

View File

@ -1,47 +0,0 @@
#include <thread>
#include <vector>
#include "glog/logging.h"
#include "transactions/engine.hpp"
int main() {
// (try to) test correctness of the transaction life cycle
constexpr int THREADS = 16;
constexpr int TRANSACTIONS = 10;
tx::Engine engine;
std::vector<uint64_t> sums;
sums.resize(THREADS);
auto f = [&engine, &sums](int idx, int n) {
uint64_t sum = 0;
for (int i = 0; i < n; ++i) {
auto t = engine.Begin();
sum += t->id_;
engine.Commit(*t);
}
sums[idx] = sum;
};
std::vector<std::thread> threads;
for (int i = 0; i < THREADS; ++i)
threads.push_back(std::thread(f, i, TRANSACTIONS));
for (auto &thread : threads) thread.join();
uint64_t sum_computed = 0;
for (int i = 0; i < THREADS; ++i) sum_computed += sums[i];
uint64_t sum_actual = 0;
for (uint64_t i = 1; i <= THREADS * TRANSACTIONS; ++i) sum_actual += i;
std::cout << sum_computed << " " << sum_actual << std::endl;
CHECK(sum_computed == sum_actual) << "sums have to be the same";
return 0;
}

View File

@ -4,6 +4,7 @@
#include "database/graph_db_accessor.hpp"
#include "database/graph_db_datatypes.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine_master.hpp"
#include "mvcc_gc_common.hpp"
@ -14,10 +15,10 @@ TEST(LabelsIndex, UniqueInsert) {
KeyIndex<GraphDbTypes::Label, Vertex> index;
GraphDb db;
GraphDbAccessor dba(db);
tx::Engine engine;
tx::MasterEngine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist(*t1, 0);
t1->Commit();
engine.Commit(*t1);
auto t2 = engine.Begin();
vlist.find(*t2)->labels_.push_back(dba.Label("1"));
@ -30,7 +31,7 @@ TEST(LabelsIndex, UniqueInsert) {
vlist.find(*t2)->labels_.push_back(dba.Label("3"));
index.Update(dba.Label("3"), &vlist, vlist.find(*t2));
t2->Commit();
engine.Commit(*t2);
EXPECT_EQ(index.Count(dba.Label("1")), 1);
EXPECT_EQ(index.Count(dba.Label("2")), 1);
@ -42,7 +43,7 @@ TEST(LabelsIndex, UniqueFilter) {
GraphDb db;
KeyIndex<GraphDbTypes::Label, Vertex> index;
GraphDbAccessor dba(db);
tx::Engine engine;
tx::MasterEngine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist1(*t1, 0);
@ -57,14 +58,14 @@ TEST(LabelsIndex, UniqueFilter) {
vlist2.find(*t1)->labels_.push_back(label1);
index.Update(label1, &vlist1, r1v1);
index.Update(label1, &vlist2, r1v2);
t1->Commit();
engine.Commit(*t1);
auto t2 = engine.Begin();
auto r2v1 = vlist1.update(*t2);
auto r2v2 = vlist2.update(*t2);
index.Update(label1, &vlist1, r2v1);
index.Update(label1, &vlist2, r2v2);
t2->Commit();
engine.Commit(*t2);
auto t3 = engine.Begin();
std::vector<mvcc::VersionList<Vertex> *> expected = {&vlist1, &vlist2};
@ -82,7 +83,7 @@ TEST(LabelsIndex, Refresh) {
KeyIndex<GraphDbTypes::Label, Vertex> index;
GraphDb db;
GraphDbAccessor access(db);
tx::Engine engine;
tx::MasterEngine engine;
// add two vertices to database
auto t1 = engine.Begin();
@ -100,7 +101,7 @@ TEST(LabelsIndex, Refresh) {
v2r1->labels_.push_back(label);
index.Update(label, &vlist1, v1r1);
index.Update(label, &vlist2, v2r1);
t1->Commit();
engine.Commit(*t1);
auto t2 = engine.Begin();
auto v1r2 = vlist1.update(*t2);
@ -111,7 +112,7 @@ TEST(LabelsIndex, Refresh) {
index.Refresh(GcSnapshot(engine, t2), engine);
EXPECT_EQ(index.Count(label), 4);
t2->Commit();
engine.Commit(*t2);
EXPECT_EQ(index.Count(label), 4);
index.Refresh(GcSnapshot(engine, nullptr), engine);
EXPECT_EQ(index.Count(label), 2);

View File

@ -4,6 +4,7 @@
#include "database/graph_db_accessor.hpp"
#include "database/graph_db_datatypes.hpp"
#include "database/indexes/label_property_index.hpp"
#include "transactions/engine_master.hpp"
#include "mvcc_gc_common.hpp"
@ -43,7 +44,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
LabelPropertyIndex index;
LabelPropertyIndex::Key *key;
tx::Engine engine;
tx::MasterEngine engine;
tx::Transaction *t{nullptr};
mvcc::VersionList<Vertex> *vlist;
@ -142,11 +143,11 @@ TEST_F(LabelPropertyIndexComplexTest, UniqueInsert) {
// Check if index filters duplicates.
TEST_F(LabelPropertyIndexComplexTest, UniqueFilter) {
index.UpdateOnLabelProperty(vlist, vertex);
t->Commit();
engine.Commit(*t);
auto t2 = engine.Begin();
auto vertex2 = vlist->update(*t2);
t2->Commit();
engine.Commit(*t2);
index.UpdateOnLabelProperty(vlist, vertex2);
EXPECT_EQ(index.Count(*key), 2);
@ -154,7 +155,7 @@ TEST_F(LabelPropertyIndexComplexTest, UniqueFilter) {
auto t3 = engine.Begin();
auto iter = index.GetVlists(*key, *t3, false);
EXPECT_EQ(std::distance(iter.begin(), iter.end()), 1);
t3->Commit();
engine.Commit(*t3);
}
// Remove label and check if index vertex is not returned now.
@ -184,7 +185,7 @@ TEST_F(LabelPropertyIndexComplexTest, RemoveProperty) {
// Refresh with a vertex that looses its labels and properties.
TEST_F(LabelPropertyIndexComplexTest, Refresh) {
index.UpdateOnLabelProperty(vlist, vertex);
t->Commit();
engine.Commit(*t);
EXPECT_EQ(index.Count(*key), 1);
vertex->labels_.clear();
vertex->properties_.clear();

View File

@ -5,18 +5,18 @@
#include "mvcc/version.hpp"
#include "mvcc/version_list.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.cpp"
#include "transactions/engine_master.hpp"
#include "transactions/transaction.hpp"
#include "mvcc_gc_common.hpp"
TEST(MVCC, Deadlock) {
tx::Engine engine;
tx::MasterEngine engine;
auto t0 = engine.Begin();
mvcc::VersionList<Prop> version_list1(*t0, 0);
mvcc::VersionList<Prop> version_list2(*t0, 1);
t0->Commit();
engine.Commit(*t0);
auto t1 = engine.Begin();
auto t2 = engine.Begin();
@ -31,14 +31,14 @@ TEST(MVCC, Deadlock) {
TEST(MVCC, UpdateDontDelete) {
std::atomic<int> count{0};
{
tx::Engine engine;
tx::MasterEngine engine;
auto t1 = engine.Begin();
mvcc::VersionList<DestrCountRec> version_list(*t1, 0, count);
t1->Commit();
engine.Commit(*t1);
auto t2 = engine.Begin();
version_list.update(*t2);
t2->Abort();
engine.Abort(*t2);
EXPECT_EQ(count, 0);
auto t3 = engine.Begin();
@ -48,14 +48,14 @@ TEST(MVCC, UpdateDontDelete) {
EXPECT_EQ(count, 0);
// TODO Gleich: why don't we also test that remove doesn't delete?
t3->Commit();
engine.Commit(*t3);
}
EXPECT_EQ(count, 3);
}
// Check that we get the oldest record.
TEST(MVCC, Oldest) {
tx::Engine engine;
tx::MasterEngine engine;
auto t1 = engine.Begin();
mvcc::VersionList<Prop> version_list(*t1, 0);
auto first = version_list.Oldest();

View File

@ -5,8 +5,8 @@
#include "mvcc/version.hpp"
#include "mvcc/version_list.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.cpp"
#include "transactions/engine_master.hpp"
#include "transactions/transaction.hpp"
class TestClass : public mvcc::Record<TestClass> {
public:
@ -52,13 +52,13 @@ class Mvcc : public ::testing::Test {
engine.Advance(t1->id_);
id1 = t1->id_;
v1 = version_list.find(*t1);
t1->Commit();
engine.Commit(*t1);
t2 = engine.Begin();
id2 = t2->id_;
}
// variable where number of versions is stored
int version_list_size = 0;
tx::Engine engine;
tx::MasterEngine engine;
tx::Transaction *t1 = engine.Begin();
mvcc::VersionList<TestClass> version_list{*t1, 0, version_list_size};
TestClass *v1 = nullptr;
@ -74,10 +74,10 @@ class Mvcc : public ::testing::Test {
#define T4_FIND __attribute__((unused)) auto v4 = version_list.find(*t4)
#define T2_UPDATE __attribute__((unused)) auto v2 = version_list.update(*t2)
#define T3_UPDATE __attribute__((unused)) auto v3 = version_list.update(*t3)
#define T2_COMMIT t2->Commit();
#define T3_COMMIT t3->Commit();
#define T2_ABORT t2->Abort();
#define T3_ABORT t3->Abort();
#define T2_COMMIT engine.Commit(*t2);
#define T3_COMMIT engine.Commit(*t3);
#define T2_ABORT engine.Abort(*t2);
#define T3_ABORT engine.Abort(*t3);
#define T3_BEGIN \
auto t3 = engine.Begin(); \
__attribute__((unused)) int id3 = t3->id_

View File

@ -12,13 +12,13 @@
#include "mvcc/version_list.hpp"
#include "storage/garbage_collector.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine.hpp"
#include "transactions/engine_master.hpp"
#include "mvcc_gc_common.hpp"
class MvccGcTest : public ::testing::Test {
protected:
tx::Engine engine;
tx::MasterEngine engine;
private:
tx::Transaction *t0 = engine.Begin();
@ -29,16 +29,16 @@ class MvccGcTest : public ::testing::Test {
record_destruction_count};
std::vector<tx::Transaction *> transactions{t0};
void SetUp() override { t0->Commit(); }
void SetUp() override { engine.Commit(*t0); }
void MakeUpdates(int update_count, bool commit) {
for (int i = 0; i < update_count; i++) {
auto t = engine.Begin();
version_list.update(*t);
if (commit)
t->Commit();
engine.Commit(*t);
else
t->Abort();
engine.Abort(*t);
}
}
@ -50,7 +50,7 @@ class MvccGcTest : public ::testing::Test {
TEST_F(MvccGcTest, RemoveAndAbort) {
auto t = engine.Begin();
version_list.remove(version_list.find(*t), *t);
t->Abort();
engine.Abort(*t);
auto ret = GcDeleted();
EXPECT_EQ(ret.first, false);
EXPECT_EQ(ret.second, nullptr);
@ -74,7 +74,7 @@ TEST_F(MvccGcTest, UpdateAndAbort) {
TEST_F(MvccGcTest, RemoveAndCommit) {
auto t = engine.Begin();
version_list.remove(version_list.find(*t), *t);
t->Commit();
engine.Commit(*t);
auto ret = GcDeleted();
EXPECT_EQ(ret.first, true);
EXPECT_NE(ret.second, nullptr);
@ -102,7 +102,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) {
auto t1 = engine.Begin();
auto t2 = engine.Begin();
version_list.remove(version_list.find(*t1), *t1);
t1->Commit();
engine.Commit(*t1);
auto ret = GcDeleted(t2);
EXPECT_EQ(ret.first, false);
@ -116,7 +116,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) {
*/
TEST(GarbageCollector, GcClean) {
ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection;
tx::Engine engine;
tx::MasterEngine engine;
DeferredDeleter<DestrCountRec> deleter;
DeferredDeleter<mvcc::VersionList<DestrCountRec>> vlist_deleter;
GarbageCollector<decltype(collection), DestrCountRec> gc(collection, deleter,
@ -129,7 +129,7 @@ TEST(GarbageCollector, GcClean) {
new mvcc::VersionList<DestrCountRec>(*t1, 0, record_destruction_count);
auto access = collection.access();
access.insert(0, vl);
t1->Commit();
engine.Commit(*t1);
// run garbage collection that has nothing co collect
gc.Run(GcSnapshot(engine, nullptr), engine);
@ -140,7 +140,7 @@ TEST(GarbageCollector, GcClean) {
// delete the only record in the version-list in transaction t2
auto t2 = engine.Begin();
vl->remove(vl->find(*t2), *t2);
t2->Commit();
engine.Commit(*t2);
gc.Run(GcSnapshot(engine, nullptr), engine);
// check that we destroyed the record

View File

@ -1,6 +1,7 @@
#pragma once
#include "mvcc/record.hpp"
#include "transactions/engine_master.hpp"
/**
* @brief - Empty class which inherits from mvcc:Record.
@ -28,12 +29,12 @@ class DestrCountRec : public mvcc::Record<DestrCountRec> {
// helper function for creating a GC snapshot
// if given a nullptr it makes a GC snapshot like there
// are no active transactions
auto GcSnapshot(tx::Engine &engine, tx::Transaction *t) {
auto GcSnapshot(tx::MasterEngine &engine, tx::Transaction *t) {
if (t != nullptr) {
tx::Snapshot gc_snap = t->snapshot();
gc_snap.insert(t->id_);
return gc_snap;
} else {
return engine.GcSnapshot();
return engine.GlobalGcSnapshot();
}
}

View File

@ -1,47 +0,0 @@
#include "gtest/gtest.h"
#include <vector>
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
TEST(Engine, GcSnapshot) {
tx::Engine engine;
ASSERT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
std::vector<tx::Transaction *> transactions;
// create transactions and check the GC snapshot
for (int i = 0; i < 5; ++i) {
transactions.push_back(engine.Begin());
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
}
// commit transactions in the middle, expect
// the GcSnapshot did not change
transactions[1]->Commit();
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
transactions[2]->Commit();
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1}));
// have the first three transactions committed
transactions[0]->Commit();
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({1, 2, 3, 4}));
// commit all
transactions[3]->Commit();
transactions[4]->Commit();
EXPECT_EQ(engine.GcSnapshot(), tx::Snapshot({6}));
}
TEST(Engine, Advance) {
tx::Engine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0->cid(), 1);
engine.Advance(t0->id_);
EXPECT_EQ(t0->cid(), 2);
engine.Advance(t0->id_);
EXPECT_EQ(t0->cid(), 3);
EXPECT_EQ(t1->cid(), 1);
}

View File

@ -0,0 +1,66 @@
#include "gtest/gtest.h"
#include <thread>
#include <vector>
#include "data_structures/concurrent/concurrent_set.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/transaction.hpp"
TEST(Engine, GcSnapshot) {
tx::MasterEngine engine;
ASSERT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
std::vector<tx::Transaction *> transactions;
// create transactions and check the GC snapshot
for (int i = 0; i < 5; ++i) {
transactions.push_back(engine.Begin());
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
}
// commit transactions in the middle, expect
// the GcSnapshot did not change
engine.Commit(*transactions[1]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
engine.Commit(*transactions[2]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
// have the first three transactions committed
engine.Commit(*transactions[0]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1, 2, 3, 4}));
// commit all
engine.Commit(*transactions[3]);
engine.Commit(*transactions[4]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({6}));
}
TEST(Engine, Advance) {
tx::MasterEngine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0->cid(), 1);
engine.Advance(t0->id_);
EXPECT_EQ(t0->cid(), 2);
engine.Advance(t0->id_);
EXPECT_EQ(t0->cid(), 3);
EXPECT_EQ(t1->cid(), 1);
}
TEST(Engine, ConcurrentBegin) {
tx::MasterEngine engine;
std::vector<std::thread> threads;
ConcurrentSet<tx::transaction_id_t> tx_ids;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(
[&tx_ids, &engine, accessor = tx_ids.access() ]() mutable {
for (int j = 0; j < 100; ++j) {
auto t = engine.Begin();
accessor.insert(t->id_);
}
});
}
for (auto &t : threads) t.join();
EXPECT_EQ(tx_ids.access().size(), 1000);
}

View File

@ -13,7 +13,8 @@ add_executable(mg_import_csv
${memgraph_src_dir}/storage/property_value.cpp
${memgraph_src_dir}/storage/record_accessor.cpp
${memgraph_src_dir}/storage/vertex_accessor.cpp
${memgraph_src_dir}/transactions/transaction.cpp
${memgraph_src_dir}/transactions/engine_master.cpp
${memgraph_src_dir}/transactions/engine_worker.cpp
)
# Strip the executable in release build.

View File

@ -16,7 +16,8 @@ add_executable(mg_recovery_check
${memgraph_src_dir}/storage/property_value.cpp
${memgraph_src_dir}/storage/record_accessor.cpp
${memgraph_src_dir}/storage/vertex_accessor.cpp
${memgraph_src_dir}/transactions/transaction.cpp
${memgraph_src_dir}/transactions/engine_master.cpp
${memgraph_src_dir}/transactions/engine_worker.cpp
)
target_link_libraries(mg_recovery_check stdc++fs Threads::Threads fmt glog