Add snapshot based recovery capabilites

This commit is contained in:
gvolfing 2024-03-15 15:59:01 +01:00
parent 4f6d8c10ec
commit 9825bdb41b
8 changed files with 98 additions and 27 deletions

View File

@ -284,8 +284,8 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle
try {
spdlog::debug("Loading snapshot");
auto recovered_snapshot = storage::durability::LoadSnapshot(
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->repl_storage_state_.history,
storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
*maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->edges_metadata_,
&storage->repl_storage_state_.history, storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost

View File

@ -332,9 +332,15 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges,
.enable_edges_metadata = FLAGS_storage_enable_edges_metadata,
.enable_edges_metadata =
FLAGS_storage_properties_on_edges ? FLAGS_storage_enable_edges_metadata : false,
.enable_schema_metadata = FLAGS_storage_enable_schema_metadata},
.salient.storage_mode = memgraph::flags::ParseStorageMode()};
if (!FLAGS_storage_properties_on_edges && FLAGS_storage_enable_edges_metadata) {
spdlog::warn(
"Properties on edges were not enabled, hence edges metadata will also be disabled. If you wish to utilize "
"extra metadata on edges, enable properties on edges as well.");
}
spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup,
FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup);
memgraph::utils::Scheduler jemalloc_purge_scheduler;

View File

@ -279,6 +279,7 @@ std::optional<ParallelizedSchemaCreationInfo> GetParallelExecInfoIndices(const R
std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
utils::SkipList<EdgeMetadata> *edges_metadata,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num) {
@ -313,7 +314,8 @@ std::optional<RecoveryInfo> Recovery::RecoverData(std::string *uuid, Replication
}
spdlog::info("Starting snapshot recovery from {}.", path);
try {
recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
recovered_snapshot =
LoadSnapshot(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, config);
spdlog::info("Snapshot recovery successful!");
break;
} catch (const RecoveryFailure &e) {

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -133,6 +133,7 @@ struct Recovery {
/// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
utils::SkipList<EdgeMetadata> *edges_metadata,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num);

View File

@ -425,6 +425,7 @@ struct LoadPartialConnectivityResult {
template <typename TEdgeTypeFromIdFunc>
LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path,
utils::SkipList<Vertex> &vertices, utils::SkipList<Edge> &edges,
utils::SkipList<EdgeMetadata> &edges_metadata,
const uint64_t from_offset, const uint64_t vertices_count,
const SalientConfig::Items items, const bool snapshot_has_edges,
TEdgeTypeFromIdFunc get_edge_type_from_id) {
@ -435,6 +436,7 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
auto vertex_acc = vertices.access();
auto edge_acc = edges.access();
auto edge_metadata_acc = edges_metadata.access();
// Read the first gid to find the necessary iterator in vertices
const auto first_vertex_gid = std::invoke([&]() mutable {
@ -578,6 +580,9 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat
auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr});
edge_ref = EdgeRef(&*edge);
}
if (items.enable_edges_metadata) {
edge_metadata_acc.insert(EdgeMetadata{Gid::FromUint(*edge_gid), &vertex});
}
}
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref);
// Increment edge count. We only increment the count here because the
@ -624,7 +629,7 @@ void RecoverOnMultipleThreads(size_t thread_count, const TFunc &func, const std:
}
RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
SalientConfig::Items items) {
@ -642,6 +647,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1096,7 +1102,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils
}
RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
const Config &config) {
@ -1116,6 +1122,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1224,10 +1231,10 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) {
@ -1385,7 +1392,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils
}
RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count,
const Config &config) {
@ -1405,6 +1412,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1513,10 +1521,10 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) {
@ -1728,7 +1736,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils
}
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config) {
RecoveryInfo recovery_info;
@ -1740,14 +1748,16 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version));
if (*version == 14U) {
return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count,
return LoadSnapshotVersion14(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config.salient.items);
}
if (*version == 15U) {
return LoadSnapshotVersion15(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
return LoadSnapshotVersion15(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config);
}
if (*version == 16U) {
return LoadSnapshotVersion16(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
return LoadSnapshotVersion16(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count,
config);
}
// Cleanup of loaded data in case of failure.
@ -1756,6 +1766,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
if (!success) {
edges->clear();
vertices->clear();
edges_metadata->clear();
epoch_history->clear();
}
});
@ -1864,10 +1875,10 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
RecoverOnMultipleThreads(
config.durability.recovery_thread_count,
[path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id,
&highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items,
snapshot_has_edges, get_edge_type_from_id);
[path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges,
&get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) {
const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset,
batch.count, items, snapshot_has_edges, get_edge_type_from_id);
edge_count->fetch_add(result.edge_count);
auto known_highest_edge_gid = highest_edge_gid.load();
while (known_highest_edge_gid < result.highest_edge_id) {

View File

@ -64,7 +64,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
/// Function used to load the snapshot data into the storage.
/// @throw RecoveryFailure
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
utils::SkipList<Edge> *edges, utils::SkipList<EdgeMetadata> *edges_metadata,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config);

View File

@ -102,7 +102,7 @@ InMemoryStorage::InMemoryStorage(Config config)
config_.durability.storage_directory);
}
if (config_.durability.recover_on_startup) {
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_,
auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edges_metadata_, &edge_count_,
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
if (info) {
vertex_id_ = info->next_vertex_id;

View File

@ -2966,6 +2966,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::utils::SkipList<memgraph::storage::Vertex> vertices;
memgraph::utils::SkipList<memgraph::storage::Edge> edges;
memgraph::utils::SkipList<memgraph::storage::EdgeMetadata> edges_metadata;
std::unique_ptr<memgraph::storage::NameIdMapper> name_id_mapper = std::make_unique<memgraph::storage::NameIdMapper>();
std::atomic<uint64_t> edge_count{0};
uint64_t wal_seq_num{0};
@ -2979,7 +2980,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) {
config.durability.storage_directory / memgraph::storage::durability::kWalDirectory};
// Recover snapshot.
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count,
const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edges_metadata, &edge_count,
name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num);
MG_ASSERT(info.has_value(), "Info doesn't have value present");
@ -3045,3 +3046,53 @@ TEST_P(DurabilityTest, EdgeTypeIndexRecovered) {
ASSERT_FALSE(acc->Commit().HasError());
}
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(DurabilityTest, EdgeMetadataRecovered) {
if (GetParam() == false) {
return;
}
// Create snapshot.
{
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam()},
.durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
CreateBaseDataset(db.storage(), GetParam());
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
}
ASSERT_EQ(GetSnapshotsList().size(), 1);
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
ASSERT_EQ(GetWalsList().size(), 0);
ASSERT_EQ(GetBackupWalsList().size(), 0);
// Recover snapshot.
memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam(), .enable_edges_metadata = true},
.durability = {.storage_directory = storage_directory, .recover_on_startup = true}};
memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)};
memgraph::dbms::Database db{config, repl_state};
VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam());
// Check if data has been loaded correctly.
{
auto acc = db.Access();
for (auto i{0U}; i < kNumBaseEdges; ++i) {
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(0), memgraph::storage::View::OLD);
ASSERT_TRUE(edge.has_value());
}
auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD);
ASSERT_FALSE(edge.has_value());
auto vertex = acc->CreateVertex();
auto new_edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et"));
ASSERT_TRUE(new_edge.HasValue());
edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges + 1), memgraph::storage::View::OLD);
ASSERT_FALSE(edge.has_value());
ASSERT_FALSE(acc->Commit().HasError());
}
}