diff --git a/src/durability/hashed_file_reader.hpp b/src/durability/hashed_file_reader.hpp index 5d84f7326..3e1daa3f1 100644 --- a/src/durability/hashed_file_reader.hpp +++ b/src/durability/hashed_file_reader.hpp @@ -66,6 +66,9 @@ class HashedFileReader { /** Returns the hash of the data read so far from the stream. */ uint64_t hash() const { return hasher_.hash(); } + /** Checks whether the end of file is reached. */ + bool EndOfFile() const { return input_stream_.eof(); } + private: Hasher hasher_; std::ifstream input_stream_; diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 697590635..e3e48cf25 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -37,32 +37,27 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count, } bool VersionConsistency(const fs::path &durability_dir) { - const auto snapshot_dir = durability_dir / kSnapshotDir; - if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir)) { - for (auto &file : fs::directory_iterator(snapshot_dir)) { + for (const auto &durability_type : {kSnapshotDir, kWalDir}) { + auto recovery_dir = durability_dir / durability_type; + if (!fs::exists(recovery_dir) || !fs::is_directory(recovery_dir)) continue; + + for (const auto &file : fs::directory_iterator(recovery_dir)) { HashedFileReader reader; SnapshotDecoder decoder(reader); - // This is ok because we are only trying to detect version - // inconsistencies. + // The following checks are ok because we are only trying to detect + // version inconsistencies. if (!reader.Open(fs::path(file))) continue; - auto magic_number = durability::kMagicNumber; + std::array target_magic_number = + (durability_type == kSnapshotDir) ? durability::kSnapshotMagic + : durability::kWalMagic; + std::array magic_number; if (!reader.Read(magic_number.data(), magic_number.size())) continue; + if (magic_number != target_magic_number) continue; - Value dv; - if (!decoder.ReadValue(&dv, Value::Type::Int) || - dv.ValueInt() != durability::kVersion) - return false; - } - } + if (reader.EndOfFile()) continue; - const auto wal_dir = durability_dir / kWalDir; - if (fs::exists(snapshot_dir) && fs::is_directory(wal_dir)) { - for (auto &file : fs::directory_iterator(wal_dir)) { - HashedFileReader reader; - communication::bolt::Decoder decoder(reader); - if (!reader.Open(fs::path(file))) continue; Value dv; if (!decoder.ReadValue(&dv, Value::Type::Int) || dv.ValueInt() != durability::kVersion) @@ -74,25 +69,25 @@ bool VersionConsistency(const fs::path &durability_dir) { } bool ContainsDurabilityFiles(const fs::path &durability_dir) { - for (auto &durability_type : {kSnapshotDir, kWalDir}) { - const auto dtype_dir = durability_dir / durability_type; - if (fs::exists(dtype_dir) && fs::is_directory(dtype_dir) && - !fs::is_empty(dtype_dir)) + for (const auto &durability_type : {kSnapshotDir, kWalDir}) { + auto recovery_dir = durability_dir / durability_type; + if (fs::exists(recovery_dir) && fs::is_directory(recovery_dir) && + !fs::is_empty(recovery_dir)) return true; } return false; } void MoveToBackup(const fs::path &durability_dir) { - const auto backup_dir = durability_dir / kBackupDir; + auto backup_dir = durability_dir / kBackupDir; utils::CheckDir(backup_dir); utils::CheckDir(backup_dir / kSnapshotDir); utils::CheckDir(backup_dir / kWalDir); - for (auto &durability_type : {kSnapshotDir, kWalDir}) { - const auto dtype_dir = durability_dir / durability_type; - if (!fs::exists(dtype_dir) || !fs::is_directory(dtype_dir)) continue; - for (auto &file : fs::directory_iterator(dtype_dir)) { - const auto filename = fs::path(file).filename(); + for (const auto &durability_type : {kSnapshotDir, kWalDir}) { + auto recovery_dir = durability_dir / durability_type; + if (!fs::exists(recovery_dir) || !fs::is_directory(recovery_dir)) continue; + for (const auto &file : fs::directory_iterator(recovery_dir)) { + auto filename = fs::path(file).filename(); fs::rename(file, backup_dir / durability_type / filename); } } @@ -114,9 +109,9 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db, RETURN_IF_NOT(reader.Open(snapshot_file)); - auto magic_number = durability::kMagicNumber; + auto magic_number = durability::kSnapshotMagic; reader.Read(magic_number.data(), magic_number.size()); - RETURN_IF_NOT(magic_number == durability::kMagicNumber); + RETURN_IF_NOT(magic_number == durability::kSnapshotMagic); // Read the vertex and edge count, and the hash, from the end of the snapshot. int64_t vertex_count; @@ -323,6 +318,10 @@ bool ApplyOverDeltas( communication::bolt::Decoder decoder(wal_reader); + auto magic_number = durability::kWalMagic; + wal_reader.Read(magic_number.data(), magic_number.size()); + if (magic_number != durability::kWalMagic) return false; + Value dv; if (!decoder.ReadValue(&dv, Value::Type::Int) || dv.ValueInt() != durability::kVersion) diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index 5f1240d5d..fd3e2bc38 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -27,8 +27,8 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, SnapshotEncoder encoder(buffer); int64_t vertex_num = 0, edge_num = 0; - encoder.WriteRAW(durability::kMagicNumber.data(), - durability::kMagicNumber.size()); + encoder.WriteRAW(durability::kSnapshotMagic.data(), + durability::kSnapshotMagic.size()); encoder.WriteInt(durability::kVersion); // Writes the worker id to snapshot, used to guarantee consistent cluster diff --git a/src/durability/version.hpp b/src/durability/version.hpp index 0385695bf..549ffc212 100644 --- a/src/durability/version.hpp +++ b/src/durability/version.hpp @@ -11,7 +11,8 @@ namespace durability { -constexpr std::array kMagicNumber{{'M', 'G', 's', 'n'}}; +constexpr std::array kSnapshotMagic{{'M', 'G', 's', 'n'}}; +constexpr std::array kWalMagic{{'M', 'G', 'w', 'l'}}; // The current default version of snapshot and WAL encoding / decoding. constexpr int64_t kVersion{6}; diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index a91be806c..afa520934 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -54,6 +54,8 @@ void WriteAheadLog::WalFile::Init() { current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_); try { writer_.Open(current_wal_file_); + encoder_.WriteRAW(durability::kWalMagic.data(), + durability::kWalMagic.size()); encoder_.WriteInt(durability::kVersion); } catch (std::ios_base::failure &) { LOG(ERROR) << "Failed to open write-ahead log file: " diff --git a/tests/manual/snapshot_explorer.cpp b/tests/manual/snapshot_explorer.cpp index 085296813..16ae63fa5 100644 --- a/tests/manual/snapshot_explorer.cpp +++ b/tests/manual/snapshot_explorer.cpp @@ -32,9 +32,9 @@ int main(int argc, char *argv[]) { CHECK(reader.Open(snapshot_path)) << "Couldn't open snapshot file!"; - auto magic_number = durability::kMagicNumber; + auto magic_number = durability::kSnapshotMagic; reader.Read(magic_number.data(), magic_number.size()); - CHECK(magic_number == durability::kMagicNumber) << "Magic number mismatch"; + CHECK(magic_number == durability::kSnapshotMagic) << "Magic number mismatch"; int64_t vertex_count, edge_count; uint64_t hash; diff --git a/tests/manual/snapshot_generation/snapshot_writer.hpp b/tests/manual/snapshot_generation/snapshot_writer.hpp index 790a9d7d4..d253faf6b 100644 --- a/tests/manual/snapshot_generation/snapshot_writer.hpp +++ b/tests/manual/snapshot_generation/snapshot_writer.hpp @@ -25,8 +25,8 @@ class SnapshotWriter { uint64_t vertex_generator_local_count = 0, uint64_t edge_generator_local_count = 0) : worker_id_(worker_id), buffer_(path) { - encoder_.WriteRAW(durability::kMagicNumber.data(), - durability::kMagicNumber.size()); + encoder_.WriteRAW(durability::kSnapshotMagic.data(), + durability::kSnapshotMagic.size()); encoder_.WriteValue(durability::kVersion); encoder_.WriteInt(worker_id_); encoder_.WriteInt(vertex_generator_local_count); diff --git a/tests/manual/wal_explorer.cpp b/tests/manual/wal_explorer.cpp index 1fdd8dac4..3caee6564 100644 --- a/tests/manual/wal_explorer.cpp +++ b/tests/manual/wal_explorer.cpp @@ -8,6 +8,7 @@ #include "database/state_delta.hpp" #include "durability/hashed_file_reader.hpp" #include "durability/recovery.hpp" +#include "durability/version.hpp" #include "durability/wal.hpp" #include "transactions/type.hpp" @@ -64,6 +65,15 @@ int main(int argc, char *argv[]) { CHECK(wal_reader.Open(wal_path)) << "Couldn't open wal file!"; communication::bolt::Decoder decoder(wal_reader); + + auto magic_number = durability::kWalMagic; + wal_reader.Read(magic_number.data(), magic_number.size()); + CHECK(magic_number == durability::kWalMagic) << "Wal magic number mismatch"; + + communication::bolt::Value dv; + decoder.ReadValue(&dv); + CHECK(dv.ValueInt() == durability::kVersion) << "Wal version mismatch"; + tx::TransactionId max_observed_tx_id{0}; tx::TransactionId min_observed_tx_id{std::numeric_limits::max()}; diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp index e1333a9da..a554ebe36 100644 --- a/tests/unit/distributed_durability.cpp +++ b/tests/unit/distributed_durability.cpp @@ -157,6 +157,11 @@ void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) { communication::bolt::Decoder decoder{reader}; std::vector deltas; + // check magic number + auto magic_number = durability::kWalMagic; + reader.Read(magic_number.data(), magic_number.size()); + ASSERT_EQ(magic_number, durability::kWalMagic); + // check version communication::bolt::Value dv; decoder.ReadValue(&dv); diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 9f0120f48..fd6ea0248 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -365,6 +365,11 @@ TEST_F(Durability, WalEncoding) { communication::bolt::Decoder decoder{reader}; std::vector deltas; + // check magic number + auto magic_number = durability::kWalMagic; + reader.Read(magic_number.data(), magic_number.size()); + ASSERT_EQ(magic_number, durability::kWalMagic); + // check version communication::bolt::Value dv; decoder.ReadValue(&dv); @@ -448,9 +453,9 @@ TEST_F(Durability, SnapshotEncoding) { ASSERT_EQ(vertex_count, 3); ASSERT_EQ(edge_count, 2); - auto magic_number = durability::kMagicNumber; + auto magic_number = durability::kSnapshotMagic; buffer.Read(magic_number.data(), magic_number.size()); - ASSERT_EQ(magic_number, durability::kMagicNumber); + ASSERT_EQ(magic_number, durability::kSnapshotMagic); communication::bolt::Value dv; decoder.ReadValue(&dv); diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp index 972aa2f64..05b6db78b 100644 --- a/tools/src/mg_import_csv/main.cpp +++ b/tools/src/mg_import_csv/main.cpp @@ -409,8 +409,8 @@ void Convert(const std::vector &nodes, // 5) All nodes, sequentially, but not encoded as a list. // 6) All relationships, sequentially, but not encoded as a list. // 7) Summary with node count, relationship count and hash digest. - encoder.WriteRAW(durability::kMagicNumber.data(), - durability::kMagicNumber.size()); + encoder.WriteRAW(durability::kSnapshotMagic.data(), + durability::kSnapshotMagic.size()); encoder.WriteValue(durability::kVersion); encoder.WriteInt(0); // Worker Id - for this use case it's okay to set to 0