Parallelize edge recovery
This commit is contained in:
parent
f5a49ed29f
commit
0516e93060
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -10,7 +10,9 @@
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "storage/v2/durability/snapshot.hpp"
|
||||
#include <thread>
|
||||
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "storage/v2/durability/exceptions.hpp"
|
||||
#include "storage/v2/durability/paths.hpp"
|
||||
#include "storage/v2/durability/serialization.hpp"
|
||||
@ -157,6 +159,100 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path) {
|
||||
return info;
|
||||
}
|
||||
|
||||
// n is 0-indexed
|
||||
uint64_t GetNthEdgeStartOffset(Decoder &snapshot, const uint64_t n) {
|
||||
for (uint64_t i = 0; i < n; ++i) {
|
||||
{
|
||||
const auto marker = snapshot.ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
|
||||
// Skip edge.
|
||||
auto gid = snapshot.ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
|
||||
// Recover properties.
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
auto key = snapshot.ReadUint();
|
||||
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto value = snapshot.SkipPropertyValue();
|
||||
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
}
|
||||
}
|
||||
const auto offset = snapshot.GetPosition();
|
||||
MG_ASSERT(offset.has_value(), "Unexpected");
|
||||
return *offset;
|
||||
}
|
||||
|
||||
template <typename TFunc>
|
||||
void LoadPartialEdges(const std::filesystem::path &path, utils::SkipList<Edge> &edges, const uint64_t from_position,
|
||||
const uint64_t edges_count, const NameIdMapper &name_id_mapper, const Config::Items items,
|
||||
TFunc get_property_from_id) {
|
||||
Decoder snapshot;
|
||||
snapshot.Initialize(path, kSnapshotMagic);
|
||||
|
||||
// Recover edges.
|
||||
auto edge_acc = edges.access();
|
||||
uint64_t last_edge_gid = 0;
|
||||
spdlog::info("Recovering {} edges.", edges_count);
|
||||
if (!snapshot.SetPosition(from_position)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
for (uint64_t i = 0; i < edges_count; ++i) {
|
||||
{
|
||||
const auto marker = snapshot.ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
|
||||
if (items.properties_on_edges) {
|
||||
// Insert edge.
|
||||
auto gid = snapshot.ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
last_edge_gid = *gid;
|
||||
spdlog::debug("Recovering edge {} with properties.", *gid);
|
||||
auto [it, inserted] = edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr});
|
||||
if (!inserted) throw RecoveryFailure("The edge must be inserted here!");
|
||||
|
||||
// Recover properties.
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto &props = it->properties;
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
auto key = snapshot.ReadUint();
|
||||
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto value = snapshot.ReadPropertyValue();
|
||||
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.",
|
||||
name_id_mapper.IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||
props.SetProperty(get_property_from_id(*key), *value);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Read edge GID.
|
||||
auto gid = snapshot.ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
last_edge_gid = *gid;
|
||||
|
||||
spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid);
|
||||
// Read properties.
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (*props_size != 0)
|
||||
throw RecoveryFailure(
|
||||
"The snapshot has properties on edges, but the storage is "
|
||||
"configured without properties on edges!");
|
||||
}
|
||||
}
|
||||
}
|
||||
spdlog::info("Partial edges are recovered.");
|
||||
}
|
||||
|
||||
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges,
|
||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||
@ -226,65 +322,29 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
// Reset current edge count.
|
||||
edge_count->store(0, std::memory_order_release);
|
||||
|
||||
spdlog::info("Recovering edges.");
|
||||
{
|
||||
// Recover edges.
|
||||
auto edge_acc = edges->access();
|
||||
uint64_t last_edge_gid = 0;
|
||||
if (snapshot_has_edges) {
|
||||
spdlog::info("Recovering {} edges.", info.edges_count);
|
||||
if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
for (uint64_t i = 0; i < info.edges_count; ++i) {
|
||||
{
|
||||
const auto marker = snapshot.ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
|
||||
if (items.properties_on_edges) {
|
||||
// Insert edge.
|
||||
auto gid = snapshot.ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
last_edge_gid = *gid;
|
||||
spdlog::debug("Recovering edge {} with properties.", *gid);
|
||||
auto [it, inserted] = edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr});
|
||||
if (!inserted) throw RecoveryFailure("The edge must be inserted here!");
|
||||
|
||||
// Recover properties.
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto &props = it->properties;
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
auto key = snapshot.ReadUint();
|
||||
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto value = snapshot.ReadPropertyValue();
|
||||
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.",
|
||||
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||
props.SetProperty(get_property_from_id(*key), *value);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Read edge GID.
|
||||
auto gid = snapshot.ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
last_edge_gid = *gid;
|
||||
|
||||
spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid);
|
||||
// Read properties.
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (*props_size != 0)
|
||||
throw RecoveryFailure(
|
||||
"The snapshot has properties on edges, but the storage is "
|
||||
"configured without properties on edges!");
|
||||
}
|
||||
}
|
||||
std::vector<uint64_t> offsets;
|
||||
offsets.push_back(info.offset_edges);
|
||||
constexpr auto kEdgeChunkCount = 8;
|
||||
const auto edge_chunk_size = static_cast<uint64_t>(info.edges_count / kEdgeChunkCount);
|
||||
while (offsets.size() < kEdgeChunkCount) {
|
||||
offsets.push_back(GetNthEdgeStartOffset(snapshot, edge_chunk_size));
|
||||
}
|
||||
std::vector<std::jthread> threads;
|
||||
threads.reserve(offsets.size());
|
||||
for (const auto offset : offsets) {
|
||||
threads.emplace_back([path, edges, offset, edge_chunk_size, name_id_mapper, items, &get_property_from_id] {
|
||||
LoadPartialEdges(path, *edges, offset, edge_chunk_size, *name_id_mapper, items, get_property_from_id);
|
||||
});
|
||||
}
|
||||
spdlog::info("Edges are recovered.");
|
||||
}
|
||||
spdlog::info("Edges are recovered.");
|
||||
|
||||
// Recover vertices (labels and properties).
|
||||
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
@ -369,6 +429,68 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
||||
}
|
||||
spdlog::info("Vertices are recovered.");
|
||||
|
||||
// Recover vertices (labels and properties).
|
||||
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
for (uint64_t i = 0; i < info.vertices_count; ++i) {
|
||||
{
|
||||
auto marker = snapshot.ReadMarker();
|
||||
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
|
||||
// Insert vertex.
|
||||
auto gid = snapshot.ReadUint();
|
||||
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
|
||||
// Skip labels.
|
||||
{
|
||||
auto labels_size = snapshot.ReadUint();
|
||||
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
for (uint64_t j = 0; j < *labels_size; ++j) {
|
||||
auto label = snapshot.ReadUint();
|
||||
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
}
|
||||
|
||||
// Skip properties.
|
||||
{
|
||||
auto props_size = snapshot.ReadUint();
|
||||
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||
auto key = snapshot.ReadUint();
|
||||
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto value = snapshot.SkipPropertyValue();
|
||||
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
}
|
||||
|
||||
// Skip in edges.
|
||||
{
|
||||
auto in_size = snapshot.ReadUint();
|
||||
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||
auto edge_gid = snapshot.ReadUint();
|
||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto from_gid = snapshot.ReadUint();
|
||||
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto edge_type = snapshot.ReadUint();
|
||||
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
}
|
||||
|
||||
// Skip out edges.
|
||||
auto out_size = snapshot.ReadUint();
|
||||
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||
for (uint64_t j = 0; j < *out_size; ++j) {
|
||||
auto edge_gid = snapshot.ReadUint();
|
||||
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto to_gid = snapshot.ReadUint();
|
||||
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto edge_type = snapshot.ReadUint();
|
||||
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||
}
|
||||
}
|
||||
spdlog::info("Vertices are recovered twice.");
|
||||
|
||||
// Recover vertices (in/out edges).
|
||||
spdlog::info("Recovering connectivity.");
|
||||
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||
|
Loading…
Reference in New Issue
Block a user