Implement full durability mode

Summary:
This diff introduces a new flags
* `--synchronous-commit`

The `--synchronous-commit` tells the WAL when should the deltas be flushed to
the disk drive. By default this is off and the WAL flushes deltas every `N`
milliseconds. If it's turned on, on every transaction end, commit or abort, the
WAL will first flush the deltas and only after that will return from ending a
transaction.

Reviewers: buda, vkasljevic, mferencevic, teon.banek, ipaljak

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1542
This commit is contained in:
Ivan Paljak 2018-08-24 10:43:27 +02:00 committed by Matija Santl
parent bff56bcf89
commit d106aff88f
10 changed files with 194 additions and 97 deletions

View File

@ -72,8 +72,7 @@
# are used: snapshots store the total current database state while write-ahead
# logs store small changes incrementally. They are used in tandem to provide
# fast and storage-efficient persistence. Some aspects of snapshot taking
# are configurable, while write-ahead logging is pre-configured for optimal
# performance.
# and write-ahead logging are configurable.
--durability-enabled=true
# Path to the directory where snapshots and write-ahead log files will be stored.
@ -93,6 +92,15 @@
# for new ones. If set to -1, the number of kept snapshots is unlimited.
--snapshot-max-retained=3
# Specifies whether WAL updates should be written on disk immediately after a
# transaction finishes. Setting this parameter to false does introduce risk of
# database inconsistency because an operating system or hardware crash might
# lead to missing transactions in the write-ahead log, but the database will
# handle this as if those transactions never happened. Turning
# synchronous-commit off can be a useful trade-off between exact durability and
# performance.
--synchronous-commit=false
## Logging
# Path to where the log should be stored.

View File

@ -16,9 +16,17 @@ characteristics.
Ensuring that the log is written before the transaction is committed can
slow down the database. For that reason this guarantee is most often
configurable in databases. In Memgraph it is at the moment not
guaranteed, nor configurable. The WAL is flushed to the disk
periodically and transactions do not wait for this to complete.
configurable in databases.
Memgraph offers two options for the WAL. The default option, where the WAL is
flushed to the disk periodically and transactions do not wait for this to
complete, introduces the risk of database inconsistency because an operating
system or hardware crash might lead to missing transactions in the WAL. Memgraph
will handle this as if those transactions never happened. The second option,
called synchronous commit, will instruct Memgraph to wait for the WAL to be
flushed to the disk when a transactions completes and the transaction will wait
for this to complete. This option can be turned on with the
`--synchronous-commit` command line flag.
### Format

View File

@ -32,6 +32,11 @@ DEFINE_string(properties_on_disk, "",
"Property names of properties which will be stored on available "
"disk. Property names have to be separated with comma (,).");
// Full durability.
DEFINE_bool(synchronous_commit, false,
"Should a transaction end wait for WAL records to be written to "
"disk before the transaction finishes.");
#ifndef MG_COMMUNITY
// Distributed master/worker flags.
DEFINE_VALIDATED_HIDDEN_int32(worker_id, 0,
@ -76,6 +81,7 @@ database::Config::Config()
snapshot_cycle_sec{FLAGS_snapshot_cycle_sec},
snapshot_max_retained{FLAGS_snapshot_max_retained},
snapshot_on_exit{FLAGS_snapshot_on_exit},
synchronous_commit{FLAGS_synchronous_commit},
// Misc flags.
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec},

View File

@ -537,7 +537,8 @@ class Master {
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
config_.durability_enabled,
config_.synchronous_commit};
// Shared implementations for all RecordAccessor in this Db.
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
&updates_clients_};
@ -837,7 +838,8 @@ class Worker {
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
config_.durability_enabled,
config_.synchronous_commit};
// Shared implementations for all RecordAccessor in this Db.
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
&updates_clients_};

View File

@ -180,9 +180,9 @@ class SingleNode {
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
durability::WriteAheadLog wal_{
config_.worker_id, config_.durability_directory,
config_.durability_enabled, config_.synchronous_commit};
tx::SingleNodeEngine tx_engine_{&wal_};
std::unique_ptr<StorageGcSingleNode> storage_gc_ =

View File

@ -28,6 +28,7 @@ struct Config {
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
bool synchronous_commit;
// Misc flags.
int gc_cycle_sec;

View File

@ -19,19 +19,21 @@ DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
FLAG_IN_RANGE(1, 1 << 30));
namespace durability {
WriteAheadLog::WriteAheadLog(
int worker_id, const std::experimental::filesystem::path &durability_dir,
bool durability_enabled)
: deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} {
if (durability_enabled) {
bool durability_enabled, bool synchronous_commit)
: deltas_{FLAGS_wal_buffer_size},
wal_file_{worker_id, durability_dir},
durability_enabled_(durability_enabled),
synchronous_commit_(synchronous_commit) {
if (durability_enabled_) {
utils::CheckDir(durability_dir);
}
}
WriteAheadLog::~WriteAheadLog() {
if (enabled_) {
scheduler_.Stop();
if (durability_enabled_) {
if (!synchronous_commit_) scheduler_.Stop();
wal_file_.Flush(deltas_);
}
}
@ -101,23 +103,53 @@ void WriteAheadLog::WalFile::RotateFile() {
}
void WriteAheadLog::Init() {
enabled_ = true;
wal_file_.Init();
scheduler_.Run("WAL",
std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(deltas_); });
}
void WriteAheadLog::Emplace(database::StateDelta &&delta) {
if (enabled_ && FLAGS_wal_flush_interval_millis >= 0)
deltas_.emplace(std::move(delta));
if (durability_enabled_) {
enabled_ = true;
wal_file_.Init();
if (!synchronous_commit_) {
scheduler_.Run("WAL",
std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(deltas_); });
}
}
}
void WriteAheadLog::Emplace(const database::StateDelta &delta) {
if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) deltas_.emplace(delta);
if (durability_enabled_ && enabled_) {
deltas_.emplace(delta);
if (synchronous_commit_ && IsStateDeltaTransactionEnd(delta)) {
wal_file_.Flush(deltas_);
}
}
}
bool WriteAheadLog::IsStateDeltaTransactionEnd(
const database::StateDelta &delta) {
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_COMMIT:
case database::StateDelta::Type::TRANSACTION_ABORT:
return true;
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::ADD_OUT_EDGE:
case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::ADD_IN_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE:
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
case database::StateDelta::Type::ADD_LABEL:
case database::StateDelta::Type::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
return false;
}
}
void WriteAheadLog::Flush() {
if (enabled_) wal_file_.Flush(deltas_);
if (enabled_) {
wal_file_.Flush(deltas_);
}
}
} // namespace durability

View File

@ -3,7 +3,6 @@
#include <chrono>
#include <cstdint>
#include <experimental/filesystem>
#include <experimental/optional>
#include <gflags/gflags.h>
#include <glog/logging.h>
@ -19,67 +18,65 @@
namespace durability {
/** A database StateDelta log for durability. Buffers and periodically
* serializes small-granulation database deltas (StateDelta).
*
* The order is not deterministic in a multithreaded scenario (multiple DB
* transactions). This is fine, the recovery process should be immune to this
* indeterminism.
*/
/// A database StateDelta log for durability. Buffers and periodically
/// serializes small-granulation database deltas (StateDelta).
///
/// The order is not deterministic in a multithreaded scenario (multiple DB
/// transactions). This is fine, the recovery process should be immune to this
/// indeterminism.
class WriteAheadLog {
public:
WriteAheadLog(int worker_id,
const std::experimental::filesystem::path &durability_dir,
bool durability_enabled);
bool durability_enabled, bool synchronous_commit);
~WriteAheadLog();
/** Initializes the WAL. Called at the end of GraphDb construction, after
* (optional) recovery. Also responsible for initializing the wal_file.
*/
/// Initializes the WAL. Called at the end of GraphDb construction, after
/// (optional) recovery. Also responsible for initializing the wal_file.
void Init();
/// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
void Emplace(database::StateDelta &&delta);
/// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
/// If the WAL is configured to work in synchronous commit mode, emplace will
/// flush the buffers if a delta represents a transaction end.
void Emplace(const database::StateDelta &delta);
/// Flushes every delta currently in the ring buffer
/// Flushes every delta currently in the ring buffer.
/// This method should only be called from tests.
void Flush();
private:
/** Groups the logic of WAL file handling (flushing, naming, rotating) */
/// Groups the logic of WAL file handling (flushing, naming, rotating)
class WalFile {
public:
WalFile(int worker_id, const std::experimental::filesystem::path &wal__dir);
~WalFile();
/** Initializes the WAL file. Must be called before first flush. Can be
* called after Flush() to re-initialize stuff. */
/// Initializes the WAL file. Must be called before first flush. Can be
/// called after Flush() to re-initialize stuff.
void Init();
/** Flushes all the deltas in the buffer to the WAL file. If necessary
* rotates the file. */
/// Flushes all the deltas in the buffer to the WAL file. If necessary
/// rotates the file.
void Flush(RingBuffer<database::StateDelta> &buffer);
private:
// Mutex used for flushing wal data
/// Mutex used for flushing wal data
std::mutex flush_mutex_;
int worker_id_;
const std::experimental::filesystem::path wal_dir_;
HashedFileWriter writer_;
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{writer_};
// The file to which the WAL flushes data. The path is fixed, the file gets
// moved when the WAL gets rotated.
/// The file to which the WAL flushes data. The path is fixed, the file gets
/// moved when the WAL gets rotated.
std::experimental::filesystem::path current_wal_file_;
// Number of deltas in the current wal file.
/// Number of deltas in the current wal file.
int current_wal_file_delta_count_{0};
// The latest transaction whose delta is recorded in the current WAL file.
// Zero indicates that no deltas have so far been written to the current WAL
// file.
/// The latest transaction whose delta is recorded in the current WAL file.
/// Zero indicates that no deltas have so far been written to the current
/// WAL file.
tx::TransactionId latest_tx_{0};
void RotateFile();
@ -88,7 +85,16 @@ class WriteAheadLog {
RingBuffer<database::StateDelta> deltas_;
utils::Scheduler scheduler_;
WalFile wal_file_;
// Used for disabling the WAL during DB recovery.
/// Used for disabling the durability feature of the DB.
bool durability_enabled_{false};
/// Used for disabling the WAL during DB recovery.
bool enabled_{false};
/// Should every WAL write be synced with the underlying storage.
bool synchronous_commit_{false};
/// Checks whether the given state delta represents a transaction end,
/// TRANSACTION_COMMIT and TRANSACTION_ABORT.
bool IsStateDeltaTransactionEnd(const database::StateDelta &delta);
};
} // namespace durability

View File

@ -31,16 +31,16 @@ class DistributedDurability : public DistributedGraphDbTest {
});
}
void RestartWithWal() {
void RestartWithWal(bool synchronous_commit) {
DistributedGraphDbTest::ShutDown();
Initialize([](database::Config config) {
Initialize([synchronous_commit](database::Config config) {
config.durability_enabled = true;
config.synchronous_commit = synchronous_commit;
return config;
});
}
void FlushAllWal() {
// TODO(buda): Extend this when we have a fully durable mode
master().wal().Flush();
worker(1).wal().Flush();
worker(2).wal().Flush();
@ -177,20 +177,42 @@ void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) {
}
}
TEST_F(DistributedDurability, WriteCommittedTx) {
RestartWithWal();
auto dba = master().Access();
dba->Commit();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_COMMIT);
TEST_F(DistributedDurability, WalWrite) {
{
CleanDurability();
RestartWithWal(false);
auto dba = master().Access();
dba->Abort();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
}
{
CleanDurability();
RestartWithWal(false);
auto dba = master().Access();
dba->Abort();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
}
}
TEST_F(DistributedDurability, WriteAbortedTx) {
RestartWithWal();
auto dba = master().Access();
dba->Abort();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
TEST_F(DistributedDurability, WalSynchronizedWrite) {
{
CleanDurability();
RestartWithWal(true);
auto dba = master().Access();
dba->Commit();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_COMMIT);
}
{
CleanDurability();
RestartWithWal(true);
auto dba = master().Access();
dba->Abort();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
}
}

View File

@ -249,13 +249,12 @@ fs::path GetLastFile(fs::path dir) {
return *std::max_element(files.begin(), files.end());
}
void MakeDb(durability::WriteAheadLog &wal, database::GraphDbAccessor &dba,
int scale, std::vector<int> indices = {}) {
void MakeDb(database::GraphDbAccessor &dba, int scale,
std::vector<int> indices = {}) {
DbGenerator generator{dba};
for (int i = 0; i < scale; i++) generator.InsertVertex();
for (int i = 0; i < scale * 2; i++) generator.InsertEdge();
for (int i = 0; i < scale / 2; i++) generator.InsertCycleEdge();
wal.Flush();
for (int i = 0; i < scale * 3; i++) {
generator.SetVertexProperty();
@ -278,7 +277,7 @@ void MakeDb(durability::WriteAheadLog &wal, database::GraphDbAccessor &dba,
void MakeDb(database::GraphDb &db, int scale, std::vector<int> indices = {}) {
auto dba = db.Access();
MakeDb(db.wal(), *dba, scale, indices);
MakeDb(*dba, scale, indices);
dba->Commit();
}
@ -606,22 +605,35 @@ TEST_F(Durability, OnlyWalIdRecovery) {
}
TEST_F(Durability, WalRecovery) {
auto config = DbConfig();
config.durability_enabled = true;
database::SingleNode db{config};
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeDb(db, 300, {3, 4});
auto modify_config = [](database::Config config, bool durability_enabled,
bool synchronous_commit) {
config.durability_enabled = durability_enabled;
config.synchronous_commit = synchronous_commit;
return config;
};
db.wal().Flush();
ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0);
EXPECT_GT(DirFiles(wal_dir_).size(), 1);
for (auto &synchronous_commit : {false, true}) {
CleanDurability();
auto config = modify_config(DbConfig(), true, synchronous_commit);
database::SingleNode db{config};
MakeDb(db, 100, {0, 1, 2});
MakeDb(db, 100);
MakeDb(db, 100, {3, 4});
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
database::SingleNode recovered{recovered_config};
CompareDbs(db, recovered);
// When synchronous_commit is true, we don't need to flush the WAL.
if (!synchronous_commit) {
db.wal().Flush();
}
ASSERT_EQ(DirFiles(snapshot_dir_).size(), 0);
EXPECT_GT(DirFiles(wal_dir_).size(), 1);
{
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
database::SingleNode recovered{recovered_config};
CompareDbs(db, recovered);
}
}
}
@ -655,16 +667,16 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) {
// The first transaction modifies and commits.
auto dba_1 = db.Access();
MakeDb(db.wal(), *dba_1, 100);
MakeDb(*dba_1, 100);
dba_1->Commit();
// The second transaction will commit after snapshot.
auto dba_2 = db.Access();
MakeDb(db.wal(), *dba_2, 100);
MakeDb(*dba_2, 100);
// The third transaction modifies and commits.
auto dba_3 = db.Access();
MakeDb(db.wal(), *dba_3, 100);
MakeDb(*dba_3, 100);
dba_3->Commit();
MakeSnapshot(0, db); // Snapshooter takes the fourth transaction.
@ -672,12 +684,12 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) {
// The fifth transaction starts and commits after snapshot.
auto dba_5 = db.Access();
MakeDb(db.wal(), *dba_5, 100);
MakeDb(*dba_5, 100);
dba_5->Commit();
// The sixth transaction will not commit at all.
auto dba_6 = db.Access();
MakeDb(db.wal(), *dba_6, 100);
MakeDb(*dba_6, 100);
auto VisibleVertexCount = [](database::GraphDb &db) {
auto dba = db.Access();