Make possible to load snapshots with old version
This commit is contained in:
parent
f4157fb1fd
commit
592b32aaf3
@ -486,6 +486,475 @@ void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<
|
|||||||
spdlog::info("Partial connectivities are recovered.");
|
spdlog::info("Partial connectivities are recovered.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||||
|
utils::SkipList<Edge> *edges,
|
||||||
|
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||||
|
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
|
||||||
|
Config::Items items) {
|
||||||
|
RecoveryInfo ret;
|
||||||
|
RecoveredIndicesAndConstraints indices_constraints;
|
||||||
|
|
||||||
|
Decoder snapshot;
|
||||||
|
auto version = snapshot.Initialize(path, kSnapshotMagic);
|
||||||
|
if (!version) throw RecoveryFailure("Couldn't read snapshot magic and/or version!");
|
||||||
|
if (*version != 14U) throw RecoveryFailure(fmt::format("Expected snapshot version is 14, but got {}", *version));
|
||||||
|
|
||||||
|
// Cleanup of loaded data in case of failure.
|
||||||
|
bool success = false;
|
||||||
|
utils::OnScopeExit cleanup([&] {
|
||||||
|
if (!success) {
|
||||||
|
edges->clear();
|
||||||
|
vertices->clear();
|
||||||
|
epoch_history->clear();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Read snapshot info.
|
||||||
|
const auto info = ReadSnapshotInfo(path);
|
||||||
|
spdlog::info("Recovering {} vertices and {} edges.", info.vertices_count, info.edges_count);
|
||||||
|
// Check for edges.
|
||||||
|
bool snapshot_has_edges = info.offset_edges != 0;
|
||||||
|
|
||||||
|
// Recover mapper.
|
||||||
|
std::unordered_map<uint64_t, uint64_t> snapshot_id_map;
|
||||||
|
{
|
||||||
|
spdlog::info("Recovering mapper metadata.");
|
||||||
|
if (!snapshot.SetPosition(info.offset_mapper)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_MAPPER) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
auto size = snapshot.ReadUint();
|
||||||
|
if (!size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
for (uint64_t i = 0; i < *size; ++i) {
|
||||||
|
auto id = snapshot.ReadUint();
|
||||||
|
if (!id) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto name = snapshot.ReadString();
|
||||||
|
if (!name) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto my_id = name_id_mapper->NameToId(*name);
|
||||||
|
snapshot_id_map.emplace(*id, my_id);
|
||||||
|
SPDLOG_TRACE("Mapping \"{}\"from snapshot id {} to actual id {}.", *name, *id, my_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
auto get_label_from_id = [&snapshot_id_map](uint64_t snapshot_id) {
|
||||||
|
auto it = snapshot_id_map.find(snapshot_id);
|
||||||
|
if (it == snapshot_id_map.end()) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
return LabelId::FromUint(it->second);
|
||||||
|
};
|
||||||
|
auto get_property_from_id = [&snapshot_id_map](uint64_t snapshot_id) {
|
||||||
|
auto it = snapshot_id_map.find(snapshot_id);
|
||||||
|
if (it == snapshot_id_map.end()) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
return PropertyId::FromUint(it->second);
|
||||||
|
};
|
||||||
|
auto get_edge_type_from_id = [&snapshot_id_map](uint64_t snapshot_id) {
|
||||||
|
auto it = snapshot_id_map.find(snapshot_id);
|
||||||
|
if (it == snapshot_id_map.end()) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
return EdgeTypeId::FromUint(it->second);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Reset current edge count.
|
||||||
|
edge_count->store(0, std::memory_order_release);
|
||||||
|
|
||||||
|
{
|
||||||
|
// Recover edges.
|
||||||
|
auto edge_acc = edges->access();
|
||||||
|
uint64_t last_edge_gid = 0;
|
||||||
|
if (snapshot_has_edges) {
|
||||||
|
spdlog::info("Recovering {} edges.", info.edges_count);
|
||||||
|
if (!snapshot.SetPosition(info.offset_edges)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
for (uint64_t i = 0; i < info.edges_count; ++i) {
|
||||||
|
{
|
||||||
|
const auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_EDGE) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (items.properties_on_edges) {
|
||||||
|
// Insert edge.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
last_edge_gid = *gid;
|
||||||
|
spdlog::debug("Recovering edge {} with properties.", *gid);
|
||||||
|
auto [it, inserted] = edge_acc.insert(Edge{Gid::FromUint(*gid), nullptr});
|
||||||
|
if (!inserted) throw RecoveryFailure("The edge must be inserted here!");
|
||||||
|
|
||||||
|
// Recover properties.
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &props = it->properties;
|
||||||
|
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||||
|
auto key = snapshot.ReadUint();
|
||||||
|
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto value = snapshot.ReadPropertyValue();
|
||||||
|
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for edge {}.",
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||||
|
props.SetProperty(get_property_from_id(*key), *value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Read edge GID.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (i > 0 && *gid <= last_edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
last_edge_gid = *gid;
|
||||||
|
|
||||||
|
spdlog::debug("Ensuring edge {} doesn't have any properties.", *gid);
|
||||||
|
// Read properties.
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (*props_size != 0)
|
||||||
|
throw RecoveryFailure(
|
||||||
|
"The snapshot has properties on edges, but the storage is "
|
||||||
|
"configured without properties on edges!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spdlog::info("Edges are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover vertices (labels and properties).
|
||||||
|
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
auto vertex_acc = vertices->access();
|
||||||
|
uint64_t last_vertex_gid = 0;
|
||||||
|
spdlog::info("Recovering {} vertices.", info.vertices_count);
|
||||||
|
for (uint64_t i = 0; i < info.vertices_count; ++i) {
|
||||||
|
{
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert vertex.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (i > 0 && *gid <= last_vertex_gid) {
|
||||||
|
throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
last_vertex_gid = *gid;
|
||||||
|
spdlog::debug("Recovering vertex {}.", *gid);
|
||||||
|
auto [it, inserted] = vertex_acc.insert(Vertex{Gid::FromUint(*gid), nullptr});
|
||||||
|
if (!inserted) throw RecoveryFailure("The vertex must be inserted here!");
|
||||||
|
|
||||||
|
// Recover labels.
|
||||||
|
spdlog::trace("Recovering labels for vertex {}.", *gid);
|
||||||
|
{
|
||||||
|
auto labels_size = snapshot.ReadUint();
|
||||||
|
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &labels = it->labels;
|
||||||
|
labels.reserve(*labels_size);
|
||||||
|
for (uint64_t j = 0; j < *labels_size; ++j) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered label \"{}\" for vertex {}.", name_id_mapper->IdToName(snapshot_id_map.at(*label)),
|
||||||
|
*gid);
|
||||||
|
labels.emplace_back(get_label_from_id(*label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover properties.
|
||||||
|
spdlog::trace("Recovering properties for vertex {}.", *gid);
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto &props = it->properties;
|
||||||
|
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||||
|
auto key = snapshot.ReadUint();
|
||||||
|
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto value = snapshot.ReadPropertyValue();
|
||||||
|
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
SPDLOG_TRACE("Recovered property \"{}\" with value \"{}\" for vertex {}.",
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*key)), *value, *gid);
|
||||||
|
props.SetProperty(get_property_from_id(*key), *value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip in edges.
|
||||||
|
{
|
||||||
|
auto in_size = snapshot.ReadUint();
|
||||||
|
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto from_gid = snapshot.ReadUint();
|
||||||
|
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip out edges.
|
||||||
|
auto out_size = snapshot.ReadUint();
|
||||||
|
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *out_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto to_gid = snapshot.ReadUint();
|
||||||
|
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spdlog::info("Vertices are recovered.");
|
||||||
|
|
||||||
|
// Recover vertices (in/out edges).
|
||||||
|
spdlog::info("Recovering connectivity.");
|
||||||
|
if (!snapshot.SetPosition(info.offset_vertices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
for (auto &vertex : vertex_acc) {
|
||||||
|
{
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_VERTEX) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::trace("Recovering connectivity for vertex {}.", vertex.gid.AsUint());
|
||||||
|
// Check vertex.
|
||||||
|
auto gid = snapshot.ReadUint();
|
||||||
|
if (!gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
if (gid != vertex.gid.AsUint()) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
// Skip labels.
|
||||||
|
{
|
||||||
|
auto labels_size = snapshot.ReadUint();
|
||||||
|
if (!labels_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *labels_size; ++j) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip properties.
|
||||||
|
{
|
||||||
|
auto props_size = snapshot.ReadUint();
|
||||||
|
if (!props_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
for (uint64_t j = 0; j < *props_size; ++j) {
|
||||||
|
auto key = snapshot.ReadUint();
|
||||||
|
if (!key) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto value = snapshot.SkipPropertyValue();
|
||||||
|
if (!value) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover in edges.
|
||||||
|
{
|
||||||
|
spdlog::trace("Recovering inbound edges for vertex {}.", vertex.gid.AsUint());
|
||||||
|
auto in_size = snapshot.ReadUint();
|
||||||
|
if (!in_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
vertex.in_edges.reserve(*in_size);
|
||||||
|
for (uint64_t j = 0; j < *in_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
last_edge_gid = std::max(last_edge_gid, *edge_gid);
|
||||||
|
|
||||||
|
auto from_gid = snapshot.ReadUint();
|
||||||
|
if (!from_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
auto from_vertex = vertex_acc.find(Gid::FromUint(*from_gid));
|
||||||
|
if (from_vertex == vertex_acc.end()) throw RecoveryFailure("Invalid from vertex!");
|
||||||
|
|
||||||
|
EdgeRef edge_ref(Gid::FromUint(*edge_gid));
|
||||||
|
if (items.properties_on_edges) {
|
||||||
|
if (snapshot_has_edges) {
|
||||||
|
auto edge = edge_acc.find(Gid::FromUint(*edge_gid));
|
||||||
|
if (edge == edge_acc.end()) throw RecoveryFailure("Invalid edge!");
|
||||||
|
edge_ref = EdgeRef(&*edge);
|
||||||
|
} else {
|
||||||
|
auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr});
|
||||||
|
edge_ref = EdgeRef(&*edge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SPDLOG_TRACE("Recovered inbound edge {} with label \"{}\" from vertex {}.", *edge_gid,
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*edge_type)), from_vertex->gid.AsUint());
|
||||||
|
vertex.in_edges.emplace_back(get_edge_type_from_id(*edge_type), &*from_vertex, edge_ref);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover out edges.
|
||||||
|
{
|
||||||
|
spdlog::trace("Recovering outbound edges for vertex {}.", vertex.gid.AsUint());
|
||||||
|
auto out_size = snapshot.ReadUint();
|
||||||
|
if (!out_size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
vertex.out_edges.reserve(*out_size);
|
||||||
|
for (uint64_t j = 0; j < *out_size; ++j) {
|
||||||
|
auto edge_gid = snapshot.ReadUint();
|
||||||
|
if (!edge_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
last_edge_gid = std::max(last_edge_gid, *edge_gid);
|
||||||
|
|
||||||
|
auto to_gid = snapshot.ReadUint();
|
||||||
|
if (!to_gid) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto edge_type = snapshot.ReadUint();
|
||||||
|
if (!edge_type) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
auto to_vertex = vertex_acc.find(Gid::FromUint(*to_gid));
|
||||||
|
if (to_vertex == vertex_acc.end()) throw RecoveryFailure("Invalid to vertex!");
|
||||||
|
|
||||||
|
EdgeRef edge_ref(Gid::FromUint(*edge_gid));
|
||||||
|
if (items.properties_on_edges) {
|
||||||
|
if (snapshot_has_edges) {
|
||||||
|
auto edge = edge_acc.find(Gid::FromUint(*edge_gid));
|
||||||
|
if (edge == edge_acc.end()) throw RecoveryFailure("Invalid edge!");
|
||||||
|
edge_ref = EdgeRef(&*edge);
|
||||||
|
} else {
|
||||||
|
auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr});
|
||||||
|
edge_ref = EdgeRef(&*edge);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SPDLOG_TRACE("Recovered outbound edge {} with label \"{}\" to vertex {}.", *edge_gid,
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*edge_type)), to_vertex->gid.AsUint());
|
||||||
|
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref);
|
||||||
|
}
|
||||||
|
// Increment edge count. We only increment the count here because the
|
||||||
|
// information is duplicated in in_edges.
|
||||||
|
edge_count->fetch_add(*out_size, std::memory_order_acq_rel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spdlog::info("Connectivity is recovered.");
|
||||||
|
|
||||||
|
// Set initial values for edge/vertex ID generators.
|
||||||
|
ret.next_edge_id = last_edge_gid + 1;
|
||||||
|
ret.next_vertex_id = last_vertex_gid + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover indices.
|
||||||
|
{
|
||||||
|
spdlog::info("Recovering metadata of indices.");
|
||||||
|
if (!snapshot.SetPosition(info.offset_indices)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_INDICES) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
// Recover label indices.
|
||||||
|
{
|
||||||
|
auto size = snapshot.ReadUint();
|
||||||
|
if (!size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
spdlog::info("Recovering metadata of {} label indices.", *size);
|
||||||
|
for (uint64_t i = 0; i < *size; ++i) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
AddRecoveredIndexConstraint(&indices_constraints.indices.label, get_label_from_id(*label),
|
||||||
|
"The label index already exists!");
|
||||||
|
SPDLOG_TRACE("Recovered metadata of label index for :{}", name_id_mapper->IdToName(snapshot_id_map.at(*label)));
|
||||||
|
}
|
||||||
|
spdlog::info("Metadata of label indices are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover label+property indices.
|
||||||
|
{
|
||||||
|
auto size = snapshot.ReadUint();
|
||||||
|
if (!size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
spdlog::info("Recovering metadata of {} label+property indices.", *size);
|
||||||
|
for (uint64_t i = 0; i < *size; ++i) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto property = snapshot.ReadUint();
|
||||||
|
if (!property) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
AddRecoveredIndexConstraint(&indices_constraints.indices.label_property,
|
||||||
|
{get_label_from_id(*label), get_property_from_id(*property)},
|
||||||
|
"The label+property index already exists!");
|
||||||
|
SPDLOG_TRACE("Recovered metadata of label+property index for :{}({})",
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*label)),
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*property)));
|
||||||
|
}
|
||||||
|
spdlog::info("Metadata of label+property indices are recovered.");
|
||||||
|
}
|
||||||
|
spdlog::info("Metadata of indices are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover constraints.
|
||||||
|
{
|
||||||
|
spdlog::info("Recovering metadata of constraints.");
|
||||||
|
if (!snapshot.SetPosition(info.offset_constraints)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
|
||||||
|
auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_CONSTRAINTS) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
// Recover existence constraints.
|
||||||
|
{
|
||||||
|
auto size = snapshot.ReadUint();
|
||||||
|
if (!size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
spdlog::info("Recovering metadata of {} existence constraints.", *size);
|
||||||
|
for (uint64_t i = 0; i < *size; ++i) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto property = snapshot.ReadUint();
|
||||||
|
if (!property) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
AddRecoveredIndexConstraint(&indices_constraints.constraints.existence,
|
||||||
|
{get_label_from_id(*label), get_property_from_id(*property)},
|
||||||
|
"The existence constraint already exists!");
|
||||||
|
SPDLOG_TRACE("Recovered metadata of existence constraint for :{}({})",
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*label)),
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*property)));
|
||||||
|
}
|
||||||
|
spdlog::info("Metadata of existence constraints are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover unique constraints.
|
||||||
|
// Snapshot version should be checked since unique constraints were
|
||||||
|
// implemented in later versions of snapshot.
|
||||||
|
if (*version >= kUniqueConstraintVersion) {
|
||||||
|
auto size = snapshot.ReadUint();
|
||||||
|
if (!size) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
spdlog::info("Recovering metadata of {} unique constraints.", *size);
|
||||||
|
for (uint64_t i = 0; i < *size; ++i) {
|
||||||
|
auto label = snapshot.ReadUint();
|
||||||
|
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
auto properties_count = snapshot.ReadUint();
|
||||||
|
if (!properties_count) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
std::set<PropertyId> properties;
|
||||||
|
for (uint64_t j = 0; j < *properties_count; ++j) {
|
||||||
|
auto property = snapshot.ReadUint();
|
||||||
|
if (!property) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
properties.insert(get_property_from_id(*property));
|
||||||
|
}
|
||||||
|
AddRecoveredIndexConstraint(&indices_constraints.constraints.unique, {get_label_from_id(*label), properties},
|
||||||
|
"The unique constraint already exists!");
|
||||||
|
SPDLOG_TRACE("Recovered metadata of unique constraints for :{}",
|
||||||
|
name_id_mapper->IdToName(snapshot_id_map.at(*label)));
|
||||||
|
}
|
||||||
|
spdlog::info("Metadata of unique constraints are recovered.");
|
||||||
|
}
|
||||||
|
spdlog::info("Metadata of constraints are recovered.");
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::info("Recovering metadata.");
|
||||||
|
// Recover epoch history
|
||||||
|
{
|
||||||
|
if (!snapshot.SetPosition(info.offset_epoch_history)) throw RecoveryFailure("Couldn't read data from snapshot!");
|
||||||
|
|
||||||
|
const auto marker = snapshot.ReadMarker();
|
||||||
|
if (!marker || *marker != Marker::SECTION_EPOCH_HISTORY) throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
|
||||||
|
const auto history_size = snapshot.ReadUint();
|
||||||
|
if (!history_size) {
|
||||||
|
throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < *history_size; ++i) {
|
||||||
|
auto maybe_epoch_id = snapshot.ReadString();
|
||||||
|
if (!maybe_epoch_id) {
|
||||||
|
throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
const auto maybe_last_commit_timestamp = snapshot.ReadUint();
|
||||||
|
if (!maybe_last_commit_timestamp) {
|
||||||
|
throw RecoveryFailure("Invalid snapshot data!");
|
||||||
|
}
|
||||||
|
epoch_history->emplace_back(std::move(*maybe_epoch_id), *maybe_last_commit_timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::info("Metadata recovered.");
|
||||||
|
// Recover timestamp.
|
||||||
|
ret.next_timestamp = info.start_timestamp + 1;
|
||||||
|
|
||||||
|
// Set success flag (to disable cleanup).
|
||||||
|
success = true;
|
||||||
|
|
||||||
|
return {info, ret, std::move(indices_constraints)};
|
||||||
|
}
|
||||||
|
|
||||||
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
|
||||||
utils::SkipList<Edge> *edges,
|
utils::SkipList<Edge> *edges,
|
||||||
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
|
||||||
@ -494,9 +963,13 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
|
|||||||
RecoveredIndicesAndConstraints indices_constraints;
|
RecoveredIndicesAndConstraints indices_constraints;
|
||||||
|
|
||||||
Decoder snapshot;
|
Decoder snapshot;
|
||||||
auto version = snapshot.Initialize(path, kSnapshotMagic);
|
const auto version = snapshot.Initialize(path, kSnapshotMagic);
|
||||||
if (!version) throw RecoveryFailure("Couldn't read snapshot magic and/or version!");
|
if (!version) throw RecoveryFailure("Couldn't read snapshot magic and/or version!");
|
||||||
|
|
||||||
if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
|
if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
|
||||||
|
if (*version == 14U) {
|
||||||
|
return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config.items);
|
||||||
|
}
|
||||||
|
|
||||||
// Cleanup of loaded data in case of failure.
|
// Cleanup of loaded data in case of failure.
|
||||||
bool success = false;
|
bool success = false;
|
||||||
|
Loading…
Reference in New Issue
Block a user