Load verex labels and properties parallel
This commit is contained in:
parent
0516e93060
commit
076b035f7b
@ -21,6 +21,7 @@
|
|||||||
#include "storage/v2/edge_accessor.hpp"
|
#include "storage/v2/edge_accessor.hpp"
|
||||||
#include "storage/v2/edge_ref.hpp"
|
#include "storage/v2/edge_ref.hpp"
|
||||||
#include "storage/v2/mvcc.hpp"
|
#include "storage/v2/mvcc.hpp"
|
||||||
|
#include "storage/v2/vertex.hpp"
|
||||||
#include "storage/v2/vertex_accessor.hpp"
|
#include "storage/v2/vertex_accessor.hpp"
|
||||||
#include "utils/file_locker.hpp"
|
#include "utils/file_locker.hpp"
|
||||||
#include "utils/logging.hpp"
|
#include "utils/logging.hpp"
|
||||||
@ -188,8 +189,83 @@ uint64_t GetNthEdgeStartOffset(Decoder &snapshot, const uint64_t n) {
|
|||||||
return *offset;
|
return *offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// n is 0-indexed
|
||||||
|
std::pair<uint64_t /*offsest*/, uint64_t /*gid*/> GetNthVertexStartOffsetAndGid(Decoder &snapshot, const uint64_t n) {
|
||||||
|
for (uint64_t i = 0; i < n; ++i) {
|
||||||
|
{
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert vertex.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
// Skip labels.
|
||||||
|
{
|
||||||
|
auto labels_size = snapshot.ReadUint();
|
||||||
|
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *labels_size; ++j) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip properties.
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||||
|
auto key = snapshot.ReadUint();
|
||||||
|
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto value = snapshot.SkipPropertyValue();
|
||||||
|
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip in edges.
|
||||||
|
{
|
||||||
|
auto in_size = snapshot.ReadUint();
|
||||||
|
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto from_gid = snapshot.ReadUint();
|
||||||
|
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip out edges.
|
||||||
|
auto out_size = snapshot.ReadUint();
|
||||||
|
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *out_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto to_gid = snapshot.ReadUint();
|
||||||
|
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto offset = snapshot.GetPosition();
|
||||||
|
MG_ASSERT(offset.has_value(), "Unexpected");
|
||||||
|
{
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert vertex.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
return std::make_pair(*offset, *gid);
|
||||||
|
}
|
||||||
|
|
||||||
template <typename TFunc>
|
template <typename TFunc>
|
||||||
void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_position,
|
void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_offset,
|
||||||
const uint64_t edges_count, const NameIdMapper &name_id_mapper, const Config::Items items,
|
const uint64_t edges_count, const NameIdMapper &name_id_mapper, const Config::Items items,
|
||||||
TFunc get_property_from_id) {
|
TFunc get_property_from_id) {
|
||||||
Decoder snapshot;
|
Decoder snapshot;
|
||||||
@ -199,7 +275,7 @@ void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &
|
|||||||
auto edge_acc = edges.access();
|
auto edge_acc = edges.access();
|
||||||
uint64_t last_edge_gid = 0;
|
uint64_t last_edge_gid = 0;
|
||||||
spdlog::info("Recovering {} edges.", edges_count);
|
spdlog::info("Recovering {} edges.", edges_count);
|
||||||
if (!snapshot.SetPosition(from_position)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
for (uint64_t i = 0; i < edges_count; ++i) {
|
for (uint64_t i = 0; i < edges_count; ++i) {
|
||||||
{
|
{
|
||||||
const auto marker = snapshot.ReadMarker();
|
const auto marker = snapshot.ReadMarker();
|
||||||
@ -253,6 +329,187 @@ void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &
|
|||||||
spdlog::info("Partial edges are recovered.");
|
spdlog::info("Partial edges are recovered.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename TLabelFromIdFunc, typename TPropertyFromIdFunc>
|
||||||
|
void LoadPartialVertices(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices,
|
||||||
|
const uint64_t from_offset, const uint64_t vertices_count, const NameIdMapper &name_id_mapper,
|
||||||
|
TLabelFromIdFunc get_label_from_id, TPropertyFromIdFunc get_property_from_id) {
|
||||||
|
Decoder snapshot;
|
||||||
|
snapshot.Initialize(path, kSnapshotMagic);
|
||||||
|
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
|
||||||
|
auto vertex_acc = vertices.access();
|
||||||
|
uint64_t last_vertex_gid = 0;
|
||||||
|
spdlog::info("Recovering {} vertices.", vertices_count);
|
||||||
|
for (uint64_t i = 0; i < vertices_count; ++i) {
|
||||||
|
{
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert vertex.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (i > 0 && *gid <= last_vertex_gid) {
|
||||||
|
throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
last_vertex_gid = *gid;
|
||||||
|
spdlog::debug("Recovering vertex {}.", *gid);
|
||||||
|
auto [it, inserted] = vertex_acc.insert(Vertex{Gid::FromUint(*gid), nullptr});
|
||||||
|
if (!inserted) throw RecoveryFailure("The vertex must be inserted here!");
|
||||||
|
|
||||||
|
// Recover labels.
|
||||||
|
spdlog::trace("Recovering labels for vertex {}.", *gid);
|
||||||
|
{
|
||||||
|
auto labels_size = snapshot.ReadUint();
|
||||||
|
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &labels = it->labels;
|
||||||
|
labels.reserve(*labels_size);
|
||||||
|
for (uint64_t j = 0; j < *labels_size; ++j) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered label \"{}\" for vertex {}.", name_id_mapper.IdToName(snapshot_id_map.at(*label)),
|
||||||
|
*gid);
|
||||||
|
labels.emplace_back(get_label_from_id(*label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover properties.
|
||||||
|
spdlog::trace("Recovering properties for vertex {}.", *gid);
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &props = it->properties;
|
||||||
|
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||||
|
auto key = snapshot.ReadUint();
|
||||||
|
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto value = snapshot.ReadPropertyValue();
|
||||||
|
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.",
|
||||||
|
name_id_mapper.IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||||
|
props.SetProperty(get_property_from_id(*key), *value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip in edges.
|
||||||
|
{
|
||||||
|
auto in_size = snapshot.ReadUint();
|
||||||
|
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto from_gid = snapshot.ReadUint();
|
||||||
|
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip out edges.
|
||||||
|
auto out_size = snapshot.ReadUint();
|
||||||
|
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *out_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto to_gid = snapshot.ReadUint();
|
||||||
|
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spdlog::info("Partial vertices are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename TLabelFromIdFunc, typename TPropertyFromIdFunc>
|
||||||
|
void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<Vertex> &vertices,
|
||||||
|
const uint64_t from_offset, const uint64_t vertices_count,
|
||||||
|
const NameIdMapper &name_id_mapper, TLabelFromIdFunc get_label_from_id,
|
||||||
|
TPropertyFromIdFunc get_property_from_id) {
|
||||||
|
Decoder snapshot;
|
||||||
|
snapshot.Initialize(path, kSnapshotMagic);
|
||||||
|
if (!snapshot.SetPosition(from_offset)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
|
||||||
|
auto vertex_acc = vertices.access();
|
||||||
|
uint64_t last_vertex_gid = 0;
|
||||||
|
spdlog::info("Recovering {} vertices.", vertices_count);
|
||||||
|
for (uint64_t i = 0; i < vertices_count; ++i) {
|
||||||
|
{
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert vertex.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (i > 0 && *gid <= last_vertex_gid) {
|
||||||
|
throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
last_vertex_gid = *gid;
|
||||||
|
spdlog::debug("Recovering vertex {}.", *gid);
|
||||||
|
auto [it, inserted] = vertex_acc.insert(Vertex{Gid::FromUint(*gid), nullptr});
|
||||||
|
if (!inserted) throw RecoveryFailure("The vertex must be inserted here!");
|
||||||
|
|
||||||
|
// Recover labels.
|
||||||
|
spdlog::trace("Recovering labels for vertex {}.", *gid);
|
||||||
|
{
|
||||||
|
auto labels_size = snapshot.ReadUint();
|
||||||
|
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &labels = it->labels;
|
||||||
|
labels.reserve(*labels_size);
|
||||||
|
for (uint64_t j = 0; j < *labels_size; ++j) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered label \"{}\" for vertex {}.", name_id_mapper.IdToName(snapshot_id_map.at(*label)),
|
||||||
|
*gid);
|
||||||
|
labels.emplace_back(get_label_from_id(*label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover properties.
|
||||||
|
spdlog::trace("Recovering properties for vertex {}.", *gid);
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &props = it->properties;
|
||||||
|
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||||
|
auto key = snapshot.ReadUint();
|
||||||
|
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto value = snapshot.ReadPropertyValue();
|
||||||
|
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.",
|
||||||
|
name_id_mapper.IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||||
|
props.SetProperty(get_property_from_id(*key), *value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip in edges.
|
||||||
|
{
|
||||||
|
auto in_size = snapshot.ReadUint();
|
||||||
|
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto from_gid = snapshot.ReadUint();
|
||||||
|
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip out edges.
|
||||||
|
auto out_size = snapshot.ReadUint();
|
||||||
|
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *out_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto to_gid = snapshot.ReadUint();
|
||||||
|
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spdlog::info("Partial vertices are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||||
utils::SkipList<Edge> *edges,
|
utils::SkipList<Edge> *edges,
|
||||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||||
@ -322,6 +579,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
// Reset current edge count.
|
// Reset current edge count.
|
||||||
edge_count->store(0, std::memory_order_release);
|
edge_count->store(0, std::memory_order_release);
|
||||||
|
|
||||||
|
static constexpr auto kThreadCount = 8;
|
||||||
spdlog::info("Recovering edges.");
|
spdlog::info("Recovering edges.");
|
||||||
{
|
{
|
||||||
// Recover edges.
|
// Recover edges.
|
||||||
@ -331,9 +589,8 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
std::vector<uint64_t> offsets;
|
std::vector<uint64_t> offsets;
|
||||||
offsets.push_back(info.offset_edges);
|
offsets.push_back(info.offset_edges);
|
||||||
constexpr auto kEdgeChunkCount = 8;
|
const auto edge_chunk_size = static_cast<uint64_t>(info.edges_count / kThreadCount);
|
||||||
const auto edge_chunk_size = static_cast<uint64_t>(info.edges_count / kEdgeChunkCount);
|
while (offsets.size() < kThreadCount) {
|
||||||
while (offsets.size() < kEdgeChunkCount) {
|
|
||||||
offsets.push_back(GetNthEdgeStartOffset(snapshot, edge_chunk_size));
|
offsets.push_back(GetNthEdgeStartOffset(snapshot, edge_chunk_size));
|
||||||
}
|
}
|
||||||
std::vector<std::jthread> threads;
|
std::vector<std::jthread> threads;
|
||||||
@ -347,152 +604,33 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
spdlog::info("Edges are recovered.");
|
spdlog::info("Edges are recovered.");
|
||||||
|
|
||||||
// Recover vertices (labels and properties).
|
// Recover vertices (labels and properties).
|
||||||
|
spdlog::info("Recovering vertices.", info.vertices_count);
|
||||||
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
auto vertex_acc = vertices->access();
|
std::vector<std::pair<uint64_t, uint64_t>> vertices_offsets_and_gids;
|
||||||
uint64_t last_vertex_gid = 0;
|
const auto vertex_chunk_size = static_cast<uint64_t>(info.vertices_count / kThreadCount);
|
||||||
spdlog::info("Recovering {} vertices.", info.vertices_count);
|
{
|
||||||
for (uint64_t i = 0; i < info.vertices_count; ++i) {
|
vertices_offsets_and_gids.push_back(GetNthVertexStartOffsetAndGid(snapshot, 0));
|
||||||
{
|
while (vertices_offsets_and_gids.size() < kThreadCount) {
|
||||||
auto marker = snapshot.ReadMarker();
|
snapshot.SetPosition(vertices_offsets_and_gids.back().first);
|
||||||
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
vertices_offsets_and_gids.push_back(GetNthVertexStartOffsetAndGid(snapshot, vertex_chunk_size));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Insert vertex.
|
{
|
||||||
auto gid = snapshot.ReadUint();
|
std::vector<std::jthread> threads;
|
||||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
threads.reserve(vertices_offsets_and_gids.size());
|
||||||
if (i > 0 && *gid <= last_vertex_gid) {
|
for (const auto &offset_and_gid : vertices_offsets_and_gids) {
|
||||||
throw RecoveryFailure("Invalid snapshot data!");
|
threads.emplace_back([path, vertices, offset = offset_and_gid.first, vertex_chunk_size, name_id_mapper,
|
||||||
}
|
&get_label_from_id, &get_property_from_id] {
|
||||||
last_vertex_gid = *gid;
|
LoadPartialVertices(path, *vertices, offset, vertex_chunk_size, *name_id_mapper, get_label_from_id,
|
||||||
spdlog::debug("Recovering vertex {}.", *gid);
|
get_property_from_id);
|
||||||
auto [it, inserted] = vertex_acc.insert(Vertex{Gid::FromUint(*gid), nullptr});
|
});
|
||||||
if (!inserted) throw RecoveryFailure("The vertex must be inserted here!");
|
|
||||||
|
|
||||||
// Recover labels.
|
|
||||||
spdlog::trace("Recovering labels for vertex {}.", *gid);
|
|
||||||
{
|
|
||||||
auto labels_size = snapshot.ReadUint();
|
|
||||||
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto &labels = it->labels;
|
|
||||||
labels.reserve(*labels_size);
|
|
||||||
for (uint64_t j = 0; j < *labels_size; ++j) {
|
|
||||||
auto label = snapshot.ReadUint();
|
|
||||||
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
SPDLOG_TRACE("Recovered label \"{}\" for vertex {}.", name_id_mapper->IdToName(snapshot_id_map.at(*label)),
|
|
||||||
*gid);
|
|
||||||
labels.emplace_back(get_label_from_id(*label));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recover properties.
|
|
||||||
spdlog::trace("Recovering properties for vertex {}.", *gid);
|
|
||||||
{
|
|
||||||
auto props_size = snapshot.ReadUint();
|
|
||||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto &props = it->properties;
|
|
||||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
|
||||||
auto key = snapshot.ReadUint();
|
|
||||||
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto value = snapshot.ReadPropertyValue();
|
|
||||||
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.",
|
|
||||||
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
|
||||||
props.SetProperty(get_property_from_id(*key), *value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip in edges.
|
|
||||||
{
|
|
||||||
auto in_size = snapshot.ReadUint();
|
|
||||||
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
for (uint64_t j = 0; j < *in_size; ++j) {
|
|
||||||
auto edge_gid = snapshot.ReadUint();
|
|
||||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto from_gid = snapshot.ReadUint();
|
|
||||||
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto edge_type = snapshot.ReadUint();
|
|
||||||
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip out edges.
|
|
||||||
auto out_size = snapshot.ReadUint();
|
|
||||||
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
for (uint64_t j = 0; j < *out_size; ++j) {
|
|
||||||
auto edge_gid = snapshot.ReadUint();
|
|
||||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto to_gid = snapshot.ReadUint();
|
|
||||||
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto edge_type = snapshot.ReadUint();
|
|
||||||
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
spdlog::info("Vertices are recovered.");
|
spdlog::info("Vertices are recovered.");
|
||||||
|
|
||||||
// Recover vertices (labels and properties).
|
|
||||||
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
|
||||||
for (uint64_t i = 0; i < info.vertices_count; ++i) {
|
|
||||||
{
|
|
||||||
auto marker = snapshot.ReadMarker();
|
|
||||||
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert vertex.
|
|
||||||
auto gid = snapshot.ReadUint();
|
|
||||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
|
|
||||||
// Skip labels.
|
|
||||||
{
|
|
||||||
auto labels_size = snapshot.ReadUint();
|
|
||||||
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
for (uint64_t j = 0; j < *labels_size; ++j) {
|
|
||||||
auto label = snapshot.ReadUint();
|
|
||||||
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip properties.
|
|
||||||
{
|
|
||||||
auto props_size = snapshot.ReadUint();
|
|
||||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
|
||||||
auto key = snapshot.ReadUint();
|
|
||||||
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto value = snapshot.SkipPropertyValue();
|
|
||||||
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip in edges.
|
|
||||||
{
|
|
||||||
auto in_size = snapshot.ReadUint();
|
|
||||||
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
for (uint64_t j = 0; j < *in_size; ++j) {
|
|
||||||
auto edge_gid = snapshot.ReadUint();
|
|
||||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto from_gid = snapshot.ReadUint();
|
|
||||||
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto edge_type = snapshot.ReadUint();
|
|
||||||
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip out edges.
|
|
||||||
auto out_size = snapshot.ReadUint();
|
|
||||||
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
for (uint64_t j = 0; j < *out_size; ++j) {
|
|
||||||
auto edge_gid = snapshot.ReadUint();
|
|
||||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto to_gid = snapshot.ReadUint();
|
|
||||||
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
auto edge_type = snapshot.ReadUint();
|
|
||||||
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
spdlog::info("Vertices are recovered twice.");
|
|
||||||
|
|
||||||
// Recover vertices (in/out edges).
|
// Recover vertices (in/out edges).
|
||||||
spdlog::info("Recovering connectivity.");
|
spdlog::info("Recovering connectivity.");
|
||||||
|
auto vertex_acc = vertices->access();
|
||||||
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
for (auto &vertex : vertex_acc) {
|
for (auto &vertex : vertex_acc) {
|
||||||
{
|
{
|
||||||
@ -607,7 +745,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
|
|
||||||
// Set initial values for edge/vertex ID generators.
|
// Set initial values for edge/vertex ID generators.
|
||||||
ret.next_edge_id = last_edge_gid + 1;
|
ret.next_edge_id = last_edge_gid + 1;
|
||||||
ret.next_vertex_id = last_vertex_gid + 1;
|
ret.next_vertex_id = info.vertices_count + 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recover indices.
|
// Recover indices.
|
||||||
|
Loading…
Reference in New Issue
Block a user