Handle next vertex and edge id properly
This commit is contained in:
parent
f7b577eeb6
commit
f2e44e497b
@ -205,9 +205,10 @@ std::vector<BatchInfo> ReadBatchInfos(Decoder &snapshot) {
|
|||||||
return infos;
|
return infos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns the gid of the last recovered edge
|
||||||
template <typename TFunc>
|
template <typename TFunc>
|
||||||
void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_offset,
|
uint64_t LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_offset,
|
||||||
const uint64_t edges_count, const Config::Items items, TFunc get_property_from_id) {
|
const uint64_t edges_count, const Config::Items items, TFunc get_property_from_id) {
|
||||||
Decoder snapshot;
|
Decoder snapshot;
|
||||||
snapshot.Initialize(path, kSnapshotMagic);
|
snapshot.Initialize(path, kSnapshotMagic);
|
||||||
|
|
||||||
@ -265,12 +266,15 @@ void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
spdlog::info("Partial edges are recovered.");
|
spdlog::info("Partial edges are recovered.");
|
||||||
|
|
||||||
|
return last_edge_gid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns the gid of the last recovered vertex
|
||||||
template <typename TLabelFromIdFunc, typename TPropertyFromIdFunc>
|
template <typename TLabelFromIdFunc, typename TPropertyFromIdFunc>
|
||||||
void LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices,
|
uint64_t LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices,
|
||||||
const uint64_t from_offset, const uint64_t vertices_count, const NameIdMapper &name_id_mapper,
|
const uint64_t from_offset, const uint64_t vertices_count,
|
||||||
TLabelFromIdFunc get_label_from_id, TPropertyFromIdFunc get_property_from_id) {
|
TLabelFromIdFunc get_label_from_id, TPropertyFromIdFunc get_property_from_id) {
|
||||||
Decoder snapshot;
|
Decoder snapshot;
|
||||||
snapshot.Initialize(path, kSnapshotMagic);
|
snapshot.Initialize(path, kSnapshotMagic);
|
||||||
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
@ -351,6 +355,8 @@ void LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<Vert
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
spdlog::info("Partial vertices are recovered.");
|
spdlog::info("Partial vertices are recovered.");
|
||||||
|
|
||||||
|
return last_vertex_gid;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename TEdgeTypeFromIdFunc>
|
template <typename TEdgeTypeFromIdFunc>
|
||||||
@ -1029,6 +1035,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
|
|
||||||
// Reset current edge count.
|
// Reset current edge count.
|
||||||
edge_count->store(0, std::memory_order_release);
|
edge_count->store(0, std::memory_order_release);
|
||||||
|
uint64_t last_edge_gid{0};
|
||||||
|
|
||||||
{
|
{
|
||||||
spdlog::info("Recovering edges.");
|
spdlog::info("Recovering edges.");
|
||||||
@ -1043,26 +1050,33 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
std::atomic<uint64_t> batch_counter = 0;
|
std::atomic<uint64_t> batch_counter = 0;
|
||||||
|
|
||||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||||
threads.emplace_back(
|
threads.emplace_back([path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id,
|
||||||
[path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id]() mutable {
|
&last_edge_gid]() mutable {
|
||||||
while (true) {
|
while (true) {
|
||||||
const auto batch_index = batch_counter++;
|
const auto batch_index = batch_counter++;
|
||||||
if (batch_index >= edge_batches.size()) {
|
if (batch_index >= edge_batches.size()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const auto &batch = edge_batches[batch_index];
|
const auto &batch = edge_batches[batch_index];
|
||||||
|
const auto last_edge_gid_in_batch =
|
||||||
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
|
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
|
||||||
}
|
if (batch_index == edge_batches.size() - 1) {
|
||||||
});
|
last_edge_gid = last_edge_gid_in_batch;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
spdlog::info("Edges are recovered.");
|
spdlog::info("Edges are recovered.");
|
||||||
|
|
||||||
// Recover vertices (labels and properties).
|
// Recover vertices (labels and properties).
|
||||||
spdlog::info("Recovering vertices.", info.vertices_count);
|
spdlog::info("Recovering vertices.", info.vertices_count);
|
||||||
|
uint64_t last_vertex_gid{0};
|
||||||
|
|
||||||
if (!snapshot.SetPosition(info.offset_vertex_batches)) {
|
if (!snapshot.SetPosition(info.offset_vertex_batches)) {
|
||||||
throw RecoveryFailure("Couldn't read data from snapshot!");
|
throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto vertex_batches = ReadBatchInfos(snapshot);
|
const auto vertex_batches = ReadBatchInfos(snapshot);
|
||||||
{
|
{
|
||||||
std::vector<std::jthread> threads;
|
std::vector<std::jthread> threads;
|
||||||
@ -1070,16 +1084,19 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
std::atomic<uint64_t> batch_counter = 0;
|
std::atomic<uint64_t> batch_counter = 0;
|
||||||
|
|
||||||
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
|
||||||
threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, name_id_mapper, &get_label_from_id,
|
threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, &get_label_from_id,
|
||||||
&get_property_from_id]() mutable {
|
&get_property_from_id, &last_vertex_gid]() mutable {
|
||||||
while (true) {
|
while (true) {
|
||||||
const auto batch_index = batch_counter++;
|
const auto batch_index = batch_counter++;
|
||||||
if (batch_index >= vertex_batches.size()) {
|
if (batch_index >= vertex_batches.size()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const auto &batch = vertex_batches[batch_index];
|
const auto &batch = vertex_batches[batch_index];
|
||||||
LoadPartialVertices(path, *vertices, batch.offset, batch.count, *name_id_mapper, get_label_from_id,
|
const auto last_vertex_gid_in_batch = LoadPartialVertices(path, *vertices, batch.offset, batch.count,
|
||||||
get_property_from_id);
|
get_label_from_id, get_property_from_id);
|
||||||
|
if (batch_index == vertex_batches.size() - 1) {
|
||||||
|
last_vertex_gid = last_vertex_gid_in_batch;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -1111,9 +1128,8 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
spdlog::info("Connectivity is recovered.");
|
spdlog::info("Connectivity is recovered.");
|
||||||
|
|
||||||
// Set initial values for edge/vertex ID generators.
|
// Set initial values for edge/vertex ID generators.
|
||||||
// TODO(antaljanosbenjamin): Figure out proper values
|
ret.next_edge_id = last_edge_gid + 1;
|
||||||
ret.next_edge_id = info.edges_count + 2;
|
ret.next_vertex_id = last_vertex_gid + 1;
|
||||||
ret.next_vertex_id = info.vertices_count + 2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recover indices.
|
// Recover indices.
|
||||||
|
Loading…
Reference in New Issue
Block a user