Implement WAL writing for storage v2
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2515
This commit is contained in:
parent
4879686ef4
commit
13ba9cc23e
@ -21,16 +21,26 @@ struct Config {
|
||||
} items;
|
||||
|
||||
struct Durability {
|
||||
enum class SnapshotType { NONE, PERIODIC };
|
||||
enum class SnapshotWalMode {
|
||||
DISABLED,
|
||||
PERIODIC_SNAPSHOT,
|
||||
PERIODIC_SNAPSHOT_WITH_WAL
|
||||
};
|
||||
|
||||
std::filesystem::path storage_directory{"storage"};
|
||||
|
||||
bool recover_on_startup{false};
|
||||
|
||||
SnapshotType snapshot_type{SnapshotType::NONE};
|
||||
SnapshotWalMode snapshot_wal_mode{SnapshotWalMode::DISABLED};
|
||||
|
||||
std::chrono::milliseconds snapshot_interval{std::chrono::minutes(2)};
|
||||
uint64_t snapshot_retention_count{3};
|
||||
|
||||
uint64_t wal_file_size_kibibytes{20 * 1024};
|
||||
uint64_t wal_file_flush_every_n_tx{100000};
|
||||
|
||||
bool snapshot_on_exit{false};
|
||||
|
||||
} durability;
|
||||
};
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include "storage/v2/durability.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
|
||||
@ -1265,6 +1266,7 @@ Durability::Durability(Config::Durability config,
|
||||
constraints_(constraints),
|
||||
items_(items),
|
||||
snapshot_directory_(config_.storage_directory / kSnapshotDirectory),
|
||||
wal_directory_(config_.storage_directory / kWalDirectory),
|
||||
uuid_(utils::GenerateUUID()) {}
|
||||
|
||||
std::optional<Durability::RecoveryInfo> Durability::Initialize(
|
||||
@ -1275,7 +1277,8 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
|
||||
if (config_.recover_on_startup) {
|
||||
ret = RecoverData();
|
||||
}
|
||||
if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC ||
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::DISABLED ||
|
||||
config_.snapshot_on_exit) {
|
||||
// Create the directory initially to crash the database in case of
|
||||
// permission errors. This is done early to crash the database on startup
|
||||
@ -1283,7 +1286,13 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
|
||||
// could be an unpleasant surprise).
|
||||
utils::EnsureDirOrDie(snapshot_directory_);
|
||||
}
|
||||
if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC) {
|
||||
if (config_.snapshot_wal_mode ==
|
||||
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) {
|
||||
// Same reasoning as above.
|
||||
utils::EnsureDirOrDie(wal_directory_);
|
||||
}
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::DISABLED) {
|
||||
snapshot_runner_.Run("Snapshot", config_.snapshot_interval, [this] {
|
||||
execute_with_transaction_(
|
||||
[this](Transaction *transaction) { CreateSnapshot(transaction); });
|
||||
@ -1293,7 +1302,9 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
|
||||
}
|
||||
|
||||
void Durability::Finalize() {
|
||||
if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC) {
|
||||
wal_file_ = std::nullopt;
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::DISABLED) {
|
||||
snapshot_runner_.Stop();
|
||||
}
|
||||
if (config_.snapshot_on_exit) {
|
||||
@ -1302,6 +1313,171 @@ void Durability::Finalize() {
|
||||
}
|
||||
}
|
||||
|
||||
void Durability::AppendToWal(const Transaction &transaction,
|
||||
uint64_t final_commit_timestamp) {
|
||||
if (!InitializeWalFile()) return;
|
||||
// Traverse deltas and append them to the WAL file.
|
||||
// A single transaction will always be contained in a single WAL file.
|
||||
auto current_commit_timestamp =
|
||||
transaction.commit_timestamp->load(std::memory_order_acq_rel);
|
||||
// Helper lambda that traverses the delta chain on order to find the first
|
||||
// delta that should be processed and then appends all discovered deltas.
|
||||
auto find_and_apply_deltas = [&](const auto *delta, auto *parent,
|
||||
auto filter) {
|
||||
while (true) {
|
||||
auto older = delta->next.load(std::memory_order_acq_rel);
|
||||
if (older == nullptr ||
|
||||
older->timestamp->load(std::memory_order_acq_rel) !=
|
||||
current_commit_timestamp)
|
||||
break;
|
||||
delta = older;
|
||||
}
|
||||
while (true) {
|
||||
if (filter(delta->action)) {
|
||||
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
|
||||
}
|
||||
auto prev = delta->prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::DELTA) break;
|
||||
delta = prev.delta;
|
||||
}
|
||||
};
|
||||
|
||||
// The deltas are ordered correctly in the `transaction.deltas` buffer, but we
|
||||
// don't traverse them in that order. That is because for each delta we need
|
||||
// information about the vertex or edge they belong to and that information
|
||||
// isn't stored in the deltas themselves. In order to find out information
|
||||
// about the corresponding vertex or edge it is necessary to traverse the
|
||||
// delta chain for each delta until a vertex or edge is encountered. This
|
||||
// operation is very expensive as the chain grows.
|
||||
// Instead, we traverse the edges until we find a vertex or edge and traverse
|
||||
// their delta chains. This approach has a drawback because we lose the
|
||||
// correct order of the operations. Because of that, we need to traverse the
|
||||
// deltas several times and we have to manually ensure that the stored deltas
|
||||
// will be ordered correctly.
|
||||
|
||||
// 1. Process all Vertex deltas and store all operations that create vertices
|
||||
// and modify vertex data.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
return true;
|
||||
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
// 2. Process all Vertex deltas and store all operations that create edges.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return true;
|
||||
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
// 3. Process all Edge deltas and store all operations that modify edge data.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::EDGE) continue;
|
||||
find_and_apply_deltas(&delta, prev.edge, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
return true;
|
||||
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
// 4. Process all Vertex deltas and store all operations that delete edges.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
return true;
|
||||
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
// 5. Process all Vertex deltas and store all operations that delete vertices.
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
return true;
|
||||
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Add a delta that indicates that the transaction is fully written to the WAL
|
||||
// file.
|
||||
wal_file_->AppendTransactionEnd(final_commit_timestamp);
|
||||
|
||||
FinalizeWalFile();
|
||||
}
|
||||
|
||||
void Durability::AppendToWal(StorageGlobalOperation operation, LabelId label,
|
||||
std::optional<PropertyId> property,
|
||||
uint64_t final_commit_timestamp) {
|
||||
if (!InitializeWalFile()) return;
|
||||
wal_file_->AppendOperation(operation, label, property,
|
||||
final_commit_timestamp);
|
||||
FinalizeWalFile();
|
||||
}
|
||||
|
||||
void Durability::CreateSnapshot(Transaction *transaction) {
|
||||
// Ensure that the storage directory exists.
|
||||
utils::EnsureDirOrDie(snapshot_directory_);
|
||||
@ -1545,37 +1721,92 @@ void Durability::CreateSnapshot(Transaction *transaction) {
|
||||
LOG(INFO) << "Snapshot creation successful!";
|
||||
|
||||
// Ensure exactly `snapshot_retention_count` snapshots exist.
|
||||
std::vector<std::filesystem::path> old_snapshot_files;
|
||||
std::error_code error_code;
|
||||
for (auto &item :
|
||||
std::filesystem::directory_iterator(snapshot_directory_, error_code)) {
|
||||
if (!item.is_regular_file()) continue;
|
||||
if (item.path() == path) continue;
|
||||
try {
|
||||
auto info = ReadSnapshotInfo(item.path());
|
||||
if (info.uuid == uuid_) {
|
||||
old_snapshot_files.push_back(item.path());
|
||||
std::vector<std::pair<uint64_t, std::filesystem::path>> old_snapshot_files;
|
||||
{
|
||||
std::error_code error_code;
|
||||
for (const auto &item :
|
||||
std::filesystem::directory_iterator(snapshot_directory_, error_code)) {
|
||||
if (!item.is_regular_file()) continue;
|
||||
if (item.path() == path) continue;
|
||||
try {
|
||||
auto info = ReadSnapshotInfo(item.path());
|
||||
if (info.uuid != uuid_) continue;
|
||||
old_snapshot_files.emplace_back(info.start_timestamp, item.path());
|
||||
} catch (const RecoveryFailure &e) {
|
||||
LOG(WARNING) << "Found a corrupt snapshot file " << item.path()
|
||||
<< " because of: " << e.what();
|
||||
continue;
|
||||
}
|
||||
} catch (const RecoveryFailure &e) {
|
||||
LOG(WARNING) << "Found a corrupt snapshot file " << item.path()
|
||||
<< " because of: " << e.what();
|
||||
continue;
|
||||
}
|
||||
LOG_IF(ERROR, error_code)
|
||||
<< "Couldn't ensure that exactly " << config_.snapshot_retention_count
|
||||
<< " snapshots exist because an error occurred: "
|
||||
<< error_code.message() << "!";
|
||||
std::sort(old_snapshot_files.begin(), old_snapshot_files.end());
|
||||
if (old_snapshot_files.size() > config_.snapshot_retention_count - 1) {
|
||||
auto num_to_erase =
|
||||
old_snapshot_files.size() - (config_.snapshot_retention_count - 1);
|
||||
for (size_t i = 0; i < num_to_erase; ++i) {
|
||||
const auto &[start_timestamp, snapshot_path] = old_snapshot_files[i];
|
||||
if (!utils::DeleteFile(snapshot_path)) {
|
||||
LOG(WARNING) << "Couldn't delete snapshot file " << snapshot_path
|
||||
<< "!";
|
||||
}
|
||||
}
|
||||
old_snapshot_files.erase(old_snapshot_files.begin(),
|
||||
old_snapshot_files.begin() + num_to_erase);
|
||||
}
|
||||
}
|
||||
if (error_code) {
|
||||
LOG(ERROR) << "Couldn't ensure that exactly "
|
||||
<< config_.snapshot_retention_count
|
||||
<< " snapshots exist because an error occurred: "
|
||||
<< error_code.message() << "!";
|
||||
}
|
||||
if (old_snapshot_files.size() >= config_.snapshot_retention_count) {
|
||||
std::sort(old_snapshot_files.begin(), old_snapshot_files.end());
|
||||
for (size_t i = 0;
|
||||
i <= old_snapshot_files.size() - config_.snapshot_retention_count;
|
||||
++i) {
|
||||
const auto &path = old_snapshot_files[i];
|
||||
if (!utils::DeleteFile(path)) {
|
||||
LOG(WARNING) << "Couldn't delete snapshot file " << path << "!";
|
||||
|
||||
// Ensure that only the absolutely necessary WAL files exist.
|
||||
if (old_snapshot_files.size() == config_.snapshot_retention_count - 1 &&
|
||||
utils::DirExists(wal_directory_)) {
|
||||
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
|
||||
wal_files;
|
||||
std::error_code error_code;
|
||||
for (const auto &item :
|
||||
std::filesystem::directory_iterator(wal_directory_, error_code)) {
|
||||
if (!item.is_regular_file()) continue;
|
||||
try {
|
||||
auto info = ReadWalInfo(item.path());
|
||||
if (info.uuid != uuid_) continue;
|
||||
wal_files.emplace_back(info.seq_num, info.from_timestamp,
|
||||
info.to_timestamp, item.path());
|
||||
} catch (const RecoveryFailure &e) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
LOG_IF(ERROR, error_code)
|
||||
<< "Couldn't ensure that only the absolutely necessary WAL files exist "
|
||||
"because an error occurred: "
|
||||
<< error_code.message() << "!";
|
||||
std::sort(wal_files.begin(), wal_files.end());
|
||||
uint64_t snapshot_start_timestamp = transaction->start_timestamp;
|
||||
if (!old_snapshot_files.empty()) {
|
||||
snapshot_start_timestamp = old_snapshot_files.begin()->first;
|
||||
}
|
||||
std::optional<uint64_t> pos = 0;
|
||||
for (uint64_t i = 0; i < wal_files.size(); ++i) {
|
||||
const auto &[seq_num, from_timestamp, to_timestamp, wal_path] =
|
||||
wal_files[i];
|
||||
if (to_timestamp <= snapshot_start_timestamp) {
|
||||
pos = i;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (pos && *pos > 0) {
|
||||
// We need to leave at least one WAL file that contains deltas that were
|
||||
// created before the oldest snapshot. Because we always leave at least
|
||||
// one WAL file that contains deltas before the snapshot, this correctly
|
||||
// handles the edge case when that one file is the current WAL file that
|
||||
// is being appended to.
|
||||
for (uint64_t i = 0; i < *pos; ++i) {
|
||||
const auto &[seq_num, from_timestamp, to_timestamp, wal_path] =
|
||||
wal_files[i];
|
||||
if (!utils::DeleteFile(wal_path)) {
|
||||
LOG(WARNING) << "Couldn't delete WAL file " << wal_path << "!";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2018,4 +2249,27 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool Durability::InitializeWalFile() {
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
|
||||
return false;
|
||||
if (!wal_file_) {
|
||||
wal_file_.emplace(wal_directory_, uuid_, items_, name_id_mapper_,
|
||||
wal_seq_num_++);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void Durability::FinalizeWalFile() {
|
||||
++wal_unsynced_transactions_;
|
||||
if (wal_unsynced_transactions_ >= config_.wal_file_flush_every_n_tx) {
|
||||
wal_file_->Sync();
|
||||
wal_unsynced_transactions_ = 0;
|
||||
}
|
||||
if (wal_file_->GetSize() / 1024 >= config_.wal_file_size_kibibytes) {
|
||||
wal_file_ = std::nullopt;
|
||||
wal_unsynced_transactions_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
@ -3,10 +3,12 @@
|
||||
#include <cstdint>
|
||||
#include <filesystem>
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "storage/v2/config.hpp"
|
||||
#include "storage/v2/constraints.hpp"
|
||||
@ -353,6 +355,13 @@ class Durability final {
|
||||
|
||||
void Finalize();
|
||||
|
||||
void AppendToWal(const Transaction &transaction,
|
||||
uint64_t final_commit_timestamp);
|
||||
|
||||
void AppendToWal(StorageGlobalOperation operation, LabelId label,
|
||||
std::optional<PropertyId> property,
|
||||
uint64_t final_commit_timestamp);
|
||||
|
||||
private:
|
||||
void CreateSnapshot(Transaction *transaction);
|
||||
|
||||
@ -360,6 +369,10 @@ class Durability final {
|
||||
|
||||
RecoveryInfo LoadSnapshot(const std::filesystem::path &path);
|
||||
|
||||
bool InitializeWalFile();
|
||||
|
||||
void FinalizeWalFile();
|
||||
|
||||
Config::Durability config_;
|
||||
|
||||
utils::SkipList<Vertex> *vertices_;
|
||||
@ -373,10 +386,17 @@ class Durability final {
|
||||
execute_with_transaction_;
|
||||
|
||||
std::filesystem::path snapshot_directory_;
|
||||
std::filesystem::path wal_directory_;
|
||||
|
||||
utils::Scheduler snapshot_runner_;
|
||||
|
||||
// UUID used to distinguish snapshots and to link snapshots to WALs
|
||||
std::string uuid_;
|
||||
// Sequence number used to keep track of the chain of WALs.
|
||||
uint64_t wal_seq_num_{0};
|
||||
|
||||
std::optional<WalFile> wal_file_;
|
||||
uint64_t wal_unsynced_transactions_{0};
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
@ -669,6 +669,15 @@ Storage::Accessor::Commit() {
|
||||
std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
|
||||
commit_timestamp = storage_->timestamp_++;
|
||||
|
||||
// Write transaction to WAL while holding the engine lock to make sure
|
||||
// that committed transactions are sorted by the commit timestamp in the
|
||||
// WAL files. We supply the new commit timestamp to the function so that
|
||||
// it knows what will be the final commit timestamp. The WAL must be
|
||||
// written before actually committing the transaction (before setting the
|
||||
// commit timestamp) so that no other transaction can see the
|
||||
// modifications before they are written to disk.
|
||||
storage_->durability_.AppendToWal(transaction_, commit_timestamp);
|
||||
|
||||
// Take committed_transactions lock while holding the engine lock to
|
||||
// make sure that committed transactions are sorted by the commit
|
||||
// timestamp in the list.
|
||||
@ -912,6 +921,79 @@ EdgeTypeId Storage::NameToEdgeType(const std::string &name) {
|
||||
return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name));
|
||||
}
|
||||
|
||||
bool Storage::CreateIndex(LabelId label) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
if (!indices_.label_index.CreateIndex(label, vertices_.access()))
|
||||
return false;
|
||||
// Here it is safe to use `timestamp_` as the final commit timestamp of this
|
||||
// operation even though this operation isn't transactional. The `timestamp_`
|
||||
// variable holds the next timestamp that will be used. Because the above
|
||||
// `storage_guard` ensures that no transactions are currently active, the
|
||||
// value of `timestamp_` is guaranteed to be used as a start timestamp for the
|
||||
// next regular transaction after this operation. This prevents collisions of
|
||||
// commit timestamps between non-transactional operations and transactional
|
||||
// operations.
|
||||
durability_.AppendToWal(StorageGlobalOperation::LABEL_INDEX_CREATE, label,
|
||||
std::nullopt, timestamp_);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Storage::CreateIndex(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
if (!indices_.label_property_index.CreateIndex(label, property,
|
||||
vertices_.access()))
|
||||
return false;
|
||||
// For a description why using `timestamp_` is correct, see
|
||||
// `CreateIndex(LabelId label)`.
|
||||
durability_.AppendToWal(StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE,
|
||||
label, property, timestamp_);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Storage::DropIndex(LabelId label) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
if (!indices_.label_index.DropIndex(label)) return false;
|
||||
// For a description why using `timestamp_` is correct, see
|
||||
// `CreateIndex(LabelId label)`.
|
||||
durability_.AppendToWal(StorageGlobalOperation::LABEL_INDEX_DROP, label,
|
||||
std::nullopt, timestamp_);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Storage::DropIndex(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
if (!indices_.label_property_index.DropIndex(label, property)) return false;
|
||||
// For a description why using `timestamp_` is correct, see
|
||||
// `CreateIndex(LabelId label)`.
|
||||
durability_.AppendToWal(StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP,
|
||||
label, property, timestamp_);
|
||||
return true;
|
||||
}
|
||||
|
||||
utils::BasicResult<ExistenceConstraintViolation, bool>
|
||||
Storage::CreateExistenceConstraint(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
auto ret = ::storage::CreateExistenceConstraint(&constraints_, label,
|
||||
property, vertices_.access());
|
||||
if (ret.HasError() || !ret.GetValue()) return ret;
|
||||
// For a description why using `timestamp_` is correct, see
|
||||
// `CreateIndex(LabelId label)`.
|
||||
durability_.AppendToWal(StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE,
|
||||
label, property, timestamp_);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Storage::DropExistenceConstraint(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
if (!::storage::DropExistenceConstraint(&constraints_, label, property))
|
||||
return false;
|
||||
// For a description why using `timestamp_` is correct, see
|
||||
// `CreateIndex(LabelId label)`.
|
||||
durability_.AppendToWal(StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP,
|
||||
label, property, timestamp_);
|
||||
return true;
|
||||
}
|
||||
|
||||
VerticesIterable Storage::Accessor::Vertices(LabelId label, View view) {
|
||||
return VerticesIterable(
|
||||
storage_->indices_.label_index.Vertices(label, view, &transaction_));
|
||||
|
@ -299,27 +299,14 @@ class Storage final {
|
||||
EdgeTypeId NameToEdgeType(const std::string &name);
|
||||
|
||||
/// @throw std::bad_alloc
|
||||
bool CreateIndex(LabelId label) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
return indices_.label_index.CreateIndex(label, vertices_.access());
|
||||
}
|
||||
bool CreateIndex(LabelId label);
|
||||
|
||||
/// @throw std::bad_alloc
|
||||
bool CreateIndex(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
return indices_.label_property_index.CreateIndex(label, property,
|
||||
vertices_.access());
|
||||
}
|
||||
bool CreateIndex(LabelId label, PropertyId property);
|
||||
|
||||
bool DropIndex(LabelId label) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
return indices_.label_index.DropIndex(label);
|
||||
}
|
||||
bool DropIndex(LabelId label);
|
||||
|
||||
bool DropIndex(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
return indices_.label_property_index.DropIndex(label, property);
|
||||
}
|
||||
bool DropIndex(LabelId label, PropertyId property);
|
||||
|
||||
bool LabelIndexExists(LabelId label) const {
|
||||
return indices_.label_index.IndexExists(label);
|
||||
@ -342,18 +329,11 @@ class Storage final {
|
||||
/// @throw std::bad_alloc
|
||||
/// @throw std::length_error
|
||||
utils::BasicResult<ExistenceConstraintViolation, bool>
|
||||
CreateExistenceConstraint(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
return ::storage::CreateExistenceConstraint(&constraints_, label, property,
|
||||
vertices_.access());
|
||||
}
|
||||
CreateExistenceConstraint(LabelId label, PropertyId property);
|
||||
|
||||
/// Removes a unique constraint. Returns true if the constraint was removed,
|
||||
/// and false if it doesn't exist.
|
||||
bool DropExistenceConstraint(LabelId label, PropertyId property) {
|
||||
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
return ::storage::DropExistenceConstraint(&constraints_, label, property);
|
||||
}
|
||||
bool DropExistenceConstraint(LabelId label, PropertyId property);
|
||||
|
||||
ConstraintsInfo ListAllConstraints() const {
|
||||
return {ListExistenceConstraints(constraints_)};
|
||||
|
@ -50,6 +50,10 @@ for test in tests:
|
||||
if name.startswith("benchmark") or name.startswith("concurrent"):
|
||||
prefix = "TIMEOUT=600 "
|
||||
|
||||
# larger timeout for storage_v2_durability unit test
|
||||
if name.endswith("storage_v2_durability"):
|
||||
prefix = "TIMEOUT=300 "
|
||||
|
||||
outfile_paths = []
|
||||
if name.startswith("unit"):
|
||||
dirname = dirname.replace("/build_debug/", "/build_coverage/")
|
||||
|
@ -1,25 +1,39 @@
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <csignal>
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include "storage/v2/durability.hpp"
|
||||
#include "storage/v2/storage.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using testing::Contains;
|
||||
using testing::UnorderedElementsAre;
|
||||
|
||||
class DurabilityTest : public ::testing::TestWithParam<bool> {
|
||||
private:
|
||||
protected:
|
||||
const uint64_t kNumBaseVertices = 1000;
|
||||
const uint64_t kNumBaseEdges = 10000;
|
||||
const uint64_t kNumExtendedVertices = 100;
|
||||
const uint64_t kNumExtendedEdges = 1000;
|
||||
|
||||
// We don't want to flush the WAL while we are doing operations because the
|
||||
// flushing adds a large overhead that slows down execution.
|
||||
const uint64_t kFlushWalEvery = (kNumBaseVertices + kNumBaseEdges +
|
||||
kNumExtendedVertices + kNumExtendedEdges) *
|
||||
2;
|
||||
|
||||
public:
|
||||
DurabilityTest()
|
||||
: base_vertex_gids_(kNumBaseVertices),
|
||||
@ -445,14 +459,11 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
|
||||
}
|
||||
|
||||
std::vector<std::filesystem::path> GetSnapshotsList() {
|
||||
std::vector<std::filesystem::path> ret;
|
||||
for (auto &item : std::filesystem::directory_iterator(
|
||||
storage_directory / storage::kSnapshotDirectory)) {
|
||||
ret.push_back(item.path());
|
||||
}
|
||||
std::sort(ret.begin(), ret.end());
|
||||
std::reverse(ret.begin(), ret.end());
|
||||
return ret;
|
||||
return GetFilesList(storage_directory / storage::kSnapshotDirectory);
|
||||
}
|
||||
|
||||
std::vector<std::filesystem::path> GetWalsList() {
|
||||
return GetFilesList(storage_directory / storage::kWalDirectory);
|
||||
}
|
||||
|
||||
std::filesystem::path storage_directory{
|
||||
@ -460,6 +471,18 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
|
||||
"MG_test_unit_storage_v2_durability"};
|
||||
|
||||
private:
|
||||
std::vector<std::filesystem::path> GetFilesList(
|
||||
const std::filesystem::path &path) {
|
||||
std::vector<std::filesystem::path> ret;
|
||||
std::error_code ec; // For exception suppression.
|
||||
for (auto &item : std::filesystem::directory_iterator(path, ec)) {
|
||||
ret.push_back(item.path());
|
||||
}
|
||||
std::sort(ret.begin(), ret.end());
|
||||
std::reverse(ret.begin(), ret.end());
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Clear() {
|
||||
if (!std::filesystem::exists(storage_directory)) return;
|
||||
std::filesystem::remove_all(storage_directory);
|
||||
@ -491,6 +514,9 @@ TEST_P(DurabilityTest, SnapshotOnExit) {
|
||||
VerifyExtendedDataset(&store);
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
@ -515,13 +541,16 @@ TEST_P(DurabilityTest, SnapshotPeriodic) {
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_type =
|
||||
storage::Config::Durability::SnapshotType::PERIODIC,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
}
|
||||
|
||||
ASSERT_GE(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
@ -545,8 +574,8 @@ TEST_P(DurabilityTest, SnapshotFallback) {
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_type =
|
||||
storage::Config::Durability::SnapshotType::PERIODIC,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000)}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
@ -554,6 +583,9 @@ TEST_P(DurabilityTest, SnapshotFallback) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
}
|
||||
|
||||
ASSERT_GE(GetSnapshotsList().size(), 2);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Destroy last snapshot.
|
||||
{
|
||||
auto snapshots = GetSnapshotsList();
|
||||
@ -589,13 +621,29 @@ TEST_P(DurabilityTest, SnapshotFallback) {
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, SnapshotRetention) {
|
||||
// Create unrelated snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_on_exit = true}});
|
||||
auto acc = store.Access();
|
||||
for (uint64_t i = 0; i < 1000; ++i) {
|
||||
acc.CreateVertex();
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_GE(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_type =
|
||||
storage::Config::Durability::SnapshotType::PERIODIC,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000),
|
||||
.snapshot_retention_count = 3}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
@ -603,13 +651,24 @@ TEST_P(DurabilityTest, SnapshotRetention) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
|
||||
}
|
||||
|
||||
// Verify that exactly 3 snapshots exist.
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1 + 3);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Verify that exactly 3 snapshots and 1 unrelated snapshot exist.
|
||||
{
|
||||
auto snapshots = GetSnapshotsList();
|
||||
ASSERT_EQ(snapshots.size(), 3);
|
||||
for (const auto &path : snapshots) {
|
||||
ASSERT_EQ(snapshots.size(), 1 + 3);
|
||||
std::string uuid;
|
||||
for (size_t i = 0; i < snapshots.size(); ++i) {
|
||||
const auto &path = snapshots[i];
|
||||
// This shouldn't throw.
|
||||
storage::ReadSnapshotInfo(path);
|
||||
auto info = storage::ReadSnapshotInfo(path);
|
||||
if (i == 0) uuid = info.uuid;
|
||||
if (i < snapshots.size() - 1) {
|
||||
ASSERT_EQ(info.uuid, uuid);
|
||||
} else {
|
||||
ASSERT_NE(info.uuid, uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -645,6 +704,9 @@ TEST_F(DurabilityTest,
|
||||
VerifyExtendedDataset(&store);
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = true},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
@ -746,6 +808,9 @@ TEST_F(DurabilityTest,
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Recover snapshot.
|
||||
storage::Storage store({.items = {.properties_on_edges = false},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
@ -762,3 +827,468 @@ TEST_F(DurabilityTest,
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalBasic) {
|
||||
// Create WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
CreateExtendedDataset(&store);
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalCreateInSingleTransaction) {
|
||||
// NOLINTNEXTLINE(readability-isolate-declaration)
|
||||
storage::Gid gid_v1, gid_v2, gid_e1, gid_v3;
|
||||
|
||||
// Create WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
auto acc = store.Access();
|
||||
auto v1 = acc.CreateVertex();
|
||||
gid_v1 = v1.Gid();
|
||||
auto v2 = acc.CreateVertex();
|
||||
gid_v2 = v2.Gid();
|
||||
auto e1 = acc.CreateEdge(&v1, &v2, store.NameToEdgeType("e1"));
|
||||
ASSERT_TRUE(e1.HasValue());
|
||||
gid_e1 = e1->Gid();
|
||||
ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l11")).HasValue());
|
||||
ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l12")).HasValue());
|
||||
ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l13")).HasValue());
|
||||
if (GetParam()) {
|
||||
ASSERT_TRUE(e1->SetProperty(store.NameToProperty("test"),
|
||||
storage::PropertyValue("nandare"))
|
||||
.HasValue());
|
||||
}
|
||||
ASSERT_TRUE(v2.AddLabel(store.NameToLabel("l21")).HasValue());
|
||||
ASSERT_TRUE(v2.SetProperty(store.NameToProperty("hello"),
|
||||
storage::PropertyValue("world"))
|
||||
.HasValue());
|
||||
auto v3 = acc.CreateVertex();
|
||||
gid_v3 = v3.Gid();
|
||||
ASSERT_TRUE(
|
||||
v3.SetProperty(store.NameToProperty("v3"), storage::PropertyValue(42))
|
||||
.HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalCreateAndRemoveEverything) {
|
||||
// Create WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
CreateExtendedDataset(&store);
|
||||
auto indices = store.ListAllIndices();
|
||||
for (auto index : indices.label) {
|
||||
ASSERT_TRUE(store.DropIndex(index));
|
||||
}
|
||||
for (auto index : indices.label_property) {
|
||||
ASSERT_TRUE(store.DropIndex(index.first, index.second));
|
||||
}
|
||||
auto constraints = store.ListAllConstraints();
|
||||
for (auto constraint : constraints.existence) {
|
||||
ASSERT_TRUE(
|
||||
store.DropExistenceConstraint(constraint.first, constraint.second));
|
||||
}
|
||||
auto acc = store.Access();
|
||||
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
||||
ASSERT_TRUE(acc.DetachDeleteVertex(&vertex).HasValue());
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalTransactionOrdering) {
|
||||
// NOLINTNEXTLINE(readability-isolate-declaration)
|
||||
storage::Gid gid1, gid2, gid3;
|
||||
|
||||
// Create WAL.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery,
|
||||
.wal_file_size_kibibytes = 100000}});
|
||||
auto acc1 = store.Access();
|
||||
auto acc2 = store.Access();
|
||||
|
||||
// Create vertex in transaction 2.
|
||||
{
|
||||
auto vertex2 = acc2.CreateVertex();
|
||||
gid2 = vertex2.Gid();
|
||||
ASSERT_TRUE(vertex2
|
||||
.SetProperty(store.NameToProperty("id"),
|
||||
storage::PropertyValue(2))
|
||||
.HasValue());
|
||||
}
|
||||
|
||||
auto acc3 = store.Access();
|
||||
|
||||
// Create vertex in transaction 3.
|
||||
{
|
||||
auto vertex3 = acc3.CreateVertex();
|
||||
gid3 = vertex3.Gid();
|
||||
ASSERT_TRUE(vertex3
|
||||
.SetProperty(store.NameToProperty("id"),
|
||||
storage::PropertyValue(3))
|
||||
.HasValue());
|
||||
}
|
||||
|
||||
// Create vertex in transaction 1.
|
||||
{
|
||||
auto vertex1 = acc1.CreateVertex();
|
||||
gid1 = vertex1.Gid();
|
||||
ASSERT_TRUE(vertex1
|
||||
.SetProperty(store.NameToProperty("id"),
|
||||
storage::PropertyValue(1))
|
||||
.HasValue());
|
||||
}
|
||||
|
||||
// Commit transaction 3, then 1, then 2.
|
||||
ASSERT_FALSE(acc3.Commit().HasError());
|
||||
ASSERT_FALSE(acc1.Commit().HasError());
|
||||
ASSERT_FALSE(acc2.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_EQ(GetWalsList().size(), 1);
|
||||
|
||||
// Verify WAL data.
|
||||
{
|
||||
auto path = GetWalsList().front();
|
||||
auto info = storage::ReadWalInfo(path);
|
||||
storage::Decoder wal;
|
||||
wal.Initialize(path, storage::kWalMagic);
|
||||
wal.SetPosition(info.offset_deltas);
|
||||
ASSERT_EQ(info.num_deltas, 9);
|
||||
std::vector<std::pair<uint64_t, storage::WalDeltaData>> data;
|
||||
for (uint64_t i = 0; i < info.num_deltas; ++i) {
|
||||
auto timestamp = storage::ReadWalDeltaHeader(&wal);
|
||||
data.emplace_back(timestamp, storage::ReadWalDeltaData(&wal));
|
||||
}
|
||||
// Verify timestamps.
|
||||
ASSERT_EQ(data[1].first, data[0].first);
|
||||
ASSERT_EQ(data[2].first, data[1].first);
|
||||
ASSERT_GT(data[3].first, data[2].first);
|
||||
ASSERT_EQ(data[4].first, data[3].first);
|
||||
ASSERT_EQ(data[5].first, data[4].first);
|
||||
ASSERT_GT(data[6].first, data[5].first);
|
||||
ASSERT_EQ(data[7].first, data[6].first);
|
||||
ASSERT_EQ(data[8].first, data[7].first);
|
||||
// Verify transaction 3.
|
||||
ASSERT_EQ(data[0].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
|
||||
ASSERT_EQ(data[0].second.vertex_create_delete.gid, gid3);
|
||||
ASSERT_EQ(data[1].second.type,
|
||||
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
|
||||
ASSERT_EQ(data[1].second.vertex_edge_set_property.gid, gid3);
|
||||
ASSERT_EQ(data[1].second.vertex_edge_set_property.property, "id");
|
||||
ASSERT_EQ(data[1].second.vertex_edge_set_property.value,
|
||||
storage::PropertyValue(3));
|
||||
ASSERT_EQ(data[2].second.type,
|
||||
storage::WalDeltaData::Type::TRANSACTION_END);
|
||||
// Verify transaction 1.
|
||||
ASSERT_EQ(data[3].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
|
||||
ASSERT_EQ(data[3].second.vertex_create_delete.gid, gid1);
|
||||
ASSERT_EQ(data[4].second.type,
|
||||
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
|
||||
ASSERT_EQ(data[4].second.vertex_edge_set_property.gid, gid1);
|
||||
ASSERT_EQ(data[4].second.vertex_edge_set_property.property, "id");
|
||||
ASSERT_EQ(data[4].second.vertex_edge_set_property.value,
|
||||
storage::PropertyValue(1));
|
||||
ASSERT_EQ(data[5].second.type,
|
||||
storage::WalDeltaData::Type::TRANSACTION_END);
|
||||
// Verify transaction 2.
|
||||
ASSERT_EQ(data[6].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
|
||||
ASSERT_EQ(data[6].second.vertex_create_delete.gid, gid2);
|
||||
ASSERT_EQ(data[7].second.type,
|
||||
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
|
||||
ASSERT_EQ(data[7].second.vertex_edge_set_property.gid, gid2);
|
||||
ASSERT_EQ(data[7].second.vertex_edge_set_property.property, "id");
|
||||
ASSERT_EQ(data[7].second.vertex_edge_set_property.value,
|
||||
storage::PropertyValue(2));
|
||||
ASSERT_EQ(data[8].second.type,
|
||||
storage::WalDeltaData::Type::TRANSACTION_END);
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalCreateAndRemoveOnlyBaseDataset) {
|
||||
// Create WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
CreateExtendedDataset(&store);
|
||||
auto label_indexed = store.NameToLabel("base_indexed");
|
||||
auto label_unindexed = store.NameToLabel("base_unindexed");
|
||||
auto acc = store.Access();
|
||||
for (auto vertex : acc.Vertices(storage::View::OLD)) {
|
||||
auto has_indexed = vertex.HasLabel(label_indexed, storage::View::OLD);
|
||||
ASSERT_TRUE(has_indexed.HasValue());
|
||||
auto has_unindexed = vertex.HasLabel(label_unindexed, storage::View::OLD);
|
||||
if (!*has_indexed && !*has_unindexed) continue;
|
||||
ASSERT_TRUE(acc.DetachDeleteVertex(&vertex).HasValue());
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalDeathResilience) {
|
||||
pid_t pid = fork();
|
||||
if (pid == 0) {
|
||||
// Create WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
// Create one million vertices.
|
||||
for (uint64_t i = 0; i < 1000000; ++i) {
|
||||
auto acc = store.Access();
|
||||
acc.CreateVertex();
|
||||
CHECK(!acc.Commit().HasError()) << "Couldn't commit transaction!";
|
||||
}
|
||||
}
|
||||
} else if (pid > 0) {
|
||||
// Wait for WALs to be created.
|
||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||
int status;
|
||||
EXPECT_EQ(waitpid(pid, &status, WNOHANG), 0);
|
||||
EXPECT_EQ(kill(pid, SIGKILL), 0);
|
||||
EXPECT_EQ(waitpid(pid, &status, 0), pid);
|
||||
EXPECT_NE(status, 0);
|
||||
} else {
|
||||
LOG(FATAL) << "Couldn't create process to execute test!";
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalAllOperationsInSingleTransaction) {
|
||||
// Create WALs
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_size_kibibytes = 1,
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
auto acc = store.Access();
|
||||
auto vertex1 = acc.CreateVertex();
|
||||
auto vertex2 = acc.CreateVertex();
|
||||
ASSERT_TRUE(vertex1.AddLabel(acc.NameToLabel("nandare")).HasValue());
|
||||
ASSERT_TRUE(vertex2
|
||||
.SetProperty(acc.NameToProperty("haihai"),
|
||||
storage::PropertyValue(42))
|
||||
.HasValue());
|
||||
ASSERT_TRUE(vertex1.RemoveLabel(acc.NameToLabel("nandare")).HasValue());
|
||||
auto edge1 = acc.CreateEdge(&vertex1, &vertex2, acc.NameToEdgeType("et1"));
|
||||
ASSERT_TRUE(edge1.HasValue());
|
||||
ASSERT_TRUE(
|
||||
vertex2
|
||||
.SetProperty(acc.NameToProperty("haihai"), storage::PropertyValue())
|
||||
.HasValue());
|
||||
auto vertex3 = acc.CreateVertex();
|
||||
auto edge2 = acc.CreateEdge(&vertex3, &vertex3, acc.NameToEdgeType("et2"));
|
||||
ASSERT_TRUE(edge2.HasValue());
|
||||
if (GetParam()) {
|
||||
ASSERT_TRUE(edge2
|
||||
->SetProperty(acc.NameToProperty("meaning"),
|
||||
storage::PropertyValue(true))
|
||||
.HasValue());
|
||||
ASSERT_TRUE(edge1
|
||||
->SetProperty(acc.NameToProperty("hello"),
|
||||
storage::PropertyValue("world"))
|
||||
.HasValue());
|
||||
ASSERT_TRUE(edge2
|
||||
->SetProperty(acc.NameToProperty("meaning"),
|
||||
storage::PropertyValue())
|
||||
.HasValue());
|
||||
}
|
||||
ASSERT_TRUE(vertex3.AddLabel(acc.NameToLabel("test")).HasValue());
|
||||
ASSERT_TRUE(vertex3
|
||||
.SetProperty(acc.NameToProperty("nonono"),
|
||||
storage::PropertyValue(-1))
|
||||
.HasValue());
|
||||
ASSERT_TRUE(
|
||||
vertex3
|
||||
.SetProperty(acc.NameToProperty("nonono"), storage::PropertyValue())
|
||||
.HasValue());
|
||||
if (GetParam()) {
|
||||
ASSERT_TRUE(edge1
|
||||
->SetProperty(acc.NameToProperty("hello"),
|
||||
storage::PropertyValue())
|
||||
.HasValue());
|
||||
}
|
||||
ASSERT_TRUE(vertex3.RemoveLabel(acc.NameToLabel("test")).HasValue());
|
||||
ASSERT_TRUE(acc.DetachDeleteVertex(&vertex1).HasValue());
|
||||
ASSERT_TRUE(acc.DeleteEdge(&*edge2).HasValue());
|
||||
ASSERT_TRUE(acc.DeleteVertex(&vertex2).HasValue());
|
||||
ASSERT_TRUE(acc.DeleteVertex(&vertex3).HasValue());
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalAndSnapshot) {
|
||||
// Create snapshot and WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::milliseconds(2000),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
|
||||
CreateExtendedDataset(&store);
|
||||
}
|
||||
|
||||
ASSERT_GE(GetSnapshotsList().size(), 1);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshot) {
|
||||
// Create snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_on_exit = true}});
|
||||
CreateBaseDataset(&store, GetParam());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
||||
ASSERT_EQ(GetWalsList().size(), 0);
|
||||
|
||||
// Recover snapshot.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyBaseDataset(&store, GetParam(), false);
|
||||
}
|
||||
|
||||
// Recover snapshot and create WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
CreateExtendedDataset(&store);
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 1);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST_P(DurabilityTest, WalAndSnapshotWalRetention) {
|
||||
// Create unrelated WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::minutes(20),
|
||||
.wal_file_size_kibibytes = 1,
|
||||
.wal_file_flush_every_n_tx = kFlushWalEvery}});
|
||||
auto acc = store.Access();
|
||||
for (uint64_t i = 0; i < 1000; ++i) {
|
||||
acc.CreateVertex();
|
||||
}
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 0);
|
||||
ASSERT_GE(GetWalsList().size(), 1);
|
||||
|
||||
uint64_t unrelated_wals = GetWalsList().size();
|
||||
|
||||
uint64_t items_created = 0;
|
||||
|
||||
// Create snapshot and WALs.
|
||||
{
|
||||
storage::Storage store(
|
||||
{.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.snapshot_wal_mode = storage::Config::Durability::
|
||||
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
|
||||
.snapshot_interval = std::chrono::seconds(2),
|
||||
.wal_file_size_kibibytes = 1,
|
||||
.wal_file_flush_every_n_tx = 1}});
|
||||
utils::Timer timer;
|
||||
// Allow at least 6 snapshots to be created.
|
||||
while (timer.Elapsed().count() < 13.0) {
|
||||
auto acc = store.Access();
|
||||
acc.CreateVertex();
|
||||
ASSERT_FALSE(acc.Commit().HasError());
|
||||
++items_created;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(GetSnapshotsList().size(), 3);
|
||||
ASSERT_GE(GetWalsList().size(), unrelated_wals + 1);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user