Implement WAL loading for storage v2
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2521
This commit is contained in:
parent
b5482a8169
commit
875a4a8629
@ -809,6 +809,32 @@ WalDeltaData ReadSkipWalDeltaData(Decoder *wal) {
|
||||
|
||||
return delta;
|
||||
}
|
||||
|
||||
// Helper function used to insert indices/constraints into the recovered
|
||||
// indices/constraints object.
|
||||
// @throw RecoveryFailure
|
||||
template <typename TObj>
|
||||
void AddRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj,
|
||||
const char *error_message) {
|
||||
auto it = std::find(list->begin(), list->end(), obj);
|
||||
if (it == list->end()) {
|
||||
list->push_back(obj);
|
||||
} else {
|
||||
throw RecoveryFailure(error_message);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TObj>
|
||||
void RemoveRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj,
|
||||
const char *error_message) {
|
||||
auto it = std::find(list->begin(), list->end(), obj);
|
||||
if (it != list->end()) {
|
||||
std::swap(*it, list->back());
|
||||
list->pop_back();
|
||||
} else {
|
||||
throw RecoveryFailure(error_message);
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// Function used to read information about the snapshot file.
|
||||
@ -1783,7 +1809,7 @@ void Durability::CreateSnapshot(Transaction *transaction) {
|
||||
std::sort(wal_files.begin(), wal_files.end());
|
||||
uint64_t snapshot_start_timestamp = transaction->start_timestamp;
|
||||
if (!old_snapshot_files.empty()) {
|
||||
snapshot_start_timestamp = old_snapshot_files.begin()->first;
|
||||
snapshot_start_timestamp = old_snapshot_files.front().first;
|
||||
}
|
||||
std::optional<uint64_t> pos = 0;
|
||||
for (uint64_t i = 0; i < wal_files.size(); ++i) {
|
||||
@ -1815,41 +1841,192 @@ void Durability::CreateSnapshot(Transaction *transaction) {
|
||||
std::optional<Durability::RecoveryInfo> Durability::RecoverData() {
|
||||
if (!utils::DirExists(snapshot_directory_)) return std::nullopt;
|
||||
|
||||
// Helper lambda used to recover all discovered indices and constraints. The
|
||||
// indices and constraints must be recovered after the data recovery is done
|
||||
// to ensure that the indices and constraints are consistent at the end of the
|
||||
// recovery process.
|
||||
auto recover_indices_and_constraints = [this](
|
||||
const auto &indices_constraints) {
|
||||
// Recover label indices.
|
||||
for (const auto &item : indices_constraints.indices.label) {
|
||||
if (!indices_->label_index.CreateIndex(item, vertices_->access()))
|
||||
throw RecoveryFailure("The label index must be created here!");
|
||||
}
|
||||
|
||||
// Recover label+property indices.
|
||||
for (const auto &item : indices_constraints.indices.label_property) {
|
||||
if (!indices_->label_property_index.CreateIndex(item.first, item.second,
|
||||
vertices_->access()))
|
||||
throw RecoveryFailure("The label+property index must be created here!");
|
||||
}
|
||||
|
||||
// Recover existence constraints.
|
||||
for (const auto &item : indices_constraints.constraints.existence) {
|
||||
auto ret = CreateExistenceConstraint(constraints_, item.first,
|
||||
item.second, vertices_->access());
|
||||
if (ret.HasError() || !ret.GetValue())
|
||||
throw RecoveryFailure("The existence constraint must be created here!");
|
||||
}
|
||||
};
|
||||
|
||||
// Array of all discovered snapshots, ordered by name.
|
||||
std::vector<std::filesystem::path> snapshot_files;
|
||||
std::vector<std::pair<std::filesystem::path, std::string>> snapshot_files;
|
||||
std::error_code error_code;
|
||||
for (auto &item :
|
||||
for (const auto &item :
|
||||
std::filesystem::directory_iterator(snapshot_directory_, error_code)) {
|
||||
if (!item.is_regular_file()) continue;
|
||||
try {
|
||||
ReadSnapshotInfo(item.path());
|
||||
snapshot_files.push_back(item.path());
|
||||
auto info = ReadSnapshotInfo(item.path());
|
||||
snapshot_files.emplace_back(item.path(), info.uuid);
|
||||
} catch (const RecoveryFailure &) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
|
||||
<< error_code.message() << "!";
|
||||
std::sort(snapshot_files.begin(), snapshot_files.end());
|
||||
for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) {
|
||||
const auto &path = *it;
|
||||
LOG(INFO) << "Starting snapshot recovery from " << path;
|
||||
|
||||
RecoveryInfo recovery_info;
|
||||
RecoveredIndicesAndConstraints indices_constraints;
|
||||
std::optional<uint64_t> snapshot_timestamp;
|
||||
if (!snapshot_files.empty()) {
|
||||
std::sort(snapshot_files.begin(), snapshot_files.end());
|
||||
// UUID used for durability is the UUID of the last snapshot file.
|
||||
uuid_ = snapshot_files.back().second;
|
||||
std::optional<Durability::RecoveredSnapshot> recovered_snapshot;
|
||||
for (auto it = snapshot_files.rbegin(); it != snapshot_files.rend(); ++it) {
|
||||
const auto &[path, uuid] = *it;
|
||||
if (uuid != uuid_) {
|
||||
LOG(WARNING) << "The snapshot file " << path
|
||||
<< " isn't related to the latest snapshot file!";
|
||||
continue;
|
||||
}
|
||||
LOG(INFO) << "Starting snapshot recovery from " << path;
|
||||
try {
|
||||
recovered_snapshot = LoadSnapshot(path);
|
||||
LOG(INFO) << "Snapshot recovery successful!";
|
||||
break;
|
||||
} catch (const RecoveryFailure &e) {
|
||||
LOG(WARNING) << "Couldn't recover snapshot from " << path
|
||||
<< " because of: " << e.what();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
CHECK(recovered_snapshot)
|
||||
<< "The database is configured to recover on startup, but couldn't "
|
||||
"recover using any of the specified snapshots! Please inspect them "
|
||||
"and restart the database.";
|
||||
recovery_info = recovered_snapshot->recovery_info;
|
||||
indices_constraints = std::move(recovered_snapshot->indices_constraints);
|
||||
snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp;
|
||||
if (!utils::DirExists(wal_directory_)) {
|
||||
recover_indices_and_constraints(indices_constraints);
|
||||
return recovered_snapshot->recovery_info;
|
||||
}
|
||||
} else {
|
||||
if (!utils::DirExists(wal_directory_)) return std::nullopt;
|
||||
// Array of all discovered WAL files, ordered by name.
|
||||
std::vector<std::pair<std::filesystem::path, std::string>> wal_files;
|
||||
for (const auto &item :
|
||||
std::filesystem::directory_iterator(wal_directory_, error_code)) {
|
||||
if (!item.is_regular_file()) continue;
|
||||
try {
|
||||
auto info = ReadWalInfo(item.path());
|
||||
wal_files.emplace_back(item.path(), info.uuid);
|
||||
} catch (const RecoveryFailure &e) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
|
||||
<< error_code.message() << "!";
|
||||
if (wal_files.empty()) return std::nullopt;
|
||||
std::sort(wal_files.begin(), wal_files.end());
|
||||
// UUID used for durability is the UUID of the last WAL file.
|
||||
uuid_ = wal_files.back().second;
|
||||
}
|
||||
|
||||
// Array of all discovered WAL files, ordered by sequence number.
|
||||
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
|
||||
wal_files;
|
||||
for (const auto &item :
|
||||
std::filesystem::directory_iterator(wal_directory_, error_code)) {
|
||||
if (!item.is_regular_file()) continue;
|
||||
try {
|
||||
auto info = LoadSnapshot(path);
|
||||
LOG(INFO) << "Snapshot recovery successful!";
|
||||
return info;
|
||||
auto info = ReadWalInfo(item.path());
|
||||
if (info.uuid != uuid_) continue;
|
||||
wal_files.emplace_back(info.seq_num, info.from_timestamp,
|
||||
info.to_timestamp, item.path());
|
||||
} catch (const RecoveryFailure &e) {
|
||||
LOG(WARNING) << "Couldn't recover snapshot from " << path
|
||||
<< " because of: " << e.what();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
CHECK(!error_code) << "Couldn't recover data because an error occurred: "
|
||||
<< error_code.message() << "!";
|
||||
// By this point we should have recovered from a snapshot, or we should have
|
||||
// found some WAL files to recover from in the above `else`. This is just a
|
||||
// sanity check to circumvent the following case: The database didn't recover
|
||||
// from a snapshot, the above `else` triggered to find the recovery UUID from
|
||||
// a WAL file. The above `else` has an early exit in case there are no WAL
|
||||
// files. Because we reached this point there must have been some WAL files
|
||||
// and we must have some WAL files after this second WAL directory iteration.
|
||||
CHECK(snapshot_timestamp || !wal_files.empty())
|
||||
<< "The database didn't recover from a snapshot and didn't find any WAL "
|
||||
"files that match the last WAL file!";
|
||||
|
||||
if (!wal_files.empty()) {
|
||||
std::sort(wal_files.begin(), wal_files.end());
|
||||
{
|
||||
const auto &[seq_num, from_timestamp, to_timestamp, path] = wal_files[0];
|
||||
if (seq_num != 0) {
|
||||
// We don't have all WAL files. We need to see whether we need them all.
|
||||
if (!snapshot_timestamp) {
|
||||
// We didn't recover from a snapshot and we must have all WAL files
|
||||
// starting from the first one (seq_num == 0) to be able to recover
|
||||
// data from them.
|
||||
LOG(FATAL) << "There are missing prefix WAL files and data can't be "
|
||||
"recovered without them!";
|
||||
} else if (to_timestamp >= *snapshot_timestamp) {
|
||||
// We recovered from a snapshot and we must have at least one WAL file
|
||||
// whose all deltas were created before the snapshot in order to
|
||||
// verify that nothing is missing from the beginning of the WAL chain.
|
||||
LOG(FATAL) << "You must have at least one WAL file that contains "
|
||||
"deltas that were created before the snapshot file!";
|
||||
}
|
||||
}
|
||||
}
|
||||
std::optional<uint64_t> previous_seq_num;
|
||||
for (const auto &[seq_num, from_timestamp, to_timestamp, path] :
|
||||
wal_files) {
|
||||
if (previous_seq_num && *previous_seq_num + 1 != seq_num) {
|
||||
LOG(FATAL) << "You are missing a WAL file with the sequence number "
|
||||
<< *previous_seq_num + 1 << "!";
|
||||
}
|
||||
previous_seq_num = seq_num;
|
||||
try {
|
||||
auto info = LoadWal(path, &indices_constraints, snapshot_timestamp);
|
||||
recovery_info.next_vertex_id =
|
||||
std::max(recovery_info.next_vertex_id, info.next_vertex_id);
|
||||
recovery_info.next_edge_id =
|
||||
std::max(recovery_info.next_edge_id, info.next_edge_id);
|
||||
recovery_info.next_timestamp =
|
||||
std::max(recovery_info.next_timestamp, info.next_timestamp);
|
||||
} catch (const RecoveryFailure &e) {
|
||||
LOG(FATAL) << "Couldn't recover WAL deltas from " << path
|
||||
<< " because of: " << e.what();
|
||||
}
|
||||
}
|
||||
// The sequence number needs to be recovered even though `LoadWal` didn't
|
||||
// load any deltas from that file.
|
||||
wal_seq_num_ = *previous_seq_num + 1;
|
||||
}
|
||||
|
||||
recover_indices_and_constraints(indices_constraints);
|
||||
return recovery_info;
|
||||
}
|
||||
|
||||
Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
Durability::RecoveredSnapshot Durability::LoadSnapshot(
|
||||
const std::filesystem::path &path) {
|
||||
Durability::RecoveryInfo ret;
|
||||
RecoveredIndicesAndConstraints indices_constraints;
|
||||
|
||||
Decoder snapshot;
|
||||
auto version = snapshot.Initialize(path, kSnapshotMagic);
|
||||
@ -1863,9 +2040,6 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
if (!success) {
|
||||
edges_->clear();
|
||||
vertices_->clear();
|
||||
indices_->label_index.Clear();
|
||||
indices_->label_property_index.Clear();
|
||||
constraints_->existence_constraints.clear();
|
||||
}
|
||||
});
|
||||
|
||||
@ -1875,9 +2049,6 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
// Check for edges.
|
||||
bool snapshot_has_edges = info.offset_edges != 0;
|
||||
|
||||
// Set storage UUID.
|
||||
uuid_ = info.uuid;
|
||||
|
||||
// Recover mapper.
|
||||
std::unordered_map<uint64_t, uint64_t> snapshot_id_map;
|
||||
{
|
||||
@ -2190,9 +2361,9 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
for (uint64_t i = 0; i < *size; ++i) {
|
||||
auto label = snapshot.ReadUint();
|
||||
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (!indices_->label_index.CreateIndex(get_label_from_id(*label),
|
||||
vertices_->access()))
|
||||
throw RecoveryFailure("Couldn't recover label index!");
|
||||
AddRecoveredIndexConstraint(&indices_constraints.indices.label,
|
||||
get_label_from_id(*label),
|
||||
"The label index already exists!");
|
||||
}
|
||||
}
|
||||
|
||||
@ -2205,10 +2376,10 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto property = snapshot.ReadUint();
|
||||
if (!property) throw RecoveryFailure("Invalid snapshot data!");
|
||||
if (!indices_->label_property_index.CreateIndex(
|
||||
get_label_from_id(*label), get_property_from_id(*property),
|
||||
vertices_->access()))
|
||||
throw RecoveryFailure("Couldn't recover label+property index!");
|
||||
AddRecoveredIndexConstraint(
|
||||
&indices_constraints.indices.label_property,
|
||||
{get_label_from_id(*label), get_property_from_id(*property)},
|
||||
"The label+property index already exists!");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2231,11 +2402,10 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
if (!label) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto property = snapshot.ReadUint();
|
||||
if (!property) throw RecoveryFailure("Invalid snapshot data!");
|
||||
auto ret = CreateExistenceConstraint(
|
||||
constraints_, get_label_from_id(*label),
|
||||
get_property_from_id(*property), vertices_->access());
|
||||
if (!ret.HasValue() || !*ret)
|
||||
throw RecoveryFailure("Couldn't recover existence constraint!");
|
||||
AddRecoveredIndexConstraint(
|
||||
&indices_constraints.constraints.existence,
|
||||
{get_label_from_id(*label), get_property_from_id(*property)},
|
||||
"The existence constraint already exists!");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2246,6 +2416,299 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
|
||||
// Set success flag (to disable cleanup).
|
||||
success = true;
|
||||
|
||||
return {info, ret, std::move(indices_constraints)};
|
||||
}
|
||||
|
||||
Durability::RecoveryInfo Durability::LoadWal(
|
||||
const std::filesystem::path &path,
|
||||
RecoveredIndicesAndConstraints *indices_constraints,
|
||||
std::optional<uint64_t> snapshot_timestamp) {
|
||||
Durability::RecoveryInfo ret;
|
||||
|
||||
Decoder wal;
|
||||
auto version = wal.Initialize(path, kWalMagic);
|
||||
if (!version)
|
||||
throw RecoveryFailure("Couldn't read WAL magic and/or version!");
|
||||
if (*version != kVersion) throw RecoveryFailure("Invalid WAL version!");
|
||||
|
||||
// Read wal info.
|
||||
auto info = ReadWalInfo(path);
|
||||
|
||||
// Check timestamp.
|
||||
if (snapshot_timestamp && info.to_timestamp <= *snapshot_timestamp)
|
||||
return ret;
|
||||
|
||||
// Recover deltas.
|
||||
wal.SetPosition(info.offset_deltas);
|
||||
uint64_t deltas_applied = 0;
|
||||
auto edge_acc = edges_->access();
|
||||
auto vertex_acc = vertices_->access();
|
||||
for (uint64_t i = 0; i < info.num_deltas; ++i) {
|
||||
// Read WAL delta header to find out the delta timestamp.
|
||||
auto timestamp = ReadWalDeltaHeader(&wal);
|
||||
|
||||
if (!snapshot_timestamp || timestamp > *snapshot_timestamp) {
|
||||
// This delta should be loaded.
|
||||
auto delta = ReadWalDeltaData(&wal);
|
||||
switch (delta.type) {
|
||||
case WalDeltaData::Type::VERTEX_CREATE: {
|
||||
auto [vertex, inserted] = vertex_acc.insert(
|
||||
Vertex{delta.vertex_create_delete.gid, nullptr});
|
||||
if (!inserted)
|
||||
throw RecoveryFailure("The vertex must be inserted here!");
|
||||
|
||||
ret.next_vertex_id = std::max(
|
||||
ret.next_vertex_id, delta.vertex_create_delete.gid.AsUint() + 1);
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_DELETE: {
|
||||
auto vertex = vertex_acc.find(delta.vertex_create_delete.gid);
|
||||
if (vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The vertex doesn't exist!");
|
||||
if (!vertex->in_edges.empty() || !vertex->out_edges.empty())
|
||||
throw RecoveryFailure(
|
||||
"The vertex can't be deleted because it still has edges!");
|
||||
|
||||
if (!vertex_acc.remove(delta.vertex_create_delete.gid))
|
||||
throw RecoveryFailure("The vertex must be removed here!");
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_ADD_LABEL:
|
||||
case WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
|
||||
auto vertex = vertex_acc.find(delta.vertex_add_remove_label.gid);
|
||||
if (vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The vertex doesn't exist!");
|
||||
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.vertex_add_remove_label.label));
|
||||
auto it =
|
||||
std::find(vertex->labels.begin(), vertex->labels.end(), label_id);
|
||||
|
||||
if (delta.type == WalDeltaData::Type::VERTEX_ADD_LABEL) {
|
||||
if (it != vertex->labels.end())
|
||||
throw RecoveryFailure("The vertex already has the label!");
|
||||
vertex->labels.push_back(label_id);
|
||||
} else {
|
||||
if (it == vertex->labels.end())
|
||||
throw RecoveryFailure("The vertex doesn't have the label!");
|
||||
std::swap(*it, vertex->labels.back());
|
||||
vertex->labels.pop_back();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::VERTEX_SET_PROPERTY: {
|
||||
auto vertex = vertex_acc.find(delta.vertex_edge_set_property.gid);
|
||||
if (vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The vertex doesn't exist!");
|
||||
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId(
|
||||
delta.vertex_edge_set_property.property));
|
||||
auto &property_value = delta.vertex_edge_set_property.value;
|
||||
|
||||
auto it = vertex->properties.find(property_id);
|
||||
if (it != vertex->properties.end()) {
|
||||
if (property_value.IsNull()) {
|
||||
// remove the property
|
||||
vertex->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = std::move(property_value);
|
||||
}
|
||||
} else if (!property_value.IsNull()) {
|
||||
vertex->properties.emplace(property_id, std::move(property_value));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_CREATE: {
|
||||
auto from_vertex =
|
||||
vertex_acc.find(delta.edge_create_delete.from_vertex);
|
||||
if (from_vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The from vertex doesn't exist!");
|
||||
auto to_vertex = vertex_acc.find(delta.edge_create_delete.to_vertex);
|
||||
if (to_vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The to vertex doesn't exist!");
|
||||
|
||||
auto edge_gid = delta.edge_create_delete.gid;
|
||||
auto edge_type_id = EdgeTypeId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.edge_create_delete.edge_type));
|
||||
EdgeRef edge_ref(edge_gid);
|
||||
if (items_.properties_on_edges) {
|
||||
auto [edge, inserted] = edge_acc.insert(Edge{edge_gid, nullptr});
|
||||
if (!inserted)
|
||||
throw RecoveryFailure("The edge must be inserted here!");
|
||||
edge_ref = EdgeRef(&*edge);
|
||||
}
|
||||
{
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{
|
||||
edge_type_id, &*to_vertex, edge_ref};
|
||||
auto it = std::find(from_vertex->out_edges.begin(),
|
||||
from_vertex->out_edges.end(), link);
|
||||
if (it != from_vertex->out_edges.end())
|
||||
throw RecoveryFailure("The from vertex already has this edge!");
|
||||
from_vertex->out_edges.push_back(link);
|
||||
}
|
||||
{
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{
|
||||
edge_type_id, &*from_vertex, edge_ref};
|
||||
auto it = std::find(to_vertex->in_edges.begin(),
|
||||
to_vertex->in_edges.end(), link);
|
||||
if (it != to_vertex->in_edges.end())
|
||||
throw RecoveryFailure("The to vertex already has this edge!");
|
||||
to_vertex->in_edges.push_back(link);
|
||||
}
|
||||
|
||||
ret.next_edge_id = std::max(ret.next_edge_id, edge_gid.AsUint() + 1);
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_DELETE: {
|
||||
auto from_vertex =
|
||||
vertex_acc.find(delta.edge_create_delete.from_vertex);
|
||||
if (from_vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The from vertex doesn't exist!");
|
||||
auto to_vertex = vertex_acc.find(delta.edge_create_delete.to_vertex);
|
||||
if (to_vertex == vertex_acc.end())
|
||||
throw RecoveryFailure("The to vertex doesn't exist!");
|
||||
|
||||
auto edge_gid = delta.edge_create_delete.gid;
|
||||
auto edge_type_id = EdgeTypeId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.edge_create_delete.edge_type));
|
||||
EdgeRef edge_ref(edge_gid);
|
||||
if (items_.properties_on_edges) {
|
||||
auto edge = edge_acc.find(edge_gid);
|
||||
if (edge == edge_acc.end())
|
||||
throw RecoveryFailure("The edge doesn't exist!");
|
||||
edge_ref = EdgeRef(&*edge);
|
||||
}
|
||||
{
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{
|
||||
edge_type_id, &*to_vertex, edge_ref};
|
||||
auto it = std::find(from_vertex->out_edges.begin(),
|
||||
from_vertex->out_edges.end(), link);
|
||||
if (it == from_vertex->out_edges.end())
|
||||
throw RecoveryFailure("The from vertex doesn't have this edge!");
|
||||
std::swap(*it, from_vertex->out_edges.back());
|
||||
from_vertex->out_edges.pop_back();
|
||||
}
|
||||
{
|
||||
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{
|
||||
edge_type_id, &*from_vertex, edge_ref};
|
||||
auto it = std::find(to_vertex->in_edges.begin(),
|
||||
to_vertex->in_edges.end(), link);
|
||||
if (it == to_vertex->in_edges.end())
|
||||
throw RecoveryFailure("The to vertex doesn't have this edge!");
|
||||
std::swap(*it, to_vertex->in_edges.back());
|
||||
to_vertex->in_edges.pop_back();
|
||||
}
|
||||
if (items_.properties_on_edges) {
|
||||
if (!edge_acc.remove(edge_gid))
|
||||
throw RecoveryFailure("The edge must be removed here!");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
|
||||
if (!items_.properties_on_edges)
|
||||
throw RecoveryFailure(
|
||||
"The WAL has properties on edges, but the storage is "
|
||||
"configured without properties on edges!");
|
||||
auto edge = edge_acc.find(delta.vertex_edge_set_property.gid);
|
||||
if (edge == edge_acc.end())
|
||||
throw RecoveryFailure("The edge doesn't exist!");
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId(
|
||||
delta.vertex_edge_set_property.property));
|
||||
auto &property_value = delta.vertex_edge_set_property.value;
|
||||
auto it = edge->properties.find(property_id);
|
||||
if (it != edge->properties.end()) {
|
||||
if (property_value.IsNull()) {
|
||||
// remove the property
|
||||
edge->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = std::move(property_value);
|
||||
}
|
||||
} else if (!property_value.IsNull()) {
|
||||
edge->properties.emplace(property_id, std::move(property_value));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::TRANSACTION_END:
|
||||
break;
|
||||
case WalDeltaData::Type::LABEL_INDEX_CREATE: {
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.operation_label.label));
|
||||
AddRecoveredIndexConstraint(&indices_constraints->indices.label,
|
||||
label_id,
|
||||
"The label index already exists!");
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_INDEX_DROP: {
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.operation_label.label));
|
||||
RemoveRecoveredIndexConstraint(&indices_constraints->indices.label,
|
||||
label_id,
|
||||
"The label index doesn't exist!");
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_CREATE: {
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.operation_label_property.label));
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId(
|
||||
delta.operation_label_property.property));
|
||||
AddRecoveredIndexConstraint(
|
||||
&indices_constraints->indices.label_property,
|
||||
{label_id, property_id},
|
||||
"The label property index already exists!");
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::LABEL_PROPERTY_INDEX_DROP: {
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.operation_label_property.label));
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId(
|
||||
delta.operation_label_property.property));
|
||||
RemoveRecoveredIndexConstraint(
|
||||
&indices_constraints->indices.label_property,
|
||||
{label_id, property_id},
|
||||
"The label property index doesn't exist!");
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_CREATE: {
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.operation_label_property.label));
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId(
|
||||
delta.operation_label_property.property));
|
||||
AddRecoveredIndexConstraint(
|
||||
&indices_constraints->constraints.existence,
|
||||
{label_id, property_id},
|
||||
"The existence constraint already exists!");
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: {
|
||||
auto label_id = LabelId::FromUint(
|
||||
name_id_mapper_->NameToId(delta.operation_label_property.label));
|
||||
auto property_id = PropertyId::FromUint(name_id_mapper_->NameToId(
|
||||
delta.operation_label_property.property));
|
||||
RemoveRecoveredIndexConstraint(
|
||||
&indices_constraints->constraints.existence,
|
||||
{label_id, property_id},
|
||||
"The existence constraint doesn't exist!");
|
||||
break;
|
||||
}
|
||||
}
|
||||
ret.next_timestamp = std::max(ret.next_timestamp, timestamp + 1);
|
||||
++deltas_applied;
|
||||
} else {
|
||||
// This delta should be skipped.
|
||||
SkipWalDeltaData(&wal);
|
||||
}
|
||||
}
|
||||
|
||||
LOG(INFO) << "Applied " << deltas_applied << " deltas from WAL " << path;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -298,6 +298,18 @@ enum class StorageGlobalOperation {
|
||||
EXISTENCE_CONSTRAINT_DROP,
|
||||
};
|
||||
|
||||
/// Structure used to track indices and constraints during recovery.
|
||||
struct RecoveredIndicesAndConstraints {
|
||||
struct {
|
||||
std::vector<LabelId> label;
|
||||
std::vector<std::pair<LabelId, PropertyId>> label_property;
|
||||
} indices;
|
||||
|
||||
struct {
|
||||
std::vector<std::pair<LabelId, PropertyId>> existence;
|
||||
} constraints;
|
||||
};
|
||||
|
||||
/// WalFile class used to append deltas and operations to the WAL file.
|
||||
class WalFile {
|
||||
public:
|
||||
@ -340,9 +352,15 @@ class WalFile {
|
||||
class Durability final {
|
||||
public:
|
||||
struct RecoveryInfo {
|
||||
uint64_t next_vertex_id;
|
||||
uint64_t next_edge_id;
|
||||
uint64_t next_timestamp;
|
||||
uint64_t next_vertex_id{0};
|
||||
uint64_t next_edge_id{0};
|
||||
uint64_t next_timestamp{0};
|
||||
};
|
||||
|
||||
struct RecoveredSnapshot {
|
||||
SnapshotInfo snapshot_info;
|
||||
RecoveryInfo recovery_info;
|
||||
RecoveredIndicesAndConstraints indices_constraints;
|
||||
};
|
||||
|
||||
Durability(Config::Durability config, utils::SkipList<Vertex> *vertices,
|
||||
@ -367,7 +385,11 @@ class Durability final {
|
||||
|
||||
std::optional<RecoveryInfo> RecoverData();
|
||||
|
||||
RecoveryInfo LoadSnapshot(const std::filesystem::path &path);
|
||||
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path);
|
||||
|
||||
RecoveryInfo LoadWal(const std::filesystem::path &path,
|
||||
RecoveredIndicesAndConstraints *indices_constraints,
|
||||
std::optional<uint64_t> snapshot_timestamp);
|
||||
|
||||
bool InitializeWalFile();
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user