Fix recovering edge count
This commit is contained in:
parent
50b98a5a96
commit
76a0ff1dc4
@ -205,10 +205,9 @@ std::vector<BatchInfo> ReadBatchInfos(Decoder &snapshot) {
|
||||
return infos;
|
||||
}
|
||||
|
||||
// Returns the gid of the last recovered edge
|
||||
template <typename TFunc>
|
||||
uint64_t LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_offset,
|
||||
const uint64_t edges_count, const Config::Items items, TFunc get_property_from_id) {
|
||||
void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_offset,
|
||||
const uint64_t edges_count, const Config::Items items, TFunc get_property_from_id) {
|
||||
Decoder snapshot;
|
||||
snapshot.Initialize(path, kSnapshotMagic);
|
||||
|
||||
@ -266,8 +265,6 @@ uint64_t LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edg
|
||||
}
|
||||
}
|
||||
spdlog::info("Partial edges are recovered.");
|
||||
|
||||
return last_edge_gid;
|
||||
}
|
||||
|
||||
// Returns the gid of the last recovered vertex
|
||||
@ -361,10 +358,10 @@ uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<
|
||||
|
||||
// Returns the number of edges recovered
|
||||
template <typename TEdgeTypeFromIdFunc>
|
||||
uint64_t LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices,
|
||||
utils::SkipList<Edge> &edges, const uint64_t from_offset,
|
||||
const uint64_t vertices_count, const Config::Items items,
|
||||
const bool snapshot_has_edges, TEdgeTypeFromIdFunc get_edge_type_from_id) {
|
||||
std::pair<uint64_t /*edge count*/, uint64_t /*highest edge gid*/> LoadPartialConnectivity(
|
||||
const std::filesystem::path &path, utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
|
||||
const uint64_t from_offset, const uint64_t vertices_count, const Config::Items items, const bool snapshot_has_edges,
|
||||
TEdgeTypeFromIdFunc get_edge_type_from_id) {
|
||||
Decoder snapshot;
|
||||
snapshot.Initialize(path, kSnapshotMagic);
|
||||
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
@ -385,6 +382,7 @@ uint64_t LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipL
|
||||
});
|
||||
|
||||
uint64_t edge_count{0};
|
||||
uint64_t highest_edge_gid{0};
|
||||
auto vertex_it = vertex_acc.find(start_vertex_gid);
|
||||
if (vertex_it == vertex_acc.end()) {
|
||||
throw RecoveryFailure("Invalid snapshot data!");
|
||||
@ -436,6 +434,7 @@ uint64_t LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipL
|
||||
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||
auto edge_gid = snapshot.ReadUint();
|
||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
highest_edge_gid = std::max(highest_edge_gid, *edge_gid);
|
||||
|
||||
auto from_gid = snapshot.ReadUint();
|
||||
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
@ -494,15 +493,15 @@ uint64_t LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipL
|
||||
}
|
||||
}
|
||||
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref);
|
||||
// Increment edge count. We only increment the count here because the
|
||||
// information is duplicated in in_edges.
|
||||
edge_count++;
|
||||
}
|
||||
// Increment edge count. We only increment the count here because the
|
||||
// information is duplicated in in_edges.
|
||||
edge_count++;
|
||||
}
|
||||
++vertex_it;
|
||||
}
|
||||
spdlog::info("Partial connectivities are recovered.");
|
||||
return edge_count;
|
||||
return std::make_pair(edge_count, highest_edge_gid);
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
@ -1046,7 +1045,6 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
|
||||
// Reset current edge count.
|
||||
edge_count->store(0, std::memory_order_release);
|
||||
uint64_t last_edge_gid{0};
|
||||
|
||||
{
|
||||
spdlog::info("Recovering edges.");
|
||||
@ -1061,21 +1059,17 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
std::atomic<uint64_t> batch_counter = 0;
|
||||
|
||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||
threads.emplace_back([path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id,
|
||||
&last_edge_gid]() mutable {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= edge_batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = edge_batches[batch_index];
|
||||
const auto last_edge_gid_in_batch =
|
||||
threads.emplace_back(
|
||||
[path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id]() {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= edge_batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = edge_batches[batch_index];
|
||||
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
|
||||
if (batch_index == edge_batches.size() - 1) {
|
||||
last_edge_gid = last_edge_gid_in_batch;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
spdlog::info("Edges are recovered.");
|
||||
@ -1096,7 +1090,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
|
||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||
threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, &get_label_from_id,
|
||||
&get_property_from_id, &last_vertex_gid]() mutable {
|
||||
&get_property_from_id, &last_vertex_gid]() {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= vertex_batches.size()) {
|
||||
@ -1116,23 +1110,28 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
|
||||
// Recover vertices (in/out edges).
|
||||
spdlog::info("Recover connectivity.");
|
||||
std::atomic<uint64_t> highest_edge_gid{0};
|
||||
{
|
||||
std::vector<std::jthread> threads;
|
||||
threads.reserve(config.durability.recovery_thread_count);
|
||||
std::atomic<uint64_t> batch_counter = 0;
|
||||
|
||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||
threads.emplace_back([path, vertices, edges, edge_count, &vertex_batches, &batch_counter, items = config.items,
|
||||
snapshot_has_edges, &get_edge_type_from_id]() mutable {
|
||||
threads.emplace_back([path, vertices, edges, edge_count, &highest_edge_gid, &vertex_batches, &batch_counter,
|
||||
items = config.items, snapshot_has_edges, &get_edge_type_from_id]() {
|
||||
while (true) {
|
||||
const auto batch_index = batch_counter++;
|
||||
if (batch_index >= vertex_batches.size()) {
|
||||
return;
|
||||
}
|
||||
const auto &batch = vertex_batches[batch_index];
|
||||
const auto number_of_recovered_edges = LoadPartialConnectivity(
|
||||
const auto [number_of_recovered_edges, highest_edge_gid_in_batch] = LoadPartialConnectivity(
|
||||
path, *vertices, *edges, batch.offset, batch.count, items, snapshot_has_edges, get_edge_type_from_id);
|
||||
edge_count->fetch_add(number_of_recovered_edges);
|
||||
auto known_highest_edge_gid = highest_edge_gid.load();
|
||||
while (known_highest_edge_gid < highest_edge_gid_in_batch) {
|
||||
highest_edge_gid.compare_exchange_weak(known_highest_edge_gid, highest_edge_gid_in_batch);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -1140,7 +1139,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
spdlog::info("Connectivity is recovered.");
|
||||
|
||||
// Set initial values for edge/vertex ID generators.
|
||||
ret.next_edge_id = last_edge_gid + 1;
|
||||
ret.next_edge_id = highest_edge_gid + 1;
|
||||
ret.next_vertex_id = last_vertex_gid + 1;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user