From 0516e9306060b883a44e486b48af7e7c661e26e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Mon, 3 Apr 2023 14:58:28 +0200 Subject: [PATCH] Parallelize edge recovery --- src/storage/v2/durability/snapshot.cpp | 226 +++++++++++++++++++------ 1 file changed, 174 insertions(+), 52 deletions(-) diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 16c7d017c..569d37cbf 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,7 +10,9 @@ // licenses/APL.txt. #include "storage/v2/durability/snapshot.hpp" +#include +#include "spdlog/spdlog.h" #include "storage/v2/durability/exceptions.hpp" #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/serialization.hpp" @@ -157,6 +159,100 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { return info; } +// n is 0-indexed +uint64_t GetNthEdgeStartOffset(Decoder &snapshot, const uint64_t n) { + for (uint64_t i = 0; i < n; ++i) { + { + const auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!"); + } + + // Skip edge. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + + // Recover properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.SkipPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + } + } + } + const auto offset = snapshot.GetPosition(); + MG_ASSERT(offset.has_value(), "Unexpected"); + return *offset; +} + +template +void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList &edges, const uint64_t from_position, + const uint64_t edges_count, const NameIdMapper &name_id_mapper, const Config::Items items, + TFunc get_property_from_id) { + Decoder snapshot; + snapshot.Initialize(path, kSnapshotMagic); + + // Recover edges. + auto edge_acc = edges.access(); + uint64_t last_edge_gid = 0; + spdlog::info("Recovering {} edges.", edges_count); + if (!snapshot.SetPosition(from_position)) throw RecoveryFailure("Couldn't read data from snapshot!"); + for (uint64_t i = 0; i < edges_count; ++i) { + { + const auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!"); + } + + if (items.properties_on_edges) { + // Insert edge. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = *gid; + spdlog::debug("Recovering edge {} with properties.", *gid); + auto [it, inserted] = edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr}); + if (!inserted) throw RecoveryFailure("The edge must be inserted here!"); + + // Recover properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + auto &props = it->properties; + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.ReadPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.", + name_id_mapper.IdToName(snapshot_id_map.at(*key)), *value, *gid); + props.SetProperty(get_property_from_id(*key), *value); + } + } + } else { + // Read edge GID. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + last_edge_gid = *gid; + + spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid); + // Read properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + if (*props_size != 0) + throw RecoveryFailure( + "The snapshot has properties on edges, but the storage is " + "configured without properties on edges!"); + } + } + } + spdlog::info("Partial edges are recovered."); +} + RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList *vertices, utils::SkipList *edges, std::deque> *epoch_history, @@ -226,65 +322,29 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis // Reset current edge count. edge_count->store(0, std::memory_order_release); + spdlog::info("Recovering edges."); { // Recover edges. auto edge_acc = edges->access(); uint64_t last_edge_gid = 0; if (snapshot_has_edges) { - spdlog::info("Recovering {} edges.", info.edges_count); if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!"); - for (uint64_t i = 0; i < info.edges_count; ++i) { - { - const auto marker = snapshot.ReadMarker(); - if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!"); - } - - if (items.properties_on_edges) { - // Insert edge. - auto gid = snapshot.ReadUint(); - if (!gid) throw RecoveryFailure("Invalid snapshot data!"); - if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!"); - last_edge_gid = *gid; - spdlog::debug("Recovering edge {} with properties.", *gid); - auto [it, inserted] = edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr}); - if (!inserted) throw RecoveryFailure("The edge must be inserted here!"); - - // Recover properties. - { - auto props_size = snapshot.ReadUint(); - if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); - auto &props = it->properties; - for (uint64_t j = 0; j < *props_size; ++j) { - auto key = snapshot.ReadUint(); - if (!key) throw RecoveryFailure("Invalid snapshot data!"); - auto value = snapshot.ReadPropertyValue(); - if (!value) throw RecoveryFailure("Invalid snapshot data!"); - SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.", - name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid); - props.SetProperty(get_property_from_id(*key), *value); - } - } - } else { - // Read edge GID. - auto gid = snapshot.ReadUint(); - if (!gid) throw RecoveryFailure("Invalid snapshot data!"); - if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!"); - last_edge_gid = *gid; - - spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid); - // Read properties. - { - auto props_size = snapshot.ReadUint(); - if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); - if (*props_size != 0) - throw RecoveryFailure( - "The snapshot has properties on edges, but the storage is " - "configured without properties on edges!"); - } - } + std::vector offsets; + offsets.push_back(info.offset_edges); + constexpr auto kEdgeChunkCount = 8; + const auto edge_chunk_size = static_cast(info.edges_count / kEdgeChunkCount); + while (offsets.size() < kEdgeChunkCount) { + offsets.push_back(GetNthEdgeStartOffset(snapshot, edge_chunk_size)); + } + std::vector threads; + threads.reserve(offsets.size()); + for (const auto offset : offsets) { + threads.emplace_back([path, edges, offset, edge_chunk_size, name_id_mapper, items, &get_property_from_id] { + LoadPartialEdges(path, *edges, offset, edge_chunk_size, *name_id_mapper, items, get_property_from_id); + }); } - spdlog::info("Edges are recovered."); } + spdlog::info("Edges are recovered."); // Recover vertices (labels and properties). if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!"); @@ -369,6 +429,68 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis } spdlog::info("Vertices are recovered."); + // Recover vertices (labels and properties). + if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!"); + for (uint64_t i = 0; i < info.vertices_count; ++i) { + { + auto marker = snapshot.ReadMarker(); + if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!"); + } + + // Insert vertex. + auto gid = snapshot.ReadUint(); + if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + + // Skip labels. + { + auto labels_size = snapshot.ReadUint(); + if (!labels_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *labels_size; ++j) { + auto label = snapshot.ReadUint(); + if (!label) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip properties. + { + auto props_size = snapshot.ReadUint(); + if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *props_size; ++j) { + auto key = snapshot.ReadUint(); + if (!key) throw RecoveryFailure("Invalid snapshot data!"); + auto value = snapshot.SkipPropertyValue(); + if (!value) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip in edges. + { + auto in_size = snapshot.ReadUint(); + if (!in_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *in_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto from_gid = snapshot.ReadUint(); + if (!from_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + } + } + + // Skip out edges. + auto out_size = snapshot.ReadUint(); + if (!out_size) throw RecoveryFailure("Invalid snapshot data!"); + for (uint64_t j = 0; j < *out_size; ++j) { + auto edge_gid = snapshot.ReadUint(); + if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto to_gid = snapshot.ReadUint(); + if (!to_gid) throw RecoveryFailure("Invalid snapshot data!"); + auto edge_type = snapshot.ReadUint(); + if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); + } + } + spdlog::info("Vertices are recovered twice."); + // Recover vertices (in/out edges). spdlog::info("Recovering connectivity."); if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");