diff --git a/src/storage/v2/durability.cpp b/src/storage/v2/durability.cpp index 795a7ad66..681a74cbb 100644 --- a/src/storage/v2/durability.cpp +++ b/src/storage/v2/durability.cpp @@ -809,6 +809,32 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) { return delta; } + +// Helper function used to insert indices/constraints into the recovered +// indices/constraints object. +// @throw RecoveryFailure +template <typename TObj> +void AddRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj, + const char *error_message) { + auto it = std::find(list->begin(), list->end(), obj); + if (it == list->end()) { + list->push_back(obj); + } else { + throw RecoveryFailure(error_message); + } +} + +template <typename TObj> +void RemoveRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj, + const char *error_message) { + auto it = std::find(list->begin(), list->end(), obj); + if (it != list->end()) { + std::swap(*it, list->back()); + list->pop_back(); + } else { + throw RecoveryFailure(error_message); + } +} } // namespace // Function used to read information about the snapshot file. @@ -1783,7 +1809,7 @@ void Durability::CreateSnapshot(Transaction *transaction) { std::sort(wal_files.begin(), wal_files.end()); uint64_t snapshot_start_timestamp = transaction->start_timestamp; if (!old_snapshot_files.empty()) { - snapshot_start_timestamp = old_snapshot_files.begin()->first; + snapshot_start_timestamp = old_snapshot_files.front().first; } std::optional<uint64_t> pos = 0; for (uint64_t i = 0; i < wal_files.size(); ++i) { @@ -1815,41 +1841,192 @@ void Durability::CreateSnapshot(Transaction *transaction) { std::optional<Durability::RecoveryInfo> Durability::RecoverData() { if (!utils::DirExists(snapshot_directory_)) return std::nullopt; + // Helper lambda used to recover all discovered indices and constraints. The + // indices and constraints must be recovered after the data recovery is done + // to ensure that the indices and constraints are consistent at the end of the + // recovery process. + auto recover_indices_and_constraints = [this]( + const auto &indices_constraints) { + // Recover label indices. + for (const auto &item : indices_constraints.indices.label) { + if (!indices_->label_index.CreateIndex(item, vertices_->access())) + throw RecoveryFailure("The label index must be created here!"); + } + + // Recover label+property indices. + for (const auto &item : indices_constraints.indices.label_property) { + if (!indices_->label_property_index.CreateIndex(item.first, item.second, + vertices_->access())) + throw RecoveryFailure("The label+property index must be created here!"); + } + + // Recover existence constraints. + for (const auto &item : indices_constraints.constraints.existence) { + auto ret = CreateExistenceConstraint(constraints_, item.first, + item.second, vertices_->access()); + if (ret.HasError() || !ret.GetValue()) + throw RecoveryFailure("The existence constraint must be created here!"); + } + }; + // Array of all discovered snapshots, ordered by name. - std::vector<std::filesystem::path> snapshot_files; + std::vector<std::pair<std::filesystem::path, std::string>> snapshot_files; std::error_code error_code; - for (auto &item : + for (const auto &item : std::filesystem::directory_iterator(snapshot_directory_, error_code)) { if (!item.is_regular_file()) continue; try { - ReadSnapshotInfo(item.path()); - snapshot_files.push_back(item.path()); + auto info = ReadSnapshotInfo(item.path()); + snapshot_files.emplace_back(item.path(), info.uuid); } catch (const RecoveryFailure &) { continue; } } CHECK(!error_code) << "Couldn't recover data because an error occurred: " << error_code.message() << "!"; - std::sort(snapshot_files.begin(), snapshot_files.end()); - for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) { - const auto &path = *it; - LOG(INFO) << "Starting snapshot recovery from " << path; + + RecoveryInfo recovery_info; + RecoveredIndicesAndConstraints indices_constraints; + std::optional<uint64_t> snapshot_timestamp; + if (!snapshot_files.empty()) { + std::sort(snapshot_files.begin(), snapshot_files.end()); + // UUID used for durability is the UUID of the last snapshot file. + uuid_ = snapshot_files.back().second; + std::optional<Durability::RecoveredSnapshot> recovered_snapshot; + for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) { + const auto &[path, uuid] = *it; + if (uuid != uuid_) { + LOG(WARNING) << "The snapshot file " << path + << " isn't related to the latest snapshot file!"; + continue; + } + LOG(INFO) << "Starting snapshot recovery from " << path; + try { + recovered_snapshot = LoadSnapshot(path); + LOG(INFO) << "Snapshot recovery successful!"; + break; + } catch (const RecoveryFailure &e) { + LOG(WARNING) << "Couldn't recover snapshot from " << path + << " because of: " << e.what(); + continue; + } + } + CHECK(recovered_snapshot) + << "The database is configured to recover on startup, but couldn't " + "recover using any of the specified snapshots! Please inspect them " + "and restart the database."; + recovery_info = recovered_snapshot->recovery_info; + indices_constraints = std::move(recovered_snapshot->indices_constraints); + snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp; + if (!utils::DirExists(wal_directory_)) { + recover_indices_and_constraints(indices_constraints); + return recovered_snapshot->recovery_info; + } + } else { + if (!utils::DirExists(wal_directory_)) return std::nullopt; + // Array of all discovered WAL files, ordered by name. + std::vector<std::pair<std::filesystem::path, std::string>> wal_files; + for (const auto &item : + std::filesystem::directory_iterator(wal_directory_, error_code)) { + if (!item.is_regular_file()) continue; + try { + auto info = ReadWalInfo(item.path()); + wal_files.emplace_back(item.path(), info.uuid); + } catch (const RecoveryFailure &e) { + continue; + } + } + CHECK(!error_code) << "Couldn't recover data because an error occurred: " + << error_code.message() << "!"; + if (wal_files.empty()) return std::nullopt; + std::sort(wal_files.begin(), wal_files.end()); + // UUID used for durability is the UUID of the last WAL file. + uuid_ = wal_files.back().second; + } + + // Array of all discovered WAL files, ordered by sequence number. + std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>> + wal_files; + for (const auto &item : + std::filesystem::directory_iterator(wal_directory_, error_code)) { + if (!item.is_regular_file()) continue; try { - auto info = LoadSnapshot(path); - LOG(INFO) << "Snapshot recovery successful!"; - return info; + auto info = ReadWalInfo(item.path()); + if (info.uuid != uuid_) continue; + wal_files.emplace_back(info.seq_num, info.from_timestamp, + info.to_timestamp, item.path()); } catch (const RecoveryFailure &e) { - LOG(WARNING) << "Couldn't recover snapshot from " << path - << " because of: " << e.what(); continue; } } - return std::nullopt; + CHECK(!error_code) << "Couldn't recover data because an error occurred: " + << error_code.message() << "!"; + // By this point we should have recovered from a snapshot, or we should have + // found some WAL files to recover from in the above `else`. This is just a + // sanity check to circumvent the following case: The database didn't recover + // from a snapshot, the above `else` triggered to find the recovery UUID from + // a WAL file. The above `else` has an early exit in case there are no WAL + // files. Because we reached this point there must have been some WAL files + // and we must have some WAL files after this second WAL directory iteration. + CHECK(snapshot_timestamp || !wal_files.empty()) + << "The database didn't recover from a snapshot and didn't find any WAL " + "files that match the last WAL file!"; + + if (!wal_files.empty()) { + std::sort(wal_files.begin(), wal_files.end()); + { + const auto &[seq_num, from_timestamp, to_timestamp, path] = wal_files[0]; + if (seq_num != 0) { + // We don't have all WAL files. We need to see whether we need them all. + if (!snapshot_timestamp) { + // We didn't recover from a snapshot and we must have all WAL files + // starting from the first one (seq_num == 0) to be able to recover + // data from them. + LOG(FATAL) << "There are missing prefix WAL files and data can't be " + "recovered without them!"; + } else if (to_timestamp >= *snapshot_timestamp) { + // We recovered from a snapshot and we must have at least one WAL file + // whose all deltas were created before the snapshot in order to + // verify that nothing is missing from the beginning of the WAL chain. + LOG(FATAL) << "You must have at least one WAL file that contains " + "deltas that were created before the snapshot file!"; + } + } + } + std::optional<uint64_t> previous_seq_num; + for (const auto &[seq_num, from_timestamp, to_timestamp, path] : + wal_files) { + if (previous_seq_num && *previous_seq_num + 1 != seq_num) { + LOG(FATAL) << "You are missing a WAL file with the sequence number " + << *previous_seq_num + 1 << "!"; + } + previous_seq_num = seq_num; + try { + auto info = LoadWal(path, &indices_constraints, snapshot_timestamp); + recovery_info.next_vertex_id = + std::max(recovery_info.next_vertex_id, info.next_vertex_id); + recovery_info.next_edge_id = + std::max(recovery_info.next_edge_id, info.next_edge_id); + recovery_info.next_timestamp = + std::max(recovery_info.next_timestamp, info.next_timestamp); + } catch (const RecoveryFailure &e) { + LOG(FATAL) << "Couldn't recover WAL deltas from " << path + << " because of: " << e.what(); + } + } + // The sequence number needs to be recovered even though `LoadWal` didn't + // load any deltas from that file. + wal_seq_num_ = *previous_seq_num + 1; + } + + recover_indices_and_constraints(indices_constraints); + return recovery_info; } -Durability::RecoveryInfo Durability::LoadSnapshot( +Durability::RecoveredSnapshot Durability::LoadSnapshot( const std::filesystem::path &path) { Durability::RecoveryInfo ret; + RecoveredIndicesAndConstraints indices_constraints; Decoder snapshot; auto version = snapshot.Initialize(path, kSnapshotMagic); @@ -1863,9 +2040,6 @@ Durability::RecoveryInfo Durability::LoadSnapshot( if (!success) { edges_->clear(); vertices_->clear(); - indices_->label_index.Clear(); - indices_->label_property_index.Clear(); - constraints_->existence_constraints.clear(); } }); @@ -1875,9 +2049,6 @@ Durability::RecoveryInfo Durability::LoadSnapshot( // Check for edges. bool snapshot_has_edges = info.offset_edges != 0; - // Set storage UUID. - uuid_ = info.uuid; - // Recover mapper. std::unordered_map<uint64_t, uint64_t> snapshot_id_map; { @@ -2190,9 +2361,9 @@ Durability::RecoveryInfo Durability::LoadSnapshot( for (uint64_t i = 0; i < *size; ++i) { auto label = snapshot.ReadUint(); if (!label) throw RecoveryFailure("Invalid snapshot data!"); - if (!indices_->label_index.CreateIndex(get_label_from_id(*label), - vertices_->access())) - throw RecoveryFailure("Couldn't recover label index!"); + AddRecoveredIndexConstraint(&indices_constraints.indices.label, + get_label_from_id(*label), + "The label index already exists!"); } } @@ -2205,10 +2376,10 @@ Durability::RecoveryInfo Durability::LoadSnapshot( if (!label) throw RecoveryFailure("Invalid snapshot data!"); auto property = snapshot.ReadUint(); if (!property) throw RecoveryFailure("Invalid snapshot data!"); - if (!indices_->label_property_index.CreateIndex( - get_label_from_id(*label), get_property_from_id(*property), - vertices_->access())) - throw RecoveryFailure("Couldn't recover label+property index!"); + AddRecoveredIndexConstraint( + &indices_constraints.indices.label_property, + {get_label_from_id(*label), get_property_from_id(*property)}, + "The label+property index already exists!"); } } } @@ -2231,11 +2402,10 @@ Durability::RecoveryInfo Durability::LoadSnapshot( if (!label) throw RecoveryFailure("Invalid snapshot data!"); auto property = snapshot.ReadUint(); if (!property) throw RecoveryFailure("Invalid snapshot data!"); - auto ret = CreateExistenceConstraint( - constraints_, get_label_from_id(*label), - get_property_from_id(*property), vertices_->access()); - if (!ret.HasValue() || !*ret) - throw RecoveryFailure("Couldn't recover existence constraint!"); + AddRecoveredIndexConstraint( + &indices_constraints.constraints.existence, + {get_label_from_id(*label), get_property_from_id(*property)}, + "The existence constraint already exists!"); } } } @@ -2246,6 +2416,299 @@ Durability::RecoveryInfo Durability::LoadSnapshot( // Set success flag (to disable cleanup). success = true; + return {info, ret, std::move(indices_constraints)}; +} + +Durability::RecoveryInfo Durability::LoadWal( + const std::filesystem::path &path, + RecoveredIndicesAndConstraints *indices_constraints, + std::optional<uint64_t> snapshot_timestamp) { + Durability::RecoveryInfo ret; + + Decoder wal; + auto version = wal.Initialize(path, kWalMagic); + if (!version) + throw RecoveryFailure("Couldn't read WAL magic and/or version!"); + if (*version != kVersion) throw RecoveryFailure("Invalid WAL version!"); + + // Read wal info. + auto info = ReadWalInfo(path); + + // Check timestamp. + if (snapshot_timestamp && info.to_timestamp <= *snapshot_timestamp) + return ret; + + // Recover deltas. + wal.SetPosition(info.offset_deltas); + uint64_t deltas_applied = 0; + auto edge_acc = edges_->access(); + auto vertex_acc = vertices_->access(); + for (uint64_t i = 0; i < info.num_deltas; ++i) { + // Read WAL delta header to find out the delta timestamp. + auto timestamp = ReadWalDeltaHeader(&wal); + + if (!snapshot_timestamp || timestamp > *snapshot_timestamp) { + // This delta should be loaded. + auto delta = ReadWalDeltaData(&wal); + switch (delta.type) { + case WalDeltaData::Type::VERTEX_CREATE: { + auto [vertex, inserted] = vertex_acc.insert( + Vertex{delta.vertex_create_delete.gid, nullptr}); + if (!inserted) + throw RecoveryFailure("The vertex must be inserted here!"); + + ret.next_vertex_id = std::max( + ret.next_vertex_id, delta.vertex_create_delete.gid.AsUint() + 1); + + break; + } + case WalDeltaData::Type::VERTEX_DELETE: { + auto vertex = vertex_acc.find(delta.vertex_create_delete.gid); + if (vertex == vertex_acc.end()) + throw RecoveryFailure("The vertex doesn't exist!"); + if (!vertex->in_edges.empty() || !vertex->out_edges.empty()) + throw RecoveryFailure( + "The vertex can't be deleted because it still has edges!"); + + if (!vertex_acc.remove(delta.vertex_create_delete.gid)) + throw RecoveryFailure("The vertex must be removed here!"); + + break; + } + case WalDeltaData::Type::VERTEX_ADD_LABEL: + case WalDeltaData::Type::VERTEX_REMOVE_LABEL: { + auto vertex = vertex_acc.find(delta.vertex_add_remove_label.gid); + if (vertex == vertex_acc.end()) + throw RecoveryFailure("The vertex doesn't exist!"); + + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.vertex_add_remove_label.label)); + auto it = + std::find(vertex->labels.begin(), vertex->labels.end(), label_id); + + if (delta.type == WalDeltaData::Type::VERTEX_ADD_LABEL) { + if (it != vertex->labels.end()) + throw RecoveryFailure("The vertex already has the label!"); + vertex->labels.push_back(label_id); + } else { + if (it == vertex->labels.end()) + throw RecoveryFailure("The vertex doesn't have the label!"); + std::swap(*it, vertex->labels.back()); + vertex->labels.pop_back(); + } + + break; + } + case WalDeltaData::Type::VERTEX_SET_PROPERTY: { + auto vertex = vertex_acc.find(delta.vertex_edge_set_property.gid); + if (vertex == vertex_acc.end()) + throw RecoveryFailure("The vertex doesn't exist!"); + + auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId( + delta.vertex_edge_set_property.property)); + auto &property_value = delta.vertex_edge_set_property.value; + + auto it = vertex->properties.find(property_id); + if (it != vertex->properties.end()) { + if (property_value.IsNull()) { + // remove the property + vertex->properties.erase(it); + } else { + // set the value + it->second = std::move(property_value); + } + } else if (!property_value.IsNull()) { + vertex->properties.emplace(property_id, std::move(property_value)); + } + + break; + } + case WalDeltaData::Type::EDGE_CREATE: { + auto from_vertex = + vertex_acc.find(delta.edge_create_delete.from_vertex); + if (from_vertex == vertex_acc.end()) + throw RecoveryFailure("The from vertex doesn't exist!"); + auto to_vertex = vertex_acc.find(delta.edge_create_delete.to_vertex); + if (to_vertex == vertex_acc.end()) + throw RecoveryFailure("The to vertex doesn't exist!"); + + auto edge_gid = delta.edge_create_delete.gid; + auto edge_type_id = EdgeTypeId::FromUint( + name_id_mapper_->NameToId(delta.edge_create_delete.edge_type)); + EdgeRef edge_ref(edge_gid); + if (items_.properties_on_edges) { + auto [edge, inserted] = edge_acc.insert(Edge{edge_gid, nullptr}); + if (!inserted) + throw RecoveryFailure("The edge must be inserted here!"); + edge_ref = EdgeRef(&*edge); + } + { + std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{ + edge_type_id, &*to_vertex, edge_ref}; + auto it = std::find(from_vertex->out_edges.begin(), + from_vertex->out_edges.end(), link); + if (it != from_vertex->out_edges.end()) + throw RecoveryFailure("The from vertex already has this edge!"); + from_vertex->out_edges.push_back(link); + } + { + std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{ + edge_type_id, &*from_vertex, edge_ref}; + auto it = std::find(to_vertex->in_edges.begin(), + to_vertex->in_edges.end(), link); + if (it != to_vertex->in_edges.end()) + throw RecoveryFailure("The to vertex already has this edge!"); + to_vertex->in_edges.push_back(link); + } + + ret.next_edge_id = std::max(ret.next_edge_id, edge_gid.AsUint() + 1); + + break; + } + case WalDeltaData::Type::EDGE_DELETE: { + auto from_vertex = + vertex_acc.find(delta.edge_create_delete.from_vertex); + if (from_vertex == vertex_acc.end()) + throw RecoveryFailure("The from vertex doesn't exist!"); + auto to_vertex = vertex_acc.find(delta.edge_create_delete.to_vertex); + if (to_vertex == vertex_acc.end()) + throw RecoveryFailure("The to vertex doesn't exist!"); + + auto edge_gid = delta.edge_create_delete.gid; + auto edge_type_id = EdgeTypeId::FromUint( + name_id_mapper_->NameToId(delta.edge_create_delete.edge_type)); + EdgeRef edge_ref(edge_gid); + if (items_.properties_on_edges) { + auto edge = edge_acc.find(edge_gid); + if (edge == edge_acc.end()) + throw RecoveryFailure("The edge doesn't exist!"); + edge_ref = EdgeRef(&*edge); + } + { + std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{ + edge_type_id, &*to_vertex, edge_ref}; + auto it = std::find(from_vertex->out_edges.begin(), + from_vertex->out_edges.end(), link); + if (it == from_vertex->out_edges.end()) + throw RecoveryFailure("The from vertex doesn't have this edge!"); + std::swap(*it, from_vertex->out_edges.back()); + from_vertex->out_edges.pop_back(); + } + { + std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{ + edge_type_id, &*from_vertex, edge_ref}; + auto it = std::find(to_vertex->in_edges.begin(), + to_vertex->in_edges.end(), link); + if (it == to_vertex->in_edges.end()) + throw RecoveryFailure("The to vertex doesn't have this edge!"); + std::swap(*it, to_vertex->in_edges.back()); + to_vertex->in_edges.pop_back(); + } + if (items_.properties_on_edges) { + if (!edge_acc.remove(edge_gid)) + throw RecoveryFailure("The edge must be removed here!"); + } + + break; + } + case WalDeltaData::Type::EDGE_SET_PROPERTY: { + if (!items_.properties_on_edges) + throw RecoveryFailure( + "The WAL has properties on edges, but the storage is " + "configured without properties on edges!"); + auto edge = edge_acc.find(delta.vertex_edge_set_property.gid); + if (edge == edge_acc.end()) + throw RecoveryFailure("The edge doesn't exist!"); + auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId( + delta.vertex_edge_set_property.property)); + auto &property_value = delta.vertex_edge_set_property.value; + auto it = edge->properties.find(property_id); + if (it != edge->properties.end()) { + if (property_value.IsNull()) { + // remove the property + edge->properties.erase(it); + } else { + // set the value + it->second = std::move(property_value); + } + } else if (!property_value.IsNull()) { + edge->properties.emplace(property_id, std::move(property_value)); + } + break; + } + case WalDeltaData::Type::TRANSACTION_END: + break; + case WalDeltaData::Type::LABEL_INDEX_CREATE: { + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.operation_label.label)); + AddRecoveredIndexConstraint(&indices_constraints->indices.label, + label_id, + "The label index already exists!"); + break; + } + case WalDeltaData::Type::LABEL_INDEX_DROP: { + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.operation_label.label)); + RemoveRecoveredIndexConstraint(&indices_constraints->indices.label, + label_id, + "The label index doesn't exist!"); + break; + } + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: { + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.operation_label_property.label)); + auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId( + delta.operation_label_property.property)); + AddRecoveredIndexConstraint( + &indices_constraints->indices.label_property, + {label_id, property_id}, + "The label property index already exists!"); + break; + } + case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: { + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.operation_label_property.label)); + auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId( + delta.operation_label_property.property)); + RemoveRecoveredIndexConstraint( + &indices_constraints->indices.label_property, + {label_id, property_id}, + "The label property index doesn't exist!"); + break; + } + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: { + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.operation_label_property.label)); + auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId( + delta.operation_label_property.property)); + AddRecoveredIndexConstraint( + &indices_constraints->constraints.existence, + {label_id, property_id}, + "The existence constraint already exists!"); + break; + } + case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { + auto label_id = LabelId::FromUint( + name_id_mapper_->NameToId(delta.operation_label_property.label)); + auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId( + delta.operation_label_property.property)); + RemoveRecoveredIndexConstraint( + &indices_constraints->constraints.existence, + {label_id, property_id}, + "The existence constraint doesn't exist!"); + break; + } + } + ret.next_timestamp = std::max(ret.next_timestamp, timestamp + 1); + ++deltas_applied; + } else { + // This delta should be skipped. + SkipWalDeltaData(&wal); + } + } + + LOG(INFO) << "Applied " << deltas_applied << " deltas from WAL " << path; + return ret; } diff --git a/src/storage/v2/durability.hpp b/src/storage/v2/durability.hpp index 701616292..c213c2885 100644 --- a/src/storage/v2/durability.hpp +++ b/src/storage/v2/durability.hpp @@ -298,6 +298,18 @@ enum class StorageGlobalOperation { EXISTENCE_CONSTRAINT_DROP, }; +/// Structure used to track indices and constraints during recovery. +struct RecoveredIndicesAndConstraints { + struct { + std::vector<LabelId> label; + std::vector<std::pair<LabelId, PropertyId>> label_property; + } indices; + + struct { + std::vector<std::pair<LabelId, PropertyId>> existence; + } constraints; +}; + /// WalFile class used to append deltas and operations to the WAL file. class WalFile { public: @@ -340,9 +352,15 @@ class WalFile { class Durability final { public: struct RecoveryInfo { - uint64_t next_vertex_id; - uint64_t next_edge_id; - uint64_t next_timestamp; + uint64_t next_vertex_id{0}; + uint64_t next_edge_id{0}; + uint64_t next_timestamp{0}; + }; + + struct RecoveredSnapshot { + SnapshotInfo snapshot_info; + RecoveryInfo recovery_info; + RecoveredIndicesAndConstraints indices_constraints; }; Durability(Config::Durability config, utils::SkipList<Vertex> *vertices, @@ -367,7 +385,11 @@ class Durability final { std::optional<RecoveryInfo> RecoverData(); - RecoveryInfo LoadSnapshot(const std::filesystem::path &path); + RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path); + + RecoveryInfo LoadWal(const std::filesystem::path &path, + RecoveredIndicesAndConstraints *indices_constraints, + std::optional<uint64_t> snapshot_timestamp); bool InitializeWalFile(); diff --git a/tests/unit/storage_v2_durability.cpp b/tests/unit/storage_v2_durability.cpp index 934157601..c769b081b 100644 --- a/tests/unit/storage_v2_durability.cpp +++ b/tests/unit/storage_v2_durability.cpp @@ -34,12 +34,28 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { kNumExtendedVertices + kNumExtendedEdges) * 2; + enum class DatasetType { + ONLY_BASE, + ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS, + ONLY_EXTENDED, + ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS, + BASE_WITH_EXTENDED, + }; + public: DurabilityTest() - : base_vertex_gids_(kNumBaseVertices), - base_edge_gids_(kNumBaseEdges), - extended_vertex_gids_(kNumExtendedVertices), - extended_edge_gids_(kNumExtendedEdges) {} + : base_vertex_gids_( + kNumBaseVertices, + storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())), + base_edge_gids_( + kNumBaseEdges, + storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())), + extended_vertex_gids_( + kNumExtendedVertices, + storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())), + extended_edge_gids_( + kNumExtendedEdges, + storage::Gid::FromUint(std::numeric_limits<uint64_t>::max())) {} void SetUp() override { Clear(); } @@ -113,160 +129,8 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { } } - void VerifyBaseDataset(storage::Storage *store, bool properties_on_edges, - bool extended_dataset_exists) { - auto label_indexed = store->NameToLabel("base_indexed"); - auto label_unindexed = store->NameToLabel("base_unindexed"); - auto property_id = store->NameToProperty("id"); - auto et1 = store->NameToEdgeType("base_et1"); - auto et2 = store->NameToEdgeType("base_et2"); - - // Verify indices info. - { - auto info = store->ListAllIndices(); - if (extended_dataset_exists) { - ASSERT_THAT(info.label, Contains(label_unindexed)); - ASSERT_THAT(info.label_property, - Contains(std::make_pair(label_indexed, property_id))); - } else { - ASSERT_THAT(info.label, UnorderedElementsAre(label_unindexed)); - ASSERT_THAT(info.label_property, UnorderedElementsAre(std::make_pair( - label_indexed, property_id))); - } - } - - // Verify constraints info. - { - auto info = store->ListAllConstraints(); - if (extended_dataset_exists) { - ASSERT_THAT(info.existence, - Contains(std::make_pair(label_unindexed, property_id))); - } else { - ASSERT_THAT(info.existence, UnorderedElementsAre(std::make_pair( - label_unindexed, property_id))); - } - } - - // Create storage accessor. - auto acc = store->Access(); - - // Verify vertices. - for (uint64_t i = 0; i < kNumBaseVertices; ++i) { - auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD); - ASSERT_TRUE(vertex); - auto labels = vertex->Labels(storage::View::OLD); - ASSERT_TRUE(labels.HasValue()); - if (i < kNumBaseVertices / 2) { - ASSERT_THAT(*labels, UnorderedElementsAre(label_indexed)); - } else { - ASSERT_THAT(*labels, UnorderedElementsAre(label_unindexed)); - } - auto properties = vertex->Properties(storage::View::OLD); - ASSERT_TRUE(properties.HasValue()); - if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) { - ASSERT_EQ(properties->size(), 1); - ASSERT_EQ((*properties)[property_id], - storage::PropertyValue(static_cast<int64_t>(i))); - } else { - ASSERT_EQ(properties->size(), 0); - } - } - - // Verify edges. - for (uint64_t i = 0; i < kNumBaseEdges; ++i) { - auto find_edge = - [&](const auto &edges) -> std::optional<storage::EdgeAccessor> { - for (const auto &edge : edges) { - if (edge.Gid() == base_edge_gids_[i]) { - return edge; - } - } - return std::nullopt; - }; - - { - auto vertex1 = acc.FindVertex( - base_vertex_gids_[(i / 2) % kNumBaseVertices], storage::View::OLD); - ASSERT_TRUE(vertex1); - auto out_edges = vertex1->OutEdges({}, storage::View::OLD); - ASSERT_TRUE(out_edges.HasValue()); - auto edge1 = find_edge(*out_edges); - ASSERT_TRUE(edge1); - if (i < kNumBaseEdges / 2) { - ASSERT_EQ(edge1->EdgeType(), et1); - } else { - ASSERT_EQ(edge1->EdgeType(), et2); - } - auto properties = edge1->Properties(storage::View::OLD); - ASSERT_TRUE(properties.HasValue()); - if (properties_on_edges) { - ASSERT_EQ(properties->size(), 1); - ASSERT_EQ((*properties)[property_id], - storage::PropertyValue(static_cast<int64_t>(i))); - } else { - ASSERT_EQ(properties->size(), 0); - } - } - - { - auto vertex2 = acc.FindVertex( - base_vertex_gids_[(i / 3) % kNumBaseVertices], storage::View::OLD); - ASSERT_TRUE(vertex2); - auto in_edges = vertex2->InEdges({}, storage::View::OLD); - ASSERT_TRUE(in_edges.HasValue()); - auto edge2 = find_edge(*in_edges); - ASSERT_TRUE(edge2); - if (i < kNumBaseEdges / 2) { - ASSERT_EQ(edge2->EdgeType(), et1); - } else { - ASSERT_EQ(edge2->EdgeType(), et2); - } - auto properties = edge2->Properties(storage::View::OLD); - ASSERT_TRUE(properties.HasValue()); - if (properties_on_edges) { - ASSERT_EQ(properties->size(), 1); - ASSERT_EQ((*properties)[property_id], - storage::PropertyValue(static_cast<int64_t>(i))); - } else { - ASSERT_EQ(properties->size(), 0); - } - } - } - - // Verify label indices. - { - std::vector<storage::VertexAccessor> vertices; - vertices.reserve(kNumBaseVertices / 2); - for (auto vertex : acc.Vertices(label_unindexed, storage::View::OLD)) { - vertices.push_back(vertex); - } - ASSERT_EQ(vertices.size(), kNumBaseVertices / 2); - std::sort(vertices.begin(), vertices.end(), - [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); - for (uint64_t i = 0; i < kNumBaseVertices / 2; ++i) { - ASSERT_EQ(vertices[i].Gid(), - base_vertex_gids_[kNumBaseVertices / 2 + i]); - } - } - - // Verify label+property index. - { - std::vector<storage::VertexAccessor> vertices; - vertices.reserve(kNumBaseVertices / 3); - for (auto vertex : - acc.Vertices(label_indexed, property_id, storage::View::OLD)) { - vertices.push_back(vertex); - } - ASSERT_EQ(vertices.size(), kNumBaseVertices / 3); - std::sort(vertices.begin(), vertices.end(), - [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); - for (uint64_t i = 0; i < kNumBaseVertices / 3; ++i) { - ASSERT_EQ(vertices[i].Gid(), base_vertex_gids_[i]); - } - } - } - - void CreateExtendedDataset(storage::Storage *store) { + void CreateExtendedDataset(storage::Storage *store, + bool single_transaction = false) { auto label_indexed = store->NameToLabel("extended_indexed"); auto label_unused = store->NameToLabel("extended_unused"); auto property_count = store->NameToProperty("count"); @@ -283,10 +147,14 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { ASSERT_FALSE(store->CreateExistenceConstraint(label_unused, property_count) .HasError()); + // Storage accessor. + std::optional<storage::Storage::Accessor> acc; + if (single_transaction) acc.emplace(store->Access()); + // Create vertices. for (uint64_t i = 0; i < kNumExtendedVertices; ++i) { - auto acc = store->Access(); - auto vertex = acc.CreateVertex(); + if (!single_transaction) acc.emplace(store->Access()); + auto vertex = acc->CreateVertex(); extended_vertex_gids_[i] = vertex.Gid(); if (i < kNumExtendedVertices / 2) { ASSERT_TRUE(vertex.AddLabel(label_indexed).HasValue()); @@ -297,19 +165,19 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { .SetProperty(property_count, storage::PropertyValue("nandare")) .HasValue()); } - ASSERT_FALSE(acc.Commit().HasError()); + if (!single_transaction) ASSERT_FALSE(acc->Commit().HasError()); } // Create edges. for (uint64_t i = 0; i < kNumExtendedEdges; ++i) { - auto acc = store->Access(); + if (!single_transaction) acc.emplace(store->Access()); auto vertex1 = - acc.FindVertex(extended_vertex_gids_[(i / 5) % kNumExtendedVertices], - storage::View::OLD); + acc->FindVertex(extended_vertex_gids_[(i / 5) % kNumExtendedVertices], + storage::View::NEW); ASSERT_TRUE(vertex1); auto vertex2 = - acc.FindVertex(extended_vertex_gids_[(i / 6) % kNumExtendedVertices], - storage::View::OLD); + acc->FindVertex(extended_vertex_gids_[(i / 6) % kNumExtendedVertices], + storage::View::NEW); ASSERT_TRUE(vertex2); storage::EdgeTypeId et; if (i < kNumExtendedEdges / 4) { @@ -317,16 +185,25 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { } else { et = et4; } - auto edge = acc.CreateEdge(&*vertex1, &*vertex2, et); + auto edge = acc->CreateEdge(&*vertex1, &*vertex2, et); ASSERT_TRUE(edge.HasValue()); extended_edge_gids_[i] = edge->Gid(); - ASSERT_FALSE(acc.Commit().HasError()); + if (!single_transaction) ASSERT_FALSE(acc->Commit().HasError()); } + + if (single_transaction) ASSERT_FALSE(acc->Commit().HasError()); } - void VerifyExtendedDataset(storage::Storage *store) { - auto label_indexed = store->NameToLabel("extended_indexed"); - auto label_unused = store->NameToLabel("extended_unused"); + void VerifyDataset(storage::Storage *store, DatasetType type, + bool properties_on_edges) { + auto base_label_indexed = store->NameToLabel("base_indexed"); + auto base_label_unindexed = store->NameToLabel("base_unindexed"); + auto property_id = store->NameToProperty("id"); + auto et1 = store->NameToEdgeType("base_et1"); + auto et2 = store->NameToEdgeType("base_et2"); + + auto extended_label_indexed = store->NameToLabel("extended_indexed"); + auto extended_label_unused = store->NameToLabel("extended_unused"); auto property_count = store->NameToProperty("count"); auto et3 = store->NameToEdgeType("extended_et3"); auto et4 = store->NameToEdgeType("extended_et4"); @@ -334,126 +211,366 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { // Verify indices info. { auto info = store->ListAllIndices(); - auto base_label_indexed = store->NameToLabel("base_indexed"); - auto base_label_unindexed = store->NameToLabel("base_unindexed"); - auto base_property_id = store->NameToProperty("id"); - ASSERT_THAT(info.label, - UnorderedElementsAre(base_label_unindexed, label_unused)); - ASSERT_THAT(info.label_property, - UnorderedElementsAre( - std::make_pair(base_label_indexed, base_property_id), - std::make_pair(label_indexed, property_count))); + switch (type) { + case DatasetType::ONLY_BASE: + ASSERT_THAT(info.label, UnorderedElementsAre(base_label_unindexed)); + ASSERT_THAT(info.label_property, + UnorderedElementsAre( + std::make_pair(base_label_indexed, property_id))); + break; + case DatasetType::ONLY_EXTENDED: + ASSERT_THAT(info.label, UnorderedElementsAre(extended_label_unused)); + ASSERT_THAT( + info.label_property, + UnorderedElementsAre( + std::make_pair(base_label_indexed, property_id), + std::make_pair(extended_label_indexed, property_count))); + break; + case DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS: + case DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS: + case DatasetType::BASE_WITH_EXTENDED: + ASSERT_THAT(info.label, UnorderedElementsAre(base_label_unindexed, + extended_label_unused)); + ASSERT_THAT( + info.label_property, + UnorderedElementsAre( + std::make_pair(base_label_indexed, property_id), + std::make_pair(extended_label_indexed, property_count))); + break; + } } // Verify constraints info. { auto info = store->ListAllConstraints(); - auto base_label_unindexed = store->NameToLabel("base_unindexed"); - auto base_property_id = store->NameToProperty("id"); - ASSERT_THAT(info.existence, - UnorderedElementsAre( - std::make_pair(base_label_unindexed, base_property_id), - std::make_pair(label_unused, property_count))); + switch (type) { + case DatasetType::ONLY_BASE: + ASSERT_THAT(info.existence, UnorderedElementsAre(std::make_pair( + base_label_unindexed, property_id))); + break; + case DatasetType::ONLY_EXTENDED: + ASSERT_THAT(info.existence, + UnorderedElementsAre(std::make_pair(extended_label_unused, + property_count))); + break; + case DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS: + case DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS: + case DatasetType::BASE_WITH_EXTENDED: + ASSERT_THAT( + info.existence, + UnorderedElementsAre( + std::make_pair(base_label_unindexed, property_id), + std::make_pair(extended_label_unused, property_count))); + break; + } + } + + bool have_base_dataset = false; + bool have_extended_dataset = false; + switch (type) { + case DatasetType::ONLY_BASE: + case DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS: + have_base_dataset = true; + break; + case DatasetType::ONLY_EXTENDED: + case DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS: + have_extended_dataset = true; + break; + case DatasetType::BASE_WITH_EXTENDED: + have_base_dataset = true; + have_extended_dataset = true; + break; } // Create storage accessor. auto acc = store->Access(); - // Verify vertices. - for (uint64_t i = 0; i < kNumExtendedVertices; ++i) { - auto vertex = - acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD); - ASSERT_TRUE(vertex); - auto labels = vertex->Labels(storage::View::OLD); - ASSERT_TRUE(labels.HasValue()); - if (i < kNumExtendedVertices / 2) { - ASSERT_THAT(*labels, UnorderedElementsAre(label_indexed)); + // Verify base dataset. + if (have_base_dataset) { + // Verify vertices. + for (uint64_t i = 0; i < kNumBaseVertices; ++i) { + auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD); + ASSERT_TRUE(vertex); + auto labels = vertex->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + if (i < kNumBaseVertices / 2) { + ASSERT_THAT(*labels, UnorderedElementsAre(base_label_indexed)); + } else { + ASSERT_THAT(*labels, UnorderedElementsAre(base_label_unindexed)); + } + auto properties = vertex->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (i < kNumBaseVertices / 3 || i >= kNumBaseVertices / 2) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_id], + storage::PropertyValue(static_cast<int64_t>(i))); + } else { + ASSERT_EQ(properties->size(), 0); + } } - auto properties = vertex->Properties(storage::View::OLD); - ASSERT_TRUE(properties.HasValue()); - if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) { - ASSERT_EQ(properties->size(), 1); - ASSERT_EQ((*properties)[property_count], - storage::PropertyValue("nandare")); - } else { - ASSERT_EQ(properties->size(), 0); - } - } - // Verify edges. - for (uint64_t i = 0; i < kNumExtendedEdges; ++i) { - auto find_edge = - [&](const auto &edges) -> std::optional<storage::EdgeAccessor> { - for (const auto &edge : edges) { - if (edge.Gid() == extended_edge_gids_[i]) { - return edge; + // Verify edges. + for (uint64_t i = 0; i < kNumBaseEdges; ++i) { + auto find_edge = + [&](const auto &edges) -> std::optional<storage::EdgeAccessor> { + for (const auto &edge : edges) { + if (edge.Gid() == base_edge_gids_[i]) { + return edge; + } + } + return std::nullopt; + }; + + { + auto vertex1 = + acc.FindVertex(base_vertex_gids_[(i / 2) % kNumBaseVertices], + storage::View::OLD); + ASSERT_TRUE(vertex1); + auto out_edges = vertex1->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + auto edge1 = find_edge(*out_edges); + ASSERT_TRUE(edge1); + if (i < kNumBaseEdges / 2) { + ASSERT_EQ(edge1->EdgeType(), et1); + } else { + ASSERT_EQ(edge1->EdgeType(), et2); + } + auto properties = edge1->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (properties_on_edges) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_id], + storage::PropertyValue(static_cast<int64_t>(i))); + } else { + ASSERT_EQ(properties->size(), 0); } } - return std::nullopt; - }; - { - auto vertex1 = acc.FindVertex( - extended_vertex_gids_[(i / 5) % kNumExtendedVertices], - storage::View::OLD); - ASSERT_TRUE(vertex1); - auto out_edges = vertex1->OutEdges({}, storage::View::OLD); - ASSERT_TRUE(out_edges.HasValue()); - auto edge1 = find_edge(*out_edges); - ASSERT_TRUE(edge1); - if (i < kNumExtendedEdges / 4) { - ASSERT_EQ(edge1->EdgeType(), et3); - } else { - ASSERT_EQ(edge1->EdgeType(), et4); + { + auto vertex2 = + acc.FindVertex(base_vertex_gids_[(i / 3) % kNumBaseVertices], + storage::View::OLD); + ASSERT_TRUE(vertex2); + auto in_edges = vertex2->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + auto edge2 = find_edge(*in_edges); + ASSERT_TRUE(edge2); + if (i < kNumBaseEdges / 2) { + ASSERT_EQ(edge2->EdgeType(), et1); + } else { + ASSERT_EQ(edge2->EdgeType(), et2); + } + auto properties = edge2->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (properties_on_edges) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_id], + storage::PropertyValue(static_cast<int64_t>(i))); + } else { + ASSERT_EQ(properties->size(), 0); + } } - auto properties = edge1->Properties(storage::View::OLD); - ASSERT_TRUE(properties.HasValue()); - ASSERT_EQ(properties->size(), 0); } + // Verify label indices. { - auto vertex2 = acc.FindVertex( - extended_vertex_gids_[(i / 6) % kNumExtendedVertices], - storage::View::OLD); - ASSERT_TRUE(vertex2); - auto in_edges = vertex2->InEdges({}, storage::View::OLD); - ASSERT_TRUE(in_edges.HasValue()); - auto edge2 = find_edge(*in_edges); - ASSERT_TRUE(edge2); - if (i < kNumExtendedEdges / 4) { - ASSERT_EQ(edge2->EdgeType(), et3); - } else { - ASSERT_EQ(edge2->EdgeType(), et4); + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumBaseVertices / 2); + for (auto vertex : + acc.Vertices(base_label_unindexed, storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), kNumBaseVertices / 2); + std::sort( + vertices.begin(), vertices.end(), + [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); + for (uint64_t i = 0; i < kNumBaseVertices / 2; ++i) { + ASSERT_EQ(vertices[i].Gid(), + base_vertex_gids_[kNumBaseVertices / 2 + i]); + } + } + + // Verify label+property index. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumBaseVertices / 3); + for (auto vertex : acc.Vertices(base_label_indexed, property_id, + storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), kNumBaseVertices / 3); + std::sort( + vertices.begin(), vertices.end(), + [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); + for (uint64_t i = 0; i < kNumBaseVertices / 3; ++i) { + ASSERT_EQ(vertices[i].Gid(), base_vertex_gids_[i]); + } + } + } else { + // Verify vertices. + for (uint64_t i = 0; i < kNumBaseVertices; ++i) { + auto vertex = acc.FindVertex(base_vertex_gids_[i], storage::View::OLD); + ASSERT_FALSE(vertex); + } + + if (type == + DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS) { + // Verify label indices. + { + uint64_t count = 0; + auto iterable = + acc.Vertices(base_label_unindexed, storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_EQ(count, 0); + } + + // Verify label+property index. + { + uint64_t count = 0; + auto iterable = + acc.Vertices(base_label_indexed, property_id, storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_EQ(count, 0); } - auto properties = edge2->Properties(storage::View::OLD); - ASSERT_TRUE(properties.HasValue()); - ASSERT_EQ(properties->size(), 0); } } - // Verify label indices. - { - std::vector<storage::VertexAccessor> vertices; - vertices.reserve(kNumExtendedVertices / 2); - for (auto vertex : acc.Vertices(label_unused, storage::View::OLD)) { - vertices.push_back(vertex); + // Verify extended dataset. + if (have_extended_dataset) { + // Verify vertices. + for (uint64_t i = 0; i < kNumExtendedVertices; ++i) { + auto vertex = + acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD); + ASSERT_TRUE(vertex); + auto labels = vertex->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + if (i < kNumExtendedVertices / 2) { + ASSERT_THAT(*labels, UnorderedElementsAre(extended_label_indexed)); + } + auto properties = vertex->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + if (i < kNumExtendedVertices / 3 || i >= kNumExtendedVertices / 2) { + ASSERT_EQ(properties->size(), 1); + ASSERT_EQ((*properties)[property_count], + storage::PropertyValue("nandare")); + } else { + ASSERT_EQ(properties->size(), 0); + } } - ASSERT_EQ(vertices.size(), 0); - } - // Verify label+property index. - { - std::vector<storage::VertexAccessor> vertices; - vertices.reserve(kNumExtendedVertices / 3); - for (auto vertex : - acc.Vertices(label_indexed, property_count, storage::View::OLD)) { - vertices.push_back(vertex); + // Verify edges. + for (uint64_t i = 0; i < kNumExtendedEdges; ++i) { + auto find_edge = + [&](const auto &edges) -> std::optional<storage::EdgeAccessor> { + for (const auto &edge : edges) { + if (edge.Gid() == extended_edge_gids_[i]) { + return edge; + } + } + return std::nullopt; + }; + + { + auto vertex1 = acc.FindVertex( + extended_vertex_gids_[(i / 5) % kNumExtendedVertices], + storage::View::OLD); + ASSERT_TRUE(vertex1); + auto out_edges = vertex1->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + auto edge1 = find_edge(*out_edges); + ASSERT_TRUE(edge1); + if (i < kNumExtendedEdges / 4) { + ASSERT_EQ(edge1->EdgeType(), et3); + } else { + ASSERT_EQ(edge1->EdgeType(), et4); + } + auto properties = edge1->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_EQ(properties->size(), 0); + } + + { + auto vertex2 = acc.FindVertex( + extended_vertex_gids_[(i / 6) % kNumExtendedVertices], + storage::View::OLD); + ASSERT_TRUE(vertex2); + auto in_edges = vertex2->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + auto edge2 = find_edge(*in_edges); + ASSERT_TRUE(edge2); + if (i < kNumExtendedEdges / 4) { + ASSERT_EQ(edge2->EdgeType(), et3); + } else { + ASSERT_EQ(edge2->EdgeType(), et4); + } + auto properties = edge2->Properties(storage::View::OLD); + ASSERT_TRUE(properties.HasValue()); + ASSERT_EQ(properties->size(), 0); + } } - ASSERT_EQ(vertices.size(), kNumExtendedVertices / 3); - std::sort(vertices.begin(), vertices.end(), - [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); - for (uint64_t i = 0; i < kNumExtendedVertices / 3; ++i) { - ASSERT_EQ(vertices[i].Gid(), extended_vertex_gids_[i]); + + // Verify label indices. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumExtendedVertices / 2); + for (auto vertex : + acc.Vertices(extended_label_unused, storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), 0); + } + + // Verify label+property index. + { + std::vector<storage::VertexAccessor> vertices; + vertices.reserve(kNumExtendedVertices / 3); + for (auto vertex : acc.Vertices(extended_label_indexed, property_count, + storage::View::OLD)) { + vertices.push_back(vertex); + } + ASSERT_EQ(vertices.size(), kNumExtendedVertices / 3); + std::sort( + vertices.begin(), vertices.end(), + [](const auto &a, const auto &b) { return a.Gid() < b.Gid(); }); + for (uint64_t i = 0; i < kNumExtendedVertices / 3; ++i) { + ASSERT_EQ(vertices[i].Gid(), extended_vertex_gids_[i]); + } + } + } else { + // Verify vertices. + for (uint64_t i = 0; i < kNumExtendedVertices; ++i) { + auto vertex = + acc.FindVertex(extended_vertex_gids_[i], storage::View::OLD); + ASSERT_FALSE(vertex); + } + + if (type == + DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS) { + // Verify label indices. + { + uint64_t count = 0; + auto iterable = + acc.Vertices(extended_label_unused, storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_EQ(count, 0); + } + + // Verify label+property index. + { + uint64_t count = 0; + auto iterable = acc.Vertices(extended_label_indexed, property_count, + storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_EQ(count, 0); + } } } } @@ -494,6 +611,46 @@ class DurabilityTest : public ::testing::TestWithParam<bool> { std::vector<storage::Gid> extended_edge_gids_; }; +void DestroySnapshot(const std::filesystem::path &path) { + auto info = storage::ReadSnapshotInfo(path); + LOG(INFO) << "Destroying snapshot " << path; + utils::OutputFile file; + file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING); + file.SetPosition(utils::OutputFile::Position::SET, info.offset_vertices); + auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP); + file.Write(&value, sizeof(value)); + file.Sync(); + file.Close(); +} + +void DestroyWalFirstDelta(const std::filesystem::path &path) { + auto info = storage::ReadWalInfo(path); + LOG(INFO) << "Destroying WAL " << path; + utils::OutputFile file; + file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING); + file.SetPosition(utils::OutputFile::Position::SET, info.offset_deltas); + auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP); + file.Write(&value, sizeof(value)); + file.Sync(); + file.Close(); +} + +void DestroyWalSuffix(const std::filesystem::path &path) { + auto info = storage::ReadWalInfo(path); + LOG(INFO) << "Destroying WAL " << path; + utils::OutputFile file; + file.Open(path, utils::OutputFile::Mode::OVERWRITE_EXISTING); + ASSERT_LT( + info.offset_deltas, + file.SetPosition(utils::OutputFile::Position::RELATIVE_TO_END, -100)); + uint8_t value = 0; + for (size_t i = 0; i < 100; ++i) { + file.Write(&value, sizeof(value)); + } + file.Sync(); + file.Close(); +} + INSTANTIATE_TEST_CASE_P(EdgesWithProperties, DurabilityTest, ::testing::Values(true)); INSTANTIATE_TEST_CASE_P(EdgesWithoutProperties, DurabilityTest, @@ -508,10 +665,9 @@ TEST_P(DurabilityTest, SnapshotOnExit) { .durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}}); CreateBaseDataset(&store, GetParam()); - VerifyBaseDataset(&store, GetParam(), false); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); CreateExtendedDataset(&store); - VerifyBaseDataset(&store, GetParam(), true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); } ASSERT_EQ(GetSnapshotsList().size(), 1); @@ -521,8 +677,7 @@ TEST_P(DurabilityTest, SnapshotOnExit) { storage::Storage store({.items = {.properties_on_edges = GetParam()}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, GetParam(), true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); // Try to use the storage. { @@ -555,7 +710,7 @@ TEST_P(DurabilityTest, SnapshotPeriodic) { storage::Storage store({.items = {.properties_on_edges = GetParam()}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, GetParam(), false); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); // Try to use the storage. { @@ -590,24 +745,14 @@ TEST_P(DurabilityTest, SnapshotFallback) { { auto snapshots = GetSnapshotsList(); ASSERT_GE(snapshots.size(), 2); - - auto info = storage::ReadSnapshotInfo(*snapshots.begin()); - - LOG(INFO) << "Destroying snapshot " << *snapshots.begin(); - utils::OutputFile file; - file.Open(*snapshots.begin(), utils::OutputFile::Mode::OVERWRITE_EXISTING); - file.SetPosition(utils::OutputFile::Position::SET, info.offset_vertices); - auto value = static_cast<uint8_t>(storage::Marker::TYPE_MAP); - file.Write(&value, sizeof(value)); - file.Sync(); - file.Close(); + DestroySnapshot(*snapshots.begin()); } // Recover snapshot. storage::Storage store({.items = {.properties_on_edges = GetParam()}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, GetParam(), false); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); // Try to use the storage. { @@ -619,6 +764,75 @@ TEST_P(DurabilityTest, SnapshotFallback) { } } +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotEverythingCorrupt) { + // Create unrelated snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + auto acc = store.Access(); + for (uint64_t i = 0; i < 1000; ++i) { + acc.CreateVertex(); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetWalsList().size(), 0); + + // Get unrelated UUID. + std::string unrelated_uuid; + { + auto snapshots = GetSnapshotsList(); + ASSERT_EQ(snapshots.size(), 1); + auto info = storage::ReadSnapshotInfo(*snapshots.begin()); + unrelated_uuid = info.uuid; + } + + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT, + .snapshot_interval = std::chrono::milliseconds(2000)}}); + CreateBaseDataset(&store, GetParam()); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + CreateExtendedDataset(&store); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + } + + ASSERT_GE(GetSnapshotsList().size(), 2); + ASSERT_EQ(GetWalsList().size(), 0); + + // Destroy all current snapshots. + { + auto snapshots = GetSnapshotsList(); + ASSERT_GE(snapshots.size(), 2); + for (const auto &snapshot : snapshots) { + auto info = storage::ReadSnapshotInfo(snapshot); + if (info.uuid == unrelated_uuid) { + LOG(INFO) << "Skipping snapshot " << snapshot; + continue; + } + DestroySnapshot(snapshot); + } + } + + // Recover snapshot. + ASSERT_DEATH( + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + }, + ""); +} + // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_P(DurabilityTest, SnapshotRetention) { // Create unrelated snapshot. @@ -676,7 +890,62 @@ TEST_P(DurabilityTest, SnapshotRetention) { storage::Storage store({.items = {.properties_on_edges = GetParam()}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, GetParam(), false); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotMixedUUID) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, GetParam()); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); + CreateExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetWalsList().size(), 0); + + // Recover snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + } + + // Create another snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, GetParam()); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 2); + ASSERT_EQ(GetWalsList().size(), 0); + + // Recover snapshot. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); // Try to use the storage. { @@ -698,10 +967,9 @@ TEST_F(DurabilityTest, .durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}}); CreateBaseDataset(&store, false); - VerifyBaseDataset(&store, false, false); + VerifyDataset(&store, DatasetType::ONLY_BASE, false); CreateExtendedDataset(&store); - VerifyBaseDataset(&store, false, true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, false); } ASSERT_EQ(GetSnapshotsList().size(), 1); @@ -711,8 +979,7 @@ TEST_F(DurabilityTest, storage::Storage store({.items = {.properties_on_edges = true}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, false, true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, false); // Try to use the storage. { @@ -734,33 +1001,23 @@ TEST_F(DurabilityTest, .durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}}); CreateBaseDataset(&store, true); - VerifyBaseDataset(&store, true, false); + VerifyDataset(&store, DatasetType::ONLY_BASE, true); CreateExtendedDataset(&store); - VerifyBaseDataset(&store, true, true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, true); } + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetWalsList().size(), 0); + // Recover snapshot. - storage::Storage store({.items = {.properties_on_edges = false}, - .durability = {.storage_directory = storage_directory, - .recover_on_startup = true}}); - { - std::vector<storage::VertexAccessor> vertices; - auto acc = store.Access(); - for (auto vertex : acc.Vertices(storage::View::OLD)) { - vertices.push_back(vertex); - } - ASSERT_EQ(vertices.size(), 0); - } - - // Try to use the storage. - { - auto acc = store.Access(); - auto vertex = acc.CreateVertex(); - auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); - ASSERT_TRUE(edge.HasValue()); - ASSERT_FALSE(acc.Commit().HasError()); - } + ASSERT_DEATH( + { + storage::Storage store( + {.items = {.properties_on_edges = false}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + }, + ""); } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -773,10 +1030,9 @@ TEST_F(DurabilityTest, .durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}}); CreateBaseDataset(&store, true); - VerifyBaseDataset(&store, true, false); + VerifyDataset(&store, DatasetType::ONLY_BASE, true); CreateExtendedDataset(&store); - VerifyBaseDataset(&store, true, true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, true); // Remove properties from edges. { auto acc = store.Access(); @@ -815,8 +1071,7 @@ TEST_F(DurabilityTest, storage::Storage store({.items = {.properties_on_edges = false}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, false, true); - VerifyExtendedDataset(&store); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, false); // Try to use the storage. { @@ -845,6 +1100,79 @@ TEST_P(DurabilityTest, WalBasic) { ASSERT_EQ(GetSnapshotsList().size(), 0); ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, WalAppendToExisting) { + // Create WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + CreateBaseDataset(&store, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); + } + + // Recover WALs and create more WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + CreateExtendedDataset(&store); + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 2); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -891,6 +1219,104 @@ TEST_P(DurabilityTest, WalCreateInSingleTransaction) { ASSERT_EQ(GetSnapshotsList().size(), 0); ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + { + auto indices = store.ListAllIndices(); + ASSERT_EQ(indices.label.size(), 0); + ASSERT_EQ(indices.label_property.size(), 0); + auto constraints = store.ListAllConstraints(); + ASSERT_EQ(constraints.existence.size(), 0); + auto acc = store.Access(); + { + auto v1 = acc.FindVertex(gid_v1, storage::View::OLD); + ASSERT_TRUE(v1); + auto labels = v1->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_THAT(*labels, UnorderedElementsAre(store.NameToLabel("l11"), + store.NameToLabel("l12"), + store.NameToLabel("l13"))); + auto props = v1->Properties(storage::View::OLD); + ASSERT_TRUE(props.HasValue()); + ASSERT_EQ(props->size(), 0); + auto in_edges = v1->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + ASSERT_EQ(in_edges->size(), 0); + auto out_edges = v1->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + ASSERT_EQ(out_edges->size(), 1); + const auto &edge = (*out_edges)[0]; + ASSERT_EQ(edge.Gid(), gid_e1); + auto edge_props = edge.Properties(storage::View::OLD); + ASSERT_TRUE(edge_props.HasValue()); + if (GetParam()) { + ASSERT_THAT(*edge_props, UnorderedElementsAre(std::make_pair( + store.NameToProperty("test"), + storage::PropertyValue("nandare")))); + } else { + ASSERT_EQ(edge_props->size(), 0); + } + } + { + auto v2 = acc.FindVertex(gid_v2, storage::View::OLD); + ASSERT_TRUE(v2); + auto labels = v2->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_THAT(*labels, UnorderedElementsAre(store.NameToLabel("l21"))); + auto props = v2->Properties(storage::View::OLD); + ASSERT_TRUE(props.HasValue()); + ASSERT_THAT(*props, UnorderedElementsAre( + std::make_pair(store.NameToProperty("hello"), + storage::PropertyValue("world")))); + auto in_edges = v2->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + ASSERT_EQ(in_edges->size(), 1); + const auto &edge = (*in_edges)[0]; + ASSERT_EQ(edge.Gid(), gid_e1); + auto edge_props = edge.Properties(storage::View::OLD); + ASSERT_TRUE(edge_props.HasValue()); + if (GetParam()) { + ASSERT_THAT(*edge_props, UnorderedElementsAre(std::make_pair( + store.NameToProperty("test"), + storage::PropertyValue("nandare")))); + } else { + ASSERT_EQ(edge_props->size(), 0); + } + auto out_edges = v2->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + ASSERT_EQ(out_edges->size(), 0); + } + { + auto v3 = acc.FindVertex(gid_v3, storage::View::OLD); + ASSERT_TRUE(v3); + auto labels = v3->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_EQ(labels->size(), 0); + auto props = v3->Properties(storage::View::OLD); + ASSERT_TRUE(props.HasValue()); + ASSERT_THAT(*props, + UnorderedElementsAre(std::make_pair( + store.NameToProperty("v3"), storage::PropertyValue(42)))); + auto in_edges = v3->InEdges({}, storage::View::OLD); + ASSERT_TRUE(in_edges.HasValue()); + ASSERT_EQ(in_edges->size(), 0); + auto out_edges = v3->OutEdges({}, storage::View::OLD); + ASSERT_TRUE(out_edges.HasValue()); + ASSERT_EQ(out_edges->size(), 0); + } + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -927,6 +1353,34 @@ TEST_P(DurabilityTest, WalCreateAndRemoveEverything) { ASSERT_EQ(GetSnapshotsList().size(), 0); ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + { + auto indices = store.ListAllIndices(); + ASSERT_EQ(indices.label.size(), 0); + ASSERT_EQ(indices.label_property.size(), 0); + auto constraints = store.ListAllConstraints(); + ASSERT_EQ(constraints.existence.size(), 0); + auto acc = store.Access(); + uint64_t count = 0; + auto iterable = acc.Vertices(storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_EQ(count, 0); + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1044,6 +1498,36 @@ TEST_P(DurabilityTest, WalTransactionOrdering) { ASSERT_EQ(data[8].second.type, storage::WalDeltaData::Type::TRANSACTION_END); } + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + { + auto acc = store.Access(); + for (auto [gid, id] : std::vector<std::pair<storage::Gid, int64_t>>{ + {gid1, 1}, {gid2, 2}, {gid3, 3}}) { + auto vertex = acc.FindVertex(gid, storage::View::OLD); + ASSERT_TRUE(vertex); + auto labels = vertex->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_EQ(labels->size(), 0); + auto props = vertex->Properties(storage::View::OLD); + ASSERT_TRUE(props.HasValue()); + ASSERT_EQ(props->size(), 1); + ASSERT_EQ(props->at(store.NameToProperty("id")), + storage::PropertyValue(id)); + } + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1074,6 +1558,23 @@ TEST_P(DurabilityTest, WalCreateAndRemoveOnlyBaseDataset) { ASSERT_EQ(GetSnapshotsList().size(), 0); ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, + DatasetType::ONLY_EXTENDED_WITH_BASE_INDICES_AND_CONSTRAINTS, + GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1110,6 +1611,264 @@ TEST_P(DurabilityTest, WalDeathResilience) { ASSERT_EQ(GetSnapshotsList().size(), 0); ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs and create more WALs. + const uint64_t kExtraItems = 1000; + uint64_t count = 0; + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_flush_every_n_tx = kFlushWalEvery, + .recover_on_startup = true}}); + { + auto acc = store.Access(); + auto iterable = acc.Vertices(storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_GT(count, 0); + } + + { + auto acc = store.Access(); + for (uint64_t i = 0; i < kExtraItems; ++i) { + acc.CreateVertex(); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 2); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + { + uint64_t current = 0; + auto acc = store.Access(); + auto iterable = acc.Vertices(storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++current; + } + ASSERT_EQ(count + kExtraItems, current); + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, WalMissingSecond) { + // Create unrelated WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_size_kibibytes = 1, + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + auto acc = store.Access(); + for (uint64_t i = 0; i < 1000; ++i) { + acc.CreateVertex(); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 1); + + uint64_t unrelated_wals = GetWalsList().size(); + + // Create WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_size_kibibytes = 1, + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + const uint64_t kNumVertices = 1000; + std::vector<storage::Gid> gids; + gids.reserve(kNumVertices); + for (uint64_t i = 0; i < kNumVertices; ++i) { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + gids.push_back(vertex.Gid()); + ASSERT_FALSE(acc.Commit().HasError()); + } + for (uint64_t i = 0; i < kNumVertices; ++i) { + auto acc = store.Access(); + auto vertex = acc.FindVertex(gids[i], storage::View::OLD); + ASSERT_TRUE(vertex); + ASSERT_TRUE(vertex + ->SetProperty(store.NameToProperty("nandare"), + storage::PropertyValue("haihaihai!")) + .HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 2); + + // Remove second WAL. + { + auto wals = GetWalsList(); + ASSERT_GT(wals.size(), unrelated_wals + 2); + const auto &wal_file = wals[wals.size() - unrelated_wals - 2]; + LOG(INFO) << "Deleting WAL file " << wal_file; + ASSERT_TRUE(std::filesystem::remove(wal_file)); + } + + // Recover WALs. + ASSERT_DEATH( + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + }, + ""); +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, WalCorruptSecond) { + // Create unrelated WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_size_kibibytes = 1, + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + auto acc = store.Access(); + for (uint64_t i = 0; i < 1000; ++i) { + acc.CreateVertex(); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 1); + + uint64_t unrelated_wals = GetWalsList().size(); + + // Create WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_size_kibibytes = 1, + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + const uint64_t kNumVertices = 1000; + std::vector<storage::Gid> gids; + gids.reserve(kNumVertices); + for (uint64_t i = 0; i < kNumVertices; ++i) { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + gids.push_back(vertex.Gid()); + ASSERT_FALSE(acc.Commit().HasError()); + } + for (uint64_t i = 0; i < kNumVertices; ++i) { + auto acc = store.Access(); + auto vertex = acc.FindVertex(gids[i], storage::View::OLD); + ASSERT_TRUE(vertex); + ASSERT_TRUE(vertex + ->SetProperty(store.NameToProperty("nandare"), + storage::PropertyValue("haihaihai!")) + .HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 2); + + // Destroy second WAL. + { + auto wals = GetWalsList(); + ASSERT_GT(wals.size(), unrelated_wals + 2); + const auto &wal_file = wals[wals.size() - unrelated_wals - 2]; + DestroyWalFirstDelta(wal_file); + } + + // Recover WALs. + ASSERT_DEATH( + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + }, + ""); +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, WalCorruptLastTransaction) { + // Create WALs + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_size_kibibytes = 1, + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + CreateBaseDataset(&store, GetParam()); + CreateExtendedDataset(&store, /* single_transaction = */ true); + } + + ASSERT_EQ(GetSnapshotsList().size(), 0); + ASSERT_GE(GetWalsList().size(), 2); + + // Destroy last transaction in the latest WAL. + { + auto wals = GetWalsList(); + ASSERT_GE(wals.size(), 2); + const auto &wal_file = wals.front(); + DestroyWalSuffix(wal_file); + } + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + // The extended dataset shouldn't be recovered because its WAL transaction was + // corrupt. + VerifyDataset(&store, + DatasetType::ONLY_BASE_WITH_EXTENDED_INDICES_AND_CONSTRAINTS, + GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1181,6 +1940,29 @@ TEST_P(DurabilityTest, WalAllOperationsInSingleTransaction) { ASSERT_EQ(GetSnapshotsList().size(), 0); ASSERT_GE(GetWalsList().size(), 1); + + // Recover WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + { + auto acc = store.Access(); + uint64_t count = 0; + auto iterable = acc.Vertices(storage::View::OLD); + for (auto it = iterable.begin(); it != iterable.end(); ++it) { + ++count; + } + ASSERT_EQ(count, 0); + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1201,6 +1983,21 @@ TEST_P(DurabilityTest, WalAndSnapshot) { ASSERT_GE(GetSnapshotsList().size(), 1); ASSERT_GE(GetWalsList().size(), 1); + + // Recover snapshot and WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1223,7 +2020,7 @@ TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshot) { {.items = {.properties_on_edges = GetParam()}, .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}); - VerifyBaseDataset(&store, GetParam(), false); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); } // Recover snapshot and create WALs. @@ -1241,6 +2038,120 @@ TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshot) { ASSERT_EQ(GetSnapshotsList().size(), 1); ASSERT_GE(GetWalsList().size(), 1); + + // Recover snapshot and WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshotAndWal) { + // Create snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_on_exit = true}}); + CreateBaseDataset(&store, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetWalsList().size(), 0); + + // Recover snapshot. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::ONLY_BASE, GetParam()); + } + + // Recover snapshot and create WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + CreateExtendedDataset(&store); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_GE(GetWalsList().size(), 1); + + // Recover snapshot and WALs and create more WALs. + storage::Gid vertex_gid; + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::minutes(20), + .wal_file_flush_every_n_tx = kFlushWalEvery}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + vertex_gid = vertex.Gid(); + if (GetParam()) { + ASSERT_TRUE(vertex + .SetProperty(store.NameToProperty("meaning"), + storage::PropertyValue(42)) + .HasValue()); + } + ASSERT_FALSE(acc.Commit().HasError()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_GE(GetWalsList().size(), 2); + + // Recover snapshot and WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + { + auto acc = store.Access(); + auto vertex = acc.FindVertex(vertex_gid, storage::View::OLD); + ASSERT_TRUE(vertex); + auto labels = vertex->Labels(storage::View::OLD); + ASSERT_TRUE(labels.HasValue()); + ASSERT_EQ(labels->size(), 0); + auto props = vertex->Properties(storage::View::OLD); + ASSERT_TRUE(props.HasValue()); + if (GetParam()) { + ASSERT_THAT(*props, UnorderedElementsAre( + std::make_pair(store.NameToProperty("meaning"), + storage::PropertyValue(42)))); + } else { + ASSERT_EQ(props->size(), 0); + } + } + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } } // NOLINTNEXTLINE(hicpp-special-member-functions) @@ -1291,4 +2202,93 @@ TEST_P(DurabilityTest, WalAndSnapshotWalRetention) { ASSERT_EQ(GetSnapshotsList().size(), 3); ASSERT_GE(GetWalsList().size(), unrelated_wals + 1); + + auto snapshots = GetSnapshotsList(); + ASSERT_EQ(snapshots.size(), 3); + + for (uint64_t i = 0; i < snapshots.size(); ++i) { + LOG(INFO) << "Recovery attempt " << i; + + // Recover and verify data. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + auto acc = store.Access(); + for (uint64_t j = 0; j < items_created; ++j) { + auto vertex = + acc.FindVertex(storage::Gid::FromUint(j), storage::View::OLD); + ASSERT_TRUE(vertex); + } + } + + // Destroy current snapshot. + DestroySnapshot(snapshots[i]); + } + + // Recover data after all of the snapshots have been destroyed. The recovery + // shouldn't be possible because the initial WALs are already deleted. + ASSERT_DEATH( + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + }, + ""); +} + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, SnapshotAndWalMixedUUID) { + // Create unrelated snapshot and WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::seconds(2)}}); + auto acc = store.Access(); + for (uint64_t i = 0; i < 1000; ++i) { + acc.CreateVertex(); + } + ASSERT_FALSE(acc.Commit().HasError()); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + } + + ASSERT_GE(GetSnapshotsList().size(), 1); + ASSERT_GE(GetWalsList().size(), 1); + + // Create snapshot and WALs. + { + storage::Storage store( + {.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability:: + SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, + .snapshot_interval = std::chrono::seconds(2)}}); + CreateBaseDataset(&store, GetParam()); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + CreateExtendedDataset(&store); + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + } + + ASSERT_GE(GetSnapshotsList().size(), 2); + ASSERT_GE(GetWalsList().size(), 2); + + // Recover snapshot and WALs. + storage::Storage store({.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, + .recover_on_startup = true}}); + VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam()); + + // Try to use the storage. + { + auto acc = store.Access(); + auto vertex = acc.CreateVertex(); + auto edge = acc.CreateEdge(&vertex, &vertex, store.NameToEdgeType("et")); + ASSERT_TRUE(edge.HasValue()); + ASSERT_FALSE(acc.Commit().HasError()); + } }