Add vertex batches to RecoveryInfo

This commit is contained in:
János Benjamin Antal 2023-04-13 16:26:36 +02:00
parent c2f3a92eca
commit eebb38c62b
2 changed files with 31 additions and 15 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -12,6 +12,7 @@
#pragma once
#include <algorithm>
#include <optional>
#include <set>
#include <utility>
#include <vector>
@ -29,6 +30,8 @@ struct RecoveryInfo {
// last timestamp read from a WAL file
std::optional<uint64_t> last_commit_timestamp;
std::vector<std::pair<Gid /*first vertex gid*/, uint64_t /*batch size*/>> vertex_batches;
};
/// Structure used to track indices and constraints during recovery.

View File

@ -357,10 +357,18 @@ uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<
}
// Returns the number of edges recovered
struct LoadPartialConnectivityResult {
uint64_t edge_count;
uint64_t highest_edge_id;
Gid first_vertex_gid;
};
template <typename TEdgeTypeFromIdFunc>
std::pair<uint64_t /*edge count*/, uint64_t /*highest edge gid*/> LoadPartialConnectivity(
const std::filesystem::path &path, utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
const uint64_t from_offset, const uint64_t vertices_count, const Config::Items items, const bool snapshot_has_edges,
LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path,
utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
const uint64_t from_offset, const uint64_t vertices_count,
const Config::Items items, const bool snapshot_has_edges,
TEdgeTypeFromIdFunc get_edge_type_from_id) {
Decoder snapshot;
snapshot.Initialize(path, kSnapshotMagic);
@ -370,7 +378,7 @@ std::pair<uint64_t /*edge count*/, uint64_t /*highest edge gid*/> LoadPartialCon
auto edge_acc = edges.access();
// Read the first gid to find the necessary iterator in vertices
const auto start_vertex_gid = std::invoke([&]() mutable {
const auto first_vertex_gid = std::invoke([&]() mutable {
{
auto marker = snapshot.ReadMarker();
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
@ -383,7 +391,7 @@ std::pair<uint64_t /*edge count*/, uint64_t /*highest edge gid*/> LoadPartialCon
uint64_t edge_count{0};
uint64_t highest_edge_gid{0};
auto vertex_it = vertex_acc.find(start_vertex_gid);
auto vertex_it = vertex_acc.find(first_vertex_gid);
if (vertex_it == vertex_acc.end()) {
throw RecoveryFailure("Invalid snapshot data!");
}
@ -501,7 +509,7 @@ std::pair<uint64_t /*edge count*/, uint64_t /*highest edge gid*/> LoadPartialCon
++vertex_it;
}
spdlog::info("Partial connectivities are recovered.");
return std::make_pair(edge_count, highest_edge_gid);
return {edge_count, highest_edge_gid, first_vertex_gid};
}
template <typename TFunc>
@ -1120,19 +1128,24 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
// Recover vertices (in/out edges).
spdlog::info("Recover connectivity.");
ret.vertex_batches.reserve(vertex_batches.size());
for (const auto batch : vertex_batches) {
ret.vertex_batches.push_back(std::make_pair(Gid::FromUint(0), batch.count));
}
std::atomic<uint64_t> highest_edge_gid{0};
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, &highest_edge_gid, items = config.items, snapshot_has_edges,
&get_edge_type_from_id](const size_t /*batch_index*/, const BatchInfo &batch) {
const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity(
path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(number_of_recovered_edges);
[path, vertices, edges, edge_count, items = config.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &ret](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < highest_edge_gid_in_batch) {
highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch);
while (known_highest_edge_gid < result.highest_edge_id) {
highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, result.highest_edge_id);
}
ret.vertex_batches[batch_index].first = result.first_vertex_gid;
},
vertex_batches);