Fix error handling in case of corrupted snapshot file
This commit is contained in:
parent
76a0ff1dc4
commit
151e1cf62b
@ -504,6 +504,36 @@ std::pair<uint64_t /*edge count*/, uint64_t /*highest edge gid*/> LoadPartialCon
|
||||
return std::make_pair(edge_count, highest_edge_gid);
|
||||
}
|
||||
|
||||
template <typename TFunc>
|
||||
void RecoverOnMultipleThreads(const size_t thread_count, const TFunc &func, const std::vector<BatchInfo> &batches) {
|
||||
utils::Synchronized<std::optional<RecoveryFailure>, utils::SpinLock> maybe_error{};
|
||||
{
|
||||
std::vector<std::jthread> threads;
|
||||
threads.reserve(thread_count);
|
||||
std::atomic<uint64_t> batch_counter = 0;
|
||||
|
||||
for (auto i{0U}; i < thread_count; ++i) {
|
||||
threads.emplace_back([&func, &batches, &maybe_error, &batch_counter]() {
|
||||
while (!maybe_error.Lock()->has_value()) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = batches[batch_index];
|
||||
try {
|
||||
func(batch_index, batch);
|
||||
} catch (RecoveryFailure &failure) {
|
||||
*maybe_error.Lock() = std::move(failure);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
if (maybe_error.Lock()->has_value()) {
|
||||
throw RecoveryFailure((*maybe_error.Lock())->what());
|
||||
}
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
@ -1050,27 +1080,18 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
spdlog::info("Recovering edges.");
|
||||
// Recover edges.
|
||||
if (snapshot_has_edges) {
|
||||
std::vector<std::jthread> threads;
|
||||
threads.reserve(config.durability.recovery_thread_count);
|
||||
if (!snapshot.SetPosition(info.offset_edge_batches)) {
|
||||
throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
}
|
||||
const auto edge_batches = ReadBatchInfos(snapshot);
|
||||
std::atomic<uint64_t> batch_counter = 0;
|
||||
|
||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||
threads.emplace_back(
|
||||
[path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id]() {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= edge_batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = edge_batches[batch_index];
|
||||
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
RecoverOnMultipleThreads(
|
||||
config.durability.recovery_thread_count,
|
||||
[path, edges, items = config.items, &get_property_from_id](const size_t /*batch_index*/,
|
||||
const BatchInfo &batch) {
|
||||
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
|
||||
},
|
||||
edge_batches);
|
||||
}
|
||||
spdlog::info("Edges are recovered.");
|
||||
|
||||
@ -1083,59 +1104,38 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
}
|
||||
|
||||
const auto vertex_batches = ReadBatchInfos(snapshot);
|
||||
{
|
||||
std::vector<std::jthread> threads;
|
||||
threads.reserve(config.durability.recovery_thread_count);
|
||||
std::atomic<uint64_t> batch_counter = 0;
|
||||
|
||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||
threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, &get_label_from_id,
|
||||
&get_property_from_id, &last_vertex_gid]() {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= vertex_batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = vertex_batches[batch_index];
|
||||
const auto last_vertex_gid_in_batch = LoadPartialVertices(path, *vertices, batch.offset, batch.count,
|
||||
get_label_from_id, get_property_from_id);
|
||||
if (batch_index == vertex_batches.size() - 1) {
|
||||
last_vertex_gid = last_vertex_gid_in_batch;
|
||||
}
|
||||
RecoverOnMultipleThreads(
|
||||
config.durability.recovery_thread_count,
|
||||
[path, vertices, &vertex_batches, &get_label_from_id, &get_property_from_id, &last_vertex_gid](
|
||||
const size_t batch_index, const BatchInfo &batch) {
|
||||
const auto last_vertex_gid_in_batch =
|
||||
LoadPartialVertices(path, *vertices, batch.offset, batch.count, get_label_from_id, get_property_from_id);
|
||||
if (batch_index == vertex_batches.size() - 1) {
|
||||
last_vertex_gid = last_vertex_gid_in_batch;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
},
|
||||
vertex_batches);
|
||||
|
||||
spdlog::info("Vertices are recovered.");
|
||||
|
||||
// Recover vertices (in/out edges).
|
||||
spdlog::info("Recover connectivity.");
|
||||
std::atomic<uint64_t> highest_edge_gid{0};
|
||||
{
|
||||
std::vector<std::jthread> threads;
|
||||
threads.reserve(config.durability.recovery_thread_count);
|
||||
std::atomic<uint64_t> batch_counter = 0;
|
||||
|
||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||
threads.emplace_back([path, vertices, edges, edge_count, &highest_edge_gid, &vertex_batches, &batch_counter,
|
||||
items = config.items, snapshot_has_edges, &get_edge_type_from_id]() {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= vertex_batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = vertex_batches[batch_index];
|
||||
const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity(
|
||||
path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id);
|
||||
edge_count->fetch_add(number_of_recovered_edges);
|
||||
auto known_highest_edge_gid = highest_edge_gid.load();
|
||||
while (known_highest_edge_gid < highest_edge_gid_in_batch) {
|
||||
highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch);
|
||||
}
|
||||
RecoverOnMultipleThreads(
|
||||
config.durability.recovery_thread_count,
|
||||
[path, vertices, edges, edge_count, &highest_edge_gid, items = config.items, snapshot_has_edges,
|
||||
&get_edge_type_from_id](const size_t /*batch_index*/, const BatchInfo &batch) {
|
||||
const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity(
|
||||
path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id);
|
||||
edge_count->fetch_add(number_of_recovered_edges);
|
||||
auto known_highest_edge_gid = highest_edge_gid.load();
|
||||
while (known_highest_edge_gid < highest_edge_gid_in_batch) {
|
||||
highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
},
|
||||
vertex_batches);
|
||||
|
||||
spdlog::info("Connectivity is recovered.");
|
||||
|
||||
// Set initial values for edge/vertex ID generators.
|
||||
|
Loading…
Reference in New Issue
Block a user