From 9825bdb41bb916bd0c5cb9936348147d0dd6e240 Mon Sep 17 00:00:00 2001 From: gvolfing Date: Fri, 15 Mar 2024 15:59:01 +0100 Subject: [PATCH] Add snapshot based recovery capabilites --- src/dbms/inmemory/replication_handlers.cpp | 4 +- src/memgraph.cpp | 8 ++- src/storage/v2/durability/durability.cpp | 4 +- src/storage/v2/durability/durability.hpp | 3 +- src/storage/v2/durability/snapshot.cpp | 49 ++++++++++------- src/storage/v2/durability/snapshot.hpp | 2 +- src/storage/v2/inmemory/storage.cpp | 2 +- tests/unit/storage_v2_durability_inmemory.cpp | 53 ++++++++++++++++++- 8 files changed, 98 insertions(+), 27 deletions(-) diff --git a/src/dbms/inmemory/replication_handlers.cpp b/src/dbms/inmemory/replication_handlers.cpp index 3e4a31884..a500e3e69 100644 --- a/src/dbms/inmemory/replication_handlers.cpp +++ b/src/dbms/inmemory/replication_handlers.cpp @@ -284,8 +284,8 @@ void InMemoryReplicationHandlers::SnapshotHandler(dbms::DbmsHandler *dbms_handle try { spdlog::debug("Loading snapshot"); auto recovered_snapshot = storage::durability::LoadSnapshot( - *maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->repl_storage_state_.history, - storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_); + *maybe_snapshot_path, &storage->vertices_, &storage->edges_, &storage->edges_metadata_, + &storage->repl_storage_state_.history, storage->name_id_mapper_.get(), &storage->edge_count_, storage->config_); spdlog::debug("Snapshot loaded successfully"); // If this step is present it should always be the first step of // the recovery so we use the UUID we read from snasphost diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 3b58b964d..530818a35 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -332,9 +332,15 @@ int main(int argc, char **argv) { .durability_directory = FLAGS_data_directory + "/rocksdb_durability", .wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, - .enable_edges_metadata = FLAGS_storage_enable_edges_metadata, + .enable_edges_metadata = + FLAGS_storage_properties_on_edges ? FLAGS_storage_enable_edges_metadata : false, .enable_schema_metadata = FLAGS_storage_enable_schema_metadata}, .salient.storage_mode = memgraph::flags::ParseStorageMode()}; + if (!FLAGS_storage_properties_on_edges && FLAGS_storage_enable_edges_metadata) { + spdlog::warn( + "Properties on edges were not enabled, hence edges metadata will also be disabled. If you wish to utilize " + "extra metadata on edges, enable properties on edges as well."); + } spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup, FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup); memgraph::utils::Scheduler jemalloc_purge_scheduler; diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index fbbedbee5..5cfe7fca7 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -279,6 +279,7 @@ std::optional GetParallelExecInfoIndices(const R std::optional Recovery::RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state, utils::SkipList *vertices, utils::SkipList *edges, + utils::SkipList *edges_metadata, std::atomic *edge_count, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, const Config &config, uint64_t *wal_seq_num) { @@ -313,7 +314,8 @@ std::optional Recovery::RecoverData(std::string *uuid, Replication } spdlog::info("Starting snapshot recovery from {}.", path); try { - recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config); + recovered_snapshot = + LoadSnapshot(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, config); spdlog::info("Snapshot recovery successful!"); break; } catch (const RecoveryFailure &e) { diff --git a/src/storage/v2/durability/durability.hpp b/src/storage/v2/durability/durability.hpp index 97e2c7efc..5d61beebc 100644 --- a/src/storage/v2/durability/durability.hpp +++ b/src/storage/v2/durability/durability.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -133,6 +133,7 @@ struct Recovery { /// @throw std::bad_alloc std::optional RecoverData(std::string *uuid, ReplicationStorageState &repl_storage_state, utils::SkipList *vertices, utils::SkipList *edges, + utils::SkipList *edges_metadata, std::atomic *edge_count, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, const Config &config, uint64_t *wal_seq_num); diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 5fea3dfa5..5b81df23b 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -425,6 +425,7 @@ struct LoadPartialConnectivityResult { template LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList &vertices, utils::SkipList &edges, + utils::SkipList &edges_metadata, const uint64_t from_offset, const uint64_t vertices_count, const SalientConfig::Items items, const bool snapshot_has_edges, TEdgeTypeFromIdFunc get_edge_type_from_id) { @@ -435,6 +436,7 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat auto vertex_acc = vertices.access(); auto edge_acc = edges.access(); + auto edge_metadata_acc = edges_metadata.access(); // Read the first gid to find the necessary iterator in vertices const auto first_vertex_gid = std::invoke([&]() mutable { @@ -578,6 +580,9 @@ LoadPartialConnectivityResult LoadPartialConnectivity(const std::filesystem::pat auto [edge, inserted] = edge_acc.insert(Edge{Gid::FromUint(*edge_gid), nullptr}); edge_ref = EdgeRef(&*edge); } + if (items.enable_edges_metadata) { + edge_metadata_acc.insert(EdgeMetadata{Gid::FromUint(*edge_gid), &vertex}); + } } vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type), &*to_vertex, edge_ref); // Increment edge count. We only increment the count here because the @@ -624,7 +629,7 @@ void RecoverOnMultipleThreads(size_t thread_count, const TFunc &func, const std: } RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils::SkipList *vertices, - utils::SkipList *edges, + utils::SkipList *edges, utils::SkipList *edges_metadata, std::deque> *epoch_history, NameIdMapper *name_id_mapper, std::atomic *edge_count, SalientConfig::Items items) { @@ -642,6 +647,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils if (!success) { edges->clear(); vertices->clear(); + edges_metadata->clear(); epoch_history->clear(); } }); @@ -1096,7 +1102,7 @@ RecoveredSnapshot LoadSnapshotVersion14(const std::filesystem::path &path, utils } RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils::SkipList *vertices, - utils::SkipList *edges, + utils::SkipList *edges, utils::SkipList *edges_metadata, std::deque> *epoch_history, NameIdMapper *name_id_mapper, std::atomic *edge_count, const Config &config) { @@ -1116,6 +1122,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils if (!success) { edges->clear(); vertices->clear(); + edges_metadata->clear(); epoch_history->clear(); } }); @@ -1224,10 +1231,10 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils RecoverOnMultipleThreads( config.durability.recovery_thread_count, - [path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id, - &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { - const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, - snapshot_has_edges, get_edge_type_from_id); + [path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges, + &get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { + const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset, + batch.count, items, snapshot_has_edges, get_edge_type_from_id); edge_count->fetch_add(result.edge_count); auto known_highest_edge_gid = highest_edge_gid.load(); while (known_highest_edge_gid < result.highest_edge_id) { @@ -1385,7 +1392,7 @@ RecoveredSnapshot LoadSnapshotVersion15(const std::filesystem::path &path, utils } RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils::SkipList *vertices, - utils::SkipList *edges, + utils::SkipList *edges, utils::SkipList *edges_metadata, std::deque> *epoch_history, NameIdMapper *name_id_mapper, std::atomic *edge_count, const Config &config) { @@ -1405,6 +1412,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils if (!success) { edges->clear(); vertices->clear(); + edges_metadata->clear(); epoch_history->clear(); } }); @@ -1513,10 +1521,10 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils RecoverOnMultipleThreads( config.durability.recovery_thread_count, - [path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id, - &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { - const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, - snapshot_has_edges, get_edge_type_from_id); + [path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges, + &get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { + const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset, + batch.count, items, snapshot_has_edges, get_edge_type_from_id); edge_count->fetch_add(result.edge_count); auto known_highest_edge_gid = highest_edge_gid.load(); while (known_highest_edge_gid < result.highest_edge_id) { @@ -1728,7 +1736,7 @@ RecoveredSnapshot LoadSnapshotVersion16(const std::filesystem::path &path, utils } RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList *vertices, - utils::SkipList *edges, + utils::SkipList *edges, utils::SkipList *edges_metadata, std::deque> *epoch_history, NameIdMapper *name_id_mapper, std::atomic *edge_count, const Config &config) { RecoveryInfo recovery_info; @@ -1740,14 +1748,16 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis if (!IsVersionSupported(*version)) throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version)); if (*version == 14U) { - return LoadSnapshotVersion14(path, vertices, edges, epoch_history, name_id_mapper, edge_count, + return LoadSnapshotVersion14(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, config.salient.items); } if (*version == 15U) { - return LoadSnapshotVersion15(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config); + return LoadSnapshotVersion15(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, + config); } if (*version == 16U) { - return LoadSnapshotVersion16(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config); + return LoadSnapshotVersion16(path, vertices, edges, edges_metadata, epoch_history, name_id_mapper, edge_count, + config); } // Cleanup of loaded data in case of failure. @@ -1756,6 +1766,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis if (!success) { edges->clear(); vertices->clear(); + edges_metadata->clear(); epoch_history->clear(); } }); @@ -1864,10 +1875,10 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis RecoverOnMultipleThreads( config.durability.recovery_thread_count, - [path, vertices, edges, edge_count, items = config.salient.items, snapshot_has_edges, &get_edge_type_from_id, - &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { - const auto result = LoadPartialConnectivity(path, *vertices, *edges, batch.offset, batch.count, items, - snapshot_has_edges, get_edge_type_from_id); + [path, vertices, edges, edges_metadata, edge_count, items = config.salient.items, snapshot_has_edges, + &get_edge_type_from_id, &highest_edge_gid, &recovery_info](const size_t batch_index, const BatchInfo &batch) { + const auto result = LoadPartialConnectivity(path, *vertices, *edges, *edges_metadata, batch.offset, + batch.count, items, snapshot_has_edges, get_edge_type_from_id); edge_count->fetch_add(result.edge_count); auto known_highest_edge_gid = highest_edge_gid.load(); while (known_highest_edge_gid < result.highest_edge_id) { diff --git a/src/storage/v2/durability/snapshot.hpp b/src/storage/v2/durability/snapshot.hpp index b8c224b3f..c93e5aea8 100644 --- a/src/storage/v2/durability/snapshot.hpp +++ b/src/storage/v2/durability/snapshot.hpp @@ -64,7 +64,7 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path); /// Function used to load the snapshot data into the storage. /// @throw RecoveryFailure RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList *vertices, - utils::SkipList *edges, + utils::SkipList *edges, utils::SkipList *edges_metadata, std::deque> *epoch_history, NameIdMapper *name_id_mapper, std::atomic *edge_count, const Config &config); diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 7a615bf60..df9deafbc 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -102,7 +102,7 @@ InMemoryStorage::InMemoryStorage(Config config) config_.durability.storage_directory); } if (config_.durability.recover_on_startup) { - auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edge_count_, + auto info = recovery_.RecoverData(&uuid_, repl_storage_state_, &vertices_, &edges_, &edges_metadata_, &edge_count_, name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_); if (info) { vertex_id_ = info->next_vertex_id; diff --git a/tests/unit/storage_v2_durability_inmemory.cpp b/tests/unit/storage_v2_durability_inmemory.cpp index 7794f2ab9..87014d9f4 100644 --- a/tests/unit/storage_v2_durability_inmemory.cpp +++ b/tests/unit/storage_v2_durability_inmemory.cpp @@ -2966,6 +2966,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) { memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; memgraph::utils::SkipList vertices; memgraph::utils::SkipList edges; + memgraph::utils::SkipList edges_metadata; std::unique_ptr name_id_mapper = std::make_unique(); std::atomic edge_count{0}; uint64_t wal_seq_num{0}; @@ -2979,7 +2980,7 @@ TEST_P(DurabilityTest, ConstraintsRecoveryFunctionSetting) { config.durability.storage_directory / memgraph::storage::durability::kWalDirectory}; // Recover snapshot. - const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edge_count, + const auto info = recovery.RecoverData(&uuid, repl_storage_state, &vertices, &edges, &edges_metadata, &edge_count, name_id_mapper.get(), &indices, &constraints, config, &wal_seq_num); MG_ASSERT(info.has_value(), "Info doesn't have value present"); @@ -3045,3 +3046,53 @@ TEST_P(DurabilityTest, EdgeTypeIndexRecovered) { ASSERT_FALSE(acc->Commit().HasError()); } } + +// NOLINTNEXTLINE(hicpp-special-member-functions) +TEST_P(DurabilityTest, EdgeMetadataRecovered) { + if (GetParam() == false) { + return; + } + // Create snapshot. + { + memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam()}, + .durability = {.storage_directory = storage_directory, .snapshot_on_exit = true}}; + memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; + memgraph::dbms::Database db{config, repl_state}; + CreateBaseDataset(db.storage(), GetParam()); + VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam()); + } + + ASSERT_EQ(GetSnapshotsList().size(), 1); + ASSERT_EQ(GetBackupSnapshotsList().size(), 0); + ASSERT_EQ(GetWalsList().size(), 0); + ASSERT_EQ(GetBackupWalsList().size(), 0); + + // Recover snapshot. + memgraph::storage::Config config{.salient.items = {.properties_on_edges = GetParam(), .enable_edges_metadata = true}, + .durability = {.storage_directory = storage_directory, .recover_on_startup = true}}; + memgraph::replication::ReplicationState repl_state{memgraph::storage::ReplicationStateRootPath(config)}; + memgraph::dbms::Database db{config, repl_state}; + VerifyDataset(db.storage(), DatasetType::ONLY_BASE, GetParam()); + + // Check if data has been loaded correctly. + { + auto acc = db.Access(); + + for (auto i{0U}; i < kNumBaseEdges; ++i) { + auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(0), memgraph::storage::View::OLD); + ASSERT_TRUE(edge.has_value()); + } + + auto edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges), memgraph::storage::View::OLD); + ASSERT_FALSE(edge.has_value()); + + auto vertex = acc->CreateVertex(); + auto new_edge = acc->CreateEdge(&vertex, &vertex, db.storage()->NameToEdgeType("et")); + ASSERT_TRUE(new_edge.HasValue()); + + edge = acc->FindEdge(memgraph::storage::Gid::FromUint(kNumBaseEdges + 1), memgraph::storage::View::OLD); + ASSERT_FALSE(edge.has_value()); + + ASSERT_FALSE(acc->Commit().HasError()); + } +}