Do not serialize first vertex ids in batches

This commit is contained in:
János Benjamin Antal 2023-04-11 18:43:36 +02:00
parent e2f6bdef80
commit f7b577eeb6

View File

@ -104,22 +104,15 @@ namespace memgraph::storage::durability {
// * vertex batch infos // * vertex batch infos
// * starting offset of the batch // * starting offset of the batch
// * number of vertices in the batch // * number of vertices in the batch
// * global id of the first vertex in the batch
// //
// IMPORTANT: When changing snapshot encoding/decoding bump the snapshot/WAL // IMPORTANT: When changing snapshot encoding/decoding bump the snapshot/WAL
// version in `version.hpp`. // version in `version.hpp`.
struct EdgeBatchInfo { struct BatchInfo {
uint64_t offset; uint64_t offset;
uint64_t count; uint64_t count;
}; };
struct VertexBatchInfo {
uint64_t offset;
uint64_t count;
uint64_t first_gid;
};
// Function used to read information about the snapshot file. // Function used to read information about the snapshot file.
SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) {
// Check magic and version. // Check magic and version.
@ -189,40 +182,25 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) {
return info; return info;
} }
template <typename T> std::vector<BatchInfo> ReadBatchInfos(Decoder &snapshot) {
concept IsBatchInfo = utils::SameAsAnyOf<T, VertexBatchInfo, EdgeBatchInfo>; std::vector<BatchInfo> infos;
template <IsBatchInfo TBatchInfo>
std::vector<TBatchInfo> ReadBatchInfos(Decoder &snapshot) {
std::vector<TBatchInfo> infos;
const auto infos_size = snapshot.ReadUint(); const auto infos_size = snapshot.ReadUint();
if (!infos_size.has_value()) { if (!infos_size.has_value()) {
throw RecoveryFailure("Invalid snapshot data!"); throw RecoveryFailure("Invalid snapshot data!");
} }
infos.reserve(*infos_size); infos.reserve(*infos_size);
TBatchInfo info;
for (auto i{0U}; i < *infos_size; ++i) { for (auto i{0U}; i < *infos_size; ++i) {
const auto offset = snapshot.ReadUint(); const auto offset = snapshot.ReadUint();
if (!offset.has_value()) { if (!offset.has_value()) {
throw RecoveryFailure("Invalid snapshot data!"); throw RecoveryFailure("Invalid snapshot data!");
} }
info.offset = *offset;
const auto count = snapshot.ReadUint(); const auto count = snapshot.ReadUint();
if (!count.has_value()) { if (!count.has_value()) {
throw RecoveryFailure("Invalid snapshot data!"); throw RecoveryFailure("Invalid snapshot data!");
} }
info.count = *count; infos.push_back(BatchInfo{*offset, *count});
if constexpr (std::same_as<TBatchInfo, VertexBatchInfo>) {
const auto first_vertex_gid = snapshot.ReadUint();
if (!first_vertex_gid.has_value()) {
throw RecoveryFailure("Invalid snapshot data!");
}
info.first_gid = *first_vertex_gid;
}
infos.push_back(info);
} }
return infos; return infos;
} }
@ -378,8 +356,8 @@ void LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<Vert
template <typename TEdgeTypeFromIdFunc> template <typename TEdgeTypeFromIdFunc>
void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices, void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices,
utils::SkipList<Edge> &edges, std::atomic<uint64_t> &edge_count, utils::SkipList<Edge> &edges, std::atomic<uint64_t> &edge_count,
const uint64_t from_offset, const Gid start_vertex_gid, const uint64_t vertices_count, const uint64_t from_offset, const uint64_t vertices_count, const Config::Items items,
const Config::Items items, TEdgeTypeFromIdFunc get_edge_type_from_id) { TEdgeTypeFromIdFunc get_edge_type_from_id) {
Decoder snapshot; Decoder snapshot;
snapshot.Initialize(path, kSnapshotMagic); snapshot.Initialize(path, kSnapshotMagic);
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!"); if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
@ -387,12 +365,27 @@ void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<
auto vertex_acc = vertices.access(); auto vertex_acc = vertices.access();
auto edge_acc = edges.access(); auto edge_acc = edges.access();
// Read the first gid to find the necessary iterator in vertices
const auto start_vertex_gid = std::invoke([&]() mutable {
{
auto marker = snapshot.ReadMarker();
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
}
auto gid = snapshot.ReadUint();
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
return Gid::FromUint(*gid);
});
auto vertex_it = vertex_acc.find(start_vertex_gid); auto vertex_it = vertex_acc.find(start_vertex_gid);
if (vertex_it == vertex_acc.end()) { if (vertex_it == vertex_acc.end()) {
throw RecoveryFailure("Invalid snapshot data!"); throw RecoveryFailure("Invalid snapshot data!");
} }
spdlog::info("Recovering connectivity for {} vertices.", vertices_count); spdlog::info("Recovering connectivity for {} vertices.", vertices_count);
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
for (uint64_t i = 0; i < vertices_count; ++i) { for (uint64_t i = 0; i < vertices_count; ++i) {
auto &vertex = *vertex_it; auto &vertex = *vertex_it;
{ {
@ -1046,7 +1039,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!snapshot.SetPosition(info.offset_edge_batches)) { if (!snapshot.SetPosition(info.offset_edge_batches)) {
throw RecoveryFailure("Couldn't read data from snapshot!"); throw RecoveryFailure("Couldn't read data from snapshot!");
} }
const auto edge_batches = ReadBatchInfos<EdgeBatchInfo>(snapshot); const auto edge_batches = ReadBatchInfos(snapshot);
std::atomic<uint64_t> batch_counter = 0; std::atomic<uint64_t> batch_counter = 0;
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) { for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
@ -1070,7 +1063,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!snapshot.SetPosition(info.offset_vertex_batches)) { if (!snapshot.SetPosition(info.offset_vertex_batches)) {
throw RecoveryFailure("Couldn't read data from snapshot!"); throw RecoveryFailure("Couldn't read data from snapshot!");
} }
const auto vertex_batches = ReadBatchInfos<VertexBatchInfo>(snapshot); const auto vertex_batches = ReadBatchInfos(snapshot);
{ {
std::vector<std::jthread> threads; std::vector<std::jthread> threads;
threads.reserve(config.durability.recovery_thread_count); threads.reserve(config.durability.recovery_thread_count);
@ -1109,8 +1102,8 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
return; return;
} }
const auto &batch = vertex_batches[batch_index]; const auto &batch = vertex_batches[batch_index];
LoadPartialConnectivity(path, *vertices, *edges, *edge_count, batch.offset, Gid::FromUint(batch.first_gid), LoadPartialConnectivity(path, *vertices, *edges, *edge_count, batch.offset, batch.count, items,
batch.count, items, get_edge_type_from_id); get_edge_type_from_id);
} }
}); });
} }
@ -1311,7 +1304,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
snapshot.WriteUint(mapping.AsUint()); snapshot.WriteUint(mapping.AsUint());
}; };
std::vector<EdgeBatchInfo> edge_batch_infos; std::vector<BatchInfo> edge_batch_infos;
auto items_in_current_batch{0UL}; auto items_in_current_batch{0UL};
offset_edges = snapshot.GetPosition(); offset_edges = snapshot.GetPosition();
auto batch_start_offset{offset_edges}; auto batch_start_offset{offset_edges};
@ -1377,7 +1370,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
++edges_count; ++edges_count;
++items_in_current_batch; ++items_in_current_batch;
if (items_in_current_batch == config.durability.items_per_batch) { if (items_in_current_batch == config.durability.items_per_batch) {
edge_batch_infos.push_back(EdgeBatchInfo{batch_start_offset, items_in_current_batch}); edge_batch_infos.push_back(BatchInfo{batch_start_offset, items_in_current_batch});
batch_start_offset = snapshot.GetPosition(); batch_start_offset = snapshot.GetPosition();
items_in_current_batch = 0; items_in_current_batch = 0;
} }
@ -1385,21 +1378,17 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
} }
if (items_in_current_batch > 0) { if (items_in_current_batch > 0) {
edge_batch_infos.push_back(EdgeBatchInfo{batch_start_offset, items_in_current_batch}); edge_batch_infos.push_back(BatchInfo{batch_start_offset, items_in_current_batch});
} }
std::vector<VertexBatchInfo> vertex_batch_infos; std::vector<BatchInfo> vertex_batch_infos;
// Store all vertices. // Store all vertices.
{ {
items_in_current_batch = 0; items_in_current_batch = 0;
offset_vertices = snapshot.GetPosition(); offset_vertices = snapshot.GetPosition();
batch_start_offset = offset_vertices; batch_start_offset = offset_vertices;
std::optional<uint64_t> first_vertex_gid;
auto acc = vertices->access(); auto acc = vertices->access();
for (auto &vertex : acc) { for (auto &vertex : acc) {
if (!first_vertex_gid.has_value()) {
first_vertex_gid = vertex.gid.AsUint();
}
// The visibility check is implemented for vertices so we use it here. // The visibility check is implemented for vertices so we use it here.
auto va = VertexAccessor::Create(&vertex, transaction, indices, constraints, config.items, View::OLD); auto va = VertexAccessor::Create(&vertex, transaction, indices, constraints, config.items, View::OLD);
if (!va) continue; if (!va) continue;
@ -1450,15 +1439,14 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
++vertices_count; ++vertices_count;
++items_in_current_batch; ++items_in_current_batch;
if (items_in_current_batch == config.durability.items_per_batch) { if (items_in_current_batch == config.durability.items_per_batch) {
vertex_batch_infos.push_back(VertexBatchInfo{batch_start_offset, items_in_current_batch, *first_vertex_gid}); vertex_batch_infos.push_back(BatchInfo{batch_start_offset, items_in_current_batch});
batch_start_offset = snapshot.GetPosition(); batch_start_offset = snapshot.GetPosition();
items_in_current_batch = 0; items_in_current_batch = 0;
first_vertex_gid.reset();
} }
} }
if (items_in_current_batch > 0) { if (items_in_current_batch > 0) {
vertex_batch_infos.push_back(VertexBatchInfo{batch_start_offset, items_in_current_batch, *first_vertex_gid}); vertex_batch_infos.push_back(BatchInfo{batch_start_offset, items_in_current_batch});
} }
} }
@ -1549,25 +1537,24 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
snapshot.WriteUint(vertices_count); snapshot.WriteUint(vertices_count);
} }
// Write edge batches auto write_batch_infos = [&snapshot](const std::vector<BatchInfo> &batch_infos) {
{ snapshot.WriteUint(batch_infos.size());
offset_edge_batches = snapshot.GetPosition(); for (const auto &batch_info : batch_infos) {
snapshot.WriteUint(edge_batch_infos.size());
for (const auto &batch_info : edge_batch_infos) {
snapshot.WriteUint(batch_info.offset); snapshot.WriteUint(batch_info.offset);
snapshot.WriteUint(batch_info.count); snapshot.WriteUint(batch_info.count);
} }
};
// Write edge batches
{
offset_edge_batches = snapshot.GetPosition();
write_batch_infos(edge_batch_infos);
} }
// Write vertex batches // Write vertex batches
{ {
offset_vertex_batches = snapshot.GetPosition(); offset_vertex_batches = snapshot.GetPosition();
snapshot.WriteUint(vertex_batch_infos.size()); write_batch_infos(vertex_batch_infos);
for (const auto &batch_info : vertex_batch_infos) {
snapshot.WriteUint(batch_info.offset);
snapshot.WriteUint(batch_info.count);
snapshot.WriteUint(batch_info.first_gid);
}
} }
// Write true offsets. // Write true offsets.