Split storage durability implementation into multiple files

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2760
This commit is contained in:
Matej Ferencevic 2020-04-17 10:20:47 +02:00
parent fba5d75bf4
commit 857de23687
20 changed files with 2150 additions and 1987 deletions

View File

@ -1,6 +1,9 @@
set(storage_v2_src_files
constraints.cpp
durability.cpp
durability/durability.cpp
durability/serialization.cpp
durability/snapshot.cpp
durability/wal.cpp
edge_accessor.cpp
indices.cpp
property_store.cpp

View File

@ -1,446 +0,0 @@
#pragma once
#include <cstdint>
#include <filesystem>
#include <functional>
#include <list>
#include <optional>
#include <string>
#include <string_view>
#include <type_traits>
#include <utility>
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/exceptions.hpp"
#include "utils/file.hpp"
#include "utils/scheduler.hpp"
#include "utils/skip_list.hpp"
namespace storage {
static const std::string kSnapshotDirectory{"snapshots"};
static const std::string kWalDirectory{"wal"};
static const std::string kBackupDirectory{".backup"};
// Magic values written to the start of a snapshot/WAL file to identify it.
const std::string kSnapshotMagic{"MGsn"};
const std::string kWalMagic{"MGwl"};
static_assert(std::is_same_v<uint8_t, unsigned char>);
/// Markers that are used to indicate crucial parts of the snapshot/WAL.
/// IMPORTANT: Don't forget to update the list of all markers `kMarkersAll` when
/// you add a new Marker.
enum class Marker : uint8_t {
TYPE_NULL = 0x10,
TYPE_BOOL = 0x11,
TYPE_INT = 0x12,
TYPE_DOUBLE = 0x13,
TYPE_STRING = 0x14,
TYPE_LIST = 0x15,
TYPE_MAP = 0x16,
TYPE_PROPERTY_VALUE = 0x17,
SECTION_VERTEX = 0x20,
SECTION_EDGE = 0x21,
SECTION_MAPPER = 0x22,
SECTION_METADATA = 0x23,
SECTION_INDICES = 0x24,
SECTION_CONSTRAINTS = 0x25,
SECTION_DELTA = 0x26,
SECTION_OFFSETS = 0x42,
DELTA_VERTEX_CREATE = 0x50,
DELTA_VERTEX_DELETE = 0x51,
DELTA_VERTEX_ADD_LABEL = 0x52,
DELTA_VERTEX_REMOVE_LABEL = 0x53,
DELTA_VERTEX_SET_PROPERTY = 0x54,
DELTA_EDGE_CREATE = 0x55,
DELTA_EDGE_DELETE = 0x56,
DELTA_EDGE_SET_PROPERTY = 0x57,
DELTA_TRANSACTION_END = 0x58,
DELTA_LABEL_INDEX_CREATE = 0x59,
DELTA_LABEL_INDEX_DROP = 0x5a,
DELTA_LABEL_PROPERTY_INDEX_CREATE = 0x5b,
DELTA_LABEL_PROPERTY_INDEX_DROP = 0x5c,
DELTA_EXISTENCE_CONSTRAINT_CREATE = 0x5d,
DELTA_EXISTENCE_CONSTRAINT_DROP = 0x5e,
DELTA_UNIQUE_CONSTRAINT_CREATE = 0x5f,
DELTA_UNIQUE_CONSTRAINT_DROP = 0x60,
VALUE_FALSE = 0x00,
VALUE_TRUE = 0xff,
};
/// List of all available markers.
/// IMPORTANT: Don't forget to update this list when you add a new Marker.
static const Marker kMarkersAll[] = {
Marker::TYPE_NULL,
Marker::TYPE_BOOL,
Marker::TYPE_INT,
Marker::TYPE_DOUBLE,
Marker::TYPE_STRING,
Marker::TYPE_LIST,
Marker::TYPE_MAP,
Marker::TYPE_PROPERTY_VALUE,
Marker::SECTION_VERTEX,
Marker::SECTION_EDGE,
Marker::SECTION_MAPPER,
Marker::SECTION_METADATA,
Marker::SECTION_INDICES,
Marker::SECTION_CONSTRAINTS,
Marker::SECTION_DELTA,
Marker::SECTION_OFFSETS,
Marker::DELTA_VERTEX_CREATE,
Marker::DELTA_VERTEX_DELETE,
Marker::DELTA_VERTEX_ADD_LABEL,
Marker::DELTA_VERTEX_REMOVE_LABEL,
Marker::DELTA_VERTEX_SET_PROPERTY,
Marker::DELTA_EDGE_CREATE,
Marker::DELTA_EDGE_DELETE,
Marker::DELTA_EDGE_SET_PROPERTY,
Marker::DELTA_TRANSACTION_END,
Marker::DELTA_LABEL_INDEX_CREATE,
Marker::DELTA_LABEL_INDEX_DROP,
Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE,
Marker::DELTA_LABEL_PROPERTY_INDEX_DROP,
Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE,
Marker::DELTA_EXISTENCE_CONSTRAINT_DROP,
Marker::DELTA_UNIQUE_CONSTRAINT_CREATE,
Marker::DELTA_UNIQUE_CONSTRAINT_DROP,
Marker::VALUE_FALSE,
Marker::VALUE_TRUE,
};
/// Encoder that is used to generate a snapshot/WAL.
class Encoder final {
public:
void Initialize(const std::filesystem::path &path,
const std::string_view &magic, uint64_t version);
// Main write function, the only one that is allowed to write to the `file_`
// directly.
void Write(const uint8_t *data, uint64_t size);
void WriteMarker(Marker marker);
void WriteBool(bool value);
void WriteUint(uint64_t value);
void WriteDouble(double value);
void WriteString(const std::string_view &value);
void WritePropertyValue(const PropertyValue &value);
uint64_t GetPosition();
void SetPosition(uint64_t position);
void Sync();
void Finalize();
private:
utils::OutputFile file_;
};
/// Decoder that is used to read a generated snapshot/WAL.
class Decoder final {
public:
std::optional<uint64_t> Initialize(const std::filesystem::path &path,
const std::string &magic);
// Main read functions, the only one that are allowed to read from the `file_`
// directly.
bool Read(uint8_t *data, size_t size);
bool Peek(uint8_t *data, size_t size);
std::optional<Marker> PeekMarker();
std::optional<Marker> ReadMarker();
std::optional<bool> ReadBool();
std::optional<uint64_t> ReadUint();
std::optional<double> ReadDouble();
std::optional<std::string> ReadString();
std::optional<PropertyValue> ReadPropertyValue();
bool SkipString();
bool SkipPropertyValue();
std::optional<uint64_t> GetSize();
std::optional<uint64_t> GetPosition();
bool SetPosition(uint64_t position);
private:
utils::InputFile file_;
};
/// Exception used to handle errors during recovery.
class RecoveryFailure : public utils::BasicException {
using utils::BasicException::BasicException;
};
/// Structure used to hold information about a snapshot.
struct SnapshotInfo {
uint64_t offset_edges;
uint64_t offset_vertices;
uint64_t offset_indices;
uint64_t offset_constraints;
uint64_t offset_mapper;
uint64_t offset_metadata;
std::string uuid;
uint64_t start_timestamp;
uint64_t edges_count;
uint64_t vertices_count;
};
/// Function used to read information about the snapshot file.
/// @throw RecoveryFailure
SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
/// Structure used to hold information about a WAL.
struct WalInfo {
uint64_t offset_metadata;
uint64_t offset_deltas;
std::string uuid;
uint64_t seq_num;
uint64_t from_timestamp;
uint64_t to_timestamp;
uint64_t num_deltas;
};
/// Function used to read information about the WAL file.
/// @throw RecoveryFailure
WalInfo ReadWalInfo(const std::filesystem::path &path);
/// Structure used to return loaded WAL delta data.
struct WalDeltaData {
enum class Type {
VERTEX_CREATE,
VERTEX_DELETE,
VERTEX_ADD_LABEL,
VERTEX_REMOVE_LABEL,
VERTEX_SET_PROPERTY,
EDGE_CREATE,
EDGE_DELETE,
EDGE_SET_PROPERTY,
TRANSACTION_END,
LABEL_INDEX_CREATE,
LABEL_INDEX_DROP,
LABEL_PROPERTY_INDEX_CREATE,
LABEL_PROPERTY_INDEX_DROP,
EXISTENCE_CONSTRAINT_CREATE,
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
};
Type type{Type::TRANSACTION_END};
struct {
Gid gid;
} vertex_create_delete;
struct {
Gid gid;
std::string label;
} vertex_add_remove_label;
struct {
Gid gid;
std::string property;
PropertyValue value;
} vertex_edge_set_property;
struct {
Gid gid;
std::string edge_type;
Gid from_vertex;
Gid to_vertex;
} edge_create_delete;
struct {
std::string label;
} operation_label;
struct {
std::string label;
std::string property;
} operation_label_property;
struct {
std::string label;
std::set<std::string> properties;
} operation_label_properties;
};
bool operator==(const WalDeltaData &a, const WalDeltaData &b);
bool operator!=(const WalDeltaData &a, const WalDeltaData &b);
/// Function used to read the WAL delta header. The function returns the delta
/// timestamp.
/// @throw RecoveryFailure
uint64_t ReadWalDeltaHeader(Decoder *wal);
/// Function used to read the current WAL delta data. The function returns the
/// read delta data. The WAL delta header must be read before calling this
/// function.
/// @throw RecoveryFailure
WalDeltaData ReadWalDeltaData(Decoder *wal);
/// Function used to skip the current WAL delta data. The function returns the
/// skipped delta type. The WAL delta header must be read before calling this
/// function.
/// @throw RecoveryFailure
WalDeltaData::Type SkipWalDeltaData(Decoder *wal);
/// Enum used to indicate a global database operation that isn't transactional.
enum class StorageGlobalOperation {
LABEL_INDEX_CREATE,
LABEL_INDEX_DROP,
LABEL_PROPERTY_INDEX_CREATE,
LABEL_PROPERTY_INDEX_DROP,
EXISTENCE_CONSTRAINT_CREATE,
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
};
/// Structure used to track indices and constraints during recovery.
struct RecoveredIndicesAndConstraints {
struct {
std::vector<LabelId> label;
std::vector<std::pair<LabelId, PropertyId>> label_property;
} indices;
struct {
std::vector<std::pair<LabelId, PropertyId>> existence;
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
} constraints;
};
/// WalFile class used to append deltas and operations to the WAL file.
class WalFile {
public:
WalFile(const std::filesystem::path &wal_directory, const std::string &uuid,
Config::Items items, NameIdMapper *name_id_mapper, uint64_t seq_num);
WalFile(const WalFile &) = delete;
WalFile(WalFile &&) = delete;
WalFile &operator=(const WalFile &) = delete;
WalFile &operator=(WalFile &&) = delete;
~WalFile();
void AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t timestamp);
void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp);
void AppendTransactionEnd(uint64_t timestamp);
void AppendOperation(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t timestamp);
void Sync();
uint64_t GetSize();
private:
void UpdateStats(uint64_t timestamp);
Config::Items items_;
NameIdMapper *name_id_mapper_;
Encoder wal_;
std::filesystem::path path_;
uint64_t from_timestamp_;
uint64_t to_timestamp_;
uint64_t count_;
};
/// Durability class that is used to provide full durability functionality to
/// the storage.
class Durability final {
public:
struct RecoveryInfo {
uint64_t next_vertex_id{0};
uint64_t next_edge_id{0};
uint64_t next_timestamp{0};
};
struct RecoveredSnapshot {
SnapshotInfo snapshot_info;
RecoveryInfo recovery_info;
RecoveredIndicesAndConstraints indices_constraints;
};
Durability(Config::Durability config, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
std::atomic<uint64_t> *edge_count, Indices *indices,
Constraints *constraints, Config::Items items);
std::optional<RecoveryInfo> Initialize(
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction);
void Finalize();
void AppendToWal(const Transaction &transaction,
uint64_t final_commit_timestamp);
void AppendToWal(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
private:
void CreateSnapshot(Transaction *transaction);
std::optional<RecoveryInfo> RecoverData();
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path);
RecoveryInfo LoadWal(const std::filesystem::path &path,
RecoveredIndicesAndConstraints *indices_constraints,
std::optional<uint64_t> snapshot_timestamp);
bool InitializeWalFile();
void FinalizeWalFile();
Config::Durability config_;
utils::SkipList<Vertex> *vertices_;
utils::SkipList<Edge> *edges_;
NameIdMapper *name_id_mapper_;
std::atomic<uint64_t> *edge_count_;
Indices *indices_;
Constraints *constraints_;
Config::Items items_;
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction_;
std::filesystem::path storage_directory_;
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
utils::Scheduler snapshot_runner_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
std::optional<WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
};
} // namespace storage

View File

@ -0,0 +1,105 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <filesystem>
#include <functional>
#include <optional>
#include <string>
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/scheduler.hpp"
#include "utils/skip_list.hpp"
namespace storage::durability {
/// Durability class that is used to provide full durability functionality to
/// the storage.
class Durability final {
public:
struct RecoveryInfo {
uint64_t next_vertex_id{0};
uint64_t next_edge_id{0};
uint64_t next_timestamp{0};
};
struct RecoveredSnapshot {
SnapshotInfo snapshot_info;
RecoveryInfo recovery_info;
RecoveredIndicesAndConstraints indices_constraints;
};
Durability(Config::Durability config, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
std::atomic<uint64_t> *edge_count, Indices *indices,
Constraints *constraints, Config::Items items);
std::optional<RecoveryInfo> Initialize(
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction);
void Finalize();
void AppendToWal(const Transaction &transaction,
uint64_t final_commit_timestamp);
void AppendToWal(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
private:
void CreateSnapshot(Transaction *transaction);
std::optional<RecoveryInfo> RecoverData();
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path);
RecoveryInfo LoadWal(const std::filesystem::path &path,
RecoveredIndicesAndConstraints *indices_constraints,
std::optional<uint64_t> snapshot_timestamp);
bool InitializeWalFile();
void FinalizeWalFile();
Config::Durability config_;
utils::SkipList<Vertex> *vertices_;
utils::SkipList<Edge> *edges_;
NameIdMapper *name_id_mapper_;
std::atomic<uint64_t> *edge_count_;
Indices *indices_;
Constraints *constraints_;
Config::Items items_;
std::function<void(std::function<void(Transaction *)>)>
execute_with_transaction_;
std::filesystem::path storage_directory_;
std::filesystem::path snapshot_directory_;
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
utils::Scheduler snapshot_runner_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
std::optional<WalFile> wal_file_;
uint64_t wal_unsynced_transactions_{0};
};
} // namespace storage::durability

View File

@ -0,0 +1,12 @@
#pragma once
#include "utils/exceptions.hpp"
namespace storage::durability {
/// Exception used to handle errors during recovery.
class RecoveryFailure : public utils::BasicException {
using utils::BasicException::BasicException;
};
} // namespace storage::durability

View File

@ -0,0 +1,91 @@
#pragma once
#include <cstdint>
namespace storage::durability {
/// Markers that are used to indicate crucial parts of the snapshot/WAL.
/// IMPORTANT: Don't forget to update the list of all markers `kMarkersAll` when
/// you add a new Marker.
enum class Marker : uint8_t {
TYPE_NULL = 0x10,
TYPE_BOOL = 0x11,
TYPE_INT = 0x12,
TYPE_DOUBLE = 0x13,
TYPE_STRING = 0x14,
TYPE_LIST = 0x15,
TYPE_MAP = 0x16,
TYPE_PROPERTY_VALUE = 0x17,
SECTION_VERTEX = 0x20,
SECTION_EDGE = 0x21,
SECTION_MAPPER = 0x22,
SECTION_METADATA = 0x23,
SECTION_INDICES = 0x24,
SECTION_CONSTRAINTS = 0x25,
SECTION_DELTA = 0x26,
SECTION_OFFSETS = 0x42,
DELTA_VERTEX_CREATE = 0x50,
DELTA_VERTEX_DELETE = 0x51,
DELTA_VERTEX_ADD_LABEL = 0x52,
DELTA_VERTEX_REMOVE_LABEL = 0x53,
DELTA_VERTEX_SET_PROPERTY = 0x54,
DELTA_EDGE_CREATE = 0x55,
DELTA_EDGE_DELETE = 0x56,
DELTA_EDGE_SET_PROPERTY = 0x57,
DELTA_TRANSACTION_END = 0x58,
DELTA_LABEL_INDEX_CREATE = 0x59,
DELTA_LABEL_INDEX_DROP = 0x5a,
DELTA_LABEL_PROPERTY_INDEX_CREATE = 0x5b,
DELTA_LABEL_PROPERTY_INDEX_DROP = 0x5c,
DELTA_EXISTENCE_CONSTRAINT_CREATE = 0x5d,
DELTA_EXISTENCE_CONSTRAINT_DROP = 0x5e,
DELTA_UNIQUE_CONSTRAINT_CREATE = 0x5f,
DELTA_UNIQUE_CONSTRAINT_DROP = 0x60,
VALUE_FALSE = 0x00,
VALUE_TRUE = 0xff,
};
/// List of all available markers.
/// IMPORTANT: Don't forget to update this list when you add a new Marker.
static const Marker kMarkersAll[] = {
Marker::TYPE_NULL,
Marker::TYPE_BOOL,
Marker::TYPE_INT,
Marker::TYPE_DOUBLE,
Marker::TYPE_STRING,
Marker::TYPE_LIST,
Marker::TYPE_MAP,
Marker::TYPE_PROPERTY_VALUE,
Marker::SECTION_VERTEX,
Marker::SECTION_EDGE,
Marker::SECTION_MAPPER,
Marker::SECTION_METADATA,
Marker::SECTION_INDICES,
Marker::SECTION_CONSTRAINTS,
Marker::SECTION_DELTA,
Marker::SECTION_OFFSETS,
Marker::DELTA_VERTEX_CREATE,
Marker::DELTA_VERTEX_DELETE,
Marker::DELTA_VERTEX_ADD_LABEL,
Marker::DELTA_VERTEX_REMOVE_LABEL,
Marker::DELTA_VERTEX_SET_PROPERTY,
Marker::DELTA_EDGE_CREATE,
Marker::DELTA_EDGE_DELETE,
Marker::DELTA_EDGE_SET_PROPERTY,
Marker::DELTA_TRANSACTION_END,
Marker::DELTA_LABEL_INDEX_CREATE,
Marker::DELTA_LABEL_INDEX_DROP,
Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE,
Marker::DELTA_LABEL_PROPERTY_INDEX_DROP,
Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE,
Marker::DELTA_EXISTENCE_CONSTRAINT_DROP,
Marker::DELTA_UNIQUE_CONSTRAINT_CREATE,
Marker::DELTA_UNIQUE_CONSTRAINT_DROP,
Marker::VALUE_FALSE,
Marker::VALUE_TRUE,
};
} // namespace storage::durability

View File

@ -0,0 +1,55 @@
#pragma once
#include <algorithm>
#include <set>
#include <utility>
#include <vector>
#include "storage/v2/durability/exceptions.hpp"
#include "storage/v2/id_types.hpp"
namespace storage::durability {
/// Structure used to track indices and constraints during recovery.
struct RecoveredIndicesAndConstraints {
struct {
std::vector<LabelId> label;
std::vector<std::pair<LabelId, PropertyId>> label_property;
} indices;
struct {
std::vector<std::pair<LabelId, PropertyId>> existence;
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
} constraints;
};
// Helper function used to insert indices/constraints into the recovered
// indices/constraints object.
// @throw RecoveryFailure
template <typename TObj>
void AddRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj,
const char *error_message) {
auto it = std::find(list->begin(), list->end(), obj);
if (it == list->end()) {
list->push_back(obj);
} else {
throw RecoveryFailure(error_message);
}
}
// Helper function used to remove indices/constraints from the recovered
// indices/constraints object.
// @throw RecoveryFailure
template <typename TObj>
void RemoveRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj,
const char *error_message) {
auto it = std::find(list->begin(), list->end(), obj);
if (it != list->end()) {
std::swap(*it, list->back());
list->pop_back();
} else {
throw RecoveryFailure(error_message);
}
}
} // namespace storage::durability

View File

@ -0,0 +1,41 @@
#pragma once
#include <cstdint>
#include <string>
#include "utils/timestamp.hpp"
namespace storage::durability {
static const std::string kSnapshotDirectory{"snapshots"};
static const std::string kWalDirectory{"wal"};
static const std::string kBackupDirectory{".backup"};
// This is the prefix used for Snapshot and WAL filenames. It is a timestamp
// format that equals to: YYYYmmddHHMMSSffffff
const std::string kTimestampFormat =
"{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}{:06d}";
// Generates the name for a snapshot in a well-defined sortable format with the
// start timestamp appended to the file name.
inline std::string MakeSnapshotName(uint64_t start_timestamp) {
std::string date_str = utils::Timestamp::Now().ToString(kTimestampFormat);
return date_str + "_timestamp_" + std::to_string(start_timestamp);
}
// Generates the name for a WAL file in a well-defined sortable format.
inline std::string MakeWalName() {
std::string date_str = utils::Timestamp::Now().ToString(kTimestampFormat);
return date_str + "_current";
}
// Generates the name for a WAL file in a well-defined sortable format with the
// range of timestamps contained [from, to] appended to the name.
inline std::string RemakeWalName(const std::string &current_name,
uint64_t from_timestamp,
uint64_t to_timestamp) {
return current_name.substr(0, current_name.size() - 8) + "_from_" +
std::to_string(from_timestamp) + "_to_" + std::to_string(to_timestamp);
}
} // namespace storage::durability

View File

@ -0,0 +1,425 @@
#include "storage/v2/durability/serialization.hpp"
#include "utils/endian.hpp"
namespace storage::durability {
//////////////////////////
// Encoder implementation.
//////////////////////////
namespace {
void WriteSize(Encoder *encoder, uint64_t size) {
size = utils::HostToLittleEndian(size);
encoder->Write(reinterpret_cast<const uint8_t *>(&size), sizeof(size));
}
} // namespace
void Encoder::Initialize(const std::filesystem::path &path,
const std::string_view &magic, uint64_t version) {
file_.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
Write(reinterpret_cast<const uint8_t *>(magic.data()), magic.size());
auto version_encoded = utils::HostToLittleEndian(version);
Write(reinterpret_cast<const uint8_t *>(&version_encoded),
sizeof(version_encoded));
}
void Encoder::Write(const uint8_t *data, uint64_t size) {
file_.Write(data, size);
}
void Encoder::WriteMarker(Marker marker) {
auto value = static_cast<uint8_t>(marker);
Write(&value, sizeof(value));
}
void Encoder::WriteBool(bool value) {
WriteMarker(Marker::TYPE_BOOL);
if (value) {
WriteMarker(Marker::VALUE_TRUE);
} else {
WriteMarker(Marker::VALUE_FALSE);
}
}
void Encoder::WriteUint(uint64_t value) {
value = utils::HostToLittleEndian(value);
WriteMarker(Marker::TYPE_INT);
Write(reinterpret_cast<const uint8_t *>(&value), sizeof(value));
}
void Encoder::WriteDouble(double value) {
auto value_uint = utils::MemcpyCast<uint64_t>(value);
value_uint = utils::HostToLittleEndian(value_uint);
WriteMarker(Marker::TYPE_DOUBLE);
Write(reinterpret_cast<const uint8_t *>(&value_uint), sizeof(value_uint));
}
void Encoder::WriteString(const std::string_view &value) {
WriteMarker(Marker::TYPE_STRING);
WriteSize(this, value.size());
Write(reinterpret_cast<const uint8_t *>(value.data()), value.size());
}
void Encoder::WritePropertyValue(const PropertyValue &value) {
WriteMarker(Marker::TYPE_PROPERTY_VALUE);
switch (value.type()) {
case PropertyValue::Type::Null: {
WriteMarker(Marker::TYPE_NULL);
break;
}
case PropertyValue::Type::Bool: {
WriteBool(value.ValueBool());
break;
}
case PropertyValue::Type::Int: {
WriteUint(utils::MemcpyCast<uint64_t>(value.ValueInt()));
break;
}
case PropertyValue::Type::Double: {
WriteDouble(value.ValueDouble());
break;
}
case PropertyValue::Type::String: {
WriteString(value.ValueString());
break;
}
case PropertyValue::Type::List: {
const auto &list = value.ValueList();
WriteMarker(Marker::TYPE_LIST);
WriteSize(this, list.size());
for (const auto &item : list) {
WritePropertyValue(item);
}
break;
}
case PropertyValue::Type::Map: {
const auto &map = value.ValueMap();
WriteMarker(Marker::TYPE_MAP);
WriteSize(this, map.size());
for (const auto &item : map) {
WriteString(item.first);
WritePropertyValue(item.second);
}
break;
}
}
}
uint64_t Encoder::GetPosition() { return file_.GetPosition(); }
void Encoder::SetPosition(uint64_t position) {
file_.SetPosition(utils::OutputFile::Position::SET, position);
}
void Encoder::Sync() { file_.Sync(); }
void Encoder::Finalize() {
file_.Sync();
file_.Close();
}
//////////////////////////
// Decoder implementation.
//////////////////////////
namespace {
std::optional<Marker> CastToMarker(uint8_t value) {
for (auto marker : kMarkersAll) {
if (static_cast<uint8_t>(marker) == value) {
return marker;
}
}
return std::nullopt;
}
std::optional<uint64_t> ReadSize(Decoder *decoder) {
uint64_t size;
if (!decoder->Read(reinterpret_cast<uint8_t *>(&size), sizeof(size)))
return std::nullopt;
size = utils::LittleEndianToHost(size);
return size;
}
} // namespace
std::optional<uint64_t> Decoder::Initialize(const std::filesystem::path &path,
const std::string &magic) {
if (!file_.Open(path)) return std::nullopt;
std::string file_magic(magic.size(), '\0');
if (!Read(reinterpret_cast<uint8_t *>(file_magic.data()), file_magic.size()))
return std::nullopt;
if (file_magic != magic) return std::nullopt;
uint64_t version_encoded;
if (!Read(reinterpret_cast<uint8_t *>(&version_encoded),
sizeof(version_encoded)))
return std::nullopt;
return utils::LittleEndianToHost(version_encoded);
}
bool Decoder::Read(uint8_t *data, size_t size) {
return file_.Read(data, size);
}
bool Decoder::Peek(uint8_t *data, size_t size) {
return file_.Peek(data, size);
}
std::optional<Marker> Decoder::PeekMarker() {
uint8_t value;
if (!Peek(&value, sizeof(value))) return std::nullopt;
auto marker = CastToMarker(value);
if (!marker) return std::nullopt;
return *marker;
}
std::optional<Marker> Decoder::ReadMarker() {
uint8_t value;
if (!Read(&value, sizeof(value))) return std::nullopt;
auto marker = CastToMarker(value);
if (!marker) return std::nullopt;
return *marker;
}
std::optional<bool> Decoder::ReadBool() {
auto marker = ReadMarker();
if (!marker || *marker != Marker::TYPE_BOOL) return std::nullopt;
auto value = ReadMarker();
if (!value || (*value != Marker::VALUE_FALSE && *value != Marker::VALUE_TRUE))
return std::nullopt;
return *value == Marker::VALUE_TRUE;
}
std::optional<uint64_t> Decoder::ReadUint() {
auto marker = ReadMarker();
if (!marker || *marker != Marker::TYPE_INT) return std::nullopt;
uint64_t value;
if (!Read(reinterpret_cast<uint8_t *>(&value), sizeof(value)))
return std::nullopt;
value = utils::LittleEndianToHost(value);
return value;
}
std::optional<double> Decoder::ReadDouble() {
auto marker = ReadMarker();
if (!marker || *marker != Marker::TYPE_DOUBLE) return std::nullopt;
uint64_t value_int;
if (!Read(reinterpret_cast<uint8_t *>(&value_int), sizeof(value_int)))
return std::nullopt;
value_int = utils::LittleEndianToHost(value_int);
auto value = utils::MemcpyCast<double>(value_int);
return value;
}
std::optional<std::string> Decoder::ReadString() {
auto marker = ReadMarker();
if (!marker || *marker != Marker::TYPE_STRING) return std::nullopt;
auto size = ReadSize(this);
if (!size) return std::nullopt;
std::string value(*size, '\0');
if (!Read(reinterpret_cast<uint8_t *>(value.data()), *size))
return std::nullopt;
return value;
}
std::optional<PropertyValue> Decoder::ReadPropertyValue() {
auto pv_marker = ReadMarker();
if (!pv_marker || *pv_marker != Marker::TYPE_PROPERTY_VALUE)
return std::nullopt;
auto marker = PeekMarker();
if (!marker) return std::nullopt;
switch (*marker) {
case Marker::TYPE_NULL: {
auto inner_marker = ReadMarker();
if (!inner_marker || *inner_marker != Marker::TYPE_NULL)
return std::nullopt;
return PropertyValue();
}
case Marker::TYPE_BOOL: {
auto value = ReadBool();
if (!value) return std::nullopt;
return PropertyValue(*value);
}
case Marker::TYPE_INT: {
auto value = ReadUint();
if (!value) return std::nullopt;
return PropertyValue(utils::MemcpyCast<int64_t>(*value));
}
case Marker::TYPE_DOUBLE: {
auto value = ReadDouble();
if (!value) return std::nullopt;
return PropertyValue(*value);
}
case Marker::TYPE_STRING: {
auto value = ReadString();
if (!value) return std::nullopt;
return PropertyValue(std::move(*value));
}
case Marker::TYPE_LIST: {
auto inner_marker = ReadMarker();
if (!inner_marker || *inner_marker != Marker::TYPE_LIST)
return std::nullopt;
auto size = ReadSize(this);
if (!size) return std::nullopt;
std::vector<PropertyValue> value;
value.reserve(*size);
for (uint64_t i = 0; i < *size; ++i) {
auto item = ReadPropertyValue();
if (!item) return std::nullopt;
value.emplace_back(std::move(*item));
}
return PropertyValue(std::move(value));
}
case Marker::TYPE_MAP: {
auto inner_marker = ReadMarker();
if (!inner_marker || *inner_marker != Marker::TYPE_MAP)
return std::nullopt;
auto size = ReadSize(this);
if (!size) return std::nullopt;
std::map<std::string, PropertyValue> value;
for (uint64_t i = 0; i < *size; ++i) {
auto key = ReadString();
if (!key) return std::nullopt;
auto item = ReadPropertyValue();
if (!item) return std::nullopt;
value.emplace(std::move(*key), std::move(*item));
}
return PropertyValue(std::move(value));
}
case Marker::TYPE_PROPERTY_VALUE:
case Marker::SECTION_VERTEX:
case Marker::SECTION_EDGE:
case Marker::SECTION_MAPPER:
case Marker::SECTION_METADATA:
case Marker::SECTION_INDICES:
case Marker::SECTION_CONSTRAINTS:
case Marker::SECTION_DELTA:
case Marker::SECTION_OFFSETS:
case Marker::DELTA_VERTEX_CREATE:
case Marker::DELTA_VERTEX_DELETE:
case Marker::DELTA_VERTEX_ADD_LABEL:
case Marker::DELTA_VERTEX_REMOVE_LABEL:
case Marker::DELTA_VERTEX_SET_PROPERTY:
case Marker::DELTA_EDGE_CREATE:
case Marker::DELTA_EDGE_DELETE:
case Marker::DELTA_EDGE_SET_PROPERTY:
case Marker::DELTA_TRANSACTION_END:
case Marker::DELTA_LABEL_INDEX_CREATE:
case Marker::DELTA_LABEL_INDEX_DROP:
case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
case Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
case Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
case Marker::VALUE_FALSE:
case Marker::VALUE_TRUE:
return std::nullopt;
}
}
bool Decoder::SkipString() {
auto marker = ReadMarker();
if (!marker || *marker != Marker::TYPE_STRING) return false;
auto maybe_size = ReadSize(this);
if (!maybe_size) return false;
const uint64_t kBufferSize = 262144;
uint8_t buffer[kBufferSize];
uint64_t size = *maybe_size;
while (size > 0) {
uint64_t to_read = size < kBufferSize ? size : kBufferSize;
if (!Read(reinterpret_cast<uint8_t *>(&buffer), to_read)) return false;
size -= to_read;
}
return true;
}
bool Decoder::SkipPropertyValue() {
auto pv_marker = ReadMarker();
if (!pv_marker || *pv_marker != Marker::TYPE_PROPERTY_VALUE) return false;
auto marker = PeekMarker();
if (!marker) return false;
switch (*marker) {
case Marker::TYPE_NULL: {
auto inner_marker = ReadMarker();
return inner_marker && *inner_marker == Marker::TYPE_NULL;
}
case Marker::TYPE_BOOL: {
return !!ReadBool();
}
case Marker::TYPE_INT: {
return !!ReadUint();
}
case Marker::TYPE_DOUBLE: {
return !!ReadDouble();
}
case Marker::TYPE_STRING: {
return SkipString();
}
case Marker::TYPE_LIST: {
auto inner_marker = ReadMarker();
if (!inner_marker || *inner_marker != Marker::TYPE_LIST) return false;
auto size = ReadSize(this);
if (!size) return false;
for (uint64_t i = 0; i < *size; ++i) {
if (!SkipPropertyValue()) return false;
}
return true;
}
case Marker::TYPE_MAP: {
auto inner_marker = ReadMarker();
if (!inner_marker || *inner_marker != Marker::TYPE_MAP) return false;
auto size = ReadSize(this);
if (!size) return false;
for (uint64_t i = 0; i < *size; ++i) {
if (!SkipString()) return false;
if (!SkipPropertyValue()) return false;
}
return true;
}
case Marker::TYPE_PROPERTY_VALUE:
case Marker::SECTION_VERTEX:
case Marker::SECTION_EDGE:
case Marker::SECTION_MAPPER:
case Marker::SECTION_METADATA:
case Marker::SECTION_INDICES:
case Marker::SECTION_CONSTRAINTS:
case Marker::SECTION_DELTA:
case Marker::SECTION_OFFSETS:
case Marker::DELTA_VERTEX_CREATE:
case Marker::DELTA_VERTEX_DELETE:
case Marker::DELTA_VERTEX_ADD_LABEL:
case Marker::DELTA_VERTEX_REMOVE_LABEL:
case Marker::DELTA_VERTEX_SET_PROPERTY:
case Marker::DELTA_EDGE_CREATE:
case Marker::DELTA_EDGE_DELETE:
case Marker::DELTA_EDGE_SET_PROPERTY:
case Marker::DELTA_TRANSACTION_END:
case Marker::DELTA_LABEL_INDEX_CREATE:
case Marker::DELTA_LABEL_INDEX_DROP:
case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
case Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
case Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
case Marker::VALUE_FALSE:
case Marker::VALUE_TRUE:
return false;
}
}
std::optional<uint64_t> Decoder::GetSize() { return file_.GetSize(); }
std::optional<uint64_t> Decoder::GetPosition() { return file_.GetPosition(); }
bool Decoder::SetPosition(uint64_t position) {
return !!file_.SetPosition(utils::InputFile::Position::SET, position);
}
} // namespace storage::durability

View File

@ -0,0 +1,74 @@
#pragma once
#include <cstdint>
#include <filesystem>
#include <string_view>
#include "storage/v2/config.hpp"
#include "storage/v2/durability/marker.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/file.hpp"
namespace storage::durability {
/// Encoder that is used to generate a snapshot/WAL.
class Encoder final {
public:
void Initialize(const std::filesystem::path &path,
const std::string_view &magic, uint64_t version);
// Main write function, the only one that is allowed to write to the `file_`
// directly.
void Write(const uint8_t *data, uint64_t size);
void WriteMarker(Marker marker);
void WriteBool(bool value);
void WriteUint(uint64_t value);
void WriteDouble(double value);
void WriteString(const std::string_view &value);
void WritePropertyValue(const PropertyValue &value);
uint64_t GetPosition();
void SetPosition(uint64_t position);
void Sync();
void Finalize();
private:
utils::OutputFile file_;
};
/// Decoder that is used to read a generated snapshot/WAL.
class Decoder final {
public:
std::optional<uint64_t> Initialize(const std::filesystem::path &path,
const std::string &magic);
// Main read functions, the only one that are allowed to read from the `file_`
// directly.
bool Read(uint8_t *data, size_t size);
bool Peek(uint8_t *data, size_t size);
std::optional<Marker> PeekMarker();
std::optional<Marker> ReadMarker();
std::optional<bool> ReadBool();
std::optional<uint64_t> ReadUint();
std::optional<double> ReadDouble();
std::optional<std::string> ReadString();
std::optional<PropertyValue> ReadPropertyValue();
bool SkipString();
bool SkipPropertyValue();
std::optional<uint64_t> GetSize();
std::optional<uint64_t> GetPosition();
bool SetPosition(uint64_t position);
private:
utils::InputFile file_;
};
} // namespace storage::durability

View File

@ -0,0 +1,142 @@
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/exceptions.hpp"
#include "storage/v2/durability/serialization.hpp"
#include "storage/v2/durability/version.hpp"
namespace storage::durability {
// Snapshot format:
//
// 1) Magic string (non-encoded)
//
// 2) Snapshot version (non-encoded, little-endian)
//
// 3) Section offsets:
// * offset to the first edge in the snapshot (`0` if properties on edges
// are disabled)
// * offset to the first vertex in the snapshot
// * offset to the indices section
// * offset to the constraints section
// * offset to the mapper section
// * offset to the metadata section
//
// 4) Encoded edges (if properties on edges are enabled); each edge is written
// in the following format:
// * gid
// * properties
//
// 5) Encoded vertices; each vertex is written in the following format:
// * gid
// * labels
// * properties
// * in edges
// * edge gid
// * from vertex gid
// * edge type
// * out edges
// * edge gid
// * to vertex gid
// * edge type
//
// 6) Indices
// * label indices
// * label
// * label+property indices
// * label
// * property
//
// 7) Constraints
// * existence constraints
// * label
// * property
// * unique constraints (from version 13)
// * label
// * properties
//
// 8) Name to ID mapper data
// * id to name mappings
// * id
// * name
//
// 9) Metadata
// * storage UUID
// * snapshot transaction start timestamp (required when recovering
// from snapshot combined with WAL to determine what deltas need to be
// applied)
// * number of edges
// * number of vertices
//
// IMPORTANT: When changing snapshot encoding/decoding bump the snapshot/WAL
// version in `version.hpp`.
// Function used to read information about the snapshot file.
SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) {
// Check magic and version.
Decoder snapshot;
auto version = snapshot.Initialize(path, kSnapshotMagic);
if (!version)
throw RecoveryFailure("Couldn't read snapshot magic and/or version!");
if (!IsVersionSupported(*version))
throw RecoveryFailure("Invalid snapshot version!");
// Prepare return value.
SnapshotInfo info;
// Read offsets.
{
auto marker = snapshot.ReadMarker();
if (!marker || *marker != Marker::SECTION_OFFSETS)
throw RecoveryFailure("Invalid snapshot data!");
auto snapshot_size = snapshot.GetSize();
if (!snapshot_size)
throw RecoveryFailure("Couldn't read data from snapshot!");
auto read_offset = [&snapshot, snapshot_size] {
auto maybe_offset = snapshot.ReadUint();
if (!maybe_offset) throw RecoveryFailure("Invalid snapshot format!");
auto offset = *maybe_offset;
if (offset > *snapshot_size)
throw RecoveryFailure("Invalid snapshot format!");
return offset;
};
info.offset_edges = read_offset();
info.offset_vertices = read_offset();
info.offset_indices = read_offset();
info.offset_constraints = read_offset();
info.offset_mapper = read_offset();
info.offset_metadata = read_offset();
}
// Read metadata.
{
if (!snapshot.SetPosition(info.offset_metadata))
throw RecoveryFailure("Couldn't read data from snapshot!");
auto marker = snapshot.ReadMarker();
if (!marker || *marker != Marker::SECTION_METADATA)
throw RecoveryFailure("Invalid snapshot data!");
auto maybe_uuid = snapshot.ReadString();
if (!maybe_uuid) throw RecoveryFailure("Invalid snapshot data!");
info.uuid = std::move(*maybe_uuid);
auto maybe_timestamp = snapshot.ReadUint();
if (!maybe_timestamp) throw RecoveryFailure("Invalid snapshot data!");
info.start_timestamp = *maybe_timestamp;
auto maybe_edges = snapshot.ReadUint();
if (!maybe_edges) throw RecoveryFailure("Invalid snapshot data!");
info.edges_count = *maybe_edges;
auto maybe_vertices = snapshot.ReadUint();
if (!maybe_vertices) throw RecoveryFailure("Invalid snapshot data!");
info.vertices_count = *maybe_vertices;
}
return info;
}
} // namespace storage::durability

View File

@ -0,0 +1,28 @@
#pragma once
#include <cstdint>
#include <filesystem>
#include <string>
namespace storage::durability {
/// Structure used to hold information about a snapshot.
struct SnapshotInfo {
uint64_t offset_edges;
uint64_t offset_vertices;
uint64_t offset_indices;
uint64_t offset_constraints;
uint64_t offset_mapper;
uint64_t offset_metadata;
std::string uuid;
uint64_t start_timestamp;
uint64_t edges_count;
uint64_t vertices_count;
};
/// Function used to read information about the snapshot file.
/// @throw RecoveryFailure
SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
} // namespace storage::durability

View File

@ -0,0 +1,28 @@
#pragma once
#include <cstdint>
#include <string>
#include <type_traits>
namespace storage::durability {
// The current version of snapshot and WAL encoding / decoding.
// IMPORTANT: Please bump this version for every snapshot and/or WAL format
// change!!!
const uint64_t kVersion{13};
const uint64_t kOldestSupportedVersion{12};
const uint64_t kUniqueConstraintVersion{13};
// Magic values written to the start of a snapshot/WAL file to identify it.
const std::string kSnapshotMagic{"MGsn"};
const std::string kWalMagic{"MGwl"};
static_assert(std::is_same_v<uint8_t, unsigned char>);
// Checks whether the loaded snapshot/WAL version is supported.
inline bool IsVersionSupported(uint64_t version) {
return version >= kOldestSupportedVersion && version <= kVersion;
}
} // namespace storage::durability

View File

@ -0,0 +1,733 @@
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/durability/exceptions.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/vertex.hpp"
namespace storage::durability {
// WAL format:
//
// 1) Magic string (non-encoded)
//
// 2) WAL version (non-encoded, little-endian)
//
// 3) Section offsets:
// * offset to the metadata section
// * offset to the first delta in the WAL
//
// 4) Metadata
// * storage UUID
// * sequence number (number indicating the sequence position of this WAL
// file)
//
// 5) Encoded deltas; each delta is written in the following format:
// * commit timestamp
// * action (only one of the actions below are encoded)
// * vertex create, vertex delete
// * gid
// * vertex add label, vertex remove label
// * gid
// * label name
// * vertex set property
// * gid
// * property name
// * property value
// * edge create, edge delete
// * gid
// * edge type name
// * from vertex gid
// * to vertex gid
// * edge set property
// * gid
// * property name
// * property value
// * transaction end (marks that the whole transaction is
// stored in the WAL file)
// * label index create, label index drop
// * label name
// * label property index create, label property index drop,
// existence constraint create, existence constraint drop
// * label name
// * property name
// * unique constraint create, unique constraint drop
// * label name
// * property names
//
// IMPORTANT: When changing WAL encoding/decoding bump the snapshot/WAL version
// in `version.hpp`.
namespace {
Marker OperationToMarker(StorageGlobalOperation operation) {
switch (operation) {
case StorageGlobalOperation::LABEL_INDEX_CREATE:
return Marker::DELTA_LABEL_INDEX_CREATE;
case StorageGlobalOperation::LABEL_INDEX_DROP:
return Marker::DELTA_LABEL_INDEX_DROP;
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
return Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE;
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
return Marker::DELTA_LABEL_PROPERTY_INDEX_DROP;
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
return Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE;
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
return Marker::DELTA_EXISTENCE_CONSTRAINT_DROP;
case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
return Marker::DELTA_UNIQUE_CONSTRAINT_CREATE;
case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP:
return Marker::DELTA_UNIQUE_CONSTRAINT_DROP;
}
}
Marker VertexActionToMarker(Delta::Action action) {
// When converting a Delta to a WAL delta the logic is inverted. That is
// because the Delta's represent undo actions and we want to store redo
// actions.
switch (action) {
case Delta::Action::DELETE_OBJECT:
return Marker::DELTA_VERTEX_CREATE;
case Delta::Action::RECREATE_OBJECT:
return Marker::DELTA_VERTEX_DELETE;
case Delta::Action::SET_PROPERTY:
return Marker::DELTA_VERTEX_SET_PROPERTY;
case Delta::Action::ADD_LABEL:
return Marker::DELTA_VERTEX_REMOVE_LABEL;
case Delta::Action::REMOVE_LABEL:
return Marker::DELTA_VERTEX_ADD_LABEL;
case Delta::Action::ADD_IN_EDGE:
return Marker::DELTA_EDGE_DELETE;
case Delta::Action::ADD_OUT_EDGE:
return Marker::DELTA_EDGE_DELETE;
case Delta::Action::REMOVE_IN_EDGE:
return Marker::DELTA_EDGE_CREATE;
case Delta::Action::REMOVE_OUT_EDGE:
return Marker::DELTA_EDGE_CREATE;
}
}
// This function convertes a Marker to a WalDeltaData::Type. It checks for the
// validity of the marker and throws if an invalid marker is specified.
// @throw RecoveryFailure
WalDeltaData::Type MarkerToWalDeltaDataType(Marker marker) {
switch (marker) {
case Marker::DELTA_VERTEX_CREATE:
return WalDeltaData::Type::VERTEX_CREATE;
case Marker::DELTA_VERTEX_DELETE:
return WalDeltaData::Type::VERTEX_DELETE;
case Marker::DELTA_VERTEX_ADD_LABEL:
return WalDeltaData::Type::VERTEX_ADD_LABEL;
case Marker::DELTA_VERTEX_REMOVE_LABEL:
return WalDeltaData::Type::VERTEX_REMOVE_LABEL;
case Marker::DELTA_EDGE_CREATE:
return WalDeltaData::Type::EDGE_CREATE;
case Marker::DELTA_EDGE_DELETE:
return WalDeltaData::Type::EDGE_DELETE;
case Marker::DELTA_VERTEX_SET_PROPERTY:
return WalDeltaData::Type::VERTEX_SET_PROPERTY;
case Marker::DELTA_EDGE_SET_PROPERTY:
return WalDeltaData::Type::EDGE_SET_PROPERTY;
case Marker::DELTA_TRANSACTION_END:
return WalDeltaData::Type::TRANSACTION_END;
case Marker::DELTA_LABEL_INDEX_CREATE:
return WalDeltaData::Type::LABEL_INDEX_CREATE;
case Marker::DELTA_LABEL_INDEX_DROP:
return WalDeltaData::Type::LABEL_INDEX_DROP;
case Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
return WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE;
case Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
return WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP;
case Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
return WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE;
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
return WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP;
case Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
return WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE;
case Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
return WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP;
case Marker::TYPE_NULL:
case Marker::TYPE_BOOL:
case Marker::TYPE_INT:
case Marker::TYPE_DOUBLE:
case Marker::TYPE_STRING:
case Marker::TYPE_LIST:
case Marker::TYPE_MAP:
case Marker::TYPE_PROPERTY_VALUE:
case Marker::SECTION_VERTEX:
case Marker::SECTION_EDGE:
case Marker::SECTION_MAPPER:
case Marker::SECTION_METADATA:
case Marker::SECTION_INDICES:
case Marker::SECTION_CONSTRAINTS:
case Marker::SECTION_DELTA:
case Marker::SECTION_OFFSETS:
case Marker::VALUE_FALSE:
case Marker::VALUE_TRUE:
throw RecoveryFailure("Invalid WAL data!");
}
}
bool IsWalDeltaDataTypeTransactionEnd(WalDeltaData::Type type) {
switch (type) {
// These delta actions are all found inside transactions so they don't
// indicate a transaction end.
case WalDeltaData::Type::VERTEX_CREATE:
case WalDeltaData::Type::VERTEX_DELETE:
case WalDeltaData::Type::VERTEX_ADD_LABEL:
case WalDeltaData::Type::VERTEX_REMOVE_LABEL:
case WalDeltaData::Type::EDGE_CREATE:
case WalDeltaData::Type::EDGE_DELETE:
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
case WalDeltaData::Type::EDGE_SET_PROPERTY:
return false;
// This delta explicitly indicates that a transaction is done.
case WalDeltaData::Type::TRANSACTION_END:
return true;
// These operations aren't transactional and they are encoded only using
// a single delta, so they each individually mark the end of their
// 'transaction'.
case WalDeltaData::Type::LABEL_INDEX_CREATE:
case WalDeltaData::Type::LABEL_INDEX_DROP:
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE:
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP:
return true;
}
}
// Function used to either read or skip the current WAL delta data. The WAL
// delta header must be read before calling this function. If the delta data is
// read then the data returned is valid, if the delta data is skipped then the
// returned data is not guaranteed to be set (it could be empty) and shouldn't
// be used.
// @throw RecoveryFailure
template <bool read_data>
WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
WalDeltaData delta;
auto action = wal->ReadMarker();
if (!action) throw RecoveryFailure("Invalid WAL data!");
delta.type = MarkerToWalDeltaDataType(*action);
switch (delta.type) {
case WalDeltaData::Type::VERTEX_CREATE:
case WalDeltaData::Type::VERTEX_DELETE: {
auto gid = wal->ReadUint();
if (!gid) throw RecoveryFailure("Invalid WAL data!");
delta.vertex_create_delete.gid = Gid::FromUint(*gid);
break;
}
case WalDeltaData::Type::VERTEX_ADD_LABEL:
case WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
auto gid = wal->ReadUint();
if (!gid) throw RecoveryFailure("Invalid WAL data!");
delta.vertex_add_remove_label.gid = Gid::FromUint(*gid);
if constexpr (read_data) {
auto label = wal->ReadString();
if (!label) throw RecoveryFailure("Invalid WAL data!");
delta.vertex_add_remove_label.label = std::move(*label);
} else {
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
}
break;
}
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
auto gid = wal->ReadUint();
if (!gid) throw RecoveryFailure("Invalid WAL data!");
delta.vertex_edge_set_property.gid = Gid::FromUint(*gid);
if constexpr (read_data) {
auto property = wal->ReadString();
if (!property) throw RecoveryFailure("Invalid WAL data!");
delta.vertex_edge_set_property.property = std::move(*property);
auto value = wal->ReadPropertyValue();
if (!value) throw RecoveryFailure("Invalid WAL data!");
delta.vertex_edge_set_property.value = std::move(*value);
} else {
if (!wal->SkipString() || !wal->SkipPropertyValue())
throw RecoveryFailure("Invalid WAL data!");
}
break;
}
case WalDeltaData::Type::EDGE_CREATE:
case WalDeltaData::Type::EDGE_DELETE: {
auto gid = wal->ReadUint();
if (!gid) throw RecoveryFailure("Invalid WAL data!");
delta.edge_create_delete.gid = Gid::FromUint(*gid);
if constexpr (read_data) {
auto edge_type = wal->ReadString();
if (!edge_type) throw RecoveryFailure("Invalid WAL data!");
delta.edge_create_delete.edge_type = std::move(*edge_type);
} else {
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
}
auto from_gid = wal->ReadUint();
if (!from_gid) throw RecoveryFailure("Invalid WAL data!");
delta.edge_create_delete.from_vertex = Gid::FromUint(*from_gid);
auto to_gid = wal->ReadUint();
if (!to_gid) throw RecoveryFailure("Invalid WAL data!");
delta.edge_create_delete.to_vertex = Gid::FromUint(*to_gid);
break;
}
case WalDeltaData::Type::TRANSACTION_END:
break;
case WalDeltaData::Type::LABEL_INDEX_CREATE:
case WalDeltaData::Type::LABEL_INDEX_DROP: {
if constexpr (read_data) {
auto label = wal->ReadString();
if (!label) throw RecoveryFailure("Invalid WAL data!");
delta.operation_label.label = std::move(*label);
} else {
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
}
break;
}
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
if constexpr (read_data) {
auto label = wal->ReadString();
if (!label) throw RecoveryFailure("Invalid WAL data!");
delta.operation_label_property.label = std::move(*label);
auto property = wal->ReadString();
if (!property) throw RecoveryFailure("Invalid WAL data!");
delta.operation_label_property.property = std::move(*property);
} else {
if (!wal->SkipString() || !wal->SkipString())
throw RecoveryFailure("Invalid WAL data!");
}
break;
}
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE:
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP: {
if constexpr (read_data) {
auto label = wal->ReadString();
if (!label) throw RecoveryFailure("Invalid WAL data!");
delta.operation_label_properties.label = std::move(*label);
auto properties_count = wal->ReadUint();
if (!properties_count) throw RecoveryFailure("Invalid WAL data!");
for (uint64_t i = 0; i < *properties_count; ++i) {
auto property = wal->ReadString();
if (!property) throw RecoveryFailure("Invalid WAL data!");
delta.operation_label_properties.properties.emplace(
std::move(*property));
}
} else {
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
auto properties_count = wal->ReadUint();
if (!properties_count) throw RecoveryFailure("Invalid WAL data!");
for (uint64_t i = 0; i < *properties_count; ++i) {
if (!wal->SkipString()) throw RecoveryFailure("Invalid WAL data!");
}
}
}
}
return delta;
}
} // namespace
// Function used to read information about the WAL file.
WalInfo ReadWalInfo(const std::filesystem::path &path) {
// Check magic and version.
Decoder wal;
auto version = wal.Initialize(path, kWalMagic);
if (!version)
throw RecoveryFailure("Couldn't read WAL magic and/or version!");
if (!IsVersionSupported(*version))
throw RecoveryFailure("Invalid WAL version!");
// Prepare return value.
WalInfo info;
// Read offsets.
{
auto marker = wal.ReadMarker();
if (!marker || *marker != Marker::SECTION_OFFSETS)
throw RecoveryFailure("Invalid WAL data!");
auto wal_size = wal.GetSize();
if (!wal_size) throw RecoveryFailure("Invalid WAL data!");
auto read_offset = [&wal, wal_size] {
auto maybe_offset = wal.ReadUint();
if (!maybe_offset) throw RecoveryFailure("Invalid WAL format!");
auto offset = *maybe_offset;
if (offset > *wal_size) throw RecoveryFailure("Invalid WAL format!");
return offset;
};
info.offset_metadata = read_offset();
info.offset_deltas = read_offset();
}
// Read metadata.
{
wal.SetPosition(info.offset_metadata);
auto marker = wal.ReadMarker();
if (!marker || *marker != Marker::SECTION_METADATA)
throw RecoveryFailure("Invalid WAL data!");
auto maybe_uuid = wal.ReadString();
if (!maybe_uuid) throw RecoveryFailure("Invalid WAL data!");
info.uuid = std::move(*maybe_uuid);
auto maybe_seq_num = wal.ReadUint();
if (!maybe_seq_num) throw RecoveryFailure("Invalid WAL data!");
info.seq_num = *maybe_seq_num;
}
// Read deltas.
info.num_deltas = 0;
auto validate_delta = [&wal]() -> std::optional<std::pair<uint64_t, bool>> {
try {
auto timestamp = ReadWalDeltaHeader(&wal);
auto type = SkipWalDeltaData(&wal);
return {{timestamp, IsWalDeltaDataTypeTransactionEnd(type)}};
} catch (const RecoveryFailure &) {
return std::nullopt;
}
};
auto size = wal.GetSize();
// Here we read the whole file and determine the number of valid deltas. A
// delta is valid only if all of its data can be successfully read. This
// allows us to recover data from WAL files that are corrupt at the end (eg.
// because of power loss) but are still valid at the beginning. While reading
// the deltas we only count deltas which are a part of a fully valid
// transaction (indicated by a TRANSACTION_END delta or any other
// non-transactional operation).
std::optional<uint64_t> current_timestamp;
uint64_t num_deltas = 0;
while (wal.GetPosition() != size) {
auto ret = validate_delta();
if (!ret) break;
auto [timestamp, is_end_of_transaction] = *ret;
if (!current_timestamp) current_timestamp = timestamp;
if (*current_timestamp != timestamp) break;
++num_deltas;
if (is_end_of_transaction) {
if (info.num_deltas == 0) {
info.from_timestamp = timestamp;
info.to_timestamp = timestamp;
}
if (timestamp < info.from_timestamp || timestamp < info.to_timestamp)
break;
info.to_timestamp = timestamp;
info.num_deltas += num_deltas;
current_timestamp = std::nullopt;
num_deltas = 0;
}
}
if (info.num_deltas == 0) throw RecoveryFailure("Invalid WAL data!");
return info;
}
bool operator==(const WalDeltaData &a, const WalDeltaData &b) {
if (a.type != b.type) return false;
switch (a.type) {
case WalDeltaData::Type::VERTEX_CREATE:
case WalDeltaData::Type::VERTEX_DELETE:
return a.vertex_create_delete.gid == b.vertex_create_delete.gid;
case WalDeltaData::Type::VERTEX_ADD_LABEL:
case WalDeltaData::Type::VERTEX_REMOVE_LABEL:
return a.vertex_add_remove_label.gid == b.vertex_add_remove_label.gid &&
a.vertex_add_remove_label.label == b.vertex_add_remove_label.label;
case WalDeltaData::Type::VERTEX_SET_PROPERTY:
case WalDeltaData::Type::EDGE_SET_PROPERTY:
return a.vertex_edge_set_property.gid == b.vertex_edge_set_property.gid &&
a.vertex_edge_set_property.property ==
b.vertex_edge_set_property.property &&
a.vertex_edge_set_property.value ==
b.vertex_edge_set_property.value;
case WalDeltaData::Type::EDGE_CREATE:
case WalDeltaData::Type::EDGE_DELETE:
return a.edge_create_delete.gid == b.edge_create_delete.gid &&
a.edge_create_delete.edge_type == b.edge_create_delete.edge_type &&
a.edge_create_delete.from_vertex ==
b.edge_create_delete.from_vertex &&
a.edge_create_delete.to_vertex == b.edge_create_delete.to_vertex;
case WalDeltaData::Type::TRANSACTION_END:
return true;
case WalDeltaData::Type::LABEL_INDEX_CREATE:
case WalDeltaData::Type::LABEL_INDEX_DROP:
return a.operation_label.label == b.operation_label.label;
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE:
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE:
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
return a.operation_label_property.label ==
b.operation_label_property.label &&
a.operation_label_property.property ==
b.operation_label_property.property;
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE:
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP:
return a.operation_label_properties.label ==
b.operation_label_properties.label &&
a.operation_label_properties.properties ==
b.operation_label_properties.properties;
}
}
bool operator!=(const WalDeltaData &a, const WalDeltaData &b) {
return !(a == b);
}
// Function used to read the WAL delta header. The function returns the delta
// timestamp.
uint64_t ReadWalDeltaHeader(Decoder *wal) {
auto marker = wal->ReadMarker();
if (!marker || *marker != Marker::SECTION_DELTA)
throw RecoveryFailure("Invalid WAL data!");
auto timestamp = wal->ReadUint();
if (!timestamp) throw RecoveryFailure("Invalid WAL data!");
return *timestamp;
}
// Function used to read the current WAL delta data. The WAL delta header must
// be read before calling this function.
WalDeltaData ReadWalDeltaData(Decoder *wal) {
return ReadSkipWalDeltaData<true>(wal);
}
// Function used to skip the current WAL delta data. The WAL delta header must
// be read before calling this function.
WalDeltaData::Type SkipWalDeltaData(Decoder *wal) {
auto delta = ReadSkipWalDeltaData<false>(wal);
return delta.type;
}
WalFile::WalFile(const std::filesystem::path &wal_directory,
const std::string &uuid, Config::Items items,
NameIdMapper *name_id_mapper, uint64_t seq_num)
: items_(items),
name_id_mapper_(name_id_mapper),
path_(wal_directory / MakeWalName()),
from_timestamp_(0),
to_timestamp_(0),
count_(0) {
// Ensure that the storage directory exists.
utils::EnsureDirOrDie(wal_directory);
// Initialize the WAL file.
wal_.Initialize(path_, kWalMagic, kVersion);
// Write placeholder offsets.
uint64_t offset_offsets = 0;
uint64_t offset_metadata = 0;
uint64_t offset_deltas = 0;
wal_.WriteMarker(Marker::SECTION_OFFSETS);
offset_offsets = wal_.GetPosition();
wal_.WriteUint(offset_metadata);
wal_.WriteUint(offset_deltas);
// Write metadata.
offset_metadata = wal_.GetPosition();
wal_.WriteMarker(Marker::SECTION_METADATA);
wal_.WriteString(uuid);
wal_.WriteUint(seq_num);
// Write final offsets.
offset_deltas = wal_.GetPosition();
wal_.SetPosition(offset_offsets);
wal_.WriteUint(offset_metadata);
wal_.WriteUint(offset_deltas);
wal_.SetPosition(offset_deltas);
// Sync the initial data.
wal_.Sync();
}
WalFile::~WalFile() {
if (count_ != 0) {
// Finalize file.
wal_.Finalize();
// Rename file.
std::filesystem::path new_path(path_);
new_path.replace_filename(
RemakeWalName(path_.filename(), from_timestamp_, to_timestamp_));
// If the rename fails it isn't a crucial situation. The renaming is done
// only to make the directory structure of the WAL files easier to read
// manually.
utils::RenamePath(path_, new_path);
} else {
// Remove empty WAL file.
utils::DeleteFile(path_);
}
}
void WalFile::AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t timestamp) {
// When converting a Delta to a WAL delta the logic is inverted. That is
// because the Delta's represent undo actions and we want to store redo
// actions.
wal_.WriteMarker(Marker::SECTION_DELTA);
wal_.WriteUint(timestamp);
std::lock_guard<utils::SpinLock> guard(vertex.lock);
switch (delta.action) {
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT: {
wal_.WriteMarker(VertexActionToMarker(delta.action));
wal_.WriteUint(vertex.gid.AsUint());
break;
}
case Delta::Action::SET_PROPERTY: {
wal_.WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY);
wal_.WriteUint(vertex.gid.AsUint());
wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
// The property value is the value that is currently stored in the
// vertex.
// TODO (mferencevic): Mitigate the memory allocation introduced here
// (with the `GetProperty` call). It is the only memory allocation in the
// entire WAL file writing logic.
wal_.WritePropertyValue(
vertex.properties.GetProperty(delta.property.key));
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL: {
wal_.WriteMarker(VertexActionToMarker(delta.action));
wal_.WriteUint(vertex.gid.AsUint());
wal_.WriteString(name_id_mapper_->IdToName(delta.label.AsUint()));
break;
}
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_OUT_EDGE: {
wal_.WriteMarker(VertexActionToMarker(delta.action));
if (items_.properties_on_edges) {
wal_.WriteUint(delta.vertex_edge.edge.ptr->gid.AsUint());
} else {
wal_.WriteUint(delta.vertex_edge.edge.gid.AsUint());
}
wal_.WriteString(
name_id_mapper_->IdToName(delta.vertex_edge.edge_type.AsUint()));
wal_.WriteUint(vertex.gid.AsUint());
wal_.WriteUint(delta.vertex_edge.vertex->gid.AsUint());
break;
}
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
// These actions are already encoded in the *_OUT_EDGE actions. This
// function should never be called for this type of deltas.
LOG(FATAL) << "Invalid delta action!";
}
UpdateStats(timestamp);
}
void WalFile::AppendDelta(const Delta &delta, const Edge &edge,
uint64_t timestamp) {
// When converting a Delta to a WAL delta the logic is inverted. That is
// because the Delta's represent undo actions and we want to store redo
// actions.
wal_.WriteMarker(Marker::SECTION_DELTA);
wal_.WriteUint(timestamp);
std::lock_guard<utils::SpinLock> guard(edge.lock);
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
wal_.WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
wal_.WriteUint(edge.gid.AsUint());
wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
// The property value is the value that is currently stored in the
// edge.
// TODO (mferencevic): Mitigate the memory allocation introduced here
// (with the `GetProperty` call). It is the only memory allocation in the
// entire WAL file writing logic.
wal_.WritePropertyValue(edge.properties.GetProperty(delta.property.key));
break;
}
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
// These actions are already encoded in vertex *_OUT_EDGE actions. Also,
// these deltas don't contain any information about the from vertex, to
// vertex or edge type so they are useless. This function should never
// be called for this type of deltas.
LOG(FATAL) << "Invalid delta action!";
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
// These deltas shouldn't appear for edges.
LOG(FATAL) << "Invalid database state!";
}
UpdateStats(timestamp);
}
void WalFile::AppendTransactionEnd(uint64_t timestamp) {
wal_.WriteMarker(Marker::SECTION_DELTA);
wal_.WriteUint(timestamp);
wal_.WriteMarker(Marker::DELTA_TRANSACTION_END);
UpdateStats(timestamp);
}
void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t timestamp) {
wal_.WriteMarker(Marker::SECTION_DELTA);
wal_.WriteUint(timestamp);
switch (operation) {
case StorageGlobalOperation::LABEL_INDEX_CREATE:
case StorageGlobalOperation::LABEL_INDEX_DROP: {
CHECK(properties.empty()) << "Invalid function call!";
wal_.WriteMarker(OperationToMarker(operation));
wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
break;
}
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP: {
CHECK(properties.size() == 1) << "Invalid function call!";
wal_.WriteMarker(OperationToMarker(operation));
wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
wal_.WriteString(
name_id_mapper_->IdToName((*properties.begin()).AsUint()));
break;
}
case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP: {
CHECK(!properties.empty()) << "Invalid function call!";
wal_.WriteMarker(OperationToMarker(operation));
wal_.WriteString(name_id_mapper_->IdToName(label.AsUint()));
wal_.WriteUint(properties.size());
for (const auto &property : properties) {
wal_.WriteString(name_id_mapper_->IdToName(property.AsUint()));
}
break;
}
}
UpdateStats(timestamp);
}
void WalFile::Sync() { wal_.Sync(); }
uint64_t WalFile::GetSize() { return wal_.GetPosition(); }
void WalFile::UpdateStats(uint64_t timestamp) {
if (count_ == 0) from_timestamp_ = timestamp;
to_timestamp_ = timestamp;
count_ += 1;
}
} // namespace storage::durability

View File

@ -0,0 +1,163 @@
#pragma once
#include <cstdint>
#include <filesystem>
#include <set>
#include <string>
#include "storage/v2/delta.hpp"
#include "storage/v2/durability/serialization.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/property_value.hpp"
namespace storage::durability {
/// Structure used to hold information about a WAL.
struct WalInfo {
uint64_t offset_metadata;
uint64_t offset_deltas;
std::string uuid;
uint64_t seq_num;
uint64_t from_timestamp;
uint64_t to_timestamp;
uint64_t num_deltas;
};
/// Structure used to return loaded WAL delta data.
struct WalDeltaData {
enum class Type {
VERTEX_CREATE,
VERTEX_DELETE,
VERTEX_ADD_LABEL,
VERTEX_REMOVE_LABEL,
VERTEX_SET_PROPERTY,
EDGE_CREATE,
EDGE_DELETE,
EDGE_SET_PROPERTY,
TRANSACTION_END,
LABEL_INDEX_CREATE,
LABEL_INDEX_DROP,
LABEL_PROPERTY_INDEX_CREATE,
LABEL_PROPERTY_INDEX_DROP,
EXISTENCE_CONSTRAINT_CREATE,
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
};
Type type{Type::TRANSACTION_END};
struct {
Gid gid;
} vertex_create_delete;
struct {
Gid gid;
std::string label;
} vertex_add_remove_label;
struct {
Gid gid;
std::string property;
PropertyValue value;
} vertex_edge_set_property;
struct {
Gid gid;
std::string edge_type;
Gid from_vertex;
Gid to_vertex;
} edge_create_delete;
struct {
std::string label;
} operation_label;
struct {
std::string label;
std::string property;
} operation_label_property;
struct {
std::string label;
std::set<std::string> properties;
} operation_label_properties;
};
bool operator==(const WalDeltaData &a, const WalDeltaData &b);
bool operator!=(const WalDeltaData &a, const WalDeltaData &b);
/// Enum used to indicate a global database operation that isn't transactional.
enum class StorageGlobalOperation {
LABEL_INDEX_CREATE,
LABEL_INDEX_DROP,
LABEL_PROPERTY_INDEX_CREATE,
LABEL_PROPERTY_INDEX_DROP,
EXISTENCE_CONSTRAINT_CREATE,
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
};
/// Function used to read information about the WAL file.
/// @throw RecoveryFailure
WalInfo ReadWalInfo(const std::filesystem::path &path);
/// Function used to read the WAL delta header. The function returns the delta
/// timestamp.
/// @throw RecoveryFailure
uint64_t ReadWalDeltaHeader(Decoder *wal);
/// Function used to read the current WAL delta data. The function returns the
/// read delta data. The WAL delta header must be read before calling this
/// function.
/// @throw RecoveryFailure
WalDeltaData ReadWalDeltaData(Decoder *wal);
/// Function used to skip the current WAL delta data. The function returns the
/// skipped delta type. The WAL delta header must be read before calling this
/// function.
/// @throw RecoveryFailure
WalDeltaData::Type SkipWalDeltaData(Decoder *wal);
/// WalFile class used to append deltas and operations to the WAL file.
class WalFile {
public:
WalFile(const std::filesystem::path &wal_directory, const std::string &uuid,
Config::Items items, NameIdMapper *name_id_mapper, uint64_t seq_num);
WalFile(const WalFile &) = delete;
WalFile(WalFile &&) = delete;
WalFile &operator=(const WalFile &) = delete;
WalFile &operator=(WalFile &&) = delete;
~WalFile();
void AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t timestamp);
void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp);
void AppendTransactionEnd(uint64_t timestamp);
void AppendOperation(StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t timestamp);
void Sync();
uint64_t GetSize();
private:
void UpdateStats(uint64_t timestamp);
Config::Items items_;
NameIdMapper *name_id_mapper_;
Encoder wal_;
std::filesystem::path path_;
uint64_t from_timestamp_;
uint64_t to_timestamp_;
uint64_t count_;
};
} // namespace storage::durability

View File

@ -968,8 +968,9 @@ bool Storage::CreateIndex(LabelId label) {
// next regular transaction after this operation. This prevents collisions of
// commit timestamps between non-transactional operations and transactional
// operations.
durability_.AppendToWal(StorageGlobalOperation::LABEL_INDEX_CREATE, label, {},
timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {},
timestamp_);
return true;
}
@ -980,8 +981,9 @@ bool Storage::CreateIndex(LabelId label, PropertyId property) {
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE,
label, {property}, timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label,
{property}, timestamp_);
return true;
}
@ -990,8 +992,8 @@ bool Storage::DropIndex(LabelId label) {
if (!indices_.label_index.DropIndex(label)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::LABEL_INDEX_DROP, label, {},
timestamp_);
durability_.AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP,
label, {}, timestamp_);
return true;
}
@ -1000,8 +1002,9 @@ bool Storage::DropIndex(LabelId label, PropertyId property) {
if (!indices_.label_property_index.DropIndex(label, property)) return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP,
label, {property}, timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label,
{property}, timestamp_);
return true;
}
@ -1019,8 +1022,9 @@ Storage::CreateExistenceConstraint(LabelId label, PropertyId property) {
if (ret.HasError() || !ret.GetValue()) return ret;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE,
label, {property}, timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label,
{property}, timestamp_);
return true;
}
@ -1030,8 +1034,9 @@ bool Storage::DropExistenceConstraint(LabelId label, PropertyId property) {
return false;
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP,
label, {property}, timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label,
{property}, timestamp_);
return true;
}
@ -1047,8 +1052,9 @@ Storage::CreateUniqueConstraint(LabelId label,
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE,
label, properties, timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label,
properties, timestamp_);
return UniqueConstraints::CreationStatus::SUCCESS;
}
@ -1061,8 +1067,9 @@ UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint(
}
// For a description why using `timestamp_` is correct, see
// `CreateIndex(LabelId label)`.
durability_.AppendToWal(StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label,
properties, timestamp_);
durability_.AppendToWal(
durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label,
properties, timestamp_);
return UniqueConstraints::DeletionStatus::SUCCESS;
}

View File

@ -6,7 +6,7 @@
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
#include "storage/v2/durability.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/indices.hpp"
@ -442,7 +442,7 @@ class Storage final {
// storage.
utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_;
Durability durability_;
durability::Durability durability_;
};
} // namespace storage

View File

@ -3,7 +3,7 @@
#include <filesystem>
#include <limits>
#include "storage/v2/durability.hpp"
#include "storage/v2/durability/serialization.hpp"
static const std::string kTestMagic{"MGtest"};
static const uint64_t kTestVersion{1};
@ -36,9 +36,9 @@ class DecoderEncoderTest : public ::testing::Test {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_F(DecoderEncoderTest, ReadMarker) {
{
storage::Encoder encoder;
storage::durability::Encoder encoder;
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
for (const auto &item : storage::kMarkersAll) {
for (const auto &item : storage::durability::kMarkersAll) {
encoder.WriteMarker(item);
}
{
@ -48,11 +48,11 @@ TEST_F(DecoderEncoderTest, ReadMarker) {
encoder.Finalize();
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);
for (const auto &item : storage::kMarkersAll) {
for (const auto &item : storage::durability::kMarkersAll) {
auto decoded = decoder.ReadMarker();
ASSERT_TRUE(decoded);
ASSERT_EQ(*decoded, item);
@ -70,7 +70,7 @@ TEST_F(DecoderEncoderTest, ReadMarker) {
TEST_F(DecoderEncoderTest, Read##name) { \
std::vector<type> dataset{__VA_ARGS__}; \
{ \
storage::Encoder encoder; \
storage::durability::Encoder encoder; \
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
for (const auto &item : dataset) { \
encoder.Write##name(item); \
@ -82,7 +82,7 @@ TEST_F(DecoderEncoderTest, ReadMarker) {
encoder.Finalize(); \
} \
{ \
storage::Decoder decoder; \
storage::durability::Decoder decoder; \
auto version = decoder.Initialize(storage_file, kTestMagic); \
ASSERT_TRUE(version); \
ASSERT_EQ(*version, kTestVersion); \
@ -131,7 +131,7 @@ GENERATE_READ_TEST(
TEST_F(DecoderEncoderTest, Skip##name) { \
std::vector<type> dataset{__VA_ARGS__}; \
{ \
storage::Encoder encoder; \
storage::durability::Encoder encoder; \
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
for (const auto &item : dataset) { \
encoder.Write##name(item); \
@ -143,7 +143,7 @@ GENERATE_READ_TEST(
encoder.Finalize(); \
} \
{ \
storage::Decoder decoder; \
storage::durability::Decoder decoder; \
auto version = decoder.Initialize(storage_file, kTestMagic); \
ASSERT_TRUE(version); \
ASSERT_EQ(*version, kTestVersion); \
@ -177,7 +177,7 @@ GENERATE_SKIP_TEST(
#define GENERATE_PARTIAL_READ_TEST(name, value) \
TEST_F(DecoderEncoderTest, PartialRead##name) { \
{ \
storage::Encoder encoder; \
storage::durability::Encoder encoder; \
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
encoder.Write##name(value); \
encoder.Finalize(); \
@ -195,7 +195,7 @@ GENERATE_SKIP_TEST(
ofile.Write(&byte, sizeof(byte)); \
ofile.Sync(); \
} \
storage::Decoder decoder; \
storage::durability::Decoder decoder; \
auto version = decoder.Initialize(alternate_file, kTestMagic); \
if (i < kTestMagic.size() + sizeof(kTestVersion)) { \
ASSERT_FALSE(version); \
@ -215,7 +215,7 @@ GENERATE_SKIP_TEST(
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
GENERATE_PARTIAL_READ_TEST(Marker, storage::Marker::SECTION_VERTEX);
GENERATE_PARTIAL_READ_TEST(Marker, storage::durability::Marker::SECTION_VERTEX);
// NOLINTNEXTLINE(hicpp-special-member-functions)
GENERATE_PARTIAL_READ_TEST(Bool, false);
@ -243,7 +243,7 @@ GENERATE_PARTIAL_READ_TEST(
#define GENERATE_PARTIAL_SKIP_TEST(name, value) \
TEST_F(DecoderEncoderTest, PartialSkip##name) { \
{ \
storage::Encoder encoder; \
storage::durability::Encoder encoder; \
encoder.Initialize(storage_file, kTestMagic, kTestVersion); \
encoder.Write##name(value); \
encoder.Finalize(); \
@ -261,7 +261,7 @@ GENERATE_PARTIAL_READ_TEST(
ofile.Write(&byte, sizeof(byte)); \
ofile.Sync(); \
} \
storage::Decoder decoder; \
storage::durability::Decoder decoder; \
auto version = decoder.Initialize(alternate_file, kTestMagic); \
if (i < kTestMagic.size() + sizeof(kTestVersion)) { \
ASSERT_FALSE(version); \
@ -294,7 +294,7 @@ GENERATE_PARTIAL_SKIP_TEST(
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
{
storage::Encoder encoder;
storage::durability::Encoder encoder;
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
encoder.WritePropertyValue(storage::PropertyValue(123L));
encoder.Finalize();
@ -302,68 +302,69 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
{
utils::OutputFile file;
file.Open(storage_file, utils::OutputFile::Mode::OVERWRITE_EXISTING);
for (auto marker : storage::kMarkersAll) {
for (auto marker : storage::durability::kMarkersAll) {
bool valid_marker;
switch (marker) {
case storage::Marker::TYPE_NULL:
case storage::Marker::TYPE_BOOL:
case storage::Marker::TYPE_INT:
case storage::Marker::TYPE_DOUBLE:
case storage::Marker::TYPE_STRING:
case storage::Marker::TYPE_LIST:
case storage::Marker::TYPE_MAP:
case storage::Marker::TYPE_PROPERTY_VALUE:
case storage::durability::Marker::TYPE_NULL:
case storage::durability::Marker::TYPE_BOOL:
case storage::durability::Marker::TYPE_INT:
case storage::durability::Marker::TYPE_DOUBLE:
case storage::durability::Marker::TYPE_STRING:
case storage::durability::Marker::TYPE_LIST:
case storage::durability::Marker::TYPE_MAP:
case storage::durability::Marker::TYPE_PROPERTY_VALUE:
valid_marker = true;
break;
case storage::Marker::SECTION_VERTEX:
case storage::Marker::SECTION_EDGE:
case storage::Marker::SECTION_MAPPER:
case storage::Marker::SECTION_METADATA:
case storage::Marker::SECTION_INDICES:
case storage::Marker::SECTION_CONSTRAINTS:
case storage::Marker::SECTION_DELTA:
case storage::Marker::SECTION_OFFSETS:
case storage::Marker::DELTA_VERTEX_CREATE:
case storage::Marker::DELTA_VERTEX_DELETE:
case storage::Marker::DELTA_VERTEX_ADD_LABEL:
case storage::Marker::DELTA_VERTEX_REMOVE_LABEL:
case storage::Marker::DELTA_VERTEX_SET_PROPERTY:
case storage::Marker::DELTA_EDGE_CREATE:
case storage::Marker::DELTA_EDGE_DELETE:
case storage::Marker::DELTA_EDGE_SET_PROPERTY:
case storage::Marker::DELTA_TRANSACTION_END:
case storage::Marker::DELTA_LABEL_INDEX_CREATE:
case storage::Marker::DELTA_LABEL_INDEX_DROP:
case storage::Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
case storage::Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
case storage::Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
case storage::Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
case storage::Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
case storage::Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
case storage::Marker::VALUE_FALSE:
case storage::Marker::VALUE_TRUE:
case storage::durability::Marker::SECTION_VERTEX:
case storage::durability::Marker::SECTION_EDGE:
case storage::durability::Marker::SECTION_MAPPER:
case storage::durability::Marker::SECTION_METADATA:
case storage::durability::Marker::SECTION_INDICES:
case storage::durability::Marker::SECTION_CONSTRAINTS:
case storage::durability::Marker::SECTION_DELTA:
case storage::durability::Marker::SECTION_OFFSETS:
case storage::durability::Marker::DELTA_VERTEX_CREATE:
case storage::durability::Marker::DELTA_VERTEX_DELETE:
case storage::durability::Marker::DELTA_VERTEX_ADD_LABEL:
case storage::durability::Marker::DELTA_VERTEX_REMOVE_LABEL:
case storage::durability::Marker::DELTA_VERTEX_SET_PROPERTY:
case storage::durability::Marker::DELTA_EDGE_CREATE:
case storage::durability::Marker::DELTA_EDGE_DELETE:
case storage::durability::Marker::DELTA_EDGE_SET_PROPERTY:
case storage::durability::Marker::DELTA_TRANSACTION_END:
case storage::durability::Marker::DELTA_LABEL_INDEX_CREATE:
case storage::durability::Marker::DELTA_LABEL_INDEX_DROP:
case storage::durability::Marker::DELTA_LABEL_PROPERTY_INDEX_CREATE:
case storage::durability::Marker::DELTA_LABEL_PROPERTY_INDEX_DROP:
case storage::durability::Marker::DELTA_EXISTENCE_CONSTRAINT_CREATE:
case storage::durability::Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
case storage::durability::Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
case storage::durability::Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
case storage::durability::Marker::VALUE_FALSE:
case storage::durability::Marker::VALUE_TRUE:
valid_marker = false;
break;
}
// We only run this test with invalid markers.
if (valid_marker) continue;
{
file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END,
-(sizeof(uint64_t) + sizeof(storage::Marker)));
file.SetPosition(
utils::OutputFile::Position::RELATIVE_TO_END,
-(sizeof(uint64_t) + sizeof(storage::durability::Marker)));
auto byte = static_cast<uint8_t>(marker);
file.Write(&byte, sizeof(byte));
file.Sync();
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);
ASSERT_FALSE(decoder.SkipPropertyValue());
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);
@ -372,21 +373,22 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
}
{
{
file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END,
-(sizeof(uint64_t) + sizeof(storage::Marker)));
file.SetPosition(
utils::OutputFile::Position::RELATIVE_TO_END,
-(sizeof(uint64_t) + sizeof(storage::durability::Marker)));
uint8_t byte = 1;
file.Write(&byte, sizeof(byte));
file.Sync();
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);
ASSERT_FALSE(decoder.SkipPropertyValue());
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);
@ -399,13 +401,13 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_F(DecoderEncoderTest, DecoderPosition) {
{
storage::Encoder encoder;
storage::durability::Encoder encoder;
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
encoder.WriteBool(true);
encoder.Finalize();
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);
@ -425,7 +427,7 @@ TEST_F(DecoderEncoderTest, DecoderPosition) {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_F(DecoderEncoderTest, EncoderPosition) {
{
storage::Encoder encoder;
storage::durability::Encoder encoder;
encoder.Initialize(storage_file, kTestMagic, kTestVersion);
encoder.WriteBool(false);
encoder.SetPosition(kTestMagic.size() + sizeof(kTestVersion));
@ -434,7 +436,7 @@ TEST_F(DecoderEncoderTest, EncoderPosition) {
encoder.Finalize();
}
{
storage::Decoder decoder;
storage::durability::Decoder decoder;
auto version = decoder.Initialize(storage_file, kTestMagic);
ASSERT_TRUE(version);
ASSERT_EQ(*version, kTestVersion);

View File

@ -13,7 +13,8 @@
#include <iostream>
#include <thread>
#include "storage/v2/durability.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/storage.hpp"
#include "utils/file.hpp"
#include "utils/timer.hpp"
@ -621,21 +622,24 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
}
std::vector<std::filesystem::path> GetSnapshotsList() {
return GetFilesList(storage_directory / storage::kSnapshotDirectory);
return GetFilesList(storage_directory /
storage::durability::kSnapshotDirectory);
}
std::vector<std::filesystem::path> GetBackupSnapshotsList() {
return GetFilesList(storage_directory / storage::kBackupDirectory /
storage::kSnapshotDirectory);
return GetFilesList(storage_directory /
storage::durability::kBackupDirectory /
storage::durability::kSnapshotDirectory);
}
std::vector<std::filesystem::path> GetWalsList() {
return GetFilesList(storage_directory / storage::kWalDirectory);
return GetFilesList(storage_directory / storage::durability::kWalDirectory);
}
std::vector<std::filesystem::path> GetBackupWalsList() {
return GetFilesList(storage_directory / storage::kBackupDirectory /
storage::kWalDirectory);
return GetFilesList(storage_directory /
storage::durability::kBackupDirectory /
storage::durability::kWalDirectory);
}
void RestoreBackups() {
@ -643,15 +647,16 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
auto backup_snapshots = GetBackupSnapshotsList();
for (const auto &item : backup_snapshots) {
std::filesystem::rename(
item,
storage_directory / storage::kSnapshotDirectory / item.filename());
item, storage_directory / storage::durability::kSnapshotDirectory /
item.filename());
}
}
{
auto backup_wals = GetBackupWalsList();
for (const auto &item : backup_wals) {
std::filesystem::rename(
item, storage_directory / storage::kWalDirectory / item.filename());
std::filesystem::rename(item, storage_directory /
storage::durability::kWalDirectory /
item.filename());
}
}
}
@ -685,31 +690,31 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
};
void DestroySnapshot(const std::filesystem::path &path) {
auto info = storage::ReadSnapshotInfo(path);
auto info = storage::durability::ReadSnapshotInfo(path);
LOG(INFO) << "Destroying snapshot " << path;
utils::OutputFile file;
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
file.SetPosition(utils::OutputFile::Position::SET, info.offset_vertices);
auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP);
auto value = static_cast<uint8_t>(storage::durability::Marker::TYPE_MAP);
file.Write(&value, sizeof(value));
file.Sync();
file.Close();
}
void DestroyWalFirstDelta(const std::filesystem::path &path) {
auto info = storage::ReadWalInfo(path);
auto info = storage::durability::ReadWalInfo(path);
LOG(INFO) << "Destroying WAL " << path;
utils::OutputFile file;
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
file.SetPosition(utils::OutputFile::Position::SET, info.offset_deltas);
auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP);
auto value = static_cast<uint8_t>(storage::durability::Marker::TYPE_MAP);
file.Write(&value, sizeof(value));
file.Sync();
file.Close();
}
void DestroyWalSuffix(const std::filesystem::path &path) {
auto info = storage::ReadWalInfo(path);
auto info = storage::durability::ReadWalInfo(path);
LOG(INFO) << "Destroying WAL " << path;
utils::OutputFile file;
file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING);
@ -868,7 +873,7 @@ TEST_P(DurabilityTest, SnapshotEverythingCorrupt) {
{
auto snapshots = GetSnapshotsList();
ASSERT_EQ(snapshots.size(), 1);
auto info = storage::ReadSnapshotInfo(*snapshots.begin());
auto info = storage::durability::ReadSnapshotInfo(*snapshots.begin());
unrelated_uuid = info.uuid;
}
@ -904,7 +909,7 @@ TEST_P(DurabilityTest, SnapshotEverythingCorrupt) {
auto snapshots = GetSnapshotsList();
ASSERT_GE(snapshots.size(), 2);
for (const auto &snapshot : snapshots) {
auto info = storage::ReadSnapshotInfo(snapshot);
auto info = storage::durability::ReadSnapshotInfo(snapshot);
if (info.uuid == unrelated_uuid) {
LOG(INFO) << "Skipping snapshot " << snapshot;
continue;
@ -973,7 +978,7 @@ TEST_P(DurabilityTest, SnapshotRetention) {
for (size_t i = 0; i < snapshots.size(); ++i) {
const auto &path = snapshots[i];
// This shouldn't throw.
auto info = storage::ReadSnapshotInfo(path);
auto info = storage::durability::ReadSnapshotInfo(path);
if (i == 0) uuid = info.uuid;
if (i < snapshots.size() - 1) {
ASSERT_EQ(info.uuid, uuid);
@ -1655,15 +1660,15 @@ TEST_P(DurabilityTest, WalTransactionOrdering) {
// Verify WAL data.
{
auto path = GetWalsList().front();
auto info = storage::ReadWalInfo(path);
storage::Decoder wal;
wal.Initialize(path, storage::kWalMagic);
auto info = storage::durability::ReadWalInfo(path);
storage::durability::Decoder wal;
wal.Initialize(path, storage::durability::kWalMagic);
wal.SetPosition(info.offset_deltas);
ASSERT_EQ(info.num_deltas, 9);
std::vector<std::pair<uint64_t, storage::WalDeltaData>> data;
std::vector<std::pair<uint64_t, storage::durability::WalDeltaData>> data;
for (uint64_t i = 0; i < info.num_deltas; ++i) {
auto timestamp = storage::ReadWalDeltaHeader(&wal);
data.emplace_back(timestamp, storage::ReadWalDeltaData(&wal));
auto timestamp = storage::durability::ReadWalDeltaHeader(&wal);
data.emplace_back(timestamp, storage::durability::ReadWalDeltaData(&wal));
}
// Verify timestamps.
ASSERT_EQ(data[1].first, data[0].first);
@ -1675,38 +1680,41 @@ TEST_P(DurabilityTest, WalTransactionOrdering) {
ASSERT_EQ(data[7].first, data[6].first);
ASSERT_EQ(data[8].first, data[7].first);
// Verify transaction 3.
ASSERT_EQ(data[0].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
ASSERT_EQ(data[0].second.type,
storage::durability::WalDeltaData::Type::VERTEX_CREATE);
ASSERT_EQ(data[0].second.vertex_create_delete.gid, gid3);
ASSERT_EQ(data[1].second.type,
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY);
ASSERT_EQ(data[1].second.vertex_edge_set_property.gid, gid3);
ASSERT_EQ(data[1].second.vertex_edge_set_property.property, "id");
ASSERT_EQ(data[1].second.vertex_edge_set_property.value,
storage::PropertyValue(3));
ASSERT_EQ(data[2].second.type,
storage::WalDeltaData::Type::TRANSACTION_END);
storage::durability::WalDeltaData::Type::TRANSACTION_END);
// Verify transaction 1.
ASSERT_EQ(data[3].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
ASSERT_EQ(data[3].second.type,
storage::durability::WalDeltaData::Type::VERTEX_CREATE);
ASSERT_EQ(data[3].second.vertex_create_delete.gid, gid1);
ASSERT_EQ(data[4].second.type,
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY);
ASSERT_EQ(data[4].second.vertex_edge_set_property.gid, gid1);
ASSERT_EQ(data[4].second.vertex_edge_set_property.property, "id");
ASSERT_EQ(data[4].second.vertex_edge_set_property.value,
storage::PropertyValue(1));
ASSERT_EQ(data[5].second.type,
storage::WalDeltaData::Type::TRANSACTION_END);
storage::durability::WalDeltaData::Type::TRANSACTION_END);
// Verify transaction 2.
ASSERT_EQ(data[6].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
ASSERT_EQ(data[6].second.type,
storage::durability::WalDeltaData::Type::VERTEX_CREATE);
ASSERT_EQ(data[6].second.vertex_create_delete.gid, gid2);
ASSERT_EQ(data[7].second.type,
storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY);
ASSERT_EQ(data[7].second.vertex_edge_set_property.gid, gid2);
ASSERT_EQ(data[7].second.vertex_edge_set_property.property, "id");
ASSERT_EQ(data[7].second.vertex_edge_set_property.value,
storage::PropertyValue(2));
ASSERT_EQ(data[8].second.type,
storage::WalDeltaData::Type::TRANSACTION_END);
storage::durability::WalDeltaData::Type::TRANSACTION_END);
}
// Recover WALs.

View File

@ -6,32 +6,39 @@
#include <filesystem>
#include <string_view>
#include "storage/v2/durability.hpp"
#include "storage/v2/durability/exceptions.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "utils/file.hpp"
#include "utils/uuid.hpp"
// Helper function used to convert between enum types.
storage::WalDeltaData::Type StorageGlobalOperationToWalDeltaDataType(
storage::StorageGlobalOperation operation) {
storage::durability::WalDeltaData::Type
StorageGlobalOperationToWalDeltaDataType(
storage::durability::StorageGlobalOperation operation) {
switch (operation) {
case storage::StorageGlobalOperation::LABEL_INDEX_CREATE:
return storage::WalDeltaData::Type::LABEL_INDEX_CREATE;
case storage::StorageGlobalOperation::LABEL_INDEX_DROP:
return storage::WalDeltaData::Type::LABEL_INDEX_DROP;
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
return storage::WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE;
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
return storage::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP;
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
return storage::WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE;
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
return storage::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP;
case storage::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
return storage::WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE;
case storage::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP:
return storage::WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP;
case storage::durability::StorageGlobalOperation::LABEL_INDEX_CREATE:
return storage::durability::WalDeltaData::Type::LABEL_INDEX_CREATE;
case storage::durability::StorageGlobalOperation::LABEL_INDEX_DROP:
return storage::durability::WalDeltaData::Type::LABEL_INDEX_DROP;
case storage::durability::StorageGlobalOperation::
LABEL_PROPERTY_INDEX_CREATE:
return storage::durability::WalDeltaData::Type::
LABEL_PROPERTY_INDEX_CREATE;
case storage::durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
return storage::durability::WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP;
case storage::durability::StorageGlobalOperation::
EXISTENCE_CONSTRAINT_CREATE:
return storage::durability::WalDeltaData::Type::
EXISTENCE_CONSTRAINT_CREATE;
case storage::durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
return storage::durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP;
case storage::durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
return storage::durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE;
case storage::durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP:
return storage::durability::WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP;
}
}
@ -52,8 +59,8 @@ class DeltaGenerator final {
auto &it = gen_->vertices_.emplace_back(gid, delta);
delta->prev.Set(&it);
{
storage::WalDeltaData data;
data.type = storage::WalDeltaData::Type::VERTEX_CREATE;
storage::durability::WalDeltaData data;
data.type = storage::durability::WalDeltaData::Type::VERTEX_CREATE;
data.vertex_create_delete.gid = gid;
data_.push_back(data);
}
@ -64,8 +71,8 @@ class DeltaGenerator final {
storage::CreateAndLinkDelta(&transaction_, &*vertex,
storage::Delta::RecreateObjectTag());
{
storage::WalDeltaData data;
data.type = storage::WalDeltaData::Type::VERTEX_DELETE;
storage::durability::WalDeltaData data;
data.type = storage::durability::WalDeltaData::Type::VERTEX_DELETE;
data.vertex_create_delete.gid = vertex->gid;
data_.push_back(data);
}
@ -77,8 +84,8 @@ class DeltaGenerator final {
storage::CreateAndLinkDelta(&transaction_, &*vertex,
storage::Delta::RemoveLabelTag(), label_id);
{
storage::WalDeltaData data;
data.type = storage::WalDeltaData::Type::VERTEX_ADD_LABEL;
storage::durability::WalDeltaData data;
data.type = storage::durability::WalDeltaData::Type::VERTEX_ADD_LABEL;
data.vertex_add_remove_label.gid = vertex->gid;
data.vertex_add_remove_label.label = label;
data_.push_back(data);
@ -92,8 +99,9 @@ class DeltaGenerator final {
storage::CreateAndLinkDelta(&transaction_, &*vertex,
storage::Delta::AddLabelTag(), label_id);
{
storage::WalDeltaData data;
data.type = storage::WalDeltaData::Type::VERTEX_REMOVE_LABEL;
storage::durability::WalDeltaData data;
data.type =
storage::durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL;
data.vertex_add_remove_label.gid = vertex->gid;
data.vertex_add_remove_label.label = label;
data_.push_back(data);
@ -111,8 +119,9 @@ class DeltaGenerator final {
old_value);
props.SetProperty(property_id, value);
{
storage::WalDeltaData data;
data.type = storage::WalDeltaData::Type::VERTEX_SET_PROPERTY;
storage::durability::WalDeltaData data;
data.type =
storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY;
data.vertex_edge_set_property.gid = vertex->gid;
data.vertex_edge_set_property.property = property;
// We don't store the property value here. That is because the storage
@ -143,7 +152,8 @@ class DeltaGenerator final {
if (gen_->valid_) {
gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1);
for (auto &data : data_) {
if (data.type == storage::WalDeltaData::Type::VERTEX_SET_PROPERTY) {
if (data.type ==
storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY) {
// We need to put the final property value into the SET_PROPERTY
// delta.
auto vertex =
@ -158,8 +168,8 @@ class DeltaGenerator final {
}
gen_->data_.emplace_back(commit_timestamp, data);
}
storage::WalDeltaData data{
.type = storage::WalDeltaData::Type::TRANSACTION_END};
storage::durability::WalDeltaData data{
.type = storage::durability::WalDeltaData::Type::TRANSACTION_END};
gen_->data_.emplace_back(commit_timestamp, data);
}
} else {
@ -170,10 +180,11 @@ class DeltaGenerator final {
private:
DeltaGenerator *gen_;
storage::Transaction transaction_;
std::vector<storage::WalDeltaData> data_;
std::vector<storage::durability::WalDeltaData> data_;
};
using DataT = std::vector<std::pair<uint64_t, storage::WalDeltaData>>;
using DataT =
std::vector<std::pair<uint64_t, storage::durability::WalDeltaData>>;
DeltaGenerator(const std::filesystem::path &data_directory,
bool properties_on_edges, uint64_t seq_num)
@ -191,7 +202,7 @@ class DeltaGenerator final {
valid_ = false;
}
void AppendOperation(storage::StorageGlobalOperation operation,
void AppendOperation(storage::durability::StorageGlobalOperation operation,
const std::string &label,
const std::set<std::string> properties = {}) {
auto label_id = storage::LabelId::FromUint(mapper_.NameToId(label));
@ -203,21 +214,27 @@ class DeltaGenerator final {
wal_file_.AppendOperation(operation, label_id, property_ids, timestamp_);
if (valid_) {
UpdateStats(timestamp_, 1);
storage::WalDeltaData data;
storage::durability::WalDeltaData data;
data.type = StorageGlobalOperationToWalDeltaDataType(operation);
switch (operation) {
case storage::StorageGlobalOperation::LABEL_INDEX_CREATE:
case storage::StorageGlobalOperation::LABEL_INDEX_DROP:
case storage::durability::StorageGlobalOperation::LABEL_INDEX_CREATE:
case storage::durability::StorageGlobalOperation::LABEL_INDEX_DROP:
data.operation_label.label = label;
break;
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
case storage::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
case storage::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
case storage::durability::StorageGlobalOperation::
LABEL_PROPERTY_INDEX_CREATE:
case storage::durability::StorageGlobalOperation::
LABEL_PROPERTY_INDEX_DROP:
case storage::durability::StorageGlobalOperation::
EXISTENCE_CONSTRAINT_CREATE:
case storage::durability::StorageGlobalOperation::
EXISTENCE_CONSTRAINT_DROP:
data.operation_label_property.label = label;
data.operation_label_property.property = *properties.begin();
case storage::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
case storage::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP:
case storage::durability::StorageGlobalOperation::
UNIQUE_CONSTRAINT_CREATE:
case storage::durability::StorageGlobalOperation::
UNIQUE_CONSTRAINT_DROP:
data.operation_label_properties.label = label;
data.operation_label_properties.properties = properties;
}
@ -227,7 +244,7 @@ class DeltaGenerator final {
uint64_t GetPosition() { return wal_file_.GetSize(); }
storage::WalInfo GetInfo() {
storage::durability::WalInfo GetInfo() {
return {.offset_metadata = 0,
.offset_deltas = 0,
.uuid = uuid_,
@ -257,7 +274,7 @@ class DeltaGenerator final {
std::list<storage::Vertex> vertices_;
storage::NameIdMapper mapper_;
storage::WalFile wal_file_;
storage::durability::WalFile wal_file_;
DataT data_;
@ -276,10 +293,12 @@ class DeltaGenerator final {
}
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define OPERATION(op, ...) \
gen.AppendOperation(storage::StorageGlobalOperation::op, __VA_ARGS__)
#define OPERATION(op, ...) \
gen.AppendOperation(storage::durability::StorageGlobalOperation::op, \
__VA_ARGS__)
void AssertWalInfoEqual(const storage::WalInfo &a, const storage::WalInfo &b) {
void AssertWalInfoEqual(const storage::durability::WalInfo &a,
const storage::durability::WalInfo &b) {
ASSERT_EQ(a.uuid, b.uuid);
ASSERT_EQ(a.seq_num, b.seq_num);
ASSERT_EQ(a.from_timestamp, b.from_timestamp);
@ -289,14 +308,15 @@ void AssertWalInfoEqual(const storage::WalInfo &a, const storage::WalInfo &b) {
void AssertWalDataEqual(const DeltaGenerator::DataT &data,
const std::filesystem::path &path) {
auto info = storage::ReadWalInfo(path);
storage::Decoder wal;
wal.Initialize(path, storage::kWalMagic);
auto info = storage::durability::ReadWalInfo(path);
storage::durability::Decoder wal;
wal.Initialize(path, storage::durability::kWalMagic);
wal.SetPosition(info.offset_deltas);
DeltaGenerator::DataT current;
for (uint64_t i = 0; i < info.num_deltas; ++i) {
auto timestamp = storage::ReadWalDeltaHeader(&wal);
current.emplace_back(timestamp, storage::ReadWalDeltaData(&wal));
auto timestamp = storage::durability::ReadWalDeltaHeader(&wal);
current.emplace_back(timestamp,
storage::durability::ReadWalDeltaData(&wal));
}
ASSERT_EQ(data.size(), current.size());
ASSERT_EQ(data, current);
@ -344,28 +364,29 @@ TEST_P(WalFileTest, EmptyFile) {
}
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define GENERATE_SIMPLE_TEST(name, ops) \
TEST_P(WalFileTest, name) { \
storage::WalInfo info; \
DeltaGenerator::DataT data; \
\
{ \
DeltaGenerator gen(storage_directory, GetParam(), 5); \
ops; \
info = gen.GetInfo(); \
data = gen.GetData(); \
} \
\
auto wal_files = GetFilesList(); \
ASSERT_EQ(wal_files.size(), 1); \
\
if (info.num_deltas == 0) { \
ASSERT_THROW(storage::ReadWalInfo(wal_files.front()), \
storage::RecoveryFailure); \
} else { \
AssertWalInfoEqual(info, storage::ReadWalInfo(wal_files.front())); \
AssertWalDataEqual(data, wal_files.front()); \
} \
#define GENERATE_SIMPLE_TEST(name, ops) \
TEST_P(WalFileTest, name) { \
storage::durability::WalInfo info; \
DeltaGenerator::DataT data; \
\
{ \
DeltaGenerator gen(storage_directory, GetParam(), 5); \
ops; \
info = gen.GetInfo(); \
data = gen.GetData(); \
} \
\
auto wal_files = GetFilesList(); \
ASSERT_EQ(wal_files.size(), 1); \
\
if (info.num_deltas == 0) { \
ASSERT_THROW(storage::durability::ReadWalInfo(wal_files.front()), \
storage::durability::RecoveryFailure); \
} else { \
AssertWalInfoEqual(info, \
storage::durability::ReadWalInfo(wal_files.front())); \
AssertWalDataEqual(data, wal_files.front()); \
} \
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
@ -525,7 +546,7 @@ GENERATE_SIMPLE_TEST(InvalidTransactionOrdering, {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(WalFileTest, InvalidMarker) {
storage::WalInfo info;
storage::durability::WalInfo info;
{
DeltaGenerator gen(storage_directory, GetParam(), 5);
@ -537,12 +558,12 @@ TEST_P(WalFileTest, InvalidMarker) {
ASSERT_EQ(wal_files.size(), 1);
const auto &wal_file = wal_files.front();
auto final_info = storage::ReadWalInfo(wal_file);
auto final_info = storage::durability::ReadWalInfo(wal_file);
AssertWalInfoEqual(info, final_info);
size_t i = 0;
for (auto marker : storage::kMarkersAll) {
if (marker == storage::Marker::SECTION_DELTA) continue;
for (auto marker : storage::durability::kMarkersAll) {
if (marker == storage::durability::Marker::SECTION_DELTA) continue;
auto current_file = storage_directory / fmt::format("temporary_{}", i);
ASSERT_TRUE(std::filesystem::copy_file(wal_file, current_file));
utils::OutputFile file;
@ -553,14 +574,15 @@ TEST_P(WalFileTest, InvalidMarker) {
file.Write(&value, sizeof(value));
file.Sync();
file.Close();
ASSERT_THROW(storage::ReadWalInfo(current_file), storage::RecoveryFailure);
ASSERT_THROW(storage::durability::ReadWalInfo(current_file),
storage::durability::RecoveryFailure);
++i;
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(WalFileTest, PartialData) {
std::vector<std::pair<uint64_t, storage::WalInfo>> infos;
std::vector<std::pair<uint64_t, storage::durability::WalInfo>> infos;
{
DeltaGenerator gen(storage_directory, GetParam(), 5);
@ -591,7 +613,8 @@ TEST_P(WalFileTest, PartialData) {
ASSERT_EQ(wal_files.size(), 1);
const auto &wal_file = wal_files.front();
AssertWalInfoEqual(infos.back().second, storage::ReadWalInfo(wal_file));
AssertWalInfoEqual(infos.back().second,
storage::durability::ReadWalInfo(wal_file));
auto current_file = storage_directory / "temporary";
utils::InputFile infile;
@ -600,11 +623,12 @@ TEST_P(WalFileTest, PartialData) {
uint64_t pos = 0;
for (size_t i = 0; i < infile.GetSize(); ++i) {
if (i < infos.front().first) {
ASSERT_THROW(storage::ReadWalInfo(current_file),
storage::RecoveryFailure);
ASSERT_THROW(storage::durability::ReadWalInfo(current_file),
storage::durability::RecoveryFailure);
} else {
if (i >= infos[pos + 1].first) ++pos;
AssertWalInfoEqual(infos[pos].second, storage::ReadWalInfo(current_file));
AssertWalInfoEqual(infos[pos].second,
storage::durability::ReadWalInfo(current_file));
}
{
utils::OutputFile outfile;
@ -618,5 +642,5 @@ TEST_P(WalFileTest, PartialData) {
}
ASSERT_EQ(pos, infos.size() - 2);
AssertWalInfoEqual(infos[infos.size() - 1].second,
storage::ReadWalInfo(current_file));
storage::durability::ReadWalInfo(current_file));
}