From 151e1cf62b64f303d9bfa14cfd9b9a16fc21f0a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Wed, 12 Apr 2023 17:06:28 +0200 Subject: [PATCH] Fix error handling in case of corrupted snapshot file --- src/storage/v2/durability/snapshot.cpp | 122 ++++++++++++------------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 11aa0fa2a..4d2781df1 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -504,6 +504,36 @@ std::pair LoadPartialCon return std::make_pair(edge_count, highest_edge_gid); } +template +void RecoverOnMultipleThreads(const size_t thread_count, const TFunc &func, const std::vector &batches) { + utils::Synchronized, utils::SpinLock> maybe_error{}; + { + std::vector threads; + threads.reserve(thread_count); + std::atomic batch_counter = 0; + + for (auto i{0U}; i < thread_count; ++i) { + threads.emplace_back([&func, &batches, &maybe_error, &batch_counter]() { + while (!maybe_error.Lock()->has_value()) { + const auto batch_index = batch_counter++; + if (batch_index >= batches.size()) { + return; + } + const auto &batch = batches[batch_index]; + try { + func(batch_index, batch); + } catch (RecoveryFailure &failure) { + *maybe_error.Lock() = std::move(failure); + } + } + }); + } + } + if (maybe_error.Lock()->has_value()) { + throw RecoveryFailure((*maybe_error.Lock())->what()); + } +} + RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList *vertices, utils::SkipList *edges, std::deque> *epoch_history, @@ -1050,27 +1080,18 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis spdlog::info("Recovering edges."); // Recover edges. if (snapshot_has_edges) { - std::vector threads; - threads.reserve(config.durability.recovery_thread_count); if (!snapshot.SetPosition(info.offset_edge_batches)) { throw RecoveryFailure("Couldn't read data from snapshot!"); } const auto edge_batches = ReadBatchInfos(snapshot); - std::atomic batch_counter = 0; - for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) { - threads.emplace_back( - [path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id]() { - while (true) { - const auto batch_index = batch_counter++; - if (batch_index >= edge_batches.size()) { - return; - } - const auto &batch = edge_batches[batch_index]; - LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id); - } - }); - } + RecoverOnMultipleThreads( + config.durability.recovery_thread_count, + [path, edges, items = config.items, &get_property_from_id](const size_t /*batch_index*/, + const BatchInfo &batch) { + LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id); + }, + edge_batches); } spdlog::info("Edges are recovered."); @@ -1083,59 +1104,38 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis } const auto vertex_batches = ReadBatchInfos(snapshot); - { - std::vector threads; - threads.reserve(config.durability.recovery_thread_count); - std::atomic batch_counter = 0; - - for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) { - threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, &get_label_from_id, - &get_property_from_id, &last_vertex_gid]() { - while (true) { - const auto batch_index = batch_counter++; - if (batch_index >= vertex_batches.size()) { - return; - } - const auto &batch = vertex_batches[batch_index]; - const auto last_vertex_gid_in_batch = LoadPartialVertices(path, *vertices, batch.offset, batch.count, - get_label_from_id, get_property_from_id); - if (batch_index == vertex_batches.size() - 1) { - last_vertex_gid = last_vertex_gid_in_batch; - } + RecoverOnMultipleThreads( + config.durability.recovery_thread_count, + [path, vertices, &vertex_batches, &get_label_from_id, &get_property_from_id, &last_vertex_gid]( + const size_t batch_index, const BatchInfo &batch) { + const auto last_vertex_gid_in_batch = + LoadPartialVertices(path, *vertices, batch.offset, batch.count, get_label_from_id, get_property_from_id); + if (batch_index == vertex_batches.size() - 1) { + last_vertex_gid = last_vertex_gid_in_batch; } - }); - } - } + }, + vertex_batches); + spdlog::info("Vertices are recovered."); // Recover vertices (in/out edges). spdlog::info("Recover connectivity."); std::atomic highest_edge_gid{0}; - { - std::vector threads; - threads.reserve(config.durability.recovery_thread_count); - std::atomic batch_counter = 0; - for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) { - threads.emplace_back([path, vertices, edges, edge_count, &highest_edge_gid, &vertex_batches, &batch_counter, - items = config.items, snapshot_has_edges, &get_edge_type_from_id]() { - while (true) { - const auto batch_index = batch_counter++; - if (batch_index >= vertex_batches.size()) { - return; - } - const auto &batch = vertex_batches[batch_index]; - const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity( - path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id); - edge_count->fetch_add(number_of_recovered_edges); - auto known_highest_edge_gid = highest_edge_gid.load(); - while (known_highest_edge_gid < highest_edge_gid_in_batch) { - highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch); - } + RecoverOnMultipleThreads( + config.durability.recovery_thread_count, + [path, vertices, edges, edge_count, &highest_edge_gid, items = config.items, snapshot_has_edges, + &get_edge_type_from_id](const size_t /*batch_index*/, const BatchInfo &batch) { + const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity( + path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id); + edge_count->fetch_add(number_of_recovered_edges); + auto known_highest_edge_gid = highest_edge_gid.load(); + while (known_highest_edge_gid < highest_edge_gid_in_batch) { + highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch); } - }); - } - } + }, + vertex_batches); + spdlog::info("Connectivity is recovered."); // Set initial values for edge/vertex ID generators.