diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index b7f0f2c5a..695c1aaf2 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -486,6 +486,475 @@ void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList< spdlog::info("Partial connectivities are recovered."); } +RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList *vertices, + utils::SkipList *edges, + std::deque> *epoch_history, + NameIdMapper *name_id_mapper, std::atomic *edge_count, + Config::Items items) { + RecoveryInfo ret; + RecoveredIndicesAndConstraints indices_constraints; + + Decoder snapshot; + auto version = snapshot.Initialize(path, kSnapshotMagic); + if (!version) throw RecoveryFailure("Couldn't read snapshot magic and/or version!"); + if (*version != 14U) throw RecoveryFailure(fmt::format("Expected snapshot version is 14, but got {}", *version)); + + // Cleanup of loaded data in case of failure. + bool success = false; + utils::OnScopeExit cleanup([&] { + if (!success) { + edges->clear(); + vertices->clear(); + epoch_history->clear(); + } + }); + + // Read snapshot info. + const auto info = ReadSnapshotInfo(path); + spdlog::info("Recovering {} vertices and {} edges.", info.vertices_count, info.edges_count); + // Check for edges. + bool snapshot_has_edges = info.offset_edges != 0; + + // Recover mapper. + std::unordered_map snapshot_id_map; + { + spdlog::info("Recovering mapper metadata."); + if (!snapshot.SetPosition(info.offset_mapper)) throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_MAPPER) throw RecoveryFailure("Invalid snapshot data!"); + + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + + for (uint64_t i = 0; i < *size; ++i) { + auto id = snapshot.ReadUint(); + if (!id) throw RecoveryFailure("Invalid snapshot data!"); + auto name = snapshot.ReadString(); + if (!name) throw RecoveryFailure("Invalid snapshot data!"); + auto my_id = name_id_mapper->NameToId(*name); + snapshot_id_map.emplace(*id, my_id); + SPDLOG_TRACE("Mapping \"{}\"from snapshot id {} to actual id {}.", *name, *id, my_id); + } + } + auto get_label_from_id = [&snapshot_id_map](uint64_t snapshot_id) { + auto it = snapshot_id_map.find(snapshot_id); + if (it == snapshot_id_map.end()) throw RecoveryFailure("Invalid snapshot data!"); + return LabelId::FromUint(it->second); + }; + auto get_property_from_id = [&snapshot_id_map](uint64_t snapshot_id) { + auto it = snapshot_id_map.find(snapshot_id); + if (it == snapshot_id_map.end()) throw RecoveryFailure("Invalid snapshot data!"); + return PropertyId::FromUint(it->second); + }; + auto get_edge_type_from_id = [&snapshot_id_map](uint64_t snapshot_id) { + auto it = snapshot_id_map.find(snapshot_id); + if (it == snapshot_id_map.end()) throw RecoveryFailure("Invalid snapshot data!"); + return EdgeTypeId::FromUint(it->second); + }; + + // Reset current edge count. + edge_count->store(0, std::memory_order_release); + + { + // Recover edges. + auto edge_acc = edges->access(); + uint64_t last_edge_gid = 0; + if (snapshot_has_edges) { + spdlog::info("Recovering {} edges.", info.edges_count); + if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!"); + for (uint64_t i = 0; i < info.edges_count; ++i) { + { + const auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!"); + } + + if (items.properties_on_edges) { + // Insert edge. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = *gid; + spdlog::debug("Recovering edge {} with properties.", *gid); + auto [it, inserted] = edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr}); + if (!inserted) throw RecoveryFailure("The edge must be inserted here!"); + + // Recover properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &props = it->properties; + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.ReadPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.", + name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid); + props.SetProperty(get_property_from_id(*key), *value); + } + } + } else { + // Read edge GID. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = *gid; + + spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid); + // Read properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + if (*props_size != 0) + throw RecoveryFailure( + "The snapshot has properties on edges, but the storage is " + "configured without properties on edges!"); + } + } + } + spdlog::info("Edges are recovered."); + } + + // Recover vertices (labels and properties). + if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!"); + auto vertex_acc = vertices->access(); + uint64_t last_vertex_gid = 0; + spdlog::info("Recovering {} vertices.", info.vertices_count); + for (uint64_t i = 0; i < info.vertices_count; ++i) { + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!"); + } + + // Insert vertex. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_vertex_gid) { + throw RecoveryFailure("Invalid snapshot data!"); + } + last_vertex_gid = *gid; + spdlog::debug("Recovering vertex {}.", *gid); + auto [it, inserted] = vertex_acc.insert(Vertex{Gid::FromUint(*gid), nullptr}); + if (!inserted) throw RecoveryFailure("The vertex must be inserted here!"); + + // Recover labels. + spdlog::trace("Recovering labels for vertex {}.", *gid); + { + auto labels_size = snapshot.ReadUint(); + if (!labels_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &labels = it->labels; + labels.reserve(*labels_size); + for (uint64_t j = 0; j < *labels_size; ++j) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + SPDLOG_TRACE("Recovered label \"{}\" for vertex {}.", name_id_mapper->IdToName(snapshot_id_map.at(*label)), + *gid); + labels.emplace_back(get_label_from_id(*label)); + } + } + + // Recover properties. + spdlog::trace("Recovering properties for vertex {}.", *gid); + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &props = it->properties; + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.ReadPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.", + name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid); + props.SetProperty(get_property_from_id(*key), *value); + } + } + + // Skip in edges. + { + auto in_size = snapshot.ReadUint(); + if (!in_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *in_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto from_gid = snapshot.ReadUint(); + if (!from_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip out edges. + auto out_size = snapshot.ReadUint(); + if (!out_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *out_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto to_gid = snapshot.ReadUint(); + if (!to_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + } + } + spdlog::info("Vertices are recovered."); + + // Recover vertices (in/out edges). + spdlog::info("Recovering connectivity."); + if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!"); + for (auto &vertex : vertex_acc) { + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!"); + } + + spdlog::trace("Recovering connectivity for vertex {}.", vertex.gid.AsUint()); + // Check vertex. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (gid != vertex.gid.AsUint()) throw RecoveryFailure("Invalid snapshot data!"); + + // Skip labels. + { + auto labels_size = snapshot.ReadUint(); + if (!labels_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *labels_size; ++j) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.SkipPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Recover in edges. + { + spdlog::trace("Recovering inbound edges for vertex {}.", vertex.gid.AsUint()); + auto in_size = snapshot.ReadUint(); + if (!in_size) throw RecoveryFailure("Invalid snapshot data!"); + vertex.in_edges.reserve(*in_size); + for (uint64_t j = 0; j < *in_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = std::max(last_edge_gid, *edge_gid); + + auto from_gid = snapshot.ReadUint(); + if (!from_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + + auto from_vertex = vertex_acc.find(Gid::FromUint(*from_gid)); + if (from_vertex == vertex_acc.end()) throw RecoveryFailure("Invalid from vertex!"); + + EdgeRef edge_ref(Gid::FromUint(*edge_gid)); + if (items.properties_on_edges) { + if (snapshot_has_edges) { + auto edge = edge_acc.find(Gid::FromUint(*edge_gid)); + if (edge == edge_acc.end()) throw RecoveryFailure("Invalid edge!"); + edge_ref = EdgeRef(&*edge); + } else { + auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr}); + edge_ref = EdgeRef(&*edge); + } + } + SPDLOG_TRACE("Recovered inbound edge {} with label \"{}\" from vertex {}.", *edge_gid, + name_id_mapper->IdToName(snapshot_id_map.at(*edge_type)), from_vertex->gid.AsUint()); + vertex.in_edges.emplace_back(get_edge_type_from_id(*edge_type), &*from_vertex, edge_ref); + } + } + + // Recover out edges. + { + spdlog::trace("Recovering outbound edges for vertex {}.", vertex.gid.AsUint()); + auto out_size = snapshot.ReadUint(); + if (!out_size) throw RecoveryFailure("Invalid snapshot data!"); + vertex.out_edges.reserve(*out_size); + for (uint64_t j = 0; j < *out_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = std::max(last_edge_gid, *edge_gid); + + auto to_gid = snapshot.ReadUint(); + if (!to_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + + auto to_vertex = vertex_acc.find(Gid::FromUint(*to_gid)); + if (to_vertex == vertex_acc.end()) throw RecoveryFailure("Invalid to vertex!"); + + EdgeRef edge_ref(Gid::FromUint(*edge_gid)); + if (items.properties_on_edges) { + if (snapshot_has_edges) { + auto edge = edge_acc.find(Gid::FromUint(*edge_gid)); + if (edge == edge_acc.end()) throw RecoveryFailure("Invalid edge!"); + edge_ref = EdgeRef(&*edge); + } else { + auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr}); + edge_ref = EdgeRef(&*edge); + } + } + SPDLOG_TRACE("Recovered outbound edge {} with label \"{}\" to vertex {}.", *edge_gid, + name_id_mapper->IdToName(snapshot_id_map.at(*edge_type)), to_vertex->gid.AsUint()); + vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref); + } + // Increment edge count. We only increment the count here because the + // information is duplicated in in_edges. + edge_count->fetch_add(*out_size, std::memory_order_acq_rel); + } + } + spdlog::info("Connectivity is recovered."); + + // Set initial values for edge/vertex ID generators. + ret.next_edge_id = last_edge_gid + 1; + ret.next_vertex_id = last_vertex_gid + 1; + } + + // Recover indices. + { + spdlog::info("Recovering metadata of indices."); + if (!snapshot.SetPosition(info.offset_indices)) throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_INDICES) throw RecoveryFailure("Invalid snapshot data!"); + + // Recover label indices. + { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + spdlog::info("Recovering metadata of {} label indices.", *size); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + AddRecoveredIndexConstraint(&indices_constraints.indices.label, get_label_from_id(*label), + "The label index already exists!"); + SPDLOG_TRACE("Recovered metadata of label index for :{}", name_id_mapper->IdToName(snapshot_id_map.at(*label))); + } + spdlog::info("Metadata of label indices are recovered."); + } + + // Recover label+property indices. + { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + spdlog::info("Recovering metadata of {} label+property indices.", *size); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + auto property = snapshot.ReadUint(); + if (!property) throw RecoveryFailure("Invalid snapshot data!"); + AddRecoveredIndexConstraint(&indices_constraints.indices.label_property, + {get_label_from_id(*label), get_property_from_id(*property)}, + "The label+property index already exists!"); + SPDLOG_TRACE("Recovered metadata of label+property index for :{}({})", + name_id_mapper->IdToName(snapshot_id_map.at(*label)), + name_id_mapper->IdToName(snapshot_id_map.at(*property))); + } + spdlog::info("Metadata of label+property indices are recovered."); + } + spdlog::info("Metadata of indices are recovered."); + } + + // Recover constraints. + { + spdlog::info("Recovering metadata of constraints."); + if (!snapshot.SetPosition(info.offset_constraints)) throw RecoveryFailure("Couldn't read data from snapshot!"); + + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_CONSTRAINTS) throw RecoveryFailure("Invalid snapshot data!"); + + // Recover existence constraints. + { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + spdlog::info("Recovering metadata of {} existence constraints.", *size); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + auto property = snapshot.ReadUint(); + if (!property) throw RecoveryFailure("Invalid snapshot data!"); + AddRecoveredIndexConstraint(&indices_constraints.constraints.existence, + {get_label_from_id(*label), get_property_from_id(*property)}, + "The existence constraint already exists!"); + SPDLOG_TRACE("Recovered metadata of existence constraint for :{}({})", + name_id_mapper->IdToName(snapshot_id_map.at(*label)), + name_id_mapper->IdToName(snapshot_id_map.at(*property))); + } + spdlog::info("Metadata of existence constraints are recovered."); + } + + // Recover unique constraints. + // Snapshot version should be checked since unique constraints were + // implemented in later versions of snapshot. + if (*version >= kUniqueConstraintVersion) { + auto size = snapshot.ReadUint(); + if (!size) throw RecoveryFailure("Invalid snapshot data!"); + spdlog::info("Recovering metadata of {} unique constraints.", *size); + for (uint64_t i = 0; i < *size; ++i) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + auto properties_count = snapshot.ReadUint(); + if (!properties_count) throw RecoveryFailure("Invalid snapshot data!"); + std::set properties; + for (uint64_t j = 0; j < *properties_count; ++j) { + auto property = snapshot.ReadUint(); + if (!property) throw RecoveryFailure("Invalid snapshot data!"); + properties.insert(get_property_from_id(*property)); + } + AddRecoveredIndexConstraint(&indices_constraints.constraints.unique, {get_label_from_id(*label), properties}, + "The unique constraint already exists!"); + SPDLOG_TRACE("Recovered metadata of unique constraints for :{}", + name_id_mapper->IdToName(snapshot_id_map.at(*label))); + } + spdlog::info("Metadata of unique constraints are recovered."); + } + spdlog::info("Metadata of constraints are recovered."); + } + + spdlog::info("Recovering metadata."); + // Recover epoch history + { + if (!snapshot.SetPosition(info.offset_epoch_history)) throw RecoveryFailure("Couldn't read data from snapshot!"); + + const auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_EPOCH_HISTORY) throw RecoveryFailure("Invalid snapshot data!"); + + const auto history_size = snapshot.ReadUint(); + if (!history_size) { + throw RecoveryFailure("Invalid snapshot data!"); + } + + for (int i = 0; i < *history_size; ++i) { + auto maybe_epoch_id = snapshot.ReadString(); + if (!maybe_epoch_id) { + throw RecoveryFailure("Invalid snapshot data!"); + } + const auto maybe_last_commit_timestamp = snapshot.ReadUint(); + if (!maybe_last_commit_timestamp) { + throw RecoveryFailure("Invalid snapshot data!"); + } + epoch_history->emplace_back(std::move(*maybe_epoch_id), *maybe_last_commit_timestamp); + } + } + + spdlog::info("Metadata recovered."); + // Recover timestamp. + ret.next_timestamp = info.start_timestamp + 1; + + // Set success flag (to disable cleanup). + success = true; + + return {info, ret, std::move(indices_constraints)}; +} + RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList *vertices, utils::SkipList *edges, std::deque> *epoch_history, @@ -494,9 +963,13 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis RecoveredIndicesAndConstraints indices_constraints; Decoder snapshot; - auto version = snapshot.Initialize(path, kSnapshotMagic); + const auto version = snapshot.Initialize(path, kSnapshotMagic); if (!version) throw RecoveryFailure("Couldn't read snapshot magic and/or version!"); + if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version)); + if (*version == 14U) { + return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config.items); + } // Cleanup of loaded data in case of failure. bool success = false;