Remove old WAL files after snapshot
Summary: Once a snapshot is successfully written, delete WAL files which are no longer necessary for recovery. Note that this prohibits recovering the WAL from any except the last snapshot. Reviewers: buda, mislav.bradac, dgleich Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1000
This commit is contained in:
parent
a7f9255c17
commit
d5bcf9a7d7
@ -1,10 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <experimental/filesystem>
|
||||
#include <experimental/optional>
|
||||
#include <string>
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/datetime/timestamp.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
namespace durability {
|
||||
const std::string kSnapshotDir = "snapshots";
|
||||
const std::string kWalDir = "wal";
|
||||
@ -23,4 +28,55 @@ inline void CheckDurabilityDir(const std::string &durability_dir) {
|
||||
<< durability_dir << "'.";
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the transaction id contained in the file name. If the filename is
|
||||
/// not a parseable WAL file name, nullopt is returned. If the filename
|
||||
/// represents the "current" WAL file, then the maximum possible transaction ID
|
||||
/// is returned because that's appropriate for the recovery logic (the current
|
||||
/// WAL does not yet have a maximum transaction ID and can't be discarded by
|
||||
/// the recovery regardless of the snapshot from which the transaction starts).
|
||||
inline std::experimental::optional<tx::transaction_id_t>
|
||||
TransactionIdFromWalFilename(const std::string &name) {
|
||||
auto nullopt = std::experimental::nullopt;
|
||||
// Get the max_transaction_id from the file name that has format
|
||||
// "XXXXX__max_transaction_<MAX_TRANS_ID>"
|
||||
auto file_name_split = utils::RSplit(name, "__", 1);
|
||||
if (file_name_split.size() != 2) {
|
||||
LOG(WARNING) << "Unable to parse WAL file name: " << name;
|
||||
return nullopt;
|
||||
}
|
||||
if (file_name_split[1] == "current")
|
||||
return std::numeric_limits<tx::transaction_id_t>::max();
|
||||
file_name_split = utils::RSplit(file_name_split[1], "_", 1);
|
||||
if (file_name_split.size() != 2) {
|
||||
LOG(WARNING) << "Unable to parse WAL file name: " << name;
|
||||
return nullopt;
|
||||
}
|
||||
auto &tx_id_str = file_name_split[1];
|
||||
try {
|
||||
return std::stoll(tx_id_str);
|
||||
} catch (std::invalid_argument &) {
|
||||
LOG(WARNING) << "Unable to parse WAL file name tx ID: " << tx_id_str;
|
||||
return nullopt;
|
||||
} catch (std::out_of_range &) {
|
||||
LOG(WARNING) << "WAL file name tx ID too large: " << tx_id_str;
|
||||
return nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a file path for a write-ahead log file. If given a transaction ID
|
||||
/// the file name will contain it. Otherwise the file path is for the "current"
|
||||
/// WAL file for which the max tx id is still unknown.
|
||||
inline auto WalFilenameForTransactionId(
|
||||
const std::experimental::filesystem::path &wal_dir,
|
||||
std::experimental::optional<tx::transaction_id_t> tx_id =
|
||||
std::experimental::nullopt) {
|
||||
auto file_name = Timestamp::now().to_iso8601();
|
||||
if (tx_id) {
|
||||
file_name += "__max_transaction_" + std::to_string(*tx_id);
|
||||
} else {
|
||||
file_name += "__current";
|
||||
}
|
||||
return wal_dir / file_name;
|
||||
}
|
||||
}
|
||||
|
@ -200,28 +200,6 @@ void ApplyOp(const WriteAheadLog::Op &op, GraphDbAccessor &dba) {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the transaction id contained in the file name. If the filename is not
|
||||
// a parseable WAL file name, nullopt is returned. If the filename represents
|
||||
// the "current" WAL file, then the maximum possible transaction ID is returned.
|
||||
std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
|
||||
const std::string &name) {
|
||||
// Get the max_transaction_id from the file name that has format
|
||||
// "XXXXX__max_transaction_<MAX_TRANS_ID>"
|
||||
auto file_name_split = utils::RSplit(name, "__", 1);
|
||||
if (file_name_split.size() != 2) {
|
||||
LOG(WARNING) << "Unable to parse WAL file name: " << name;
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
if (file_name_split[1] == "current")
|
||||
return std::numeric_limits<tx::transaction_id_t>::max();
|
||||
file_name_split = utils::RSplit(file_name_split[1], "_", 1);
|
||||
if (file_name_split.size() != 2) {
|
||||
LOG(WARNING) << "Unable to parse WAL file name: " << name;
|
||||
return std::experimental::nullopt;
|
||||
}
|
||||
return std::stoi(file_name_split[1]);
|
||||
}
|
||||
|
||||
// TODO - finer-grained recovery feedback could be useful here.
|
||||
bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
|
||||
RecoveryData &recovery_data) {
|
||||
|
@ -70,20 +70,37 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void MaintainMaxRetainedFiles(const fs::path &snapshot_dir,
|
||||
int snapshot_max_retained) {
|
||||
if (snapshot_max_retained == -1) return;
|
||||
// Removes snaposhot files so that only `max_retained` latest ones are kept. If
|
||||
// `max_retained == -1`, all the snapshots are retained.
|
||||
void RemoveOldSnapshots(const fs::path &snapshot_dir, int max_retained) {
|
||||
if (max_retained == -1) return;
|
||||
std::vector<fs::path> files;
|
||||
for (auto &file : fs::directory_iterator(snapshot_dir))
|
||||
files.push_back(file.path());
|
||||
if (static_cast<int>(files.size()) <= snapshot_max_retained) return;
|
||||
if (static_cast<int>(files.size()) <= max_retained) return;
|
||||
sort(files.begin(), files.end());
|
||||
for (size_t i = 0U; i < files.size() - snapshot_max_retained; ++i) {
|
||||
for (int i = 0; i < static_cast<int>(files.size()) - max_retained; ++i) {
|
||||
if (!fs::remove(files[i])) {
|
||||
LOG(ERROR) << "Error while removing file: " << files[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Removes write-ahead log files that are no longer necessary (they don't get
|
||||
// used when recovering from the latest snapshot.
|
||||
void RemoveOldWals(const fs::path &wal_dir,
|
||||
const tx::Transaction &snapshot_transaction) {
|
||||
if (!fs::exists(wal_dir)) return;
|
||||
// We can remove all the WAL files that will not be used when restoring from
|
||||
// the snapshot created in the given transaction.
|
||||
auto min_trans_id = snapshot_transaction.snapshot().empty()
|
||||
? snapshot_transaction.id_
|
||||
: snapshot_transaction.snapshot().front();
|
||||
for (auto &wal_file : fs::directory_iterator(wal_dir)) {
|
||||
auto tx_id = TransactionIdFromWalFilename(wal_file.path().filename());
|
||||
if (tx_id && tx_id.value() < min_trans_id) fs::remove(wal_file);
|
||||
}
|
||||
}
|
||||
} // annonnymous namespace
|
||||
|
||||
fs::path MakeSnapshotPath(const fs::path &durability_dir) {
|
||||
@ -107,8 +124,8 @@ bool MakeSnapshot(GraphDbAccessor &db_accessor_, const fs::path &durability_dir,
|
||||
const auto snapshot_file = MakeSnapshotPath(durability_dir);
|
||||
if (fs::exists(snapshot_file)) return false;
|
||||
if (Encode(snapshot_file, db_accessor_)) {
|
||||
MaintainMaxRetainedFiles(durability_dir / kSnapshotDir,
|
||||
snapshot_max_retained);
|
||||
RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained);
|
||||
RemoveOldWals(durability_dir / kWalDir, db_accessor_.transaction());
|
||||
return true;
|
||||
} else {
|
||||
std::error_code error_code; // Just for exception suppression.
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "durability/paths.hpp"
|
||||
#include "utils/datetime/timestamp.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
|
||||
DEFINE_HIDDEN_int32(
|
||||
@ -261,20 +260,13 @@ WriteAheadLog::WalFile::~WalFile() {
|
||||
if (!current_wal_file_.empty()) writer_.Close();
|
||||
}
|
||||
|
||||
namespace {
|
||||
auto MakeFilePath(const std::experimental::filesystem::path &wal_dir,
|
||||
const std::string &suffix) {
|
||||
return wal_dir / (Timestamp::now().to_iso8601() + suffix);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteAheadLog::WalFile::Init() {
|
||||
if (!std::experimental::filesystem::exists(wal_dir_) &&
|
||||
!std::experimental::filesystem::create_directories(wal_dir_)) {
|
||||
LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_;
|
||||
current_wal_file_ = std::experimental::filesystem::path();
|
||||
} else {
|
||||
current_wal_file_ = MakeFilePath(wal_dir_, "__current");
|
||||
current_wal_file_ = WalFilenameForTransactionId(wal_dir_);
|
||||
try {
|
||||
writer_.Open(current_wal_file_);
|
||||
} catch (std::ios_base::failure &) {
|
||||
@ -317,9 +309,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<Op> &buffer) {
|
||||
void WriteAheadLog::WalFile::RotateFile() {
|
||||
writer_.Close();
|
||||
std::experimental::filesystem::rename(
|
||||
current_wal_file_,
|
||||
MakeFilePath(wal_dir_,
|
||||
"__max_transaction_" + std::to_string(latest_tx_)));
|
||||
current_wal_file_, WalFilenameForTransactionId(wal_dir_, latest_tx_));
|
||||
Init();
|
||||
}
|
||||
|
||||
|
@ -636,6 +636,24 @@ TEST_F(Durability, SnapshotRetention) {
|
||||
};
|
||||
}
|
||||
|
||||
TEST_F(Durability, WalRetention) {
|
||||
FLAGS_wal_rotate_ops_count = 100;
|
||||
auto config = DbConfig();
|
||||
config.durability_enabled = true;
|
||||
GraphDb db{config};
|
||||
MakeDb(db, 100);
|
||||
MakeSnapshot(db);
|
||||
MakeDb(db, 100);
|
||||
EXPECT_EQ(DirFiles(kSnapshotDir).size(), 1);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
// 1 current WAL file, plus retained ones
|
||||
EXPECT_GT(DirFiles(kWalDir).size(), 1);
|
||||
MakeSnapshot(db);
|
||||
// only 1 current WAL file
|
||||
EXPECT_EQ(DirFiles(kSnapshotDir).size(), 2);
|
||||
EXPECT_EQ(DirFiles(kWalDir).size(), 1);
|
||||
}
|
||||
|
||||
TEST_F(Durability, SnapshotOnExit) {
|
||||
{
|
||||
auto config = DbConfig();
|
||||
|
Loading…
Reference in New Issue
Block a user