diff --git a/src/durability/paths.hpp b/src/durability/paths.hpp index f79177e34..f1cd8fa29 100644 --- a/src/durability/paths.hpp +++ b/src/durability/paths.hpp @@ -1,10 +1,15 @@ #pragma once #include +#include #include #include "glog/logging.h" +#include "transactions/type.hpp" +#include "utils/datetime/timestamp.hpp" +#include "utils/string.hpp" + namespace durability { const std::string kSnapshotDir = "snapshots"; const std::string kWalDir = "wal"; @@ -23,4 +28,55 @@ inline void CheckDurabilityDir(const std::string &durability_dir) { << durability_dir << "'."; } } + +/// Returns the transaction id contained in the file name. If the filename is +/// not a parseable WAL file name, nullopt is returned. If the filename +/// represents the "current" WAL file, then the maximum possible transaction ID +/// is returned because that's appropriate for the recovery logic (the current +/// WAL does not yet have a maximum transaction ID and can't be discarded by +/// the recovery regardless of the snapshot from which the transaction starts). +inline std::experimental::optional +TransactionIdFromWalFilename(const std::string &name) { + auto nullopt = std::experimental::nullopt; + // Get the max_transaction_id from the file name that has format + // "XXXXX__max_transaction_" + auto file_name_split = utils::RSplit(name, "__", 1); + if (file_name_split.size() != 2) { + LOG(WARNING) << "Unable to parse WAL file name: " << name; + return nullopt; + } + if (file_name_split[1] == "current") + return std::numeric_limits::max(); + file_name_split = utils::RSplit(file_name_split[1], "_", 1); + if (file_name_split.size() != 2) { + LOG(WARNING) << "Unable to parse WAL file name: " << name; + return nullopt; + } + auto &tx_id_str = file_name_split[1]; + try { + return std::stoll(tx_id_str); + } catch (std::invalid_argument &) { + LOG(WARNING) << "Unable to parse WAL file name tx ID: " << tx_id_str; + return nullopt; + } catch (std::out_of_range &) { + LOG(WARNING) << "WAL file name tx ID too large: " << tx_id_str; + return nullopt; + } +} + +/// Generates a file path for a write-ahead log file. If given a transaction ID +/// the file name will contain it. Otherwise the file path is for the "current" +/// WAL file for which the max tx id is still unknown. +inline auto WalFilenameForTransactionId( + const std::experimental::filesystem::path &wal_dir, + std::experimental::optional tx_id = + std::experimental::nullopt) { + auto file_name = Timestamp::now().to_iso8601(); + if (tx_id) { + file_name += "__max_transaction_" + std::to_string(*tx_id); + } else { + file_name += "__current"; + } + return wal_dir / file_name; +} } diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index d35491787..04bf6541a 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -200,28 +200,6 @@ void ApplyOp(const WriteAheadLog::Op &op, GraphDbAccessor &dba) { } } -// Returns the transaction id contained in the file name. If the filename is not -// a parseable WAL file name, nullopt is returned. If the filename represents -// the "current" WAL file, then the maximum possible transaction ID is returned. -std::experimental::optional TransactionIdFromWalFilename( - const std::string &name) { - // Get the max_transaction_id from the file name that has format - // "XXXXX__max_transaction_" - auto file_name_split = utils::RSplit(name, "__", 1); - if (file_name_split.size() != 2) { - LOG(WARNING) << "Unable to parse WAL file name: " << name; - return std::experimental::nullopt; - } - if (file_name_split[1] == "current") - return std::numeric_limits::max(); - file_name_split = utils::RSplit(file_name_split[1], "_", 1); - if (file_name_split.size() != 2) { - LOG(WARNING) << "Unable to parse WAL file name: " << name; - return std::experimental::nullopt; - } - return std::stoi(file_name_split[1]); -} - // TODO - finer-grained recovery feedback could be useful here. bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor, RecoveryData &recovery_data) { diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index a1da2de41..2d6f5863a 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -70,20 +70,37 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) { return true; } -void MaintainMaxRetainedFiles(const fs::path &snapshot_dir, - int snapshot_max_retained) { - if (snapshot_max_retained == -1) return; +// Removes snaposhot files so that only `max_retained` latest ones are kept. If +// `max_retained == -1`, all the snapshots are retained. +void RemoveOldSnapshots(const fs::path &snapshot_dir, int max_retained) { + if (max_retained == -1) return; std::vector files; for (auto &file : fs::directory_iterator(snapshot_dir)) files.push_back(file.path()); - if (static_cast(files.size()) <= snapshot_max_retained) return; + if (static_cast(files.size()) <= max_retained) return; sort(files.begin(), files.end()); - for (size_t i = 0U; i < files.size() - snapshot_max_retained; ++i) { + for (int i = 0; i < static_cast(files.size()) - max_retained; ++i) { if (!fs::remove(files[i])) { LOG(ERROR) << "Error while removing file: " << files[i]; } } } + +// Removes write-ahead log files that are no longer necessary (they don't get +// used when recovering from the latest snapshot. +void RemoveOldWals(const fs::path &wal_dir, + const tx::Transaction &snapshot_transaction) { + if (!fs::exists(wal_dir)) return; + // We can remove all the WAL files that will not be used when restoring from + // the snapshot created in the given transaction. + auto min_trans_id = snapshot_transaction.snapshot().empty() + ? snapshot_transaction.id_ + : snapshot_transaction.snapshot().front(); + for (auto &wal_file : fs::directory_iterator(wal_dir)) { + auto tx_id = TransactionIdFromWalFilename(wal_file.path().filename()); + if (tx_id && tx_id.value() < min_trans_id) fs::remove(wal_file); + } +} } // annonnymous namespace fs::path MakeSnapshotPath(const fs::path &durability_dir) { @@ -107,8 +124,8 @@ bool MakeSnapshot(GraphDbAccessor &db_accessor_, const fs::path &durability_dir, const auto snapshot_file = MakeSnapshotPath(durability_dir); if (fs::exists(snapshot_file)) return false; if (Encode(snapshot_file, db_accessor_)) { - MaintainMaxRetainedFiles(durability_dir / kSnapshotDir, - snapshot_max_retained); + RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained); + RemoveOldWals(durability_dir / kWalDir, db_accessor_.transaction()); return true; } else { std::error_code error_code; // Just for exception suppression. diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index 2c63bada8..cdb09b551 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -2,7 +2,6 @@ #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "durability/paths.hpp" -#include "utils/datetime/timestamp.hpp" #include "utils/flag_validation.hpp" DEFINE_HIDDEN_int32( @@ -261,20 +260,13 @@ WriteAheadLog::WalFile::~WalFile() { if (!current_wal_file_.empty()) writer_.Close(); } -namespace { -auto MakeFilePath(const std::experimental::filesystem::path &wal_dir, - const std::string &suffix) { - return wal_dir / (Timestamp::now().to_iso8601() + suffix); -} -} - void WriteAheadLog::WalFile::Init() { if (!std::experimental::filesystem::exists(wal_dir_) && !std::experimental::filesystem::create_directories(wal_dir_)) { LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_; current_wal_file_ = std::experimental::filesystem::path(); } else { - current_wal_file_ = MakeFilePath(wal_dir_, "__current"); + current_wal_file_ = WalFilenameForTransactionId(wal_dir_); try { writer_.Open(current_wal_file_); } catch (std::ios_base::failure &) { @@ -317,9 +309,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { void WriteAheadLog::WalFile::RotateFile() { writer_.Close(); std::experimental::filesystem::rename( - current_wal_file_, - MakeFilePath(wal_dir_, - "__max_transaction_" + std::to_string(latest_tx_))); + current_wal_file_, WalFilenameForTransactionId(wal_dir_, latest_tx_)); Init(); } diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index d8c033794..dc254c197 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -636,6 +636,24 @@ TEST_F(Durability, SnapshotRetention) { }; } +TEST_F(Durability, WalRetention) { + FLAGS_wal_rotate_ops_count = 100; + auto config = DbConfig(); + config.durability_enabled = true; + GraphDb db{config}; + MakeDb(db, 100); + MakeSnapshot(db); + MakeDb(db, 100); + EXPECT_EQ(DirFiles(kSnapshotDir).size(), 1); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // 1 current WAL file, plus retained ones + EXPECT_GT(DirFiles(kWalDir).size(), 1); + MakeSnapshot(db); + // only 1 current WAL file + EXPECT_EQ(DirFiles(kSnapshotDir).size(), 2); + EXPECT_EQ(DirFiles(kWalDir).size(), 1); +} + TEST_F(Durability, SnapshotOnExit) { { auto config = DbConfig();