Handle durability versions on start-up

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1539
This commit is contained in:
Ivan Paljak 2018-08-21 15:31:15 +02:00
parent da9dc10373
commit 0249a280f8
7 changed files with 192 additions and 24 deletions

View File

@ -203,17 +203,18 @@ SingleNode::SingleNode(Config config)
utils::CheckDir(impl_->config_.durability_directory);
// Durability recovery.
{
if (impl_->config_.db_recover_on_startup) {
CHECK(durability::VersionConsistency(impl_->config_.durability_directory))
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (impl_->config_.db_recover_on_startup) {
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt, 0);
}
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt, 0);
// Post-recovery setup and checking.
if (recovery_info) {
@ -226,6 +227,17 @@ SingleNode::SingleNode(Config config)
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Enable();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
@ -340,8 +352,8 @@ GraphDbAccessor *GetAccessor(
std::unique_ptr<GraphDbAccessor>> &accessors,
const tx::TransactionId &tx_id) {
auto found = accessors.find(tx_id);
CHECK(found != accessors.end())
<< "Accessor does not exist for transaction: " << tx_id;
CHECK(found != accessors.end()) << "Accessor does not exist for transaction: "
<< tx_id;
return found->second.get();
}

View File

@ -8,6 +8,7 @@
namespace durability {
const std::string kSnapshotDir = "snapshots";
const std::string kWalDir = "wal";
const std::string kBackupDir = ".backup";
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable WAL file name, nullopt is returned. If the filename

View File

@ -17,11 +17,13 @@
#include "storage/address_types.hpp"
#include "transactions/type.hpp"
#include "utils/algorithm.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
using communication::bolt::Value;
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash) {
auto pos = buffer.Tellg();
@ -34,6 +36,68 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
return r_val;
}
bool VersionConsistency(const fs::path &durability_dir) {
const auto snapshot_dir = durability_dir / kSnapshotDir;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir)) {
for (auto &file : fs::directory_iterator(snapshot_dir)) {
HashedFileReader reader;
SnapshotDecoder<HashedFileReader> decoder(reader);
// This is ok because we are only trying to detect version
// inconsistencies.
if (!reader.Open(fs::path(file))) continue;
auto magic_number = durability::kMagicNumber;
if (!reader.Read(magic_number.data(), magic_number.size())) continue;
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion)
return false;
}
}
const auto wal_dir = durability_dir / kWalDir;
if (fs::exists(snapshot_dir) && fs::is_directory(wal_dir)) {
for (auto &file : fs::directory_iterator(wal_dir)) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
if (!reader.Open(fs::path(file))) continue;
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion)
return false;
}
}
return true;
}
bool ContainsDurabilityFiles(const fs::path &durability_dir) {
for (auto &durability_type : {kSnapshotDir, kWalDir}) {
const auto dtype_dir = durability_dir / durability_type;
if (fs::exists(dtype_dir) && fs::is_directory(dtype_dir) &&
!fs::is_empty(dtype_dir))
return true;
}
return false;
}
void MoveToBackup(const fs::path &durability_dir) {
const auto backup_dir = durability_dir / kBackupDir;
utils::CheckDir(backup_dir);
utils::CheckDir(backup_dir / kSnapshotDir);
utils::CheckDir(backup_dir / kWalDir);
for (auto &durability_type : {kSnapshotDir, kWalDir}) {
const auto dtype_dir = durability_dir / durability_type;
if (!fs::exists(dtype_dir) || !fs::is_directory(dtype_dir)) continue;
for (auto &file : fs::directory_iterator(dtype_dir)) {
const auto filename = fs::path(file).filename();
fs::rename(file, backup_dir / durability_type / filename);
}
}
}
namespace {
using communication::bolt::Value;
@ -62,7 +126,6 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash));
Value dv;
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == durability::kVersion);
@ -260,12 +323,10 @@ bool ApplyOverDeltas(
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
// check version
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion) {
dv.ValueInt() != durability::kVersion)
return false;
}
while (true) {
auto delta = database::StateDelta::Decode(wal_reader, decoder);

View File

@ -107,6 +107,34 @@ struct RecoveryData {
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
/**
* Checks version consistency within the durability directory.
*
* @param durability_dir - Path to durability directory.
* @return - True if snapshot and WAL versions are compatible with
* ` current memgraph binary.
*/
bool VersionConsistency(
const std::experimental::filesystem::path &durability_dir);
/**
* Checks whether the durability directory contains snapshot
* or write-ahead log file.
*
* @param durability_dir - Path to durability directory.
* @return - True if durability directory contains either a snapshot
* or WAL file.
*/
bool ContainsDurabilityFiles(
const std::experimental::filesystem::path &durabilty_dir);
/**
* Backup snapshots and WAL files to a backup folder.
*
* @param durability_dir - Path to durability directory.
*/
void MoveToBackup(const std::experimental::filesystem::path &durability_dir);
/**
* Recovers database from the latest possible snapshot. If recovering fails,
* false is returned and db_accessor aborts transaction, else true is returned

View File

@ -26,7 +26,6 @@ WriteAheadLog::WriteAheadLog(
: deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} {
if (durability_enabled) {
utils::CheckDir(durability_dir);
wal_file_.Init();
scheduler_.Run("WAL",
std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(deltas_); });
@ -44,7 +43,9 @@ WriteAheadLog::WalFile::WalFile(
: worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {}
WriteAheadLog::WalFile::~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
if (current_wal_file_ != std::experimental::nullopt &&
!current_wal_file_->empty())
writer_.Close();
}
void WriteAheadLog::WalFile::Init() {
@ -54,11 +55,11 @@ void WriteAheadLog::WalFile::Init() {
} else {
current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_);
try {
writer_.Open(current_wal_file_);
writer_.Open(*current_wal_file_);
encoder_.WriteInt(durability::kVersion);
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to open write-ahead log file: "
<< current_wal_file_;
<< *current_wal_file_;
current_wal_file_ = std::experimental::filesystem::path();
}
}
@ -68,7 +69,8 @@ void WriteAheadLog::WalFile::Init() {
void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
std::lock_guard<std::mutex> flush_lock(flush_mutex_);
if (current_wal_file_.empty()) {
if (current_wal_file_ == std::experimental::nullopt) Init();
if (current_wal_file_->empty()) {
LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
buffer.clear();
return;
@ -98,7 +100,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
void WriteAheadLog::WalFile::RotateFile() {
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_,
*current_wal_file_,
WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_));
Init();
}

View File

@ -71,7 +71,8 @@ class WriteAheadLog {
// The file to which the WAL flushes data. The path is fixed, the file gets
// moved when the WAL gets rotated.
std::experimental::filesystem::path current_wal_file_;
std::experimental::optional<std::experimental::filesystem::path>
current_wal_file_;
// Number of deltas in the current wal file.
int current_wal_file_delta_count_{0};

View File

@ -288,6 +288,7 @@ class Durability : public ::testing::Test {
fs::path durability_dir_;
fs::path snapshot_dir_;
fs::path wal_dir_;
fs::path backup_dir_;
void CleanDurability() {
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
@ -303,6 +304,16 @@ class Durability : public ::testing::Test {
return config;
}
auto DbConfig(bool durability_enabled, bool db_recover_on_startup) {
database::Config config;
config.durability_enabled = durability_enabled;
config.db_recover_on_startup = db_recover_on_startup;
config.snapshot_on_exit = false;
config.durability_directory = durability_dir_;
return config;
}
void MakeSnapshot(int worker_id, database::GraphDb &db,
int snapshot_max_retained = -1) {
auto dba = db.Access();
@ -314,6 +325,7 @@ class Durability : public ::testing::Test {
durability_dir_ = tmp_dir_ / utils::RandomString(24);
snapshot_dir_ = durability_dir_ / durability::kSnapshotDir;
wal_dir_ = durability_dir_ / durability::kWalDir;
backup_dir_ = durability_dir_ / durability::kBackupDir;
FLAGS_wal_rotate_deltas_count = 1000;
FLAGS_durability_directory = "MG_test_unit_durability";
CleanDurability();
@ -837,9 +849,8 @@ TEST_F(Durability, SequentialRecovery) {
return threads;
};
auto make_updates = [&run_updates, this](database::SingleNode &db,
bool snapshot_during,
bool snapshot_after) {
auto make_updates = [&run_updates, this](
database::SingleNode &db, bool snapshot_during, bool snapshot_after) {
std::atomic<bool> keep_running{true};
auto update_theads = run_updates(db, keep_running);
std::this_thread::sleep_for(25ms);
@ -881,3 +892,55 @@ TEST_F(Durability, SequentialRecovery) {
}
}
}
TEST_F(Durability, ContainsDurabilityFilesSnapshot) {
ASSERT_FALSE(durability::ContainsDurabilityFiles(durability_dir_));
{
database::SingleNode db{DbConfig()};
auto dba = db.Access();
auto v0 = dba->InsertVertex();
MakeSnapshot(0, db);
}
ASSERT_TRUE(durability::ContainsDurabilityFiles(durability_dir_));
}
TEST_F(Durability, ContainsDurabilityFilesWal) {
ASSERT_FALSE(durability::ContainsDurabilityFiles(durability_dir_));
{
database::SingleNode db{DbConfig(true, false)};
auto dba = db.Access();
auto v0 = dba->InsertVertex();
dba->Commit();
db.wal().Flush();
}
ASSERT_TRUE(durability::ContainsDurabilityFiles(durability_dir_));
}
TEST_F(Durability, MoveToBackupSnapshot) {
ASSERT_FALSE(durability::ContainsDurabilityFiles(backup_dir_));
{
database::SingleNode db{DbConfig()};
auto dba = db.Access();
auto v0 = dba->InsertVertex();
MakeSnapshot(0, db);
}
// durability-enabled=true, db-recover-on-startup=false
database::SingleNode db{DbConfig(true, false)};
ASSERT_TRUE(durability::ContainsDurabilityFiles(backup_dir_));
}
TEST_F(Durability, MoveToBackupWal) {
ASSERT_FALSE(durability::ContainsDurabilityFiles(backup_dir_));
{
database::SingleNode db{DbConfig(true, false)};
auto dba = db.Access();
auto v0 = dba->InsertVertex();
dba->Commit();
db.wal().Flush();
}
// durability-enabled=true, db-recover-on-startup=false
database::SingleNode db{DbConfig(true, false)};
ASSERT_TRUE(durability::ContainsDurabilityFiles(backup_dir_));
}