From f2e44e497b158f011f232fe5467ba9ea279c11b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Tue, 11 Apr 2023 18:43:46 +0200 Subject: [PATCH] Handle next vertex and edge id properly --- src/storage/v2/durability/snapshot.cpp | 60 ++++++++++++++++---------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index afd2b4dcb..bf949d479 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -205,9 +205,10 @@ std::vector ReadBatchInfos(Decoder &snapshot) { return infos; } +// Returns the gid of the last recovered edge template -void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList &edges, const uint64_t from_offset, - const uint64_t edges_count, const Config::Items items, TFunc get_property_from_id) { +uint64_t LoadPartialEdges(const std::filesystem::path &path, utils::SkipList &edges, const uint64_t from_offset, + const uint64_t edges_count, const Config::Items items, TFunc get_property_from_id) { Decoder snapshot; snapshot.Initialize(path, kSnapshotMagic); @@ -265,12 +266,15 @@ void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList & } } spdlog::info("Partial edges are recovered."); + + return last_edge_gid; } +// Returns the gid of the last recovered vertex template -void LoadPartialVertices(const std::filesystem::path &path, utils::SkipList &vertices, - const uint64_t from_offset, const uint64_t vertices_count, const NameIdMapper &name_id_mapper, - TLabelFromIdFunc get_label_from_id, TPropertyFromIdFunc get_property_from_id) { +uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList &vertices, + const uint64_t from_offset, const uint64_t vertices_count, + TLabelFromIdFunc get_label_from_id, TPropertyFromIdFunc get_property_from_id) { Decoder snapshot; snapshot.Initialize(path, kSnapshotMagic); if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!"); @@ -351,6 +355,8 @@ void LoadPartialVertices(const std::filesystem::path &path, utils::SkipList @@ -1029,6 +1035,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis // Reset current edge count. edge_count->store(0, std::memory_order_release); + uint64_t last_edge_gid{0}; { spdlog::info("Recovering edges."); @@ -1043,26 +1050,33 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis std::atomic batch_counter = 0; for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) { - threads.emplace_back( - [path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id]() mutable { - while (true) { - const auto batch_index = batch_counter++; - if (batch_index >= edge_batches.size()) { - return; - } - const auto &batch = edge_batches[batch_index]; + threads.emplace_back([path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id, + &last_edge_gid]() mutable { + while (true) { + const auto batch_index = batch_counter++; + if (batch_index >= edge_batches.size()) { + return; + } + const auto &batch = edge_batches[batch_index]; + const auto last_edge_gid_in_batch = LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id); - } - }); + if (batch_index == edge_batches.size() - 1) { + last_edge_gid = last_edge_gid_in_batch; + } + } + }); } } spdlog::info("Edges are recovered."); // Recover vertices (labels and properties). spdlog::info("Recovering vertices.", info.vertices_count); + uint64_t last_vertex_gid{0}; + if (!snapshot.SetPosition(info.offset_vertex_batches)) { throw RecoveryFailure("Couldn't read data from snapshot!"); } + const auto vertex_batches = ReadBatchInfos(snapshot); { std::vector threads; @@ -1070,16 +1084,19 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis std::atomic batch_counter = 0; for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) { - threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, name_id_mapper, &get_label_from_id, - &get_property_from_id]() mutable { + threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, &get_label_from_id, + &get_property_from_id, &last_vertex_gid]() mutable { while (true) { const auto batch_index = batch_counter++; if (batch_index >= vertex_batches.size()) { return; } const auto &batch = vertex_batches[batch_index]; - LoadPartialVertices(path, *vertices, batch.offset, batch.count, *name_id_mapper, get_label_from_id, - get_property_from_id); + const auto last_vertex_gid_in_batch = LoadPartialVertices(path, *vertices, batch.offset, batch.count, + get_label_from_id, get_property_from_id); + if (batch_index == vertex_batches.size() - 1) { + last_vertex_gid = last_vertex_gid_in_batch; + } } }); } @@ -1111,9 +1128,8 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis spdlog::info("Connectivity is recovered."); // Set initial values for edge/vertex ID generators. - // TODO(antaljanosbenjamin): Figure out proper values - ret.next_edge_id = info.edges_count + 2; - ret.next_vertex_id = info.vertices_count + 2; + ret.next_edge_id = last_edge_gid + 1; + ret.next_vertex_id = last_vertex_gid + 1; } // Recover indices.