Merge durability and storage

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2803
This commit is contained in:
Matej Ferencevic 2020-07-24 18:26:36 +02:00
parent 4f9e9aeafe
commit 1513a455de
6 changed files with 406 additions and 447 deletions

View File

@ -10,20 +10,15 @@
#include <algorithm>
#include <tuple>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "storage/v2/durability/paths.hpp"
#include "utils/uuid.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
namespace storage::durability {
namespace {
/// Verifies that the owner of the storage directory is the same user that
/// started the current process.
void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
const std::filesystem::path &storage_directory) {
// Get the process user ID.
@ -55,323 +50,46 @@ void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
<< ". Please start the process as user " << user_directory << "!";
}
} // namespace
Durability::Durability(Config::Durability config,
utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
NameIdMapper *name_id_mapper,
std::atomic<uint64_t> *edge_count, Indices *indices,
Constraints *constraints, Config::Items items)
: config_(config),
vertices_(vertices),
edges_(edges),
name_id_mapper_(name_id_mapper),
edge_count_(edge_count),
indices_(indices),
constraints_(constraints),
items_(items),
storage_directory_(config_.storage_directory),
snapshot_directory_(config_.storage_directory / kSnapshotDirectory),
wal_directory_(config_.storage_directory / kWalDirectory),
lock_file_path_(config_.storage_directory / ".lock"),
uuid_(utils::GenerateUUID()) {}
std::optional<RecoveryInfo> Durability::Initialize(
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction) {
execute_with_transaction_ = execute_with_transaction;
if (config_.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED ||
config_.snapshot_on_exit || config_.recover_on_startup) {
// Create the directory initially to crash the database in case of
// permission errors. This is done early to crash the database on startup
// instead of crashing the database for the first time during runtime (which
// could be an unpleasant surprise).
utils::EnsureDirOrDie(snapshot_directory_);
// Same reasoning as above.
utils::EnsureDirOrDie(wal_directory_);
// Verify that the user that started the process is the same user that is
// the owner of the storage directory.
VerifyStorageDirectoryOwnerAndProcessUserOrDie(storage_directory_);
// Create the lock file and open a handle to it. This will crash the
// database if it can't open the file for writing or if any other process is
// holding the file opened.
lock_file_handle_.Open(lock_file_path_,
utils::OutputFile::Mode::OVERWRITE_EXISTING);
CHECK(lock_file_handle_.AcquireLock())
<< "Couldn't acquire lock on the storage directory "
<< storage_directory_
<< "!\nAnother Memgraph process is currently running with the same "
"storage directory, please stop it first before starting this "
"process!";
}
std::optional<RecoveryInfo> ret;
if (config_.recover_on_startup) {
ret = RecoverData();
} else if (config_.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED ||
config_.snapshot_on_exit) {
bool files_moved = false;
auto backup_root = config_.storage_directory / kBackupDirectory;
for (const auto &[path, dirname, what] :
{std::make_tuple(snapshot_directory_, kSnapshotDirectory, "snapshot"),
std::make_tuple(wal_directory_, kWalDirectory, "WAL")}) {
if (!utils::DirExists(path)) continue;
auto backup_curr = backup_root / dirname;
std::error_code error_code;
for (const auto &item :
std::filesystem::directory_iterator(path, error_code)) {
utils::EnsureDirOrDie(backup_root);
utils::EnsureDirOrDie(backup_curr);
std::error_code item_error_code;
std::filesystem::rename(
item.path(), backup_curr / item.path().filename(), item_error_code);
CHECK(!item_error_code)
<< "Couldn't move " << what << " file " << item.path()
<< " because of: " << item_error_code.message();
files_moved = true;
}
CHECK(!error_code) << "Couldn't backup " << what
<< " files because of: " << error_code.message();
}
LOG_IF(WARNING, files_moved)
<< "Since Memgraph was not supposed to recover on startup and "
"durability is enabled, your current durability files will likely "
"be overridden. To prevent important data loss, Memgraph has stored "
"those files into a .backup directory inside the storage directory.";
}
if (config_.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.snapshot_interval, [this] {
execute_with_transaction_([this](Transaction *transaction) {
CreateSnapshot(transaction, snapshot_directory_, wal_directory_,
config_.snapshot_retention_count, vertices_, edges_,
name_id_mapper_, indices_, constraints_, items_, uuid_);
});
});
}
return ret;
}
void Durability::Finalize() {
wal_file_ = std::nullopt;
if (config_.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Stop();
}
if (config_.snapshot_on_exit) {
execute_with_transaction_([this](Transaction *transaction) {
CreateSnapshot(transaction, snapshot_directory_, wal_directory_,
config_.snapshot_retention_count, vertices_, edges_,
name_id_mapper_, indices_, constraints_, items_, uuid_);
});
}
}
void Durability::AppendToWal(const Transaction &transaction,
uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) return;
// Traverse deltas and append them to the WAL file.
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp =
transaction.commit_timestamp->load(std::memory_order_acquire);
// Helper lambda that traverses the delta chain on order to find the first
// delta that should be processed and then appends all discovered deltas.
auto find_and_apply_deltas = [&](const auto *delta, const auto &parent,
auto filter) {
while (true) {
auto older = delta->next.load(std::memory_order_acquire);
if (older == nullptr ||
older->timestamp->load(std::memory_order_acquire) !=
current_commit_timestamp)
break;
delta = older;
}
while (true) {
if (filter(delta->action)) {
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
}
auto prev = delta->prev.Get();
if (prev.type != PreviousPtr::Type::DELTA) break;
delta = prev.delta;
}
};
// The deltas are ordered correctly in the `transaction.deltas` buffer, but we
// don't traverse them in that order. That is because for each delta we need
// information about the vertex or edge they belong to and that information
// isn't stored in the deltas themselves. In order to find out information
// about the corresponding vertex or edge it is necessary to traverse the
// delta chain for each delta until a vertex or edge is encountered. This
// operation is very expensive as the chain grows.
// Instead, we traverse the edges until we find a vertex or edge and traverse
// their delta chains. This approach has a drawback because we lose the
// correct order of the operations. Because of that, we need to traverse the
// deltas several times and we have to manually ensure that the stored deltas
// will be ordered correctly.
// 1. Process all Vertex deltas and store all operations that create vertices
// and modify vertex data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
return true;
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 2. Process all Vertex deltas and store all operations that create edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::REMOVE_OUT_EDGE:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
return false;
}
});
}
// 3. Process all Edge deltas and store all operations that modify edge data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::EDGE) continue;
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
switch (action) {
case Delta::Action::SET_PROPERTY:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 4. Process all Vertex deltas and store all operations that delete edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::ADD_OUT_EDGE:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 5. Process all Vertex deltas and store all operations that delete vertices.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::RECREATE_OBJECT:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// Add a delta that indicates that the transaction is fully written to the WAL
// file.
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
}
void Durability::AppendToWal(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) return;
wal_file_->AppendOperation(operation, label, properties,
final_commit_timestamp);
FinalizeWalFile();
}
std::optional<RecoveryInfo> Durability::RecoverData() {
if (!utils::DirExists(snapshot_directory_) &&
!utils::DirExists(wal_directory_))
std::optional<RecoveryInfo> RecoverData(
const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, Config::Items items,
uint64_t *wal_seq_num) {
if (!utils::DirExists(snapshot_directory) && !utils::DirExists(wal_directory))
return std::nullopt;
// Helper lambda used to recover all discovered indices and constraints. The
// indices and constraints must be recovered after the data recovery is done
// to ensure that the indices and constraints are consistent at the end of the
// recovery process.
auto recover_indices_and_constraints = [this](
const auto &indices_constraints) {
auto recover_indices_and_constraints = [&](const auto &indices_constraints) {
// Recover label indices.
for (const auto &item : indices_constraints.indices.label) {
if (!indices_->label_index.CreateIndex(item, vertices_->access()))
if (!indices->label_index.CreateIndex(item, vertices->access()))
throw RecoveryFailure("The label index must be created here!");
}
// Recover label+property indices.
for (const auto &item : indices_constraints.indices.label_property) {
if (!indices_->label_property_index.CreateIndex(item.first, item.second,
vertices_->access()))
if (!indices->label_property_index.CreateIndex(item.first, item.second,
vertices->access()))
throw RecoveryFailure("The label+property index must be created here!");
}
// Recover existence constraints.
for (const auto &item : indices_constraints.constraints.existence) {
auto ret = CreateExistenceConstraint(constraints_, item.first,
item.second, vertices_->access());
auto ret = CreateExistenceConstraint(constraints, item.first, item.second,
vertices->access());
if (ret.HasError() || !ret.GetValue())
throw RecoveryFailure("The existence constraint must be created here!");
}
// Recover unique constraints.
for (const auto &item : indices_constraints.constraints.unique) {
auto ret = constraints_->unique_constraints.CreateConstraint(
item.first, item.second, vertices_->access());
auto ret = constraints->unique_constraints.CreateConstraint(
item.first, item.second, vertices->access());
if (ret.HasError() ||
ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS)
throw RecoveryFailure("The unique constraint must be created here!");
@ -381,9 +99,9 @@ std::optional<RecoveryInfo> Durability::RecoverData() {
// Array of all discovered snapshots, ordered by name.
std::vector<std::pair<std::filesystem::path, std::string>> snapshot_files;
std::error_code error_code;
if (utils::DirExists(snapshot_directory_)) {
if (utils::DirExists(snapshot_directory)) {
for (const auto &item :
std::filesystem::directory_iterator(snapshot_directory_, error_code)) {
std::filesystem::directory_iterator(snapshot_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadSnapshotInfo(item.path());
@ -402,19 +120,19 @@ std::optional<RecoveryInfo> Durability::RecoverData() {
if (!snapshot_files.empty()) {
std::sort(snapshot_files.begin(), snapshot_files.end());
// UUID used for durability is the UUID of the last snapshot file.
uuid_ = snapshot_files.back().second;
*uuid = snapshot_files.back().second;
std::optional<RecoveredSnapshot> recovered_snapshot;
for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) {
const auto &[path, uuid] = *it;
if (uuid != uuid_) {
const auto &[path, file_uuid] = *it;
if (file_uuid != *uuid) {
LOG(WARNING) << "The snapshot file " << path
<< " isn't related to the latest snapshot file!";
continue;
}
LOG(INFO) << "Starting snapshot recovery from " << path;
try {
recovered_snapshot = LoadSnapshot(path, vertices_, edges_,
name_id_mapper_, edge_count_, items_);
recovered_snapshot = LoadSnapshot(path, vertices, edges, name_id_mapper,
edge_count, items);
LOG(INFO) << "Snapshot recovery successful!";
break;
} catch (const RecoveryFailure &e) {
@ -430,16 +148,16 @@ std::optional<RecoveryInfo> Durability::RecoverData() {
recovery_info = recovered_snapshot->recovery_info;
indices_constraints = std::move(recovered_snapshot->indices_constraints);
snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp;
if (!utils::DirExists(wal_directory_)) {
if (!utils::DirExists(wal_directory)) {
recover_indices_and_constraints(indices_constraints);
return recovered_snapshot->recovery_info;
}
} else {
if (!utils::DirExists(wal_directory_)) return std::nullopt;
if (!utils::DirExists(wal_directory)) return std::nullopt;
// Array of all discovered WAL files, ordered by name.
std::vector<std::pair<std::filesystem::path, std::string>> wal_files;
for (const auto &item :
std::filesystem::directory_iterator(wal_directory_, error_code)) {
std::filesystem::directory_iterator(wal_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadWalInfo(item.path());
@ -453,18 +171,18 @@ std::optional<RecoveryInfo> Durability::RecoverData() {
if (wal_files.empty()) return std::nullopt;
std::sort(wal_files.begin(), wal_files.end());
// UUID used for durability is the UUID of the last WAL file.
uuid_ = wal_files.back().second;
*uuid = wal_files.back().second;
}
// Array of all discovered WAL files, ordered by sequence number.
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
wal_files;
for (const auto &item :
std::filesystem::directory_iterator(wal_directory_, error_code)) {
std::filesystem::directory_iterator(wal_directory, error_code)) {
if (!item.is_regular_file()) continue;
try {
auto info = ReadWalInfo(item.path());
if (info.uuid != uuid_) continue;
if (info.uuid != *uuid) continue;
wal_files.emplace_back(info.seq_num, info.from_timestamp,
info.to_timestamp, item.path());
} catch (const RecoveryFailure &e) {
@ -514,9 +232,8 @@ std::optional<RecoveryInfo> Durability::RecoverData() {
}
previous_seq_num = seq_num;
try {
auto info =
LoadWal(path, &indices_constraints, snapshot_timestamp, vertices_,
edges_, name_id_mapper_, edge_count_, items_);
auto info = LoadWal(path, &indices_constraints, snapshot_timestamp,
vertices, edges, name_id_mapper, edge_count, items);
recovery_info.next_vertex_id =
std::max(recovery_info.next_vertex_id, info.next_vertex_id);
recovery_info.next_edge_id =
@ -530,34 +247,11 @@ std::optional<RecoveryInfo> Durability::RecoverData() {
}
// The sequence number needs to be recovered even though `LoadWal` didn't
// load any deltas from that file.
wal_seq_num_ = *previous_seq_num + 1;
*wal_seq_num = *previous_seq_num + 1;
}
recover_indices_and_constraints(indices_constraints);
return recovery_info;
}
bool Durability::InitializeWalFile() {
if (config_.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
return false;
if (!wal_file_) {
wal_file_.emplace(wal_directory_, uuid_, items_, name_id_mapper_,
wal_seq_num_++);
}
return true;
}
void Durability::FinalizeWalFile() {
++wal_unsynced_transactions_;
if (wal_unsynced_transactions_ >= config_.wal_file_flush_every_n_tx) {
wal_file_->Sync();
wal_unsynced_transactions_ = 0;
}
if (wal_file_->GetSize() / 1024 >= config_.wal_file_size_kibibytes) {
wal_file_ = std::nullopt;
wal_unsynced_transactions_ = 0;
}
}
} // namespace storage::durability

View File

@ -3,83 +3,35 @@
#include <atomic>
#include <cstdint>
#include <filesystem>
#include <functional>
#include <optional>
#include <string>
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/scheduler.hpp"
#include "utils/skip_list.hpp"
namespace storage::durability {
/// Durability class that is used to provide full durability functionality to
/// the storage.
class Durability final {
public:
Durability(Config::Durability config, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
std::atomic<uint64_t> *edge_count, Indices *indices,
Constraints *constraints, Config::Items items);
/// Verifies that the owner of the storage directory is the same user that
/// started the current process. If the verification fails, the process is
/// killed (`CHECK` failure).
void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
const std::filesystem::path &storage_directory);
std::optional<RecoveryInfo> Initialize(
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction);
void Finalize();
void AppendToWal(const Transaction &transaction,
uint64_t final_commit_timestamp);
void AppendToWal(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
private:
std::optional<RecoveryInfo> RecoverData();
bool InitializeWalFile();
void FinalizeWalFile();
Config::Durability config_;
utils::SkipList<Vertex> *vertices_;
utils::SkipList<Edge> *edges_;
NameIdMapper *name_id_mapper_;
std::atomic<uint64_t> *edge_count_;
Indices *indices_;
Constraints *constraints_;
Config::Items items_;
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction_;
std::filesystem::path storage_directory_;
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
utils::Scheduler snapshot_runner_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
std::optional<WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
};
/// Recovers data either from a snapshot and/or WAL files.
/// @throw RecoveryFailure
/// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(
const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, Config::Items items,
uint64_t *wal_seq_num);
} // namespace storage::durability

View File

@ -10,6 +10,7 @@ namespace storage::durability {
static const std::string kSnapshotDirectory{"snapshots"};
static const std::string kWalDirectory{"wal"};
static const std::string kBackupDirectory{".backup"};
static const std::string kLockFile{".lock"};
// This is the prefix used for Snapshot and WAL filenames. It is a timestamp
// format that equals to: YYYYmmddHHMMSSffffff

View File

@ -6,8 +6,12 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/mvcc.hpp"
#include "utils/stat.hpp"
#include "utils/uuid.hpp"
namespace storage {
@ -293,25 +297,105 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
Storage::Storage(Config config)
: indices_(&constraints_, config.items),
config_(config),
durability_(config.durability, &vertices_, &edges_, &name_id_mapper_,
&edge_count_, &indices_, &constraints_, config.items) {
auto info = durability_.Initialize([this](auto callback) {
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
snapshot_directory_(config_.durability.storage_directory /
durability::kSnapshotDirectory),
wal_directory_(config_.durability.storage_directory /
durability::kWalDirectory),
lock_file_path_(config_.durability.storage_directory /
durability::kLockFile),
uuid_(utils::GenerateUUID()) {
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit ||
config_.durability.recover_on_startup) {
// Create the directory initially to crash the database in case of
// permission errors. This is done early to crash the database on startup
// instead of crashing the database for the first time during runtime (which
// could be an unpleasant surprise).
utils::EnsureDirOrDie(snapshot_directory_);
// Same reasoning as above.
utils::EnsureDirOrDie(wal_directory_);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction();
// Verify that the user that started the process is the same user that is
// the owner of the storage directory.
durability::VerifyStorageDirectoryOwnerAndProcessUserOrDie(
config_.durability.storage_directory);
// Create snapshot.
callback(&transaction);
// Create the lock file and open a handle to it. This will crash the
// database if it can't open the file for writing or if any other process is
// holding the file opened.
lock_file_handle_.Open(lock_file_path_,
utils::OutputFile::Mode::OVERWRITE_EXISTING);
CHECK(lock_file_handle_.AcquireLock())
<< "Couldn't acquire lock on the storage directory "
<< config_.durability.storage_directory
<< "!\nAnother Memgraph process is currently running with the same "
"storage directory, please stop it first before starting this "
"process!";
}
if (config_.durability.recover_on_startup) {
auto info = durability::RecoverData(
snapshot_directory_, wal_directory_, &uuid_, &vertices_, &edges_,
&edge_count_, &name_id_mapper_, &indices_, &constraints_, config_.items,
&wal_seq_num_);
if (info) {
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
timestamp_ = std::max(timestamp_, info->next_timestamp);
}
} else if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit) {
bool files_moved = false;
auto backup_root =
config_.durability.storage_directory / durability::kBackupDirectory;
for (const auto &[path, dirname, what] :
{std::make_tuple(snapshot_directory_, durability::kSnapshotDirectory,
"snapshot"),
std::make_tuple(wal_directory_, durability::kWalDirectory, "WAL")}) {
if (!utils::DirExists(path)) continue;
auto backup_curr = backup_root / dirname;
std::error_code error_code;
for (const auto &item :
std::filesystem::directory_iterator(path, error_code)) {
utils::EnsureDirOrDie(backup_root);
utils::EnsureDirOrDie(backup_curr);
std::error_code item_error_code;
std::filesystem::rename(
item.path(), backup_curr / item.path().filename(), item_error_code);
CHECK(!item_error_code)
<< "Couldn't move " << what << " file " << item.path()
<< " because of: " << item_error_code.message();
files_moved = true;
}
CHECK(!error_code) << "Couldn't backup " << what
<< " files because of: " << error_code.message();
}
LOG_IF(WARNING, files_moved)
<< "Since Memgraph was not supposed to recover on startup and "
"durability is enabled, your current durability files will likely "
"be overridden. To prevent important data loss, Memgraph has stored "
"those files into a .backup directory inside the storage directory.";
}
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run(
"Snapshot", config_.durability.snapshot_interval, [this] {
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
// Finalize snapshot transaction.
commit_log_.MarkFinished(transaction.start_timestamp);
});
if (info) {
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
timestamp_ = std::max(timestamp_, info->next_timestamp);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction();
// Create snapshot.
durability::CreateSnapshot(
&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_,
&name_id_mapper_, &indices_, &constraints_, config_.items, uuid_);
// Finalize snapshot transaction.
commit_log_.MarkFinished(transaction.start_timestamp);
});
}
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Run("Storage GC", config_.gc.interval,
@ -323,7 +407,27 @@ Storage::~Storage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
}
durability_.Finalize();
wal_file_ = std::nullopt;
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Stop();
}
if (config_.durability.snapshot_on_exit) {
// Take master RW lock (for reading).
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
// Create the transaction used to create the snapshot.
auto transaction = CreateTransaction();
// Create snapshot.
durability::CreateSnapshot(
&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_,
&name_id_mapper_, &indices_, &constraints_, config_.items, uuid_);
// Finalize snapshot transaction.
commit_log_.MarkFinished(transaction.start_timestamp);
}
}
Storage::Accessor::Accessor(Storage *storage)
@ -716,7 +820,7 @@ utils::BasicResult<ConstraintViolation, void> Storage::Accessor::Commit() {
// written before actually committing the transaction (before setting
// the commit timestamp) so that no other transaction can see the
// modifications before they are written to disk.
storage_->durability_.AppendToWal(transaction_, commit_timestamp);
storage_->AppendToWal(transaction_, commit_timestamp);
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
@ -968,9 +1072,8 @@ bool Storage::CreateIndex(LabelId label) {
// next regular transaction after this operation. This prevents collisions of
// commit timestamps between non-transactional operations and transactional
// operations.
durability_.AppendToWal(
durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {},
timestamp_);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {},
timestamp_);
return true;
}
@ -981,9 +1084,8 @@ bool Storage::CreateIndex(LabelId label, PropertyId property) {
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(
durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label,
{property}, timestamp_);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE,
label, {property}, timestamp_);
return true;
}
@ -992,8 +1094,8 @@ bool Storage::DropIndex(LabelId label) {
if (!indices_.label_index.DropIndex(label)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP,
label, {}, timestamp_);
AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {},
timestamp_);
return true;
}
@ -1002,9 +1104,8 @@ bool Storage::DropIndex(LabelId label, PropertyId property) {
if (!indices_.label_property_index.DropIndex(label, property)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(
durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label,
{property}, timestamp_);
AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP,
label, {property}, timestamp_);
return true;
}
@ -1022,9 +1123,8 @@ Storage::CreateExistenceConstraint(LabelId label, PropertyId property) {
if (ret.HasError() || !ret.GetValue()) return ret;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(
durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label,
{property}, timestamp_);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE,
label, {property}, timestamp_);
return true;
}
@ -1034,9 +1134,8 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property) {
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(
durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label,
{property}, timestamp_);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP,
label, {property}, timestamp_);
return true;
}
@ -1052,9 +1151,8 @@ Storage::CreateUniqueConstraint(LabelId label,
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(
durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label,
properties, timestamp_);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE,
label, properties, timestamp_);
return UniqueConstraints::CreationStatus::SUCCESS;
}
@ -1067,9 +1165,8 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(
durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label,
properties, timestamp_);
AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label,
properties, timestamp_);
return UniqueConstraints::DeletionStatus::SUCCESS;
}
@ -1361,4 +1458,194 @@ void Storage::CollectGarbage() {
}
}
bool Storage::InitializeWalFile() {
if (config_.durability.snapshot_wal_mode !=
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
return false;
if (!wal_file_) {
wal_file_.emplace(wal_directory_, uuid_, config_.items, &name_id_mapper_,
wal_seq_num_++);
}
return true;
}
void Storage::FinalizeWalFile() {
++wal_unsynced_transactions_;
if (wal_unsynced_transactions_ >=
config_.durability.wal_file_flush_every_n_tx) {
wal_file_->Sync();
wal_unsynced_transactions_ = 0;
}
if (wal_file_->GetSize() / 1024 >=
config_.durability.wal_file_size_kibibytes) {
wal_file_ = std::nullopt;
wal_unsynced_transactions_ = 0;
}
}
void Storage::AppendToWal(const Transaction &transaction,
uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) return;
// Traverse deltas and append them to the WAL file.
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp =
transaction.commit_timestamp->load(std::memory_order_acquire);
// Helper lambda that traverses the delta chain on order to find the first
// delta that should be processed and then appends all discovered deltas.
auto find_and_apply_deltas = [&](const auto *delta, const auto &parent,
auto filter) {
while (true) {
auto older = delta->next.load(std::memory_order_acquire);
if (older == nullptr ||
older->timestamp->load(std::memory_order_acquire) !=
current_commit_timestamp)
break;
delta = older;
}
while (true) {
if (filter(delta->action)) {
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
}
auto prev = delta->prev.Get();
if (prev.type != PreviousPtr::Type::DELTA) break;
delta = prev.delta;
}
};
// The deltas are ordered correctly in the `transaction.deltas` buffer, but we
// don't traverse them in that order. That is because for each delta we need
// information about the vertex or edge they belong to and that information
// isn't stored in the deltas themselves. In order to find out information
// about the corresponding vertex or edge it is necessary to traverse the
// delta chain for each delta until a vertex or edge is encountered. This
// operation is very expensive as the chain grows.
// Instead, we traverse the edges until we find a vertex or edge and traverse
// their delta chains. This approach has a drawback because we lose the
// correct order of the operations. Because of that, we need to traverse the
// deltas several times and we have to manually ensure that the stored deltas
// will be ordered correctly.
// 1. Process all Vertex deltas and store all operations that create vertices
// and modify vertex data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
return true;
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 2. Process all Vertex deltas and store all operations that create edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::REMOVE_OUT_EDGE:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
return false;
}
});
}
// 3. Process all Edge deltas and store all operations that modify edge data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::EDGE) continue;
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
switch (action) {
case Delta::Action::SET_PROPERTY:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 4. Process all Vertex deltas and store all operations that delete edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::ADD_OUT_EDGE:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 5. Process all Vertex deltas and store all operations that delete vertices.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::RECREATE_OBJECT:
return true;
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// Add a delta that indicates that the transaction is fully written to the WAL
// file.
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
}
void Storage::AppendToWal(durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) return;
wal_file_->AppendOperation(operation, label, properties,
final_commit_timestamp);
FinalizeWalFile();
}
} // namespace storage

View File

@ -1,12 +1,13 @@
#pragma once
#include <filesystem>
#include <optional>
#include <shared_mutex>
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/indices.hpp"
@ -385,6 +386,15 @@ class Storage final {
/// @throw std::bad_alloc
void CollectGarbage();
bool InitializeWalFile();
void FinalizeWalFile();
void AppendToWal(const Transaction &transaction,
uint64_t final_commit_timestamp);
void AppendToWal(durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
// Main storage lock.
//
// Accessors take a shared lock when starting, so it is possible to block
@ -442,7 +452,21 @@ class Storage final {
// storage.
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;
durability::Durability durability_;
// Durability
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
utils::Scheduler snapshot_runner_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
std::optional<durability::WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
};
} // namespace storage

View File

@ -14,6 +14,7 @@
#include <thread>
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/storage.hpp"
#include "utils/file.hpp"