From 2a130e784e8b06af352a4a89ca15ca0cb16b5e83 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Thu, 18 Jan 2018 10:22:54 +0100 Subject: [PATCH] Worker id in snapshot/wal Summary: Adds worker id to snapshot and wal filename. Adds a new worker_id flag to be used for recovering a worker with a distributed snapshot. Adds worker_id field to snapshot to check for consistency. Reviewers: florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1096 --- CHANGELOG.md | 1 + src/database/graph_db.cpp | 14 +++++++---- src/database/graph_db.hpp | 1 + src/durability/paths.cpp | 34 ++++++++++++++++---------- src/durability/paths.hpp | 13 ++++++---- src/durability/recovery.cpp | 4 ++++ src/durability/snapshooter.cpp | 13 ++++------ src/durability/snapshooter.hpp | 9 ++----- src/durability/version.hpp | 2 +- src/durability/wal.cpp | 13 +++++----- src/durability/wal.hpp | 9 +++---- tests/unit/durability.cpp | 41 ++++++++++++++++++++++++++++++-- tools/src/mg_import_csv/main.cpp | 7 +++++- 13 files changed, 111 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d4b0f848..84ef5e0ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Next Release +* Snapshot format changed (not backward compatible). ## v0.9.0 diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index a4f7307b2..6552d4351 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -1,6 +1,5 @@ -#include "database/graph_db.hpp" - #include "communication/messaging/distributed.hpp" +#include "database/graph_db.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" #include "durability/paths.hpp" @@ -75,7 +74,8 @@ class SingleNode : public Base { private: Storage storage_{0}; - durability::WriteAheadLog wal_{config_.durability_directory, + durability::WriteAheadLog wal_{config_.worker_id, + config_.durability_directory, config_.durability_enabled}; tx::SingleNodeEngine tx_engine_{&wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; @@ -91,7 +91,8 @@ class Master : public Base { private: communication::messaging::System system_{config_.master_endpoint}; Storage storage_{0}; - durability::WriteAheadLog wal_{config_.durability_directory, + durability::WriteAheadLog wal_{config_.worker_id, + config_.durability_directory, config_.durability_enabled}; tx::MasterEngine tx_engine_{system_, &wal_}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; @@ -113,7 +114,8 @@ class Worker : public Base { tx::WorkerEngine tx_engine_{system_, config_.master_endpoint}; Storage storage_{config_.worker_id}; StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec}; - durability::WriteAheadLog wal_{config_.durability_directory, + durability::WriteAheadLog wal_{config_.worker_id, + config_.durability_directory, config_.durability_enabled}; TypemapPack typemap_pack_{system_, config_.master_endpoint}; @@ -144,6 +146,8 @@ GraphDb::~GraphDb() { if (impl_->config_.snapshot_on_exit) MakeSnapshot(); } +int GraphDb::WorkerId() const { return impl_->config_.worker_id; } + Storage &GraphDb::storage() { return impl_->storage(); } durability::WriteAheadLog &GraphDb::wal() { return impl_->wal(); } diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 2776dd591..122b46d4e 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -76,6 +76,7 @@ class GraphDb { storage::ConcurrentIdMapper &property_mapper(); database::Counters &counters(); void CollectGarbage(); + int WorkerId() const; protected: explicit GraphDb(std::unique_ptr impl); diff --git a/src/durability/paths.cpp b/src/durability/paths.cpp index d297d4611..be5098c70 100644 --- a/src/durability/paths.cpp +++ b/src/durability/paths.cpp @@ -1,3 +1,5 @@ +#include "durability/paths.hpp" + #include #include #include @@ -22,9 +24,9 @@ bool EnsureDir(const fs::path &dir) { void CheckDurabilityDir(const std::string &durability_dir) { namespace fs = std::experimental::filesystem; if (fs::exists(durability_dir)) { - CHECK(fs::is_directory(durability_dir)) << "The durability directory path '" - << durability_dir - << "' is not a directory!"; + CHECK(fs::is_directory(durability_dir)) + << "The durability directory path '" << durability_dir + << "' is not a directory!"; } else { bool success = EnsureDir(durability_dir); CHECK(success) << "Failed to create durability directory '" @@ -42,20 +44,20 @@ std::experimental::optional TransactionIdFromWalFilename( const std::string &name) { auto nullopt = std::experimental::nullopt; // Get the max_transaction_id from the file name that has format - // "XXXXX__max_transaction_" + // "XXXXX__max_transaction__worker_" auto file_name_split = utils::RSplit(name, "__", 1); if (file_name_split.size() != 2) { LOG(WARNING) << "Unable to parse WAL file name: " << name; return nullopt; } - if (file_name_split[1] == "current") + if (utils::StartsWith(file_name_split[1], "current")) return std::numeric_limits::max(); - file_name_split = utils::RSplit(file_name_split[1], "_", 1); - if (file_name_split.size() != 2) { + file_name_split = utils::Split(file_name_split[1], "_"); + if (file_name_split.size() != 5) { LOG(WARNING) << "Unable to parse WAL file name: " << name; return nullopt; } - auto &tx_id_str = file_name_split[1]; + auto &tx_id_str = file_name_split[2]; try { return std::stoll(tx_id_str); } catch (std::invalid_argument &) { @@ -67,19 +69,27 @@ std::experimental::optional TransactionIdFromWalFilename( } } +fs::path MakeSnapshotPath(const fs::path &durability_dir, const int worker_id) { + std::string date_str = + Timestamp(Timestamp::now()) + .to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}"); + auto file_name = date_str + "_worker_" + std::to_string(worker_id); + return durability_dir / kSnapshotDir / file_name; +} + /// Generates a file path for a write-ahead log file. If given a transaction ID /// the file name will contain it. Otherwise the file path is for the "current" /// WAL file for which the max tx id is still unknown. fs::path WalFilenameForTransactionId( - const std::experimental::filesystem::path &wal_dir, - std::experimental::optional tx_id = - std::experimental::nullopt) { + const std::experimental::filesystem::path &wal_dir, int worker_id, + std::experimental::optional tx_id) { auto file_name = Timestamp::now().to_iso8601(); if (tx_id) { file_name += "__max_transaction_" + std::to_string(*tx_id); } else { file_name += "__current"; } + file_name = file_name + "_Worker_" + std::to_string(worker_id); return wal_dir / file_name; } -} // namespace durability +} // namespace durability diff --git a/src/durability/paths.hpp b/src/durability/paths.hpp index 07796ed99..6790d84c4 100644 --- a/src/durability/paths.hpp +++ b/src/durability/paths.hpp @@ -25,11 +25,16 @@ void CheckDurabilityDir(const std::string &durability_dir); std::experimental::optional TransactionIdFromWalFilename( const std::string &name); -/// Generates a file path for a write-ahead log file. If given a transaction ID -/// the file name will contain it. Otherwise the file path is for the "current" -/// WAL file for which the max tx id is still unknown. +/** Generates a path for a DB snapshot in the given folder in a well-defined + * sortable format with worker id appended to the file name. */ +std::experimental::filesystem::path MakeSnapshotPath( + const std::experimental::filesystem::path &durability_dir, int worker_id); + +/// Generates a file path for a write-ahead log file of a specified worker. If +/// given a transaction ID the file name will contain it. Otherwise the file +/// path is for the "current" WAL file for which the max tx id is still unknown. std::experimental::filesystem::path WalFilenameForTransactionId( - const std::experimental::filesystem::path &wal_dir, + const std::experimental::filesystem::path &wal_dir, int worker_id, std::experimental::optional tx_id = std::experimental::nullopt); } // namespace durability diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index f534ab24a..6951d08a1 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -76,6 +76,10 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) && dv.ValueInt() == durability::kVersion); + // Checks worker id was set correctly + RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) && + dv.ValueInt() == db.WorkerId()); + // Vertex and edge generator ids RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int)); uint64_t vertex_generator_cnt = dv.ValueInt(); diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index 15d1ddb55..8940cff64 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -27,6 +27,10 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, durability::kMagicNumber.size()); encoder.WriteInt(durability::kVersion); + // Writes the worker id to snapshot, used to guarantee consistent cluster + // state after recovery + encoder.WriteInt(db.WorkerId()); + // Write the number of generated vertex and edges, used to recover // generators internal states encoder.WriteInt(db.storage().VertexGenerator().LocalCount()); @@ -109,17 +113,10 @@ void RemoveOldWals(const fs::path &wal_dir, } } // namespace -fs::path MakeSnapshotPath(const fs::path &durability_dir) { - std::string date_str = - Timestamp(Timestamp::now()) - .to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}"); - return durability_dir / kSnapshotDir / date_str; -} - bool MakeSnapshot(database::GraphDb &db, const fs::path &durability_dir, const int snapshot_max_retained) { if (!EnsureDir(durability_dir / kSnapshotDir)) return false; - const auto snapshot_file = MakeSnapshotPath(durability_dir); + const auto snapshot_file = MakeSnapshotPath(durability_dir, db.WorkerId()); if (fs::exists(snapshot_file)) return false; database::GraphDbAccessor dba(db); if (Encode(snapshot_file, db, dba)) { diff --git a/src/durability/snapshooter.hpp b/src/durability/snapshooter.hpp index 33e52a171..09f5aef7a 100644 --- a/src/durability/snapshooter.hpp +++ b/src/durability/snapshooter.hpp @@ -5,12 +5,6 @@ #include "database/graph_db.hpp" namespace durability { -using path = std::experimental::filesystem::path; - -/** Generates a path for a DB snapshot in the given folder in a well-defined - * sortable format. */ -// TODO review - move to paths.hpp? -path MakeSnapshotPath(const path &durability_dir); /** * Make snapshot and save it in snapshots folder. Returns true if successful. @@ -18,6 +12,7 @@ path MakeSnapshotPath(const path &durability_dir); * @param durability_dir - directory where durability data is stored. * @param snapshot_max_retained - maximum number of snapshots to retain. */ -bool MakeSnapshot(database::GraphDb &db, const path &durability_dir, +bool MakeSnapshot(database::GraphDb &db, + const std::experimental::filesystem::path &durability_dir, int snapshot_max_retained); } // namespace durability diff --git a/src/durability/version.hpp b/src/durability/version.hpp index 93498ffed..9e0ecddae 100644 --- a/src/durability/version.hpp +++ b/src/durability/version.hpp @@ -8,5 +8,5 @@ namespace durability { constexpr std::array kMagicNumber{{'M', 'G', 's', 'n'}}; // The current default version of snapshot and WAL enconding / decoding. -constexpr int64_t kVersion{4}; +constexpr int64_t kVersion{5}; } // namespace durability diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index edc2de374..4c7b67e7e 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -20,9 +20,9 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096, namespace durability { WriteAheadLog::WriteAheadLog( - const std::experimental::filesystem::path &durability_dir, + int worker_id, const std::experimental::filesystem::path &durability_dir, bool durability_enabled) - : deltas_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} { + : deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} { if (durability_enabled) { CheckDurabilityDir(durability_dir); wal_file_.Init(); @@ -38,8 +38,8 @@ WriteAheadLog::~WriteAheadLog() { } WriteAheadLog::WalFile::WalFile( - const std::experimental::filesystem::path &durability_dir) - : wal_dir_{durability_dir / kWalDir} {} + int worker_id, const std::experimental::filesystem::path &durability_dir) + : worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {} WriteAheadLog::WalFile::~WalFile() { if (!current_wal_file_.empty()) writer_.Close(); @@ -50,7 +50,7 @@ void WriteAheadLog::WalFile::Init() { LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_; current_wal_file_ = std::experimental::filesystem::path(); } else { - current_wal_file_ = WalFilenameForTransactionId(wal_dir_); + current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_); try { writer_.Open(current_wal_file_); } catch (std::ios_base::failure &) { @@ -93,7 +93,8 @@ void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { void WriteAheadLog::WalFile::RotateFile() { writer_.Close(); std::experimental::filesystem::rename( - current_wal_file_, WalFilenameForTransactionId(wal_dir_, latest_tx_)); + current_wal_file_, + WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_)); Init(); } diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp index 356b5e82b..7b5900823 100644 --- a/src/durability/wal.hpp +++ b/src/durability/wal.hpp @@ -29,12 +29,12 @@ namespace durability { */ class WriteAheadLog { public: - WriteAheadLog(const std::experimental::filesystem::path &durability_dir, + WriteAheadLog(int worker_id, + const std::experimental::filesystem::path &durability_dir, bool durability_enabled); ~WriteAheadLog(); - /** Enables the WAL. Called at the end of database::GraphDb construction, - * after + /** Enables the WAL. Called at the end of GraphDb construction, after * (optional) recovery. */ void Enable() { enabled_ = true; } @@ -45,7 +45,7 @@ class WriteAheadLog { /** Groups the logic of WAL file handling (flushing, naming, rotating) */ class WalFile { public: - explicit WalFile(const std::experimental::filesystem::path &wal__dir); + WalFile(int worker_id, const std::experimental::filesystem::path &wal__dir); ~WalFile(); /** Initializes the WAL file. Must be called before first flush. Can be @@ -57,6 +57,7 @@ class WriteAheadLog { void Flush(RingBuffer &buffer); private: + int worker_id_; const std::experimental::filesystem::path wal_dir_; HashedFileWriter writer_; communication::bolt::PrimitiveEncoder encoder_{writer_}; diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 460b40cc1..98be8afe5 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -422,6 +422,9 @@ TEST_F(Durability, SnapshotEncoding) { communication::bolt::DecodedValue dv; decoder.ReadValue(&dv); ASSERT_EQ(dv.ValueInt(), durability::kVersion); + // Worker id + decoder.ReadValue(&dv); + ASSERT_EQ(dv.ValueInt(), 0); // Number of generated vertex ids. decoder.ReadValue(&dv); ASSERT_TRUE(dv.IsInt()); @@ -478,8 +481,8 @@ TEST_F(Durability, SnapshotEncoding) { EXPECT_EQ(decoded_edges[gid1].type, "et1"); EXPECT_EQ(decoded_edges[gid1].properties.size(), 0); - // Vertex and edge counts are included in the hash. Re-read them to update the - // hash. + // Vertex and edge counts are included in the hash. Re-read them to update + // the hash. buffer.ReadType(vertex_count); buffer.ReadType(edge_count); buffer.Close(); @@ -736,3 +739,37 @@ TEST_F(Durability, SnapshotOnExit) { } EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1); } + +TEST_F(Durability, WorkerIdRecovery) { + auto config = DbConfig(); + config.worker_id = 5; + database::SingleNode db{config}; + MakeDb(db, 100); + MakeSnapshot(db); + EXPECT_EQ(DirFiles(snapshot_dir_).size(), 1); + + // WorkerIds are equal and recovery should be sucessful + { + auto config = DbConfig(); + config.worker_id = 5; + config.db_recover_on_startup = true; + database::SingleNode recovered{config}; + EXPECT_EQ(recovered.WorkerId(), config.worker_id); + CompareDbs(db, recovered); + database::GraphDbAccessor dba(recovered); + EXPECT_NE(dba.VerticesCount(), 0); + EXPECT_NE(dba.EdgesCount(), 0); + } + + // WorkerIds are not equal and recovery should fail + { + auto config = DbConfig(); + config.worker_id = 10; + config.db_recover_on_startup = true; + database::SingleNode recovered{config}; + EXPECT_NE(recovered.WorkerId(), db.WorkerId()); + database::GraphDbAccessor dba(recovered); + EXPECT_EQ(dba.VerticesCount(), 0); + EXPECT_EQ(dba.EdgesCount(), 0); + } +} diff --git a/tools/src/mg_import_csv/main.cpp b/tools/src/mg_import_csv/main.cpp index 5a1c0aba2..e703297e7 100644 --- a/tools/src/mg_import_csv/main.cpp +++ b/tools/src/mg_import_csv/main.cpp @@ -11,6 +11,7 @@ #include "communication/bolt/v1/encoder/base_encoder.hpp" #include "config.hpp" #include "durability/hashed_file_writer.hpp" +#include "durability/paths.hpp" #include "durability/snapshooter.hpp" #include "durability/version.hpp" #include "utils/string.hpp" @@ -361,6 +362,9 @@ void Convert(const std::vector &nodes, durability::kMagicNumber.size()); encoder.WriteTypedValue(durability::kVersion); + encoder.WriteInt(0); // Worker Id - for this use case it's okay to set to 0 + // since we are using a single-node version of + // memgraph here // The following two entries indicate the starting points for generating new // Vertex/Edge IDs in the DB. They are only important when there are // vertices/edges that were moved to another worker (in distributed @@ -417,7 +421,8 @@ std::string GetOutputPath() { } catch (const std::experimental::filesystem::filesystem_error &error) { LOG(FATAL) << error.what(); } - return std::string(durability::MakeSnapshotPath(durability_dir)); + int worker_id = 0; + return std::string(durability::MakeSnapshotPath(durability_dir, worker_id)); } int main(int argc, char *argv[]) {