Make WAL recovery linear

Summary:
This is a proposal on how the WAL recovery process can be implemented so
that Deltas aren't accumulated, but instead applied in the same order
they are written to the WAL.

I *believe* that the only additional requirement on the system are
atomic transaction Begin/Commit/Abort. By atomic I mean that they are
present in the WAL in exactly the same ordering like in the transaciton
engine, to ensure the same commitability of original and recovery
transactions.

This could be a requirement for HA recovery. It is desirable that WAL
and HA log become the same thing, and the recovery process too.

Reviewers: mtomic, dgleich, mislav.bradac

Reviewed By: mislav.bradac

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1068
This commit is contained in:
florijan 2017-12-20 10:50:06 +01:00
parent 9cacf970cc
commit 1ee6d6d05e
6 changed files with 63 additions and 97 deletions

View File

@ -22,7 +22,7 @@ namespace fs = std::experimental::filesystem;
properties_ = std::make_unique<type<GraphDbTypes::Property>>(__VA_ARGS__);
GraphDb::GraphDb(Config config) : GraphDb(config, 0) {
tx_engine_ = std::make_unique<tx::MasterEngine>();
tx_engine_ = std::make_unique<tx::MasterEngine>(&wal_);
INIT_MAPPERS(storage::SingleNodeConcurrentIdMapper);
Start();
}
@ -30,7 +30,9 @@ GraphDb::GraphDb(Config config) : GraphDb(config, 0) {
GraphDb::GraphDb(communication::messaging::System &system,
distributed::MasterCoordination &master, Config config)
: GraphDb(config, 0) {
tx_engine_ = std::make_unique<tx::MasterEngine>(system);
auto tx_engine = std::make_unique<tx::MasterEngine>(&wal_);
tx_engine->StartServer(system);
tx_engine_ = std::move(tx_engine);
INIT_MAPPERS(storage::MasterConcurrentIdMapper, system);
get_endpoint_ = [&master](int worker_id) {
return master.GetEndpoint(worker_id);

View File

@ -1,6 +1,7 @@
#include "glog/logging.h"
#include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp"
#include "storage/edge.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex.hpp"
@ -9,9 +10,7 @@
#include "utils/on_scope_exit.hpp"
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db), transaction_(MasterEngine().Begin()) {
db_.wal_.Emplace(database::StateDelta::TxBegin(transaction_->id_));
}
: db_(db), transaction_(MasterEngine().Begin()) {}
GraphDbAccessor::~GraphDbAccessor() {
if (!commited_ && !aborted_) {
@ -30,17 +29,13 @@ void GraphDbAccessor::AdvanceCommand() {
void GraphDbAccessor::Commit() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_;
MasterEngine().Commit(*transaction_);
db_.wal_.Emplace(database::StateDelta::TxCommit(tid));
commited_ = true;
}
void GraphDbAccessor::Abort() {
DCHECK(!commited_ && !aborted_) << "Already aborted or commited transaction.";
auto tid = transaction_->id_;
MasterEngine().Abort(*transaction_);
db_.wal_.Emplace(database::StateDelta::TxAbort(tid));
aborted_ = true;
}

View File

@ -6,7 +6,6 @@
#include "durability/hashed_file_writer.hpp"
#include "storage/gid.hpp"
#include "storage/property_value.hpp"
#include "transactions/transaction.hpp"
namespace database {
/** Describes single change to the database state. Used for durability (WAL) and

View File

@ -11,7 +11,7 @@
#include "durability/wal.hpp"
#include "query/typed_value.hpp"
#include "transactions/type.hpp"
#include "utils/string.hpp"
#include "utils/algorithm.hpp"
namespace durability {
@ -152,7 +152,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, GraphDb &db,
#undef RETURN_IF_NOT
// TODO - finer-grained recovery feedback could be useful here.
bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
bool RecoverWal(const fs::path &wal_dir, GraphDb &db,
RecoveryData &recovery_data) {
// Get paths to all the WAL files and sort them (on date).
std::vector<fs::path> wal_files;
@ -161,64 +161,32 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
wal_files.emplace_back(wal_file);
std::sort(wal_files.begin(), wal_files.end());
// Track which transaction should be recovered next.
tx::transaction_id_t next_to_recover = recovery_data.snapshooter_tx_id + 1;
// Some transactions that come after the first to recover need to be skipped
// (if they committed before the snapshot, and are not in the snapshot's tx
// snapshot).
std::set<tx::transaction_id_t> to_skip;
if (!recovery_data.snapshooter_tx_snapshot.empty()) {
std::set<tx::transaction_id_t> txs{
recovery_data.snapshooter_tx_snapshot.begin(),
recovery_data.snapshooter_tx_snapshot.end()};
next_to_recover = *txs.begin();
for (tx::transaction_id_t i = next_to_recover;
i < recovery_data.snapshooter_tx_id; ++i)
if (txs.find(i) == txs.end()) to_skip.emplace(i);
// We don't try to recover the snapshooter transaction.
to_skip.emplace(recovery_data.snapshooter_tx_id);
}
// A buffer for the WAL transaction deltas. Accumulate and apply them in the
// right transactional sequence.
std::map<tx::transaction_id_t, std::vector<database::StateDelta>> deltas;
// Track which transactions were aborted/committed in the WAL.
std::set<tx::transaction_id_t> aborted;
std::set<tx::transaction_id_t> committed;
auto apply_all_possible = [&]() {
while (true) {
// Remove old deltas from memory.
for (auto it = deltas.begin(); it != deltas.end();) {
if (it->first < next_to_recover)
it = deltas.erase(it);
else
++it;
}
// Check if we can apply skip/apply the next transaction.
if (to_skip.find(next_to_recover) != to_skip.end())
next_to_recover++;
else if (utils::Contains(aborted, next_to_recover)) {
next_to_recover++;
} else if (utils::Contains(committed, next_to_recover)) {
auto found = deltas.find(next_to_recover);
if (found != deltas.end())
for (const auto &delta : found->second) delta.Apply(db_accessor);
next_to_recover++;
} else
break;
}
// Track which transaction should be recovered first, and define logic for
// which transactions should be skipped in recovery.
auto &tx_sn = recovery_data.snapshooter_tx_snapshot;
auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1
: *std::min(tx_sn.begin(), tx_sn.end());
auto should_skip = [&tx_sn, &recovery_data,
first_to_recover](tx::transaction_id_t tx_id) {
return tx_id < first_to_recover ||
(tx_id < recovery_data.snapshooter_tx_id &&
!utils::Contains(tx_sn, tx_id));
};
// Read all the WAL files whose max_tx_id is not smaller then
// min_tx_to_recover
std::unordered_map<tx::transaction_id_t, GraphDbAccessor> accessors;
auto get_accessor =
[&accessors](tx::transaction_id_t tx_id) -> GraphDbAccessor & {
auto found = accessors.find(tx_id);
CHECK(found != accessors.end())
<< "Accessor does not exist for transaction";
return found->second;
};
// Read all the WAL files whose max_tx_id is not smaller than
// min_tx_to_recover.
for (auto &wal_file : wal_files) {
auto wal_file_tx_id = TransactionIdFromWalFilename(wal_file.filename());
if (!wal_file_tx_id || *wal_file_tx_id < next_to_recover) continue;
if (!wal_file_tx_id || *wal_file_tx_id < first_to_recover) continue;
HashedFileReader wal_reader;
if (!wal_reader.Open(wal_file)) return false;
@ -226,39 +194,31 @@ bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
while (true) {
auto delta = database::StateDelta::Decode(wal_reader, decoder);
if (!delta) break;
if (should_skip(delta->transaction_id())) continue;
switch (delta->type()) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
DCHECK(deltas.find(delta->transaction_id()) == deltas.end())
DCHECK(accessors.find(delta->transaction_id()) == accessors.end())
<< "Double transaction start";
if (to_skip.find(delta->transaction_id()) == to_skip.end())
deltas.emplace(delta->transaction_id(),
std::vector<database::StateDelta>{});
accessors.emplace(delta->transaction_id(), db);
break;
case database::StateDelta::Type::TRANSACTION_ABORT: {
auto it = deltas.find(delta->transaction_id());
if (it != deltas.end()) deltas.erase(it);
aborted.emplace(delta->transaction_id());
apply_all_possible();
case database::StateDelta::Type::TRANSACTION_ABORT:
get_accessor(delta->transaction_id()).Abort();
accessors.erase(accessors.find(delta->transaction_id()));
break;
}
case database::StateDelta::Type::TRANSACTION_COMMIT:
committed.emplace(delta->transaction_id());
apply_all_possible();
get_accessor(delta->transaction_id()).Commit();
accessors.erase(accessors.find(delta->transaction_id()));
break;
case database::StateDelta::Type::BUILD_INDEX: {
case database::StateDelta::Type::BUILD_INDEX:
// TODO index building might still be problematic in HA
recovery_data.indexes.emplace_back(delta->IndexName());
break;
}
default: {
auto it = deltas.find(delta->transaction_id());
if (it != deltas.end()) it->second.emplace_back(*delta);
}
default:
delta->Apply(get_accessor(delta->transaction_id()));
}
} // reading all deltas in a single wal file
} // reading all wal files
apply_all_possible();
// TODO when implementing proper error handling return one of the following:
// - WAL fully recovered
// - WAL partially recovered
@ -291,12 +251,10 @@ bool Recover(const fs::path &durability_dir, GraphDb &db) {
}
// Write-ahead-log recovery.
GraphDbAccessor db_accessor(db);
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
// consider a better system.
RecoverWal(durability_dir / kWalDir, db_accessor, recovery_data);
db_accessor.Commit();
RecoverWal(durability_dir / kWalDir, db, recovery_data);
// Index recovery.
GraphDbAccessor db_accessor_indices{db};

View File

@ -3,14 +3,13 @@
#include "glog/logging.h"
#include "database/state_delta.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
namespace tx {
MasterEngine::MasterEngine(communication::messaging::System &system) {
StartServer(system);
}
MasterEngine::MasterEngine(durability::WriteAheadLog *wal) : wal_(wal) {}
MasterEngine::~MasterEngine() {
if (rpc_server_) StopServer();
@ -21,10 +20,11 @@ Transaction *MasterEngine::Begin() {
transaction_id_t id{++counter_};
auto t = new Transaction(id, active_, *this);
active_.insert(id);
store_.emplace(id, t);
if (wal_) {
wal_->Emplace(database::StateDelta::TxBegin(id));
}
return t;
}
@ -48,6 +48,9 @@ void MasterEngine::Commit(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(t.id_));
}
store_.erase(store_.find(t.id_));
}
@ -55,6 +58,9 @@ void MasterEngine::Abort(const Transaction &t) {
std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxAbort(t.id_));
}
store_.erase(store_.find(t.id_));
}

View File

@ -6,6 +6,7 @@
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "durability/wal.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
@ -28,9 +29,11 @@ class TransactionError : public utils::BasicException {
*/
class MasterEngine : public Engine {
public:
MasterEngine() = default;
/** Constructs a master engine and calls StartServer() */
MasterEngine(communication::messaging::System &system);
/**
* @param wal - Optional. If present, the Engine will write tx
* Begin/Commit/Abort atomically (while under lock).
*/
MasterEngine(durability::WriteAheadLog *wal = nullptr);
/** Stops the tx server if it's running. */
~MasterEngine();
@ -82,6 +85,9 @@ class MasterEngine : public Engine {
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
Snapshot active_;
SpinLock lock_;
// Optional. If present, the Engine will write tx Begin/Commit/Abort
// atomically (while under lock).
durability::WriteAheadLog *wal_{nullptr};
// Optional RPC server, only used in distributed, not in single_node.
std::experimental::optional<communication::rpc::Server> rpc_server_;