From d106aff88fed0c1ecc2ba3b978cc4a41797b5031 Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Fri, 24 Aug 2018 10:43:27 +0200 Subject: [PATCH] Implement full durability mode Summary: This diff introduces a new flags * `--synchronous-commit` The `--synchronous-commit` tells the WAL when should the deltas be flushed to the disk drive. By default this is off and the WAL flushes deltas every `N` milliseconds. If it's turned on, on every transaction end, commit or abort, the WAL will first flush the deltas and only after that will return from ending a transaction. Reviewers: buda, vkasljevic, mferencevic, teon.banek, ipaljak Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1542 --- config/community.conf | 12 ++++- docs/dev/durability/wal.md | 14 ++++-- src/database/config.cpp | 6 +++ src/database/distributed_graph_db.cpp | 6 ++- src/database/graph_db.cpp | 6 +-- src/database/graph_db.hpp | 1 + src/durability/wal.cpp | 68 ++++++++++++++++++++------- src/durability/wal.hpp | 64 +++++++++++++------------ tests/unit/distributed_durability.cpp | 56 +++++++++++++++------- tests/unit/durability.cpp | 58 ++++++++++++++--------- 10 files changed, 194 insertions(+), 97 deletions(-) diff --git a/config/community.conf b/config/community.conf index 75ccb480b..e0b6fc7ff 100644 --- a/config/community.conf +++ b/config/community.conf @@ -72,8 +72,7 @@ # are used: snapshots store the total current database state while write-ahead # logs store small changes incrementally. They are used in tandem to provide # fast and storage-efficient persistence. Some aspects of snapshot taking -# are configurable, while write-ahead logging is pre-configured for optimal -# performance. +# and write-ahead logging are configurable. --durability-enabled=true # Path to the directory where snapshots and write-ahead log files will be stored. @@ -93,6 +92,15 @@ # for new ones. If set to -1, the number of kept snapshots is unlimited. --snapshot-max-retained=3 +# Specifies whether WAL updates should be written on disk immediately after a +# transaction finishes. Setting this parameter to false does introduce risk of +# database inconsistency because an operating system or hardware crash might +# lead to missing transactions in the write-ahead log, but the database will +# handle this as if those transactions never happened. Turning +# synchronous-commit off can be a useful trade-off between exact durability and +# performance. +--synchronous-commit=false + ## Logging # Path to where the log should be stored. diff --git a/docs/dev/durability/wal.md b/docs/dev/durability/wal.md index ca1ceeef3..a64603f52 100644 --- a/docs/dev/durability/wal.md +++ b/docs/dev/durability/wal.md @@ -16,9 +16,17 @@ characteristics. Ensuring that the log is written before the transaction is committed can slow down the database. For that reason this guarantee is most often -configurable in databases. In Memgraph it is at the moment not -guaranteed, nor configurable. The WAL is flushed to the disk -periodically and transactions do not wait for this to complete. +configurable in databases. + +Memgraph offers two options for the WAL. The default option, where the WAL is +flushed to the disk periodically and transactions do not wait for this to +complete, introduces the risk of database inconsistency because an operating +system or hardware crash might lead to missing transactions in the WAL. Memgraph +will handle this as if those transactions never happened. The second option, +called synchronous commit, will instruct Memgraph to wait for the WAL to be +flushed to the disk when a transactions completes and the transaction will wait +for this to complete. This option can be turned on with the +`--synchronous-commit` command line flag. ### Format diff --git a/src/database/config.cpp b/src/database/config.cpp index 27353e2a6..b5a982c7d 100644 --- a/src/database/config.cpp +++ b/src/database/config.cpp @@ -32,6 +32,11 @@ DEFINE_string(properties_on_disk, "", "Property names of properties which will be stored on available " "disk. Property names have to be separated with comma (,)."); +// Full durability. +DEFINE_bool(synchronous_commit, false, + "Should a transaction end wait for WAL records to be written to " + "disk before the transaction finishes."); + #ifndef MG_COMMUNITY // Distributed master/worker flags. DEFINE_VALIDATED_HIDDEN_int32(worker_id, 0, @@ -76,6 +81,7 @@ database::Config::Config() snapshot_cycle_sec{FLAGS_snapshot_cycle_sec}, snapshot_max_retained{FLAGS_snapshot_max_retained}, snapshot_on_exit{FLAGS_snapshot_on_exit}, + synchronous_commit{FLAGS_synchronous_commit}, // Misc flags. gc_cycle_sec{FLAGS_gc_cycle_sec}, query_execution_time_sec{FLAGS_query_execution_time_sec}, diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 432384605..354b60a0c 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -537,7 +537,8 @@ class Master { std::make_unique(config_.worker_id, config_.properties_on_disk); durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, - config_.durability_enabled}; + config_.durability_enabled, + config_.synchronous_commit}; // Shared implementations for all RecordAccessor in this Db. DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_, &updates_clients_}; @@ -837,7 +838,8 @@ class Worker { std::make_unique(config_.worker_id, config_.properties_on_disk); durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, - config_.durability_enabled}; + config_.durability_enabled, + config_.synchronous_commit}; // Shared implementations for all RecordAccessor in this Db. DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_, &updates_clients_}; diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 12545d70f..2a32f1f58 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -180,9 +180,9 @@ class SingleNode { Config config_; std::unique_ptr storage_ = std::make_unique(config_.worker_id, config_.properties_on_disk); - durability::WriteAheadLog wal_{config_.worker_id, - config_.durability_directory, - config_.durability_enabled}; + durability::WriteAheadLog wal_{ + config_.worker_id, config_.durability_directory, + config_.durability_enabled, config_.synchronous_commit}; tx::SingleNodeEngine tx_engine_{&wal_}; std::unique_ptr storage_gc_ = diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index cdf912160..97c007bc7 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -28,6 +28,7 @@ struct Config { int snapshot_cycle_sec; int snapshot_max_retained; int snapshot_on_exit; + bool synchronous_commit; // Misc flags. int gc_cycle_sec; diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index 29b508bac..a91be806c 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -19,19 +19,21 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096, FLAG_IN_RANGE(1, 1 << 30)); namespace durability { - WriteAheadLog::WriteAheadLog( int worker_id, const std::experimental::filesystem::path &durability_dir, - bool durability_enabled) - : deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} { - if (durability_enabled) { + bool durability_enabled, bool synchronous_commit) + : deltas_{FLAGS_wal_buffer_size}, + wal_file_{worker_id, durability_dir}, + durability_enabled_(durability_enabled), + synchronous_commit_(synchronous_commit) { + if (durability_enabled_) { utils::CheckDir(durability_dir); } } WriteAheadLog::~WriteAheadLog() { - if (enabled_) { - scheduler_.Stop(); + if (durability_enabled_) { + if (!synchronous_commit_) scheduler_.Stop(); wal_file_.Flush(deltas_); } } @@ -101,23 +103,53 @@ void WriteAheadLog::WalFile::RotateFile() { } void WriteAheadLog::Init() { - enabled_ = true; - wal_file_.Init(); - scheduler_.Run("WAL", - std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), - [this]() { wal_file_.Flush(deltas_); }); -} - -void WriteAheadLog::Emplace(database::StateDelta &&delta) { - if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) - deltas_.emplace(std::move(delta)); + if (durability_enabled_) { + enabled_ = true; + wal_file_.Init(); + if (!synchronous_commit_) { + scheduler_.Run("WAL", + std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), + [this]() { wal_file_.Flush(deltas_); }); + } + } } void WriteAheadLog::Emplace(const database::StateDelta &delta) { - if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) deltas_.emplace(delta); + if (durability_enabled_ && enabled_) { + deltas_.emplace(delta); + if (synchronous_commit_ && IsStateDeltaTransactionEnd(delta)) { + wal_file_.Flush(deltas_); + } + } +} + +bool WriteAheadLog::IsStateDeltaTransactionEnd( + const database::StateDelta &delta) { + switch (delta.type) { + case database::StateDelta::Type::TRANSACTION_COMMIT: + case database::StateDelta::Type::TRANSACTION_ABORT: + return true; + case database::StateDelta::Type::TRANSACTION_BEGIN: + case database::StateDelta::Type::CREATE_VERTEX: + case database::StateDelta::Type::CREATE_EDGE: + case database::StateDelta::Type::ADD_OUT_EDGE: + case database::StateDelta::Type::REMOVE_OUT_EDGE: + case database::StateDelta::Type::ADD_IN_EDGE: + case database::StateDelta::Type::REMOVE_IN_EDGE: + case database::StateDelta::Type::SET_PROPERTY_VERTEX: + case database::StateDelta::Type::SET_PROPERTY_EDGE: + case database::StateDelta::Type::ADD_LABEL: + case database::StateDelta::Type::REMOVE_LABEL: + case database::StateDelta::Type::REMOVE_VERTEX: + case database::StateDelta::Type::REMOVE_EDGE: + case database::StateDelta::Type::BUILD_INDEX: + return false; + } } void WriteAheadLog::Flush() { - if (enabled_) wal_file_.Flush(deltas_); + if (enabled_) { + wal_file_.Flush(deltas_); + } } } // namespace durability diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp index f4f4006cc..726b7f810 100644 --- a/src/durability/wal.hpp +++ b/src/durability/wal.hpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -19,67 +18,65 @@ namespace durability { -/** A database StateDelta log for durability. Buffers and periodically - * serializes small-granulation database deltas (StateDelta). - * - * The order is not deterministic in a multithreaded scenario (multiple DB - * transactions). This is fine, the recovery process should be immune to this - * indeterminism. - */ +/// A database StateDelta log for durability. Buffers and periodically +/// serializes small-granulation database deltas (StateDelta). +/// +/// The order is not deterministic in a multithreaded scenario (multiple DB +/// transactions). This is fine, the recovery process should be immune to this +/// indeterminism. class WriteAheadLog { public: WriteAheadLog(int worker_id, const std::experimental::filesystem::path &durability_dir, - bool durability_enabled); + bool durability_enabled, bool synchronous_commit); ~WriteAheadLog(); - /** Initializes the WAL. Called at the end of GraphDb construction, after - * (optional) recovery. Also responsible for initializing the wal_file. - */ + /// Initializes the WAL. Called at the end of GraphDb construction, after + /// (optional) recovery. Also responsible for initializing the wal_file. void Init(); /// Emplaces the given DeltaState onto the buffer, if the WAL is enabled. - void Emplace(database::StateDelta &&delta); - - /// Emplaces the given DeltaState onto the buffer, if the WAL is enabled. + /// If the WAL is configured to work in synchronous commit mode, emplace will + /// flush the buffers if a delta represents a transaction end. void Emplace(const database::StateDelta &delta); - /// Flushes every delta currently in the ring buffer + /// Flushes every delta currently in the ring buffer. + /// This method should only be called from tests. void Flush(); private: - /** Groups the logic of WAL file handling (flushing, naming, rotating) */ + /// Groups the logic of WAL file handling (flushing, naming, rotating) class WalFile { public: WalFile(int worker_id, const std::experimental::filesystem::path &wal__dir); ~WalFile(); - /** Initializes the WAL file. Must be called before first flush. Can be - * called after Flush() to re-initialize stuff. */ + /// Initializes the WAL file. Must be called before first flush. Can be + /// called after Flush() to re-initialize stuff. void Init(); - /** Flushes all the deltas in the buffer to the WAL file. If necessary - * rotates the file. */ + /// Flushes all the deltas in the buffer to the WAL file. If necessary + /// rotates the file. void Flush(RingBuffer &buffer); private: - // Mutex used for flushing wal data + /// Mutex used for flushing wal data std::mutex flush_mutex_; int worker_id_; const std::experimental::filesystem::path wal_dir_; HashedFileWriter writer_; communication::bolt::BaseEncoder encoder_{writer_}; - // The file to which the WAL flushes data. The path is fixed, the file gets - // moved when the WAL gets rotated. + /// The file to which the WAL flushes data. The path is fixed, the file gets + /// moved when the WAL gets rotated. std::experimental::filesystem::path current_wal_file_; - // Number of deltas in the current wal file. + /// Number of deltas in the current wal file. int current_wal_file_delta_count_{0}; - // The latest transaction whose delta is recorded in the current WAL file. - // Zero indicates that no deltas have so far been written to the current WAL - // file. + /// The latest transaction whose delta is recorded in the current WAL file. + /// Zero indicates that no deltas have so far been written to the current + /// WAL file. tx::TransactionId latest_tx_{0}; void RotateFile(); @@ -88,7 +85,16 @@ class WriteAheadLog { RingBuffer deltas_; utils::Scheduler scheduler_; WalFile wal_file_; - // Used for disabling the WAL during DB recovery. + + /// Used for disabling the durability feature of the DB. + bool durability_enabled_{false}; + /// Used for disabling the WAL during DB recovery. bool enabled_{false}; + /// Should every WAL write be synced with the underlying storage. + bool synchronous_commit_{false}; + + /// Checks whether the given state delta represents a transaction end, + /// TRANSACTION_COMMIT and TRANSACTION_ABORT. + bool IsStateDeltaTransactionEnd(const database::StateDelta &delta); }; } // namespace durability diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp index 334b597ec..e1333a9da 100644 --- a/tests/unit/distributed_durability.cpp +++ b/tests/unit/distributed_durability.cpp @@ -31,16 +31,16 @@ class DistributedDurability : public DistributedGraphDbTest { }); } - void RestartWithWal() { + void RestartWithWal(bool synchronous_commit) { DistributedGraphDbTest::ShutDown(); - Initialize([](database::Config config) { + Initialize([synchronous_commit](database::Config config) { config.durability_enabled = true; + config.synchronous_commit = synchronous_commit; return config; }); } void FlushAllWal() { - // TODO(buda): Extend this when we have a fully durable mode master().wal().Flush(); worker(1).wal().Flush(); worker(2).wal().Flush(); @@ -177,20 +177,42 @@ void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) { } } -TEST_F(DistributedDurability, WriteCommittedTx) { - RestartWithWal(); - auto dba = master().Access(); - dba->Commit(); - FlushAllWal(); - CheckDeltas(tmp_dir_ / durability::kWalDir, - database::StateDelta::Type::TRANSACTION_COMMIT); +TEST_F(DistributedDurability, WalWrite) { + { + CleanDurability(); + RestartWithWal(false); + auto dba = master().Access(); + dba->Abort(); + FlushAllWal(); + CheckDeltas(tmp_dir_ / durability::kWalDir, + database::StateDelta::Type::TRANSACTION_ABORT); + } + { + CleanDurability(); + RestartWithWal(false); + auto dba = master().Access(); + dba->Abort(); + FlushAllWal(); + CheckDeltas(tmp_dir_ / durability::kWalDir, + database::StateDelta::Type::TRANSACTION_ABORT); + } } -TEST_F(DistributedDurability, WriteAbortedTx) { - RestartWithWal(); - auto dba = master().Access(); - dba->Abort(); - FlushAllWal(); - CheckDeltas(tmp_dir_ / durability::kWalDir, - database::StateDelta::Type::TRANSACTION_ABORT); +TEST_F(DistributedDurability, WalSynchronizedWrite) { + { + CleanDurability(); + RestartWithWal(true); + auto dba = master().Access(); + dba->Commit(); + CheckDeltas(tmp_dir_ / durability::kWalDir, + database::StateDelta::Type::TRANSACTION_COMMIT); + } + { + CleanDurability(); + RestartWithWal(true); + auto dba = master().Access(); + dba->Abort(); + CheckDeltas(tmp_dir_ / durability::kWalDir, + database::StateDelta::Type::TRANSACTION_ABORT); + } } diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 1b158aea9..9f0120f48 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -249,13 +249,12 @@ fs::path GetLastFile(fs::path dir) { return *std::max_element(files.begin(), files.end()); } -void MakeDb(durability::WriteAheadLog &wal, database::GraphDbAccessor &dba, - int scale, std::vector indices = {}) { +void MakeDb(database::GraphDbAccessor &dba, int scale, + std::vector indices = {}) { DbGenerator generator{dba}; for (int i = 0; i < scale; i++) generator.InsertVertex(); for (int i = 0; i < scale * 2; i++) generator.InsertEdge(); for (int i = 0; i < scale / 2; i++) generator.InsertCycleEdge(); - wal.Flush(); for (int i = 0; i < scale * 3; i++) { generator.SetVertexProperty(); @@ -278,7 +277,7 @@ void MakeDb(durability::WriteAheadLog &wal, database::GraphDbAccessor &dba, void MakeDb(database::GraphDb &db, int scale, std::vector indices = {}) { auto dba = db.Access(); - MakeDb(db.wal(), *dba, scale, indices); + MakeDb(*dba, scale, indices); dba->Commit(); } @@ -606,22 +605,35 @@ TEST_F(Durability, OnlyWalIdRecovery) { } TEST_F(Durability, WalRecovery) { - auto config = DbConfig(); - config.durability_enabled = true; - database::SingleNode db{config}; - MakeDb(db, 300, {0, 1, 2}); - MakeDb(db, 300); - MakeDb(db, 300, {3, 4}); + auto modify_config = [](database::Config config, bool durability_enabled, + bool synchronous_commit) { + config.durability_enabled = durability_enabled; + config.synchronous_commit = synchronous_commit; + return config; + }; - db.wal().Flush(); - ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0); - EXPECT_GT(DirFiles(wal_dir_).size(), 1); + for (auto &synchronous_commit : {false, true}) { + CleanDurability(); + auto config = modify_config(DbConfig(), true, synchronous_commit); + database::SingleNode db{config}; + MakeDb(db, 100, {0, 1, 2}); + MakeDb(db, 100); + MakeDb(db, 100, {3, 4}); - { - auto recovered_config = DbConfig(); - recovered_config.db_recover_on_startup = true; - database::SingleNode recovered{recovered_config}; - CompareDbs(db, recovered); + // When synchronous_commit is true, we don't need to flush the WAL. + if (!synchronous_commit) { + db.wal().Flush(); + } + + ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0); + EXPECT_GT(DirFiles(wal_dir_).size(), 1); + + { + auto recovered_config = DbConfig(); + recovered_config.db_recover_on_startup = true; + database::SingleNode recovered{recovered_config}; + CompareDbs(db, recovered); + } } } @@ -655,16 +667,16 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) { // The first transaction modifies and commits. auto dba_1 = db.Access(); - MakeDb(db.wal(), *dba_1, 100); + MakeDb(*dba_1, 100); dba_1->Commit(); // The second transaction will commit after snapshot. auto dba_2 = db.Access(); - MakeDb(db.wal(), *dba_2, 100); + MakeDb(*dba_2, 100); // The third transaction modifies and commits. auto dba_3 = db.Access(); - MakeDb(db.wal(), *dba_3, 100); + MakeDb(*dba_3, 100); dba_3->Commit(); MakeSnapshot(0, db); // Snapshooter takes the fourth transaction. @@ -672,12 +684,12 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) { // The fifth transaction starts and commits after snapshot. auto dba_5 = db.Access(); - MakeDb(db.wal(), *dba_5, 100); + MakeDb(*dba_5, 100); dba_5->Commit(); // The sixth transaction will not commit at all. auto dba_6 = db.Access(); - MakeDb(db.wal(), *dba_6, 100); + MakeDb(*dba_6, 100); auto VisibleVertexCount = [](database::GraphDb &db) { auto dba = db.Access();