From 1556d78d159df27f1f5058229e44be8cfc7e2b91 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Wed, 20 Dec 2017 12:48:19 +0100 Subject: [PATCH] Update snapshot format Summary: Set vertex/edge generator id from recovery Add tests Reviewers: florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1061 --- src/database/graph_db.cpp | 11 ++--- src/database/graph_db.hpp | 3 ++ src/durability/recovery.cpp | 42 ++++++++++------- src/durability/recovery.hpp | 2 +- src/durability/snapshooter.cpp | 29 +++++++----- src/durability/snapshooter.hpp | 8 ++-- src/durability/version.hpp | 4 +- src/storage/gid.hpp | 30 +++++++++--- tests/unit/durability.cpp | 79 ++++++++++++++++++++++++++++++-- tools/src/mg_import_csv/main.cpp | 12 ++++- 10 files changed, 167 insertions(+), 53 deletions(-) diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 340449542..6b944556f 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -101,13 +101,11 @@ void GraphDb::Shutdown() { void GraphDb::StartSnapshooting() { if (config_.durability_enabled) { auto create_snapshot = [this]() -> void { - GraphDbAccessor db_accessor(*this); - if (!durability::MakeSnapshot(db_accessor, + if (!durability::MakeSnapshot(*this, fs::path(config_.durability_directory), config_.snapshot_max_retained)) { LOG(WARNING) << "Durability: snapshot creation failed"; } - db_accessor.Commit(); }; snapshot_creator_.Run(std::chrono::seconds(config_.snapshot_cycle_sec), create_snapshot); @@ -178,11 +176,10 @@ GraphDb::~GraphDb() { // Create last database snapshot if (config_.snapshot_on_exit == true) { - GraphDbAccessor db_accessor(*this); LOG(INFO) << "Creating snapshot on shutdown..." << std::endl; - const bool status = durability::MakeSnapshot( - db_accessor, fs::path(config_.durability_directory), - config_.snapshot_max_retained); + const bool status = + durability::MakeSnapshot(*this, fs::path(config_.durability_directory), + config_.snapshot_max_retained); if (status) { std::cout << "Snapshot created successfully." << std::endl; } else { diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 43c5f78be..c7d71a2b0 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -103,6 +103,9 @@ class GraphDb { void CollectGarbage(); + gid::GidGenerator &VertexGenerator() { return vertex_generator_; } + gid::GidGenerator &EdgeGenerator() { return edge_generator_; } + /** When this is false, no new transactions should be created. */ std::atomic is_accepting_transactions_{true}; diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 4120bcf01..428ba92de 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -52,8 +52,7 @@ struct RecoveryData { return false; \ } -bool RecoverSnapshot(const fs::path &snapshot_file, - GraphDbAccessor &db_accessor, +bool RecoverSnapshot(const fs::path &snapshot_file, GraphDb &db, RecoveryData &recovery_data) { HashedFileReader reader; communication::bolt::Decoder decoder(reader); @@ -77,6 +76,16 @@ bool RecoverSnapshot(const fs::path &snapshot_file, RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) && dv.ValueInt() == durability::kVersion); + // Vertex and edge generator ids + RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); + uint64_t vertex_generator_cnt = dv.ValueInt(); + db.VertexGenerator().SetId( + std::max(db.VertexGenerator().LocalCount(), vertex_generator_cnt)); + RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); + uint64_t edge_generator_cnt = dv.ValueInt(); + db.EdgeGenerator().SetId( + std::max(db.EdgeGenerator().LocalCount(), edge_generator_cnt)); + RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); recovery_data.snapshooter_tx_id = dv.ValueInt(); // Transaction snapshot of the transaction that created the snapshot. @@ -98,16 +107,17 @@ bool RecoverSnapshot(const fs::path &snapshot_file, property.ValueString()); } + GraphDbAccessor dba(db); for (int64_t i = 0; i < vertex_count; ++i) { DecodedValue vertex_dv; RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex)); auto &vertex = vertex_dv.ValueVertex(); - auto vertex_accessor = db_accessor.InsertVertex(vertex.id); + auto vertex_accessor = dba.InsertVertex(vertex.id); for (const auto &label : vertex.labels) { - vertex_accessor.add_label(db_accessor.Label(label)); + vertex_accessor.add_label(dba.Label(label)); } for (const auto &property_pair : vertex.properties) { - vertex_accessor.PropsSet(db_accessor.Property(property_pair.first), + vertex_accessor.PropsSet(dba.Property(property_pair.first), query::TypedValue(property_pair.second)); } vertices.insert({vertex.id, vertex_accessor}); @@ -119,12 +129,11 @@ bool RecoverSnapshot(const fs::path &snapshot_file, auto it_from = vertices.find(edge.from); auto it_to = vertices.find(edge.to); RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end()); - auto edge_accessor = - db_accessor.InsertEdge(it_from->second, it_to->second, - db_accessor.EdgeType(edge.type), edge.id); + auto edge_accessor = dba.InsertEdge(it_from->second, it_to->second, + dba.EdgeType(edge.type), edge.id); for (const auto &property_pair : edge.properties) - edge_accessor.PropsSet(db_accessor.Property(property_pair.first), + edge_accessor.PropsSet(dba.Property(property_pair.first), query::TypedValue(property_pair.second)); } @@ -132,8 +141,12 @@ bool RecoverSnapshot(const fs::path &snapshot_file, // hash. reader.ReadType(vertex_count); reader.ReadType(edge_count); - if (!reader.Close()) return false; - return reader.hash() == hash; + if (!reader.Close() || reader.hash() != hash) { + dba.Abort(); + return false; + } + dba.Commit(); + return true; } #undef RETURN_IF_NOT @@ -266,22 +279,19 @@ bool Recover(const fs::path &durability_dir, GraphDb &db) { snapshot_files.emplace_back(file); std::sort(snapshot_files.rbegin(), snapshot_files.rend()); for (auto &snapshot_file : snapshot_files) { - GraphDbAccessor db_accessor{db}; LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file; - if (!RecoverSnapshot(snapshot_file, db_accessor, recovery_data)) { - db_accessor.Abort(); + if (!RecoverSnapshot(snapshot_file, db, recovery_data)) { recovery_data.Clear(); LOG(WARNING) << "Snapshot recovery failed, trying older snapshot..."; continue; } else { LOG(INFO) << "Snapshot recovery successful."; - db_accessor.Commit(); break; } } // Write-ahead-log recovery. - GraphDbAccessor db_accessor{db}; + GraphDbAccessor db_accessor(db); // WAL recovery does not have to be complete for the recovery to be // considered successful. For the time being ignore the return value, // consider a better system. diff --git a/src/durability/recovery.hpp b/src/durability/recovery.hpp index 45a038310..192cc6795 100644 --- a/src/durability/recovery.hpp +++ b/src/durability/recovery.hpp @@ -27,4 +27,4 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count, */ bool Recover(const std::experimental::filesystem::path &durability_dir, GraphDb &db); -} +} // namespace durability diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index 9c7479c4f..ae769c821 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -16,7 +16,7 @@ namespace fs = std::experimental::filesystem; namespace durability { namespace { -bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) { +bool Encode(const fs::path &snapshot_file, GraphDb &db, GraphDbAccessor &dba) { try { HashedFileWriter buffer(snapshot_file); communication::bolt::BaseEncoder encoder(buffer); @@ -26,14 +26,19 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) { durability::kMagicNumber.size()); encoder.WriteInt(durability::kVersion); + // Write the number of generated vertex and edges, used to recover + // generators internal states + encoder.WriteInt(db.VertexGenerator().LocalCount()); + encoder.WriteInt(db.EdgeGenerator().LocalCount()); + // Write the ID of the transaction doing the snapshot. - encoder.WriteInt(db_accessor_.transaction_id()); + encoder.WriteInt(dba.transaction_id()); // Write the transaction snapshot into the snapshot. It's used when // recovering from the combination of snapshot and write-ahead-log. { std::vector tx_snapshot; - for (int64_t tx : db_accessor_.transaction().snapshot()) + for (int64_t tx : dba.transaction().snapshot()) tx_snapshot.emplace_back(tx); encoder.WriteList(tx_snapshot); } @@ -41,18 +46,18 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) { // Write label+property indexes as list ["label", "property", ...] { std::vector index_vec; - for (const auto &key : db_accessor_.GetIndicesKeys()) { - index_vec.emplace_back(db_accessor_.LabelName(key.label_)); - index_vec.emplace_back(db_accessor_.PropertyName(key.property_)); + for (const auto &key : dba.GetIndicesKeys()) { + index_vec.emplace_back(dba.LabelName(key.label_)); + index_vec.emplace_back(dba.PropertyName(key.property_)); } encoder.WriteList(index_vec); } - for (const auto &vertex : db_accessor_.Vertices(false)) { + for (const auto &vertex : dba.Vertices(false)) { encoder.WriteVertex(vertex); vertex_num++; } - for (const auto &edge : db_accessor_.Edges(false)) { + for (const auto &edge : dba.Edges(false)) { encoder.WriteEdge(edge); edge_num++; } @@ -110,14 +115,16 @@ fs::path MakeSnapshotPath(const fs::path &durability_dir) { return durability_dir / kSnapshotDir / date_str; } -bool MakeSnapshot(GraphDbAccessor &db_accessor_, const fs::path &durability_dir, +bool MakeSnapshot(GraphDb &db, const fs::path &durability_dir, const int snapshot_max_retained) { if (!EnsureDir(durability_dir / kSnapshotDir)) return false; const auto snapshot_file = MakeSnapshotPath(durability_dir); if (fs::exists(snapshot_file)) return false; - if (Encode(snapshot_file, db_accessor_)) { + GraphDbAccessor dba(db); + if (Encode(snapshot_file, db, dba)) { RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained); - RemoveOldWals(durability_dir / kWalDir, db_accessor_.transaction()); + RemoveOldWals(durability_dir / kWalDir, dba.transaction()); + dba.Commit(); return true; } else { std::error_code error_code; // Just for exception suppression. diff --git a/src/durability/snapshooter.hpp b/src/durability/snapshooter.hpp index 85d168ebe..47b0c4a4f 100644 --- a/src/durability/snapshooter.hpp +++ b/src/durability/snapshooter.hpp @@ -2,7 +2,7 @@ #include -class GraphDbAccessor; +#include "database/graph_db.hpp" namespace durability { using path = std::experimental::filesystem::path; @@ -14,10 +14,10 @@ path MakeSnapshotPath(const path &durability_dir); /** * Make snapshot and save it in snapshots folder. Returns true if successful. - * @param db_accessor- GraphDbAccessor used to access elements of GraphDb. + * @param db - database for which we are creating a snapshot * @param durability_dir - directory where durability data is stored. * @param snapshot_max_retained - maximum number of snapshots to retain. */ -bool MakeSnapshot(GraphDbAccessor &db_accessor, const path &durability_dir, +bool MakeSnapshot(GraphDb &db, const path &durability_dir, int snapshot_max_retained); -} +} // namespace durability diff --git a/src/durability/version.hpp b/src/durability/version.hpp index 5d1cc2169..93498ffed 100644 --- a/src/durability/version.hpp +++ b/src/durability/version.hpp @@ -8,5 +8,5 @@ namespace durability { constexpr std::array kMagicNumber{{'M', 'G', 's', 'n'}}; // The current default version of snapshot and WAL enconding / decoding. -constexpr int64_t kVersion{3}; -} +constexpr int64_t kVersion{4}; +} // namespace durability diff --git a/src/storage/gid.hpp b/src/storage/gid.hpp index 027be632e..b6e457ffb 100644 --- a/src/storage/gid.hpp +++ b/src/storage/gid.hpp @@ -34,8 +34,12 @@ static inline Gid Create(uint64_t worker_id, uint64_t local_id) { } /** - * @brief - Threadsafe generation of new global ids which belong to the - * worker_id machine + * Threadsafe generation of new global ids which belong to the + * worker_id machine. Never call SetId after calling Next without an Id you are + * sure is going to be used for gid, i.e. SetId should only be called before + * first Next call. We want to make sure that every id that we generate is + * larger than the id set by SetId, we can ensure that by not allowing calls to + * SetId after Next which generated new id (incremented internal id counter). */ class GidGenerator { public: @@ -45,15 +49,29 @@ class GidGenerator { * @param local_id - force local id instead of generating a new one */ gid::Gid Next(std::experimental::optional local_id) { - auto id = local_id ? *local_id : id_++; + auto id = local_id ? *local_id : next_local_id_++; if (local_id) { - utils::EnsureAtomicGe(id_, id + 1); + utils::EnsureAtomicGe(next_local_id_, id + 1); + } else { + generated_id_ = true; } return gid::Create(worker_id_, id); } + /// Returns number of locally generated ids + uint64_t LocalCount() const { return next_local_id_; }; + + // Sets a new id from which every new gid will be generated, should only be + // set before first Next is called + void SetId(uint64_t id) { + DCHECK(!generated_id_) + << "Id should be set only before first id is generated"; + next_local_id_ = id; + } + private: + bool generated_id_{false}; int worker_id_; - std::atomic id_{0}; + std::atomic next_local_id_{0}; }; -}; // namespace gid +} // namespace gid diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 67389e7b5..ae74afc6d 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -148,7 +148,7 @@ class DbGenerator { } }; -/** Returns true if the given databases have the same contents (indices, +/** Checks if the given databases have the same contents (indices, * vertices and edges). */ void CompareDbs(GraphDb &a, GraphDb &b) { GraphDbAccessor dba_a(a); @@ -288,10 +288,8 @@ class Durability : public ::testing::Test { } void MakeSnapshot(GraphDb &db, int snapshot_max_retained = -1) { - GraphDbAccessor dba(db); ASSERT_TRUE( - durability::MakeSnapshot(dba, durability_dir_, snapshot_max_retained)); - dba.Commit(); + durability::MakeSnapshot(db, durability_dir_, snapshot_max_retained)); } void SetUp() override { @@ -418,6 +416,12 @@ TEST_F(Durability, SnapshotEncoding) { communication::bolt::DecodedValue dv; decoder.ReadValue(&dv); ASSERT_EQ(dv.ValueInt(), durability::kVersion); + // Number of generated vertex ids. + decoder.ReadValue(&dv); + ASSERT_TRUE(dv.IsInt()); + // Number of generated edge ids. + decoder.ReadValue(&dv); + ASSERT_TRUE(dv.IsInt()); // Transaction ID. decoder.ReadValue(&dv); ASSERT_TRUE(dv.IsInt()); @@ -494,6 +498,73 @@ TEST_F(Durability, SnapshotRecovery) { } } +TEST_F(Durability, SnapshotNoVerticesIdRecovery) { + GraphDb db{DbConfig()}; + MakeDb(db, 10); + + // Erase all vertices, this should cause snapshot to not have any more + // vertices which should make it not change any id after snapshot recovery, + // but we still have to make sure that the id for generators is recovered + { + GraphDbAccessor dba(db); + for (auto vertex : dba.Vertices(false)) dba.RemoveVertex(vertex); + dba.Commit(); + } + + MakeSnapshot(db); + { + auto recovered_config = DbConfig(); + recovered_config.db_recover_on_startup = true; + GraphDb recovered{recovered_config}; + EXPECT_EQ(db.VertexGenerator().LocalCount(), + recovered.VertexGenerator().LocalCount()); + EXPECT_EQ(db.EdgeGenerator().LocalCount(), + recovered.EdgeGenerator().LocalCount()); + } +} + +TEST_F(Durability, SnapshotAndWalIdRecovery) { + auto config = DbConfig(); + config.durability_enabled = true; + GraphDb db{config}; + MakeDb(db, 300); + MakeSnapshot(db); + MakeDb(db, 300); + // Sleep to ensure the WAL gets flushed. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_EQ(DirFiles(snapshot_dir_).size(), 1); + EXPECT_GT(DirFiles(wal_dir_).size(), 1); + { + auto recovered_config = DbConfig(); + recovered_config.db_recover_on_startup = true; + GraphDb recovered{recovered_config}; + EXPECT_EQ(db.VertexGenerator().LocalCount(), + recovered.VertexGenerator().LocalCount()); + EXPECT_EQ(db.EdgeGenerator().LocalCount(), + recovered.EdgeGenerator().LocalCount()); + } +} + +TEST_F(Durability, OnlyWalIdRecovery) { + auto config = DbConfig(); + config.durability_enabled = true; + GraphDb db{config}; + MakeDb(db, 300); + // Sleep to ensure the WAL gets flushed. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0); + EXPECT_GT(DirFiles(wal_dir_).size(), 1); + { + auto recovered_config = DbConfig(); + recovered_config.db_recover_on_startup = true; + GraphDb recovered{recovered_config}; + EXPECT_EQ(db.VertexGenerator().LocalCount(), + recovered.VertexGenerator().LocalCount()); + EXPECT_EQ(db.EdgeGenerator().LocalCount(), + recovered.EdgeGenerator().LocalCount()); + } +} + TEST_F(Durability, WalRecovery) { auto config = DbConfig(); config.durability_enabled = true; diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp index fab4eebd6..181ec9f0f 100644 --- a/tools/src/mg_import_csv/main.cpp +++ b/tools/src/mg_import_csv/main.cpp @@ -342,6 +342,14 @@ void Convert(const std::vector &nodes, encoder.WriteRAW(durability::kMagicNumber.data(), durability::kMagicNumber.size()); encoder.WriteTypedValue(durability::kVersion); + + // The following two entries indicate the starting points for generating new + // Vertex/Edge IDs in the DB. They are only important when there are + // vertices/edges that were moved to another worker (in distributed + // Memgraph), so it's safe to set them to 0 in snapshot generation. + encoder.WriteInt(0); // Internal Id of vertex generator + encoder.WriteInt(0); // Internal Id of edge generator + encoder.WriteInt(0); // Id of transaction that is snapshooting. encoder.WriteList({}); // Transactional snapshot. encoder.WriteList({}); // Label + property indexes. @@ -373,8 +381,8 @@ std::string GetOutputPath() { // If we have the 'out' flag, use that. if (!utils::Trim(FLAGS_out).empty()) return FLAGS_out; // Without the 'out', fall back to reading the memgraph configuration for - // durability_directory. Hopefully, memgraph configuration doesn't contain other - // flags which are defined in this file. + // durability_directory. Hopefully, memgraph configuration doesn't contain + // other flags which are defined in this file. LoadConfig(); // Without durability_directory, we have to require 'out' flag. if (utils::Trim(FLAGS_durability_directory).empty())