diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index d4bc54fec..a29a4450d 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -24,9 +24,12 @@ #include "storage/v2/mvcc.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/concepts.hpp" #include "utils/file_locker.hpp" #include "utils/logging.hpp" #include "utils/message.hpp" +#include "utils/spin_lock.hpp" +#include "utils/synchronized.hpp" namespace memgraph::storage::durability { @@ -44,6 +47,8 @@ namespace memgraph::storage::durability { // * offset to the constraints section // * offset to the mapper section // * offset to the metadata section +// * offset to the offset-count pair of the first edge batch (`0` if properties on edges are disabled) +// * offset to the offset-count pair of the first vertex batch // // 4) Encoded edges (if properties on edges are enabled); each edge is written // in the following format: @@ -90,10 +95,22 @@ namespace memgraph::storage::durability { // applied) // * number of edges // * number of vertices +// TODO(antaljanosbenjamin): extend with batch infos // // IMPORTANT: When changing snapshot encoding/decoding bump the snapshot/WAL // version in `version.hpp`. +struct EdgeBatchInfo { + uint64_t offset; + uint64_t count; +}; + +struct VertexBatchInfo { + uint64_t offset; + uint64_t count; + uint64_t first_gid; +}; + // Function used to read information about the snapshot file. SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { // Check magic and version. @@ -128,6 +145,8 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { info.offset_mapper = read_offset(); info.offset_epoch_history = read_offset(); info.offset_metadata = read_offset(); + info.offset_edge_batches = read_offset(); + info.offset_vertex_batches = read_offset(); } // Read metadata. @@ -161,108 +180,42 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) { return info; } -// n is 0-indexed -uint64_t GetNthEdgeStartOffset(Decoder &snapshot, const uint64_t n) { - for (uint64_t i = 0; i < n; ++i) { - { - const auto marker = snapshot.ReadMarker(); - if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!"); - } +template +concept IsBatchInfo = utils::SameAsAnyOf; - // Skip edge. - auto gid = snapshot.ReadUint(); - if (!gid) throw RecoveryFailure("Invalid snapshot data!"); - - // Recover properties. - { - auto props_size = snapshot.ReadUint(); - if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); - for (uint64_t j = 0; j < *props_size; ++j) { - auto key = snapshot.ReadUint(); - if (!key) throw RecoveryFailure("Invalid snapshot data!"); - auto value = snapshot.SkipPropertyValue(); - if (!value) throw RecoveryFailure("Invalid snapshot data!"); - } - } +template +std::vector ReadBatchInfos(Decoder &snapshot) { + std::vector infos; + const auto infos_size = snapshot.ReadUint(); + if (!infos_size.has_value()) { + throw RecoveryFailure("Invalid snapshot data!"); } - const auto offset = snapshot.GetPosition(); - MG_ASSERT(offset.has_value(), "Unexpected"); - return *offset; -} + infos.reserve(*infos_size); + TBatchInfo info; -// n is 0-indexed -std::pair GetNthVertexStartOffsetAndGid(Decoder &snapshot, const uint64_t n) { - for (uint64_t i = 0; i < n; ++i) { - { - auto marker = snapshot.ReadMarker(); - if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!"); + for (auto i{0U}; i < *infos_size; ++i) { + const auto offset = snapshot.ReadUint(); + if (!offset.has_value()) { + throw RecoveryFailure("Invalid snapshot data!"); } + info.offset = *offset; - // Insert vertex. - auto gid = snapshot.ReadUint(); - if (!gid) throw RecoveryFailure("Invalid snapshot data!"); + const auto count = snapshot.ReadUint(); + if (!count.has_value()) { + throw RecoveryFailure("Invalid snapshot data!"); + } + info.count = *count; - // Skip labels. - { - auto labels_size = snapshot.ReadUint(); - if (!labels_size) throw RecoveryFailure("Invalid snapshot data!"); - for (uint64_t j = 0; j < *labels_size; ++j) { - auto label = snapshot.ReadUint(); - if (!label) throw RecoveryFailure("Invalid snapshot data!"); + if constexpr (std::same_as) { + const auto first_vertex_gid = snapshot.ReadUint(); + if (!first_vertex_gid.has_value()) { + throw RecoveryFailure("Invalid snapshot data!"); } + info.first_gid = *first_vertex_gid; } - - // Skip properties. - { - auto props_size = snapshot.ReadUint(); - if (!props_size) throw RecoveryFailure("Invalid snapshot data!"); - for (uint64_t j = 0; j < *props_size; ++j) { - auto key = snapshot.ReadUint(); - if (!key) throw RecoveryFailure("Invalid snapshot data!"); - auto value = snapshot.SkipPropertyValue(); - if (!value) throw RecoveryFailure("Invalid snapshot data!"); - } - } - - // Skip in edges. - { - auto in_size = snapshot.ReadUint(); - if (!in_size) throw RecoveryFailure("Invalid snapshot data!"); - for (uint64_t j = 0; j < *in_size; ++j) { - auto edge_gid = snapshot.ReadUint(); - if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); - auto from_gid = snapshot.ReadUint(); - if (!from_gid) throw RecoveryFailure("Invalid snapshot data!"); - auto edge_type = snapshot.ReadUint(); - if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); - } - } - - // Skip out edges. - auto out_size = snapshot.ReadUint(); - if (!out_size) throw RecoveryFailure("Invalid snapshot data!"); - for (uint64_t j = 0; j < *out_size; ++j) { - auto edge_gid = snapshot.ReadUint(); - if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!"); - auto to_gid = snapshot.ReadUint(); - if (!to_gid) throw RecoveryFailure("Invalid snapshot data!"); - auto edge_type = snapshot.ReadUint(); - if (!edge_type) throw RecoveryFailure("Invalid snapshot data!"); - } + infos.push_back(info); } - - const auto offset = snapshot.GetPosition(); - MG_ASSERT(offset.has_value(), "Unexpected"); - { - auto marker = snapshot.ReadMarker(); - if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!"); - } - - // Insert vertex. - auto gid = snapshot.ReadUint(); - if (!gid) throw RecoveryFailure("Invalid snapshot data!"); - - return std::make_pair(*offset, Gid::FromUint(*gid)); + return infos; } template @@ -602,24 +555,29 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis // Reset current edge count. edge_count->store(0, std::memory_order_release); - static constexpr auto kThreadCount = 8; - spdlog::info("Recovering edges."); + static constexpr auto kThreadCount = 2U; { + spdlog::info("Recovering edges."); // Recover edges. - auto edge_acc = edges->access(); if (snapshot_has_edges) { - if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!"); - std::vector offsets; - offsets.push_back(info.offset_edges); - const auto edge_chunk_size = static_cast(info.edges_count / kThreadCount); - while (offsets.size() < kThreadCount) { - offsets.push_back(GetNthEdgeStartOffset(snapshot, edge_chunk_size)); - } std::vector threads; - threads.reserve(offsets.size()); - for (const auto offset : offsets) { - threads.emplace_back([path, edges, offset, edge_chunk_size, items, &get_property_from_id] { - LoadPartialEdges(path, *edges, offset, edge_chunk_size, items, get_property_from_id); + threads.reserve(kThreadCount); + if (!snapshot.SetPosition(info.offset_edge_batches)) { + throw RecoveryFailure("Couldn't read data from snapshot!"); + } + const auto edge_batches = ReadBatchInfos(snapshot); + std::atomic batch_counter = 0; + + for (auto i{0U}; i < kThreadCount; ++i) { + threads.emplace_back([path, edges, &edge_batches, &batch_counter, items, &get_property_from_id]() mutable { + while (true) { + const auto batch_index = batch_counter++; + if (batch_index >= edge_batches.size()) { + return; + } + const auto &batch = edge_batches[batch_index]; + LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id); + } }); } } @@ -627,24 +585,27 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis // Recover vertices (labels and properties). spdlog::info("Recovering vertices.", info.vertices_count); - if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!"); - std::vector> vertices_offsets_and_gids; - const auto vertex_chunk_size = static_cast(info.vertices_count / kThreadCount); - { - vertices_offsets_and_gids.push_back(GetNthVertexStartOffsetAndGid(snapshot, 0)); - while (vertices_offsets_and_gids.size() < kThreadCount) { - snapshot.SetPosition(vertices_offsets_and_gids.back().first); - vertices_offsets_and_gids.push_back(GetNthVertexStartOffsetAndGid(snapshot, vertex_chunk_size)); - } + if (!snapshot.SetPosition(info.offset_vertex_batches)) { + throw RecoveryFailure("Couldn't read data from snapshot!"); } + const auto vertex_batches = ReadBatchInfos(snapshot); { std::vector threads; - threads.reserve(vertices_offsets_and_gids.size()); - for (const auto &offset_and_gid : vertices_offsets_and_gids) { - threads.emplace_back([path, vertices, offset = offset_and_gid.first, vertex_chunk_size, name_id_mapper, - &get_label_from_id, &get_property_from_id] { - LoadPartialVertices(path, *vertices, offset, vertex_chunk_size, *name_id_mapper, get_label_from_id, - get_property_from_id); + threads.reserve(kThreadCount); + std::atomic batch_counter = 0; + + for (auto i{0U}; i < kThreadCount; ++i) { + threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, name_id_mapper, &get_label_from_id, + &get_property_from_id]() mutable { + while (true) { + const auto batch_index = batch_counter++; + if (batch_index >= vertex_batches.size()) { + return; + } + const auto &batch = vertex_batches[batch_index]; + LoadPartialVertices(path, *vertices, batch.offset, batch.count, *name_id_mapper, get_label_from_id, + get_property_from_id); + } }); } } @@ -654,13 +615,22 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis spdlog::info("Recover connectivity."); { std::vector threads; - threads.reserve(vertices_offsets_and_gids.size()); - for (const auto &offset_and_gid : vertices_offsets_and_gids) { - threads.emplace_back( - [path, vertices, edges, edge_count, &offset_and_gid, vertex_chunk_size, items, &get_edge_type_from_id] { - LoadPartialConnectivity(path, *vertices, *edges, *edge_count, offset_and_gid.first, offset_and_gid.second, - vertex_chunk_size, items, get_edge_type_from_id); - }); + threads.reserve(kThreadCount); + std::atomic batch_counter = 0; + + for (auto i{0U}; i < kThreadCount; ++i) { + threads.emplace_back([path, vertices, edges, edge_count, &vertex_batches, &batch_counter, items, + &get_edge_type_from_id]() mutable { + while (true) { + const auto batch_index = batch_counter++; + if (batch_index >= vertex_batches.size()) { + return; + } + const auto &batch = vertex_batches[batch_index]; + LoadPartialConnectivity(path, *vertices, *edges, *edge_count, batch.offset, Gid::FromUint(batch.first_gid), + batch.count, items, get_edge_type_from_id); + } + }); } } spdlog::info("Connectivity is recovered."); @@ -832,6 +802,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps uint64_t offset_mapper = 0; uint64_t offset_metadata = 0; uint64_t offset_epoch_history = 0; + uint64_t offset_edge_batches = 0; + uint64_t offset_vertex_batches = 0; { snapshot.WriteMarker(Marker::SECTION_OFFSETS); offset_offsets = snapshot.GetPosition(); @@ -842,6 +814,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps snapshot.WriteUint(offset_mapper); snapshot.WriteUint(offset_epoch_history); snapshot.WriteUint(offset_metadata); + snapshot.WriteUint(offset_edge_batches); + snapshot.WriteUint(offset_vertex_batches); } // Object counters. @@ -855,9 +829,13 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps snapshot.WriteUint(mapping.AsUint()); }; + std::vector edge_batch_infos; + static constexpr auto kDesiredEdgeCountPerBatch{1'000'000ULL}; + auto items_in_current_batch{0UL}; + offset_edges = snapshot.GetPosition(); + auto batch_start_offset{offset_edges}; // Store all edges. if (items.properties_on_edges) { - offset_edges = snapshot.GetPosition(); auto acc = edges->access(); for (auto &edge : acc) { // The edge visibility check must be done here manually because we don't @@ -916,14 +894,32 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps } ++edges_count; + ++items_in_current_batch; + if (items_in_current_batch == kDesiredEdgeCountPerBatch) { + edge_batch_infos.push_back(EdgeBatchInfo{batch_start_offset, items_in_current_batch}); + batch_start_offset = snapshot.GetPosition(); + items_in_current_batch = 0; + } } } + if (items_in_current_batch > 0) { + edge_batch_infos.push_back(EdgeBatchInfo{batch_start_offset, items_in_current_batch}); + } + + std::vector vertex_batch_infos; // Store all vertices. { + static constexpr auto kDesiredVertexCountPerBatch{1'000'000ULL}; + items_in_current_batch = 0; offset_vertices = snapshot.GetPosition(); + batch_start_offset = offset_vertices; + std::optional first_vertex_gid; auto acc = vertices->access(); for (auto &vertex : acc) { + if (!first_vertex_gid.has_value()) { + first_vertex_gid = vertex.gid.AsUint(); + } // The visibility check is implemented for vertices so we use it here. auto va = VertexAccessor::Create(&vertex, transaction, indices, constraints, items, View::OLD); if (!va) continue; @@ -972,6 +968,17 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps } ++vertices_count; + ++items_in_current_batch; + if (items_in_current_batch == kDesiredVertexCountPerBatch) { + vertex_batch_infos.push_back(VertexBatchInfo{batch_start_offset, items_in_current_batch, *first_vertex_gid}); + batch_start_offset = snapshot.GetPosition(); + items_in_current_batch = 0; + first_vertex_gid.reset(); + } + } + + if (items_in_current_batch > 0) { + vertex_batch_infos.push_back(VertexBatchInfo{batch_start_offset, items_in_current_batch, *first_vertex_gid}); } } @@ -1062,6 +1069,27 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps snapshot.WriteUint(vertices_count); } + // Write edge batches + { + offset_edge_batches = snapshot.GetPosition(); + snapshot.WriteUint(edge_batch_infos.size()); + for (const auto &batch_info : edge_batch_infos) { + snapshot.WriteUint(batch_info.offset); + snapshot.WriteUint(batch_info.count); + } + } + + // Write vertex batches + { + offset_vertex_batches = snapshot.GetPosition(); + snapshot.WriteUint(vertex_batch_infos.size()); + for (const auto &batch_info : vertex_batch_infos) { + snapshot.WriteUint(batch_info.offset); + snapshot.WriteUint(batch_info.count); + snapshot.WriteUint(batch_info.first_gid); + } + } + // Write true offsets. { snapshot.SetPosition(offset_offsets); @@ -1072,6 +1100,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps snapshot.WriteUint(offset_mapper); snapshot.WriteUint(offset_epoch_history); snapshot.WriteUint(offset_metadata); + snapshot.WriteUint(offset_edge_batches); + snapshot.WriteUint(offset_vertex_batches); } // Finalize snapshot file. diff --git a/src/storage/v2/durability/snapshot.hpp b/src/storage/v2/durability/snapshot.hpp index b1cfad63c..c13429e9a 100644 --- a/src/storage/v2/durability/snapshot.hpp +++ b/src/storage/v2/durability/snapshot.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -37,6 +37,8 @@ struct SnapshotInfo { uint64_t offset_mapper; uint64_t offset_epoch_history; uint64_t offset_metadata; + uint64_t offset_edge_batches; + uint64_t offset_vertex_batches; std::string uuid; std::string epoch_id; diff --git a/src/storage/v2/durability/version.hpp b/src/storage/v2/durability/version.hpp index 712bf322a..55aeff552 100644 --- a/src/storage/v2/durability/version.hpp +++ b/src/storage/v2/durability/version.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -20,7 +20,7 @@ namespace memgraph::storage::durability { // The current version of snapshot and WAL encoding / decoding. // IMPORTANT: Please bump this version for every snapshot and/or WAL format // change!!! -const uint64_t kVersion{14}; +const uint64_t kVersion{15}; const uint64_t kOldestSupportedVersion{14}; const uint64_t kUniqueConstraintVersion{13};