diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 99238d2eb..60575aa01 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -631,7 +631,7 @@ Master::Master(Config config) } if (impl_->config_.durability_enabled) { - impl_->wal_.Enable(); + impl_->wal_.Init(); snapshot_creator_ = std::make_unique(); snapshot_creator_->Run( "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), @@ -916,7 +916,7 @@ Worker::Worker(Config config) } if (impl_->config_.durability_enabled) { - impl_->wal_.Enable(); + impl_->wal_.Init(); } // Start transaction killer. diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 7eb05a237..12545d70f 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -238,7 +238,7 @@ SingleNode::SingleNode(Config config) "data loss, Memgraph has stored those files into a " ".backup directory inside durability directory"; } - impl_->wal_.Enable(); + impl_->wal_.Init(); snapshot_creator_ = std::make_unique(); snapshot_creator_->Run( "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index 86b234a4b..29b508bac 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -26,16 +26,14 @@ WriteAheadLog::WriteAheadLog( : deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} { if (durability_enabled) { utils::CheckDir(durability_dir); - scheduler_.Run("WAL", - std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), - [this]() { wal_file_.Flush(deltas_); }); } } WriteAheadLog::~WriteAheadLog() { - // TODO review : scheduler.Stop() legal if it wasn't started? - scheduler_.Stop(); - if (enabled_) wal_file_.Flush(deltas_); + if (enabled_) { + scheduler_.Stop(); + wal_file_.Flush(deltas_); + } } WriteAheadLog::WalFile::WalFile( @@ -43,9 +41,7 @@ WriteAheadLog::WalFile::WalFile( : worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {} WriteAheadLog::WalFile::~WalFile() { - if (current_wal_file_ != std::experimental::nullopt && - !current_wal_file_->empty()) - writer_.Close(); + if (!current_wal_file_.empty()) writer_.Close(); } void WriteAheadLog::WalFile::Init() { @@ -55,11 +51,11 @@ void WriteAheadLog::WalFile::Init() { } else { current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_); try { - writer_.Open(*current_wal_file_); + writer_.Open(current_wal_file_); encoder_.WriteInt(durability::kVersion); } catch (std::ios_base::failure &) { LOG(ERROR) << "Failed to open write-ahead log file: " - << *current_wal_file_; + << current_wal_file_; current_wal_file_ = std::experimental::filesystem::path(); } } @@ -69,8 +65,7 @@ void WriteAheadLog::WalFile::Init() { void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { std::lock_guard flush_lock(flush_mutex_); - if (current_wal_file_ == std::experimental::nullopt) Init(); - if (current_wal_file_->empty()) { + if (current_wal_file_.empty()) { LOG(ERROR) << "Write-ahead log file uninitialized, discarding data."; buffer.clear(); return; @@ -100,11 +95,19 @@ void WriteAheadLog::WalFile::Flush(RingBuffer &buffer) { void WriteAheadLog::WalFile::RotateFile() { writer_.Close(); std::experimental::filesystem::rename( - *current_wal_file_, + current_wal_file_, WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_)); Init(); } +void WriteAheadLog::Init() { + enabled_ = true; + wal_file_.Init(); + scheduler_.Run("WAL", + std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), + [this]() { wal_file_.Flush(deltas_); }); +} + void WriteAheadLog::Emplace(database::StateDelta &&delta) { if (enabled_ && FLAGS_wal_flush_interval_millis >= 0) deltas_.emplace(std::move(delta)); diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp index a39b8625b..f4f4006cc 100644 --- a/src/durability/wal.hpp +++ b/src/durability/wal.hpp @@ -33,9 +33,10 @@ class WriteAheadLog { bool durability_enabled); ~WriteAheadLog(); - /** Enables the WAL. Called at the end of GraphDb construction, after - * (optional) recovery. */ - void Enable() { enabled_ = true; } + /** Initializes the WAL. Called at the end of GraphDb construction, after + * (optional) recovery. Also responsible for initializing the wal_file. + */ + void Init(); /// Emplaces the given DeltaState onto the buffer, if the WAL is enabled. void Emplace(database::StateDelta &&delta); @@ -71,8 +72,7 @@ class WriteAheadLog { // The file to which the WAL flushes data. The path is fixed, the file gets // moved when the WAL gets rotated. - std::experimental::optional - current_wal_file_; + std::experimental::filesystem::path current_wal_file_; // Number of deltas in the current wal file. int current_wal_file_delta_count_{0};