diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index e0cecf709..88880d32a 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -1,6 +1,8 @@ #pragma once #include <chrono> +#include <cstdint> +#include <filesystem> namespace storage { @@ -17,6 +19,19 @@ struct Config { struct Items { bool properties_on_edges{true}; } items; + + struct Durability { + enum class SnapshotType { NONE, PERIODIC }; + + std::filesystem::path storage_directory{"storage"}; + + bool recover_on_startup{false}; + + SnapshotType snapshot_type{SnapshotType::NONE}; + std::chrono::milliseconds snapshot_interval{std::chrono::minutes(2)}; + uint64_t snapshot_retention_count{3}; + bool snapshot_on_exit{false}; + } durability; }; } // namespace storage diff --git a/src/storage/v2/durability.cpp b/src/storage/v2/durability.cpp index 8744371e6..7c601e872 100644 --- a/src/storage/v2/durability.cpp +++ b/src/storage/v2/durability.cpp @@ -1,31 +1,29 @@ #include "storage/v2/durability.hpp" -#include "utils/endian.hpp" +#include <algorithm> +#include <unordered_map> +#include <unordered_set> -namespace storage::durability { +#include "storage/v2/edge_accessor.hpp" +#include "storage/v2/edge_ref.hpp" +#include "storage/v2/mvcc.hpp" +#include "storage/v2/vertex_accessor.hpp" +#include "utils/endian.hpp" +#include "utils/file.hpp" +#include "utils/timestamp.hpp" +#include "utils/uuid.hpp" + +namespace storage { + +////////////////////////// +// Encoder implementation. +////////////////////////// namespace { -std::optional<Marker> CastToMarker(uint8_t value) { - for (auto marker : kMarkersAll) { - if (static_cast<uint8_t>(marker) == value) { - return marker; - } - } - return std::nullopt; -} - void WriteSize(Encoder *encoder, uint64_t size) { size = utils::HostToLittleEndian(size); encoder->Write(reinterpret_cast<const uint8_t *>(&size), sizeof(size)); } - -std::optional<uint64_t> ReadSize(Decoder *decoder) { - uint64_t size; - if (!decoder->Read(reinterpret_cast<uint8_t *>(&size), sizeof(size))) - return std::nullopt; - size = utils::LittleEndianToHost(size); - return size; -} } // namespace void Encoder::Initialize(const std::filesystem::path &path, @@ -130,6 +128,29 @@ void Encoder::Finalize() { file_.Close(); } +////////////////////////// +// Decoder implementation. +////////////////////////// + +namespace { +std::optional<Marker> CastToMarker(uint8_t value) { + for (auto marker : kMarkersAll) { + if (static_cast<uint8_t>(marker) == value) { + return marker; + } + } + return std::nullopt; +} + +std::optional<uint64_t> ReadSize(Decoder *decoder) { + uint64_t size; + if (!decoder->Read(reinterpret_cast<uint8_t *>(&size), sizeof(size))) + return std::nullopt; + size = utils::LittleEndianToHost(size); + return size; +} +} // namespace + std::optional<uint64_t> Decoder::Initialize(const std::filesystem::path &path, const std::string &magic) { if (!file_.Open(path)) return std::nullopt; @@ -374,4 +395,922 @@ bool Decoder::SetPosition(uint64_t position) { return !!file_.SetPosition(utils::InputFile::Position::SET, position); } -} // namespace storage::durability +///////////////////////////// +// Durability implementation. +///////////////////////////// + +namespace { +// Magic values written to the start of a snapshot/WAL file to identify it. +const std::string kSnapshotMagic{"MGsn"}; +const std::string kWalMagic{"MGwl"}; + +// The current version of snapshot and WAL encoding / decoding. +// IMPORTANT: Please bump this version for every snapshot and/or WAL format +// change!!! +const uint64_t kVersion{12}; + +// Snapshot format: +// +// 1) Magic string (non-encoded) +// +// 2) Snapshot version (non-encoded, little-endian) +// +// 3) Section offsets: +// * offset to the first edge in the snapshot (`0` if properties on edges +// are disabled) +// * offset to the first vertex in the snapshot +// * offset to the indices section +// * offset to the constraints section +// * offset to the mapper section +// * offset to the metadata section +// +// 4) Encoded edges (if properties on edges are enabled); each edge is written +// in the following format: +// * gid +// * properties +// +// 5) Encoded vertices; each vertex is written in the following format: +// * gid +// * labels +// * properties +// * in edges +// * edge gid +// * from vertex gid +// * edge type +// * out edges +// * edge gid +// * to vertex gid +// * edge type +// +// 6) Indices +// * label indices +// * label +// * label+property indices +// * label +// * property +// +// 7) Constraints +// * existence constraints +// * label +// * property +// +// 8) Name to ID mapper data +// * id to name mappings +// * id +// * name +// +// 9) Metadata +// * storage UUID +// * snapshot transaction start timestamp (required when recovering +// from snapshot combined with WAL to determine what deltas need to be +// applied) +// * number of edges +// * number of vertices + +// This is the prefix used for Snapshot and WAL filenames. It is a timestamp +// format that equals to: YYYYmmddHHMMSSffffff +const std::string kTimestampFormat = + "{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}{:06d}"; + +// Generates the name for a snapshot in a well-defined sortable format with the +// start timestamp appended to the file name. +std::string MakeSnapshotName(uint64_t start_timestamp) { + std::string date_str = utils::Timestamp::Now().ToString(kTimestampFormat); + return date_str + "_timestamp_" + std::to_string(start_timestamp); +} +} // namespace + +// Function used to read information about the snapshot file. +SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { + // Check magic and version. + Decoder snapshot; + auto version = snapshot.Initialize(path, kSnapshotMagic); + if (!version) + throw RecoveryFailure("Couldn't read snapshot magic and/or version!"); + if (*version != kVersion) throw RecoveryFailure("Invalid snapshot version!"); + + // Prepare return value. + SnapshotInfo info; + + // Read offsets. + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_OFFSETS) + throw RecoveryFailure("Invalid snapshot data!"); + + auto snapshot_size = snapshot.GetSize(); + if (!snapshot_size) + throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto read_offset = [&snapshot, snapshot_size] { + auto maybe_offset = snapshot.ReadUint(); + if (!maybe_offset) throw RecoveryFailure("Invalid snapshot format!"); + auto offset = *maybe_offset; + if (offset > *snapshot_size) + throw RecoveryFailure("Invalid snapshot format!"); + return offset; + }; + + info.offset_edges = read_offset(); + info.offset_vertices = read_offset(); + info.offset_indices = read_offset(); + info.offset_constraints = read_offset(); + info.offset_mapper = read_offset(); + info.offset_metadata = read_offset(); + } + + // Read metadata. + { + if (!snapshot.SetPosition(info.offset_metadata)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_METADATA) + throw RecoveryFailure("Invalid snapshot data!"); + + auto maybe_uuid = snapshot.ReadString(); + if (!maybe_uuid) throw RecoveryFailure("Invalid snapshot data!"); + info.uuid = std::move(*maybe_uuid); + + auto maybe_timestamp = snapshot.ReadUint(); + if (!maybe_timestamp) throw RecoveryFailure("Invalid snapshot data!"); + info.start_timestamp = *maybe_timestamp; + + auto maybe_edges = snapshot.ReadUint(); + if (!maybe_edges) throw RecoveryFailure("Invalid snapshot data!"); + info.edges_count = *maybe_edges; + + auto maybe_vertices = snapshot.ReadUint(); + if (!maybe_vertices) throw RecoveryFailure("Invalid snapshot data!"); + info.vertices_count = *maybe_vertices; + } + + return info; +} + +Durability::Durability(Config::Durability config, + utils::SkipList<Vertex> *vertices, + utils::SkipList<Edge> *edges, + NameIdMapper *name_id_mapper, Indices *indices, + Constraints *constraints, Config::Items items) + : config_(config), + vertices_(vertices), + edges_(edges), + name_id_mapper_(name_id_mapper), + indices_(indices), + constraints_(constraints), + items_(items), + snapshot_directory_(config_.storage_directory / kSnapshotDirectory), + uuid_(utils::GenerateUUID()) {} + +std::optional<Durability::RecoveryInfo> Durability::Initialize( + std::function<void(std::function<void(Transaction *)>)> + execute_with_transaction) { + execute_with_transaction_ = execute_with_transaction; + std::optional<Durability::RecoveryInfo> ret; + if (config_.recover_on_startup) { + ret = RecoverData(); + } + if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC || + config_.snapshot_on_exit) { + // Create the directory initially to crash the database in case of + // permission errors. This is done early to crash the database on startup + // instead of crashing the database for the first time during runtime (which + // could be an unpleasant surprise). + utils::EnsureDirOrDie(snapshot_directory_); + } + if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC) { + snapshot_runner_.Run("Snapshot", config_.snapshot_interval, [this] { + execute_with_transaction_( + [this](Transaction *transaction) { CreateSnapshot(transaction); }); + }); + } + return ret; +} + +void Durability::Finalize() { + if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC) { + snapshot_runner_.Stop(); + } + if (config_.snapshot_on_exit) { + execute_with_transaction_( + [this](Transaction *transaction) { CreateSnapshot(transaction); }); + } +} + +void Durability::CreateSnapshot(Transaction *transaction) { + // Ensure that the storage directory exists. + utils::EnsureDirOrDie(snapshot_directory_); + + // Create snapshot file. + auto path = + snapshot_directory_ / MakeSnapshotName(transaction->start_timestamp); + LOG(INFO) << "Starting snapshot creation to " << path; + Encoder snapshot; + snapshot.Initialize(path, kSnapshotMagic, kVersion); + + // Write placeholder offsets. + uint64_t offset_offsets = 0; + uint64_t offset_edges = 0; + uint64_t offset_vertices = 0; + uint64_t offset_indices = 0; + uint64_t offset_constraints = 0; + uint64_t offset_mapper = 0; + uint64_t offset_metadata = 0; + { + snapshot.WriteMarker(Marker::SECTION_OFFSETS); + offset_offsets = snapshot.GetPosition(); + snapshot.WriteUint(offset_edges); + snapshot.WriteUint(offset_vertices); + snapshot.WriteUint(offset_indices); + snapshot.WriteUint(offset_constraints); + snapshot.WriteUint(offset_mapper); + snapshot.WriteUint(offset_metadata); + } + + // Object counters. + uint64_t edges_count = 0; + uint64_t vertices_count = 0; + + // Mapper data. + std::unordered_set<uint64_t> used_ids; + auto write_mapping = [&snapshot, &used_ids](auto mapping) { + used_ids.insert(mapping.AsUint()); + snapshot.WriteUint(mapping.AsUint()); + }; + + // Store all edges. + if (items_.properties_on_edges) { + offset_edges = snapshot.GetPosition(); + auto acc = edges_->access(); + for (auto &edge : acc) { + // The edge visibility check must be done here manually because we don't + // allow direct access to the edges through the public API. + bool is_visible = true; + Delta *delta = nullptr; + { + std::lock_guard<utils::SpinLock> guard(edge.lock); + is_visible = !edge.deleted; + delta = edge.delta; + } + ApplyDeltasForRead(transaction, delta, View::OLD, + [&is_visible](const Delta &delta) { + switch (delta.action) { + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: + break; + case Delta::Action::RECREATE_OBJECT: { + is_visible = true; + break; + } + case Delta::Action::DELETE_OBJECT: { + is_visible = false; + break; + } + } + }); + if (!is_visible) continue; + EdgeRef edge_ref(&edge); + // Here we create an edge accessor that we will use to get the + // properties of the edge. The accessor is created with an invalid + // type and invalid from/to pointers because we don't know them here, + // but that isn't an issue because we won't use that part of the API + // here. + auto ea = EdgeAccessor{edge_ref, EdgeTypeId::FromUint(0UL), + nullptr, nullptr, + transaction, indices_, + items_}; + + // Get edge data. + auto maybe_props = ea.Properties(View::OLD); + CHECK(maybe_props.HasValue()) << "Invalid database state!"; + + // Store the edge. + { + snapshot.WriteMarker(Marker::SECTION_EDGE); + snapshot.WriteUint(edge.gid.AsUint()); + const auto &props = maybe_props.GetValue(); + snapshot.WriteUint(props.size()); + for (const auto &item : props) { + write_mapping(item.first); + snapshot.WritePropertyValue(item.second); + } + } + + ++edges_count; + } + } + + // Store all vertices. + { + offset_vertices = snapshot.GetPosition(); + auto acc = vertices_->access(); + for (auto &vertex : acc) { + // The visibility check is implemented for vertices so we use it here. + auto va = VertexAccessor::Create(&vertex, transaction, indices_, items_, + View::OLD); + if (!va) continue; + + // Get vertex data. + // TODO (mferencevic): All of these functions could be written into a + // single function so that we traverse the undo deltas only once. + auto maybe_labels = va->Labels(View::OLD); + CHECK(maybe_labels.HasValue()) << "Invalid database state!"; + auto maybe_props = va->Properties(View::OLD); + CHECK(maybe_props.HasValue()) << "Invalid database state!"; + auto maybe_in_edges = va->InEdges({}, View::OLD); + CHECK(maybe_in_edges.HasValue()) << "Invalid database state!"; + auto maybe_out_edges = va->OutEdges({}, View::OLD); + CHECK(maybe_out_edges.HasValue()) << "Invalid database state!"; + + // Store the vertex. + { + snapshot.WriteMarker(Marker::SECTION_VERTEX); + snapshot.WriteUint(vertex.gid.AsUint()); + const auto &labels = maybe_labels.GetValue(); + snapshot.WriteUint(labels.size()); + for (const auto &item : labels) { + write_mapping(item); + } + const auto &props = maybe_props.GetValue(); + snapshot.WriteUint(props.size()); + for (const auto &item : props) { + write_mapping(item.first); + snapshot.WritePropertyValue(item.second); + } + const auto &in_edges = maybe_in_edges.GetValue(); + snapshot.WriteUint(in_edges.size()); + for (const auto &item : in_edges) { + snapshot.WriteUint(item.Gid().AsUint()); + snapshot.WriteUint(item.FromVertex().Gid().AsUint()); + write_mapping(item.EdgeType()); + } + const auto &out_edges = maybe_out_edges.GetValue(); + snapshot.WriteUint(out_edges.size()); + for (const auto &item : out_edges) { + snapshot.WriteUint(item.Gid().AsUint()); + snapshot.WriteUint(item.ToVertex().Gid().AsUint()); + write_mapping(item.EdgeType()); + } + } + + ++vertices_count; + } + } + + // Write indices. + { + offset_indices = snapshot.GetPosition(); + snapshot.WriteMarker(Marker::SECTION_INDICES); + + // Write label indices. + { + auto label = indices_->label_index.ListIndices(); + snapshot.WriteUint(label.size()); + for (const auto &item : label) { + write_mapping(item); + } + } + + // Write label+property indices. + { + auto label_property = indices_->label_property_index.ListIndices(); + snapshot.WriteUint(label_property.size()); + for (const auto &item : label_property) { + write_mapping(item.first); + write_mapping(item.second); + } + } + } + + // Write constraints. + { + offset_constraints = snapshot.GetPosition(); + snapshot.WriteMarker(Marker::SECTION_CONSTRAINTS); + + // Write existence constraints. + { + auto existence = ListExistenceConstraints(*constraints_); + snapshot.WriteUint(existence.size()); + for (const auto &item : existence) { + write_mapping(item.first); + write_mapping(item.second); + } + } + } + + // Write mapper data. + { + offset_mapper = snapshot.GetPosition(); + snapshot.WriteMarker(Marker::SECTION_MAPPER); + snapshot.WriteUint(used_ids.size()); + for (auto item : used_ids) { + snapshot.WriteUint(item); + snapshot.WriteString(name_id_mapper_->IdToName(item)); + } + } + + // Write metadata. + { + offset_metadata = snapshot.GetPosition(); + snapshot.WriteMarker(Marker::SECTION_METADATA); + snapshot.WriteString(uuid_); + snapshot.WriteUint(transaction->start_timestamp); + snapshot.WriteUint(edges_count); + snapshot.WriteUint(vertices_count); + } + + // Write true offsets. + { + snapshot.SetPosition(offset_offsets); + snapshot.WriteUint(offset_edges); + snapshot.WriteUint(offset_vertices); + snapshot.WriteUint(offset_indices); + snapshot.WriteUint(offset_constraints); + snapshot.WriteUint(offset_mapper); + snapshot.WriteUint(offset_metadata); + } + + // Finalize snapshot file. + snapshot.Finalize(); + LOG(INFO) << "Snapshot creation successful!"; + + // Ensure exactly `snapshot_retention_count` snapshots exist. + std::vector<std::filesystem::path> old_snapshot_files; + std::error_code error_code; + for (auto &item : + std::filesystem::directory_iterator(snapshot_directory_, error_code)) { + if (!item.is_regular_file()) continue; + if (item.path() == path) continue; + try { + auto info = ReadSnapshotInfo(item.path()); + if (info.uuid == uuid_) { + old_snapshot_files.push_back(item.path()); + } + } catch (const RecoveryFailure &e) { + LOG(WARNING) << "Found a corrupt snapshot file " << item.path() + << " because of: " << e.what(); + continue; + } + } + if (error_code) { + LOG(ERROR) << "Couldn't ensure that exactly " + << config_.snapshot_retention_count + << " snapshots exist because an error occurred: " + << error_code.message() << "!"; + } + if (old_snapshot_files.size() >= config_.snapshot_retention_count) { + std::sort(old_snapshot_files.begin(), old_snapshot_files.end()); + for (size_t i = 0; + i <= old_snapshot_files.size() - config_.snapshot_retention_count; + ++i) { + const auto &path = old_snapshot_files[i]; + if (!utils::DeleteFile(path)) { + LOG(WARNING) << "Couldn't delete snapshot file " << path << "!"; + } + } + } +} + +std::optional<Durability::RecoveryInfo> Durability::RecoverData() { + if (!utils::DirExists(snapshot_directory_)) return std::nullopt; + + // Array of all discovered snapshots, ordered by name. + std::vector<std::filesystem::path> snapshot_files; + std::error_code error_code; + for (auto &item : + std::filesystem::directory_iterator(snapshot_directory_, error_code)) { + if (!item.is_regular_file()) continue; + try { + ReadSnapshotInfo(item.path()); + snapshot_files.push_back(item.path()); + } catch (const RecoveryFailure &) { + continue; + } + } + CHECK(!error_code) << "Couldn't recover data because an error occurred: " + << error_code.message() << "!"; + std::sort(snapshot_files.begin(), snapshot_files.end()); + for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) { + const auto &path = *it; + LOG(INFO) << "Starting snapshot recovery from " << path; + try { + auto info = LoadSnapshot(path); + LOG(INFO) << "Snapshot recovery successful!"; + return info; + } catch (const RecoveryFailure &e) { + LOG(WARNING) << "Couldn't recover snapshot from " << path + << " because of: " << e.what(); + continue; + } + } + return std::nullopt; +} + +Durability::RecoveryInfo Durability::LoadSnapshot( + const std::filesystem::path &path) { + Durability::RecoveryInfo ret; + + Decoder snapshot; + auto version = snapshot.Initialize(path, kSnapshotMagic); + if (!version) + throw RecoveryFailure("Couldn't read snapshot magic and/or version!"); + if (*version != kVersion) throw RecoveryFailure("Invalid snapshot version!"); + + // Cleanup of loaded data in case of failure. + bool success = false; + utils::OnScopeExit cleanup([this, &success] { + if (!success) { + edges_->clear(); + vertices_->clear(); + indices_->label_index.Clear(); + indices_->label_property_index.Clear(); + constraints_->existence_constraints.clear(); + } + }); + + // Read snapshot info. + auto info = ReadSnapshotInfo(path); + + // Check for edges. + bool snapshot_has_edges = info.offset_edges != 0; + + // Set storage UUID. + uuid_ = info.uuid; + + // Recover mapper. + std::unordered_map<uint64_t, uint64_t> snapshot_id_map; + { + if (!snapshot.SetPosition(info.offset_mapper)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_MAPPER) + throw RecoveryFailure("Invalid snapshot data!"); + + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + + for (uint64_t i = 0; i < *size; ++i) { + auto id = snapshot.ReadUint(); + if (!id) throw RecoveryFailure("Invalid snapshot data!"); + auto name = snapshot.ReadString(); + if (!name) throw RecoveryFailure("Invalid snapshot data!"); + auto my_id = name_id_mapper_->NameToId(*name); + snapshot_id_map.emplace(*id, my_id); + } + } + auto get_label_from_id = [&snapshot_id_map](uint64_t snapshot_id) { + auto it = snapshot_id_map.find(snapshot_id); + if (it == snapshot_id_map.end()) + throw RecoveryFailure("Invalid snapshot data!"); + return LabelId::FromUint(it->second); + }; + auto get_property_from_id = [&snapshot_id_map](uint64_t snapshot_id) { + auto it = snapshot_id_map.find(snapshot_id); + if (it == snapshot_id_map.end()) + throw RecoveryFailure("Invalid snapshot data!"); + return PropertyId::FromUint(it->second); + }; + auto get_edge_type_from_id = [&snapshot_id_map](uint64_t snapshot_id) { + auto it = snapshot_id_map.find(snapshot_id); + if (it == snapshot_id_map.end()) + throw RecoveryFailure("Invalid snapshot data!"); + return EdgeTypeId::FromUint(it->second); + }; + + { + // Recover edges. + auto edge_acc = edges_->access(); + uint64_t last_edge_gid = 0; + if (snapshot_has_edges) { + if (!snapshot.SetPosition(info.offset_edges)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + for (uint64_t i = 0; i < info.edges_count; ++i) { + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_EDGE) + throw RecoveryFailure("Invalid snapshot data!"); + } + + if (items_.properties_on_edges) { + // Insert edge. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_edge_gid) + throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = *gid; + auto [it, inserted] = + edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr}); + if (!inserted) + throw RecoveryFailure("The edge must be inserted here!"); + + // Recover properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &props = it->properties; + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.ReadPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + props.emplace(get_property_from_id(*key), std::move(*value)); + } + } + } else { + // Read edge GID. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_edge_gid) + throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = *gid; + + // Read properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + if (*props_size != 0) + throw RecoveryFailure( + "The snapshot has properties on edges, but the storage is " + "configured without properties on edges!"); + } + } + } + } + + // Recover vertices (labels and properties). + if (!snapshot.SetPosition(info.offset_vertices)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + auto vertex_acc = vertices_->access(); + uint64_t last_vertex_gid = 0; + for (uint64_t i = 0; i < info.vertices_count; ++i) { + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_VERTEX) + throw RecoveryFailure("Invalid snapshot data!"); + } + + // Insert vertex. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_vertex_gid) { + throw RecoveryFailure("Invalid snapshot data!"); + } + last_vertex_gid = *gid; + auto [it, inserted] = + vertex_acc.insert(Vertex{Gid::FromUint(*gid), nullptr}); + if (!inserted) throw RecoveryFailure("The vertex must be inserted here!"); + + // Recover labels. + { + auto labels_size = snapshot.ReadUint(); + if (!labels_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &labels = it->labels; + labels.reserve(*labels_size); + for (uint64_t j = 0; j < *labels_size; ++j) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + labels.emplace_back(get_label_from_id(*label)); + } + } + + // Recover properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &props = it->properties; + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.ReadPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + props.emplace(get_property_from_id(*key), std::move(*value)); + } + } + + // Skip in edges. + { + auto in_size = snapshot.ReadUint(); + if (!in_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *in_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto from_gid = snapshot.ReadUint(); + if (!from_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip out edges. + auto out_size = snapshot.ReadUint(); + for (uint64_t j = 0; j < *out_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto to_gid = snapshot.ReadUint(); + if (!to_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Recover vertices (in/out edges). + if (!snapshot.SetPosition(info.offset_vertices)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + for (auto &vertex : vertex_acc) { + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_VERTEX) + throw RecoveryFailure("Invalid snapshot data!"); + } + + // Check vertex. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (gid != vertex.gid.AsUint()) + throw RecoveryFailure("Invalid snapshot data!"); + + // Skip labels. + { + auto labels_size = snapshot.ReadUint(); + if (!labels_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *labels_size; ++j) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.SkipPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Recover in edges. + { + auto in_size = snapshot.ReadUint(); + if (!in_size) throw RecoveryFailure("Invalid snapshot data!"); + vertex.in_edges.reserve(*in_size); + for (uint64_t j = 0; j < *in_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = std::max(last_edge_gid, *edge_gid); + + auto from_gid = snapshot.ReadUint(); + if (!from_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + + auto from_vertex = vertex_acc.find(Gid::FromUint(*from_gid)); + if (from_vertex == vertex_acc.end()) + throw RecoveryFailure("Invalid from vertex!"); + + EdgeRef edge_ref(Gid::FromUint(*edge_gid)); + if (items_.properties_on_edges) { + if (snapshot_has_edges) { + auto edge = edge_acc.find(Gid::FromUint(*edge_gid)); + if (edge == edge_acc.end()) + throw RecoveryFailure("Invalid edge!"); + edge_ref = EdgeRef(&*edge); + } else { + auto [edge, inserted] = + edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr}); + edge_ref = EdgeRef(&*edge); + } + } + vertex.in_edges.emplace_back(get_edge_type_from_id(*edge_type), + &*from_vertex, edge_ref); + } + } + + // Recover out edges. + { + auto out_size = snapshot.ReadUint(); + if (!out_size) throw RecoveryFailure("Invalid snapshot data!"); + vertex.out_edges.reserve(*out_size); + for (uint64_t j = 0; j < *out_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = std::max(last_edge_gid, *edge_gid); + + auto to_gid = snapshot.ReadUint(); + if (!to_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + + auto to_vertex = vertex_acc.find(Gid::FromUint(*to_gid)); + if (to_vertex == vertex_acc.end()) + throw RecoveryFailure("Invalid to vertex!"); + + EdgeRef edge_ref(Gid::FromUint(*edge_gid)); + if (items_.properties_on_edges) { + if (snapshot_has_edges) { + auto edge = edge_acc.find(Gid::FromUint(*edge_gid)); + if (edge == edge_acc.end()) + throw RecoveryFailure("Invalid edge!"); + edge_ref = EdgeRef(&*edge); + } else { + auto [edge, inserted] = + edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr}); + edge_ref = EdgeRef(&*edge); + } + } + vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), + &*to_vertex, edge_ref); + } + } + } + + // Set initial values for edge/vertex ID generators. + ret.next_edge_id = last_edge_gid + 1; + ret.next_vertex_id = last_vertex_gid + 1; + } + + // Recover indices. + { + if (!snapshot.SetPosition(info.offset_indices)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_INDICES) + throw RecoveryFailure("Invalid snapshot data!"); + + // Recover label indices. + { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + if (!indices_->label_index.CreateIndex(get_label_from_id(*label), + vertices_->access())) + throw RecoveryFailure("Couldn't recover label index!"); + } + } + + // Recover label+property indices. + { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + auto property = snapshot.ReadUint(); + if (!property) throw RecoveryFailure("Invalid snapshot data!"); + if (!indices_->label_property_index.CreateIndex( + get_label_from_id(*label), get_property_from_id(*property), + vertices_->access())) + throw RecoveryFailure("Couldn't recover label+property index!"); + } + } + } + + // Recover constraints. + { + if (!snapshot.SetPosition(info.offset_constraints)) + throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_CONSTRAINTS) + throw RecoveryFailure("Invalid snapshot data!"); + + // Recover existence constraints. + { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + auto property = snapshot.ReadUint(); + if (!property) throw RecoveryFailure("Invalid snapshot data!"); + auto ret = CreateExistenceConstraint( + constraints_, get_label_from_id(*label), + get_property_from_id(*property), vertices_->access()); + if (!ret.HasValue() || !*ret) + throw RecoveryFailure("Couldn't recover existence constraint!"); + } + } + } + + // Recover timestamp. + ret.next_timestamp = info.start_timestamp + 1; + + // Set success flag (to disable cleanup). + success = true; + + return ret; +} + +} // namespace storage diff --git a/src/storage/v2/durability.hpp b/src/storage/v2/durability.hpp index 1d27eb4d6..a51ff44f0 100644 --- a/src/storage/v2/durability.hpp +++ b/src/storage/v2/durability.hpp @@ -2,15 +2,29 @@ #include <cstdint> #include <filesystem> +#include <functional> #include <optional> #include <string> #include <string_view> #include <type_traits> +#include "storage/v2/config.hpp" +#include "storage/v2/constraints.hpp" +#include "storage/v2/edge.hpp" +#include "storage/v2/indices.hpp" +#include "storage/v2/name_id_mapper.hpp" #include "storage/v2/property_value.hpp" +#include "storage/v2/transaction.hpp" +#include "storage/v2/vertex.hpp" +#include "utils/exceptions.hpp" #include "utils/file.hpp" +#include "utils/scheduler.hpp" +#include "utils/skip_list.hpp" -namespace storage::durability { +namespace storage { + +static const std::string kSnapshotDirectory{"snapshots"}; +static const std::string kWalDirectory{"wal"}; static_assert(std::is_same_v<uint8_t, unsigned char>); @@ -110,4 +124,74 @@ class Decoder final { utils::InputFile file_; }; -} // namespace storage::durability +/// Exception used to handle errors during recovery. +class RecoveryFailure : public utils::BasicException { + using utils::BasicException::BasicException; +}; + +/// Structure used to hold information about a snapshot. +struct SnapshotInfo { + uint64_t offset_edges; + uint64_t offset_vertices; + uint64_t offset_indices; + uint64_t offset_constraints; + uint64_t offset_mapper; + uint64_t offset_metadata; + + std::string uuid; + uint64_t start_timestamp; + uint64_t edges_count; + uint64_t vertices_count; +}; + +/// Function used to read information about the snapshot file. +/// @throw RecoveryFailure +SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path); + +/// Durability class that is used to provide full durability functionality to +/// the storage. +class Durability final { + public: + struct RecoveryInfo { + uint64_t next_vertex_id; + uint64_t next_edge_id; + uint64_t next_timestamp; + }; + + Durability(Config::Durability config, utils::SkipList<Vertex> *vertices, + utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper, + Indices *indices, Constraints *constraints, Config::Items items); + + std::optional<RecoveryInfo> Initialize( + std::function<void(std::function<void(Transaction *)>)> + execute_with_transaction); + + void Finalize(); + + private: + void CreateSnapshot(Transaction *transaction); + + std::optional<RecoveryInfo> RecoverData(); + + RecoveryInfo LoadSnapshot(const std::filesystem::path &path); + + Config::Durability config_; + + utils::SkipList<Vertex> *vertices_; + utils::SkipList<Edge> *edges_; + NameIdMapper *name_id_mapper_; + Indices *indices_; + Constraints *constraints_; + Config::Items items_; + + std::function<void(std::function<void(Transaction *)>)> + execute_with_transaction_; + + std::filesystem::path snapshot_directory_; + utils::Scheduler snapshot_runner_; + + // UUID used to distinguish snapshots and to link snapshots to WALs + std::string uuid_; +}; + +} // namespace storage diff --git a/src/storage/v2/edge.hpp b/src/storage/v2/edge.hpp index ca8d7df4b..8e0fd7ae6 100644 --- a/src/storage/v2/edge.hpp +++ b/src/storage/v2/edge.hpp @@ -14,7 +14,7 @@ struct Vertex; struct Edge { Edge(Gid gid, Delta *delta) : gid(gid), deleted(false), delta(delta) { - CHECK(delta->action == Delta::Action::DELETE_OBJECT) + CHECK(delta == nullptr || delta->action == Delta::Action::DELETE_OBJECT) << "Edge must be created with an initial DELETE_OBJECT delta!"; } diff --git a/src/storage/v2/indices.hpp b/src/storage/v2/indices.hpp index 52db5c884..5af28f5a4 100644 --- a/src/storage/v2/indices.hpp +++ b/src/storage/v2/indices.hpp @@ -118,6 +118,8 @@ class LabelIndex { return it->second.size(); } + void Clear() { index_.clear(); } + private: std::map<LabelId, utils::SkipList<Entry>> index_; Indices *indices_; @@ -248,6 +250,8 @@ class LabelPropertyIndex { const std::optional<utils::Bound<PropertyValue>> &lower, const std::optional<utils::Bound<PropertyValue>> &upper) const; + void Clear() { index_.clear(); } + private: std::map<std::pair<LabelId, PropertyId>, utils::SkipList<Entry>> index_; Indices *indices_; diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 66764bc75..17a9a5346 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -1,5 +1,6 @@ #include "storage/v2/storage.hpp" +#include <algorithm> #include <memory> #include <gflags/gflags.h> @@ -291,7 +292,29 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const { } } -Storage::Storage(Config config) : indices_(config.items), config_(config) { +Storage::Storage(Config config) + : indices_(config.items), + config_(config), + durability_(config.durability, &vertices_, &edges_, &name_id_mapper_, + &indices_, &constraints_, config.items) { + auto info = durability_.Initialize([this](auto callback) { + // Take master RW lock (for reading). + std::shared_lock<utils::RWLock> storage_guard(main_lock_); + + // Create the transaction used to create the snapshot. + auto transaction = CreateTransaction(); + + // Create snapshot. + callback(&transaction); + + // Finalize snapshot transaction. + commit_log_.MarkFinished(transaction.start_timestamp); + }); + if (info) { + vertex_id_ = info->next_vertex_id; + edge_id_ = info->next_edge_id; + timestamp_ = std::max(timestamp_, info->next_timestamp); + } if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); @@ -302,6 +325,7 @@ Storage::~Storage() { if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Stop(); } + durability_.Finalize(); } Storage::Accessor::Accessor(Storage *storage) diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 80fee4478..f18cdea4b 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -6,6 +6,7 @@ #include "storage/v2/commit_log.hpp" #include "storage/v2/config.hpp" #include "storage/v2/constraints.hpp" +#include "storage/v2/durability.hpp" #include "storage/v2/edge.hpp" #include "storage/v2/edge_accessor.hpp" #include "storage/v2/indices.hpp" @@ -418,6 +419,8 @@ class Storage final { // Edges that are logically deleted and wait to be removed from the main // storage. utils::Synchronized<std::list<Gid>, utils::SpinLock> deleted_edges_; + + Durability durability_; }; } // namespace storage diff --git a/src/storage/v2/vertex.hpp b/src/storage/v2/vertex.hpp index ddb1e331e..496f4b188 100644 --- a/src/storage/v2/vertex.hpp +++ b/src/storage/v2/vertex.hpp @@ -15,7 +15,7 @@ namespace storage { struct Vertex { Vertex(Gid gid, Delta *delta) : gid(gid), deleted(false), delta(delta) { - CHECK(delta->action == Delta::Action::DELETE_OBJECT) + CHECK(delta == nullptr || delta->action == Delta::Action::DELETE_OBJECT) << "Vertex must be created with an initial DELETE_OBJECT delta!"; } diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index babf65947..5b7e71321 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -344,6 +344,9 @@ target_link_libraries(${test_prefix}storage_v2_constraints mg-storage-v2) add_unit_test(storage_v2_decoder_encoder.cpp) target_link_libraries(${test_prefix}storage_v2_decoder_encoder mg-storage-v2) +add_unit_test(storage_v2_durability.cpp) +target_link_libraries(${test_prefix}storage_v2_durability mg-storage-v2) + add_unit_test(storage_v2_edge.cpp) target_link_libraries(${test_prefix}storage_v2_edge mg-storage-v2) diff --git a/tests/unit/storage_v2_decoder_encoder.cpp b/tests/unit/storage_v2_decoder_encoder.cpp index b73fb5c59..2643c63f7 100644 --- a/tests/unit/storage_v2_decoder_encoder.cpp +++ b/tests/unit/storage_v2_decoder_encoder.cpp @@ -36,9 +36,9 @@ class DecoderEncoderTest : public ::testing::Test { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(DecoderEncoderTest, ReadMarker) { { - storage::durability::Encoder encoder; + storage::Encoder encoder; encoder.Initialize(storage_file, kTestMagic, kTestVersion); - for (const auto &item : storage::durability::kMarkersAll) { + for (const auto &item : storage::kMarkersAll) { encoder.WriteMarker(item); } { @@ -48,11 +48,11 @@ TEST_F(DecoderEncoderTest, ReadMarker) { encoder.Finalize(); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); - for (const auto &item : storage::durability::kMarkersAll) { + for (const auto &item : storage::kMarkersAll) { auto decoded = decoder.ReadMarker(); ASSERT_TRUE(decoded); ASSERT_EQ(*decoded, item); @@ -70,7 +70,7 @@ TEST_F(DecoderEncoderTest, ReadMarker) { TEST_F(DecoderEncoderTest, Read##name) { \ std::vector<type> dataset{__VA_ARGS__}; \ { \ - storage::durability::Encoder encoder; \ + storage::Encoder encoder; \ encoder.Initialize(storage_file, kTestMagic, kTestVersion); \ for (const auto &item : dataset) { \ encoder.Write##name(item); \ @@ -82,7 +82,7 @@ TEST_F(DecoderEncoderTest, ReadMarker) { encoder.Finalize(); \ } \ { \ - storage::durability::Decoder decoder; \ + storage::Decoder decoder; \ auto version = decoder.Initialize(storage_file, kTestMagic); \ ASSERT_TRUE(version); \ ASSERT_EQ(*version, kTestVersion); \ @@ -131,7 +131,7 @@ GENERATE_READ_TEST( TEST_F(DecoderEncoderTest, Skip##name) { \ std::vector<type> dataset{__VA_ARGS__}; \ { \ - storage::durability::Encoder encoder; \ + storage::Encoder encoder; \ encoder.Initialize(storage_file, kTestMagic, kTestVersion); \ for (const auto &item : dataset) { \ encoder.Write##name(item); \ @@ -143,7 +143,7 @@ GENERATE_READ_TEST( encoder.Finalize(); \ } \ { \ - storage::durability::Decoder decoder; \ + storage::Decoder decoder; \ auto version = decoder.Initialize(storage_file, kTestMagic); \ ASSERT_TRUE(version); \ ASSERT_EQ(*version, kTestVersion); \ @@ -177,7 +177,7 @@ GENERATE_SKIP_TEST( #define GENERATE_PARTIAL_READ_TEST(name, value) \ TEST_F(DecoderEncoderTest, PartialRead##name) { \ { \ - storage::durability::Encoder encoder; \ + storage::Encoder encoder; \ encoder.Initialize(storage_file, kTestMagic, kTestVersion); \ encoder.Write##name(value); \ encoder.Finalize(); \ @@ -195,7 +195,7 @@ GENERATE_SKIP_TEST( ofile.Write(&byte, sizeof(byte)); \ ofile.Sync(); \ } \ - storage::durability::Decoder decoder; \ + storage::Decoder decoder; \ auto version = decoder.Initialize(alternate_file, kTestMagic); \ if (i < kTestMagic.size() + sizeof(kTestVersion)) { \ ASSERT_FALSE(version); \ @@ -215,7 +215,7 @@ GENERATE_SKIP_TEST( } // NOLINTNEXTLINE(hicpp-special-member-functions) -GENERATE_PARTIAL_READ_TEST(Marker, storage::durability::Marker::SECTION_VERTEX); +GENERATE_PARTIAL_READ_TEST(Marker, storage::Marker::SECTION_VERTEX); // NOLINTNEXTLINE(hicpp-special-member-functions) GENERATE_PARTIAL_READ_TEST(Bool, false); @@ -243,7 +243,7 @@ GENERATE_PARTIAL_READ_TEST( #define GENERATE_PARTIAL_SKIP_TEST(name, value) \ TEST_F(DecoderEncoderTest, PartialSkip##name) { \ { \ - storage::durability::Encoder encoder; \ + storage::Encoder encoder; \ encoder.Initialize(storage_file, kTestMagic, kTestVersion); \ encoder.Write##name(value); \ encoder.Finalize(); \ @@ -261,7 +261,7 @@ GENERATE_PARTIAL_READ_TEST( ofile.Write(&byte, sizeof(byte)); \ ofile.Sync(); \ } \ - storage::durability::Decoder decoder; \ + storage::Decoder decoder; \ auto version = decoder.Initialize(alternate_file, kTestMagic); \ if (i < kTestMagic.size() + sizeof(kTestVersion)) { \ ASSERT_FALSE(version); \ @@ -294,7 +294,7 @@ GENERATE_PARTIAL_SKIP_TEST( // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) { { - storage::durability::Encoder encoder; + storage::Encoder encoder; encoder.Initialize(storage_file, kTestMagic, kTestVersion); encoder.WritePropertyValue(storage::PropertyValue(123L)); encoder.Finalize(); @@ -302,51 +302,50 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) { { utils::OutputFile file; file.Open(storage_file, utils::OutputFile::Mode::OVERWRITE_EXISTING); - for (auto marker : storage::durability::kMarkersAll) { + for (auto marker : storage::kMarkersAll) { bool valid_marker; switch (marker) { - case storage::durability::Marker::TYPE_NULL: - case storage::durability::Marker::TYPE_BOOL: - case storage::durability::Marker::TYPE_INT: - case storage::durability::Marker::TYPE_DOUBLE: - case storage::durability::Marker::TYPE_STRING: - case storage::durability::Marker::TYPE_LIST: - case storage::durability::Marker::TYPE_MAP: - case storage::durability::Marker::TYPE_PROPERTY_VALUE: + case storage::Marker::TYPE_NULL: + case storage::Marker::TYPE_BOOL: + case storage::Marker::TYPE_INT: + case storage::Marker::TYPE_DOUBLE: + case storage::Marker::TYPE_STRING: + case storage::Marker::TYPE_LIST: + case storage::Marker::TYPE_MAP: + case storage::Marker::TYPE_PROPERTY_VALUE: valid_marker = true; break; - case storage::durability::Marker::SECTION_VERTEX: - case storage::durability::Marker::SECTION_EDGE: - case storage::durability::Marker::SECTION_MAPPER: - case storage::durability::Marker::SECTION_METADATA: - case storage::durability::Marker::SECTION_INDICES: - case storage::durability::Marker::SECTION_CONSTRAINTS: - case storage::durability::Marker::SECTION_OFFSETS: - case storage::durability::Marker::VALUE_FALSE: - case storage::durability::Marker::VALUE_TRUE: + case storage::Marker::SECTION_VERTEX: + case storage::Marker::SECTION_EDGE: + case storage::Marker::SECTION_MAPPER: + case storage::Marker::SECTION_METADATA: + case storage::Marker::SECTION_INDICES: + case storage::Marker::SECTION_CONSTRAINTS: + case storage::Marker::SECTION_OFFSETS: + case storage::Marker::VALUE_FALSE: + case storage::Marker::VALUE_TRUE: valid_marker = false; break; } // We only run this test with invalid markers. if (valid_marker) continue; { - file.SetPosition( - utils::OutputFile::Position::RELATIVE_TO_END, - -(sizeof(uint64_t) + sizeof(storage::durability::Marker))); + file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END, + -(sizeof(uint64_t) + sizeof(storage::Marker))); auto byte = static_cast<uint8_t>(marker); file.Write(&byte, sizeof(byte)); file.Sync(); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); ASSERT_FALSE(decoder.SkipPropertyValue()); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); @@ -355,22 +354,21 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) { } { { - file.SetPosition( - utils::OutputFile::Position::RELATIVE_TO_END, - -(sizeof(uint64_t) + sizeof(storage::durability::Marker))); + file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END, + -(sizeof(uint64_t) + sizeof(storage::Marker))); uint8_t byte = 1; file.Write(&byte, sizeof(byte)); file.Sync(); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); ASSERT_FALSE(decoder.SkipPropertyValue()); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); @@ -383,13 +381,13 @@ TEST_F(DecoderEncoderTest, PropertyValueInvalidMarker) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(DecoderEncoderTest, DecoderPosition) { { - storage::durability::Encoder encoder; + storage::Encoder encoder; encoder.Initialize(storage_file, kTestMagic, kTestVersion); encoder.WriteBool(true); encoder.Finalize(); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); @@ -409,7 +407,7 @@ TEST_F(DecoderEncoderTest, DecoderPosition) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(DecoderEncoderTest, EncoderPosition) { { - storage::durability::Encoder encoder; + storage::Encoder encoder; encoder.Initialize(storage_file, kTestMagic, kTestVersion); encoder.WriteBool(false); encoder.SetPosition(kTestMagic.size() + sizeof(kTestVersion)); @@ -418,7 +416,7 @@ TEST_F(DecoderEncoderTest, EncoderPosition) { encoder.Finalize(); } { - storage::durability::Decoder decoder; + storage::Decoder decoder; auto version = decoder.Initialize(storage_file, kTestMagic); ASSERT_TRUE(version); ASSERT_EQ(*version, kTestVersion); diff --git a/tests/unit/storage_v2_durability.cpp b/tests/unit/storage_v2_durability.cpp new file mode 100644 index 000000000..303f92aee --- /dev/null +++ b/tests/unit/storage_v2_durability.cpp @@ -0,0 +1,764 @@ +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <algorithm> +#include <chrono> +#include <filesystem> +#include <thread> + +#include "storage/v2/durability.hpp" +#include "storage/v2/storage.hpp" +#include "utils/file.hpp" + +using testing::Contains; +using testing::UnorderedElementsAre; + +class DurabilityTest : public ::testing::TestWithParam<bool> { + private: + const uint64_t kNumBaseVertices = 1000; + const uint64_t kNumBaseEdges = 10000; + const uint64_t kNumExtendedVertices = 100; + const uint64_t kNumExtendedEdges = 1000; + + public: + DurabilityTest() + : base_vertex_gids_(kNumBaseVertices), + base_edge_gids_(kNumBaseEdges), + extended_vertex_gids_(kNumExtendedVertices), + extended_edge_gids_(kNumExtendedEdges) {} + + void SetUp() override { Clear(); } + + void TearDown() override { Clear(); } + + void CreateBaseDataset(storage::Storage *store, bool properties_on_edges) { + auto label_indexed = store->NameToLabel("base_indexed"); + auto label_unindexed = store->NameToLabel("base_unindexed"); + auto property_id = store->NameToProperty("id"); + auto et1 = store->NameToEdgeType("base_et1"); + auto et2 = store->NameToEdgeType("base_et2"); + + // Create label index. + ASSERT_TRUE(store->CreateIndex(label_unindexed)); + + // Create label+property index. + ASSERT_TRUE(store->CreateIndex(label_indexed, property_id)); + + // Create existence constraint. + ASSERT_FALSE(store->CreateExistenceConstraint(label_unindexed, property_id) + .HasError()); + + // Create vertices. + for (uint64_t i = 0; i < kNumBaseVertices; ++i) { + auto acc = store->Access(); + auto vertex = acc.CreateVertex(); + base_vertex_gids_[i] = vertex.Gid(); + if (i < kNumBaseVertices / 2) { + ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue()); + } else { + ASSERT_TRUE(vertex.AddLabel(label_unindexed).HasValue()); + } + if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) { + ASSERT_TRUE(vertex + .SetProperty(property_id, storage::PropertyValue( + static_cast<int64_t>(i))) + .HasValue()); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + // Create edges. + for (uint64_t i = 0; i < kNumBaseEdges; ++i) { + auto acc = store->Access(); + auto vertex1 = acc.FindVertex( + base_vertex_gids_[(i / 2) % kNumBaseVertices], storage::View::OLD); + ASSERT_TRUE(vertex1); + auto vertex2 = acc.FindVertex( + base_vertex_gids_[(i / 3) % kNumBaseVertices], storage::View::OLD); + ASSERT_TRUE(vertex2); + storage::EdgeTypeId et; + if (i < kNumBaseEdges / 2) { + et = et1; + } else { + et = et2; + } + auto edge = acc.CreateEdge(&*vertex1, &*vertex2, et); + ASSERT_TRUE(edge.HasValue()); + base_edge_gids_[i] = edge->Gid(); + if (properties_on_edges) { + ASSERT_TRUE(edge->SetProperty(property_id, storage::PropertyValue( + static_cast<int64_t>(i))) + .HasValue()); + } else { + auto ret = edge->SetProperty( + property_id, storage::PropertyValue(static_cast<int64_t>(i))); + ASSERT_TRUE(ret.HasError()); + ASSERT_EQ(ret.GetError(), storage::Error::PROPERTIES_DISABLED); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + void VerifyBaseDataset(storage::Storage *store, bool properties_on_edges, + bool extended_dataset_exists) { + auto label_indexed = store->NameToLabel("base_indexed"); + auto label_unindexed = store->NameToLabel("base_unindexed"); + auto property_id = store->NameToProperty("id"); + auto et1 = store->NameToEdgeType("base_et1"); + auto et2 = store->NameToEdgeType("base_et2"); + + // Verify indices info. + { + auto info = store->ListAllIndices(); + if (extended_dataset_exists) { + ASSERT_THAT(info.label, Contains(label_unindexed)); + ASSERT_THAT(info.label_property, + Contains(std::make_pair(label_indexed, property_id))); + } else { + ASSERT_THAT(info.label, UnorderedElementsAre(label_unindexed)); + ASSERT_THAT(info.label_property, UnorderedElementsAre(std::make_pair( + label_indexed, property_id))); + } + } + + // Verify constraints info. + { + auto info = store->ListAllConstraints(); + if (extended_dataset_exists) { + ASSERT_THAT(info.existence, + Contains(std::make_pair(label_unindexed, property_id))); + } else { + ASSERT_THAT(info.existence, UnorderedElementsAre(std::make_pair( + label_unindexed, property_id))); + } + } + + // Create storage accessor. + auto acc = store->Access(); + + // Verify vertices. + for (uint64_t i = 0; i < kNumBaseVertices; ++i) { + auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD); + ASSERT_TRUE(vertex); + auto labels = vertex->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + if (i < kNumBaseVertices / 2) { + ASSERT_THAT(*labels, UnorderedElementsAre(label_indexed)); + } else { + ASSERT_THAT(*labels, UnorderedElementsAre(label_unindexed)); + } + auto properties = vertex->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_id], + storage::PropertyValue(static_cast<int64_t>(i))); + } else { + ASSERT_EQ(properties->size(), 0); + } + } + + // Verify edges. + for (uint64_t i = 0; i < kNumBaseEdges; ++i) { + auto find_edge = + [&](const auto &edges) -> std::optional<storage::EdgeAccessor> { + for (const auto &edge : edges) { + if (edge.Gid() == base_edge_gids_[i]) { + return edge; + } + } + return std::nullopt; + }; + + { + auto vertex1 = acc.FindVertex( + base_vertex_gids_[(i / 2) % kNumBaseVertices], storage::View::OLD); + ASSERT_TRUE(vertex1); + auto out_edges = vertex1->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + auto edge1 = find_edge(*out_edges); + ASSERT_TRUE(edge1); + if (i < kNumBaseEdges / 2) { + ASSERT_EQ(edge1->EdgeType(), et1); + } else { + ASSERT_EQ(edge1->EdgeType(), et2); + } + auto properties = edge1->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (properties_on_edges) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_id], + storage::PropertyValue(static_cast<int64_t>(i))); + } else { + ASSERT_EQ(properties->size(), 0); + } + } + + { + auto vertex2 = acc.FindVertex( + base_vertex_gids_[(i / 3) % kNumBaseVertices], storage::View::OLD); + ASSERT_TRUE(vertex2); + auto in_edges = vertex2->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + auto edge2 = find_edge(*in_edges); + ASSERT_TRUE(edge2); + if (i < kNumBaseEdges / 2) { + ASSERT_EQ(edge2->EdgeType(), et1); + } else { + ASSERT_EQ(edge2->EdgeType(), et2); + } + auto properties = edge2->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (properties_on_edges) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_id], + storage::PropertyValue(static_cast<int64_t>(i))); + } else { + ASSERT_EQ(properties->size(), 0); + } + } + } + + // Verify label indices. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumBaseVertices / 2); + for (auto vertex : acc.Vertices(label_unindexed, storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), kNumBaseVertices / 2); + std::sort(vertices.begin(), vertices.end(), + [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); + for (uint64_t i = 0; i < kNumBaseVertices / 2; ++i) { + ASSERT_EQ(vertices[i].Gid(), + base_vertex_gids_[kNumBaseVertices / 2 + i]); + } + } + + // Verify label+property index. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumBaseVertices / 3); + for (auto vertex : + acc.Vertices(label_indexed, property_id, storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), kNumBaseVertices / 3); + std::sort(vertices.begin(), vertices.end(), + [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); + for (uint64_t i = 0; i < kNumBaseVertices / 3; ++i) { + ASSERT_EQ(vertices[i].Gid(), base_vertex_gids_[i]); + } + } + } + + void CreateExtendedDataset(storage::Storage *store) { + auto label_indexed = store->NameToLabel("extended_indexed"); + auto label_unused = store->NameToLabel("extended_unused"); + auto property_count = store->NameToProperty("count"); + auto et3 = store->NameToEdgeType("extended_et3"); + auto et4 = store->NameToEdgeType("extended_et4"); + + // Create label index. + ASSERT_TRUE(store->CreateIndex(label_unused)); + + // Create label+property index. + ASSERT_TRUE(store->CreateIndex(label_indexed, property_count)); + + // Create existence constraint. + ASSERT_FALSE(store->CreateExistenceConstraint(label_unused, property_count) + .HasError()); + + // Create vertices. + for (uint64_t i = 0; i < kNumExtendedVertices; ++i) { + auto acc = store->Access(); + auto vertex = acc.CreateVertex(); + extended_vertex_gids_[i] = vertex.Gid(); + if (i < kNumExtendedVertices / 2) { + ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue()); + } + if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) { + ASSERT_TRUE( + vertex + .SetProperty(property_count, storage::PropertyValue("nandare")) + .HasValue()); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + // Create edges. + for (uint64_t i = 0; i < kNumExtendedEdges; ++i) { + auto acc = store->Access(); + auto vertex1 = + acc.FindVertex(extended_vertex_gids_[(i / 5) % kNumExtendedVertices], + storage::View::OLD); + ASSERT_TRUE(vertex1); + auto vertex2 = + acc.FindVertex(extended_vertex_gids_[(i / 6) % kNumExtendedVertices], + storage::View::OLD); + ASSERT_TRUE(vertex2); + storage::EdgeTypeId et; + if (i < kNumExtendedEdges / 4) { + et = et3; + } else { + et = et4; + } + auto edge = acc.CreateEdge(&*vertex1, &*vertex2, et); + ASSERT_TRUE(edge.HasValue()); + extended_edge_gids_[i] = edge->Gid(); + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + void VerifyExtendedDataset(storage::Storage *store) { + auto label_indexed = store->NameToLabel("extended_indexed"); + auto label_unused = store->NameToLabel("extended_unused"); + auto property_count = store->NameToProperty("count"); + auto et3 = store->NameToEdgeType("extended_et3"); + auto et4 = store->NameToEdgeType("extended_et4"); + + // Verify indices info. + { + auto info = store->ListAllIndices(); + auto base_label_indexed = store->NameToLabel("base_indexed"); + auto base_label_unindexed = store->NameToLabel("base_unindexed"); + auto base_property_id = store->NameToProperty("id"); + ASSERT_THAT(info.label, + UnorderedElementsAre(base_label_unindexed, label_unused)); + ASSERT_THAT(info.label_property, + UnorderedElementsAre( + std::make_pair(base_label_indexed, base_property_id), + std::make_pair(label_indexed, property_count))); + } + + // Verify constraints info. + { + auto info = store->ListAllConstraints(); + auto base_label_unindexed = store->NameToLabel("base_unindexed"); + auto base_property_id = store->NameToProperty("id"); + ASSERT_THAT(info.existence, + UnorderedElementsAre( + std::make_pair(base_label_unindexed, base_property_id), + std::make_pair(label_unused, property_count))); + } + + // Create storage accessor. + auto acc = store->Access(); + + // Verify vertices. + for (uint64_t i = 0; i < kNumExtendedVertices; ++i) { + auto vertex = + acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD); + ASSERT_TRUE(vertex); + auto labels = vertex->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + if (i < kNumExtendedVertices / 2) { + ASSERT_THAT(*labels, UnorderedElementsAre(label_indexed)); + } + auto properties = vertex->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_count], + storage::PropertyValue("nandare")); + } else { + ASSERT_EQ(properties->size(), 0); + } + } + + // Verify edges. + for (uint64_t i = 0; i < kNumExtendedEdges; ++i) { + auto find_edge = + [&](const auto &edges) -> std::optional<storage::EdgeAccessor> { + for (const auto &edge : edges) { + if (edge.Gid() == extended_edge_gids_[i]) { + return edge; + } + } + return std::nullopt; + }; + + { + auto vertex1 = acc.FindVertex( + extended_vertex_gids_[(i / 5) % kNumExtendedVertices], + storage::View::OLD); + ASSERT_TRUE(vertex1); + auto out_edges = vertex1->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + auto edge1 = find_edge(*out_edges); + ASSERT_TRUE(edge1); + if (i < kNumExtendedEdges / 4) { + ASSERT_EQ(edge1->EdgeType(), et3); + } else { + ASSERT_EQ(edge1->EdgeType(), et4); + } + auto properties = edge1->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_EQ(properties->size(), 0); + } + + { + auto vertex2 = acc.FindVertex( + extended_vertex_gids_[(i / 6) % kNumExtendedVertices], + storage::View::OLD); + ASSERT_TRUE(vertex2); + auto in_edges = vertex2->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + auto edge2 = find_edge(*in_edges); + ASSERT_TRUE(edge2); + if (i < kNumExtendedEdges / 4) { + ASSERT_EQ(edge2->EdgeType(), et3); + } else { + ASSERT_EQ(edge2->EdgeType(), et4); + } + auto properties = edge2->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_EQ(properties->size(), 0); + } + } + + // Verify label indices. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumExtendedVertices / 2); + for (auto vertex : acc.Vertices(label_unused, storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), 0); + } + + // Verify label+property index. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumExtendedVertices / 3); + for (auto vertex : + acc.Vertices(label_indexed, property_count, storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), kNumExtendedVertices / 3); + std::sort(vertices.begin(), vertices.end(), + [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); + for (uint64_t i = 0; i < kNumExtendedVertices / 3; ++i) { + ASSERT_EQ(vertices[i].Gid(), extended_vertex_gids_[i]); + } + } + } + + std::vector<std::filesystem::path> GetSnapshotsList() { + std::vector<std::filesystem::path> ret; + for (auto &item : std::filesystem::directory_iterator( + storage_directory / storage::kSnapshotDirectory)) { + ret.push_back(item.path()); + } + std::sort(ret.begin(), ret.end()); + std::reverse(ret.begin(), ret.end()); + return ret; + } + + std::filesystem::path storage_directory{ + std::filesystem::temp_directory_path() / + "MG_test_unit_storage_v2_durability"}; + + private: + void Clear() { + if (!std::filesystem::exists(storage_directory)) return; + std::filesystem::remove_all(storage_directory); + } + + std::vector<storage::Gid> base_vertex_gids_; + std::vector<storage::Gid> base_edge_gids_; + std::vector<storage::Gid> extended_vertex_gids_; + std::vector<storage::Gid> extended_edge_gids_; +}; + +INSTANTIATE_TEST_CASE_P(EdgesWithProperties, DurabilityTest, + ::testing::Values(true)); +INSTANTIATE_TEST_CASE_P(EdgesWithoutProperties, DurabilityTest, + ::testing::Values(false)); + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotOnExit) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, GetParam()); + VerifyBaseDataset(&store, GetParam(), false); + CreateExtendedDataset(&store); + VerifyBaseDataset(&store, GetParam(), true); + VerifyExtendedDataset(&store); + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyBaseDataset(&store, GetParam(), true); + VerifyExtendedDataset(&store); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotPeriodic) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_type = + storage::Config::Durability::SnapshotType::PERIODIC, + .snapshot_interval = std::chrono::milliseconds(2000)}}); + CreateBaseDataset(&store, GetParam()); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyBaseDataset(&store, GetParam(), false); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotFallback) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_type = + storage::Config::Durability::SnapshotType::PERIODIC, + .snapshot_interval = std::chrono::milliseconds(2000)}}); + CreateBaseDataset(&store, GetParam()); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + CreateExtendedDataset(&store); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + } + + // Destroy last snapshot. + { + auto snapshots = GetSnapshotsList(); + ASSERT_GE(snapshots.size(), 2); + + auto info = storage::ReadSnapshotInfo(*snapshots.begin()); + + LOG(INFO) << "Destroying snapshot " << *snapshots.begin(); + utils::OutputFile file; + file.Open(*snapshots.begin(), utils::OutputFile::Mode::OVERWRITE_EXISTING); + file.SetPosition(utils::OutputFile::Position::SET, info.offset_vertices); + auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP); + file.Write(&value, sizeof(value)); + file.Sync(); + file.Close(); + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyBaseDataset(&store, GetParam(), false); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotRetention) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_type = + storage::Config::Durability::SnapshotType::PERIODIC, + .snapshot_interval = std::chrono::milliseconds(2000), + .snapshot_retention_count = 3}}); + CreateBaseDataset(&store, GetParam()); + // Allow approximately 5 snapshots to be created. + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + } + + // Verify that exactly 3 snapshots exist. + { + auto snapshots = GetSnapshotsList(); + ASSERT_EQ(snapshots.size(), 3); + for (const auto &path : snapshots) { + // This shouldn't throw. + storage::ReadSnapshotInfo(path); + } + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyBaseDataset(&store, GetParam(), false); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_F(DurabilityTest, + SnapshotWithoutPropertiesOnEdgesRecoveryWithPropertiesOnEdges) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = false}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, false); + VerifyBaseDataset(&store, false, false); + CreateExtendedDataset(&store); + VerifyBaseDataset(&store, false, true); + VerifyExtendedDataset(&store); + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = true}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyBaseDataset(&store, false, true); + VerifyExtendedDataset(&store); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_F(DurabilityTest, + SnapshotWithPropertiesOnEdgesRecoveryWithoutPropertiesOnEdges) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = true}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, true); + VerifyBaseDataset(&store, true, false); + CreateExtendedDataset(&store); + VerifyBaseDataset(&store, true, true); + VerifyExtendedDataset(&store); + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = false}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + { + std::vector<storage::VertexAccessor> vertices; + auto acc = store.Access(); + for (auto vertex : acc.Vertices(storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), 0); + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_F(DurabilityTest, + SnapshotWithPropertiesOnEdgesButUnusedRecoveryWithoutPropertiesOnEdges) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = true}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, true); + VerifyBaseDataset(&store, true, false); + CreateExtendedDataset(&store); + VerifyBaseDataset(&store, true, true); + VerifyExtendedDataset(&store); + // Remove properties from edges. + { + auto acc = store.Access(); + for (auto vertex : acc.Vertices(storage::View::OLD)) { + auto in_edges = vertex.InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + for (auto edge : *in_edges) { + // TODO (mferencevic): Replace with `ClearProperties()` + auto props = edge.Properties(storage::View::NEW); + ASSERT_TRUE(props.HasValue()); + for (const auto &prop : *props) { + ASSERT_TRUE(edge.SetProperty(prop.first, storage::PropertyValue()) + .HasValue()); + } + } + auto out_edges = vertex.InEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + for (auto edge : *out_edges) { + // TODO (mferencevic): Replace with `ClearProperties()` + auto props = edge.Properties(storage::View::NEW); + ASSERT_TRUE(props.HasValue()); + for (const auto &prop : *props) { + ASSERT_TRUE(edge.SetProperty(prop.first, storage::PropertyValue()) + .HasValue()); + } + } + } + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = false}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyBaseDataset(&store, false, true); + VerifyExtendedDataset(&store); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +}