From eebb38c62b8f317445f5f78560d393125ad76bfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Thu, 13 Apr 2023 16:26:36 +0200 Subject: [PATCH] Add vertex batches to `RecoveryInfo` --- src/storage/v2/durability/metadata.hpp | 5 +++- src/storage/v2/durability/snapshot.cpp | 41 +++++++++++++++++--------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/storage/v2/durability/metadata.hpp b/src/storage/v2/durability/metadata.hpp index 2986212be..3c66f7ec8 100644 --- a/src/storage/v2/durability/metadata.hpp +++ b/src/storage/v2/durability/metadata.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include #include @@ -29,6 +30,8 @@ struct RecoveryInfo { // last timestamp read from a WAL file std::optional last_commit_timestamp; + + std::vector> vertex_batches; }; /// Structure used to track indices and constraints during recovery. diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 4d2781df1..eaddb37b9 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -357,11 +357,19 @@ uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList< } // Returns the number of edges recovered + +struct LoadPartialConnectivityResult { + uint64_t edge_count; + uint64_t highest_edge_id; + Gid first_vertex_gid; +}; + template -std::pair LoadPartialConnectivity( - const std::filesystem::path &path, utils::SkipList &vertices, utils::SkipList &edges, - const uint64_t from_offset, const uint64_t vertices_count, const Config::Items items, const bool snapshot_has_edges, - TEdgeTypeFromIdFunc get_edge_type_from_id) { +LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path, + utils::SkipList &vertices, utils::SkipList &edges, + const uint64_t from_offset, const uint64_t vertices_count, + const Config::Items items, const bool snapshot_has_edges, + TEdgeTypeFromIdFunc get_edge_type_from_id) { Decoder snapshot; snapshot.Initialize(path, kSnapshotMagic); if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!"); @@ -370,7 +378,7 @@ std::pair LoadPartialCon auto edge_acc = edges.access(); // Read the first gid to find the necessary iterator in vertices - const auto start_vertex_gid = std::invoke([&]() mutable { + const auto first_vertex_gid = std::invoke([&]() mutable { { auto marker = snapshot.ReadMarker(); if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!"); @@ -383,7 +391,7 @@ std::pair LoadPartialCon uint64_t edge_count{0}; uint64_t highest_edge_gid{0}; - auto vertex_it = vertex_acc.find(start_vertex_gid); + auto vertex_it = vertex_acc.find(first_vertex_gid); if (vertex_it == vertex_acc.end()) { throw RecoveryFailure("Invalid snapshot data!"); } @@ -501,7 +509,7 @@ std::pair LoadPartialCon ++vertex_it; } spdlog::info("Partial connectivities are recovered."); - return std::make_pair(edge_count, highest_edge_gid); + return {edge_count, highest_edge_gid, first_vertex_gid}; } template @@ -1120,19 +1128,24 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis // Recover vertices (in/out edges). spdlog::info("Recover connectivity."); + ret.vertex_batches.reserve(vertex_batches.size()); + for (const auto batch : vertex_batches) { + ret.vertex_batches.push_back(std::make_pair(Gid::FromUint(0), batch.count)); + } std::atomic highest_edge_gid{0}; RecoverOnMultipleThreads( config.durability.recovery_thread_count, - [path, vertices, edges, edge_count, &highest_edge_gid, items = config.items, snapshot_has_edges, - &get_edge_type_from_id](const size_t /*batch_index*/, const BatchInfo &batch) { - const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity( - path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id); - edge_count->fetch_add(number_of_recovered_edges); + [path, vertices, edges, edge_count, items = config.items, snapshot_has_edges, &get_edge_type_from_id, + &highest_edge_gid, &ret](const size_t batch_index, const BatchInfo &batch) { + const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, + snapshot_has_edges, get_edge_type_from_id); + edge_count->fetch_add(result.edge_count); auto known_highest_edge_gid = highest_edge_gid.load(); - while (known_highest_edge_gid < highest_edge_gid_in_batch) { - highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch); + while (known_highest_edge_gid < result.highest_edge_id) { + highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, result.highest_edge_id); } + ret.vertex_batches[batch_index].first = result.first_vertex_gid; }, vertex_batches);