From 8c2ba7d56465c682b29609fbf39a3908119048da Mon Sep 17 00:00:00 2001
From: Ivan Paljak <ivan.paljak@memgraph.io>
Date: Fri, 24 Aug 2018 10:43:27 +0200
Subject: [PATCH] Fix issue with wal_file initialization

Reviewers: msantl

Reviewed By: msantl

Subscribers: teon.banek, pullbot

Differential Revision: https://phabricator.memgraph.io/D1559
---
 src/database/distributed_graph_db.cpp |  4 ++--
 src/database/graph_db.cpp             |  2 +-
 src/durability/wal.cpp                | 31 +++++++++++++++------------
 src/durability/wal.hpp                | 10 ++++-----
 4 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp
index 99238d2eb..60575aa01 100644
--- a/src/database/distributed_graph_db.cpp
+++ b/src/database/distributed_graph_db.cpp
@@ -631,7 +631,7 @@ Master::Master(Config config)
   }
 
   if (impl_->config_.durability_enabled) {
-    impl_->wal_.Enable();
+    impl_->wal_.Init();
     snapshot_creator_ = std::make_unique<utils::Scheduler>();
     snapshot_creator_->Run(
         "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
@@ -916,7 +916,7 @@ Worker::Worker(Config config)
   }
 
   if (impl_->config_.durability_enabled) {
-    impl_->wal_.Enable();
+    impl_->wal_.Init();
   }
 
   // Start transaction killer.
diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp
index 7eb05a237..12545d70f 100644
--- a/src/database/graph_db.cpp
+++ b/src/database/graph_db.cpp
@@ -238,7 +238,7 @@ SingleNode::SingleNode(Config config)
                       "data loss, Memgraph has stored those files into a "
                       ".backup directory inside durability directory";
     }
-    impl_->wal_.Enable();
+    impl_->wal_.Init();
     snapshot_creator_ = std::make_unique<utils::Scheduler>();
     snapshot_creator_->Run(
         "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp
index 86b234a4b..29b508bac 100644
--- a/src/durability/wal.cpp
+++ b/src/durability/wal.cpp
@@ -26,16 +26,14 @@ WriteAheadLog::WriteAheadLog(
     : deltas_{FLAGS_wal_buffer_size}, wal_file_{worker_id, durability_dir} {
   if (durability_enabled) {
     utils::CheckDir(durability_dir);
-    scheduler_.Run("WAL",
-                   std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
-                   [this]() { wal_file_.Flush(deltas_); });
   }
 }
 
 WriteAheadLog::~WriteAheadLog() {
-  // TODO review : scheduler.Stop() legal if it wasn't started?
-  scheduler_.Stop();
-  if (enabled_) wal_file_.Flush(deltas_);
+  if (enabled_) {
+    scheduler_.Stop();
+    wal_file_.Flush(deltas_);
+  }
 }
 
 WriteAheadLog::WalFile::WalFile(
@@ -43,9 +41,7 @@ WriteAheadLog::WalFile::WalFile(
     : worker_id_(worker_id), wal_dir_{durability_dir / kWalDir} {}
 
 WriteAheadLog::WalFile::~WalFile() {
-  if (current_wal_file_ != std::experimental::nullopt &&
-      !current_wal_file_->empty())
-    writer_.Close();
+  if (!current_wal_file_.empty()) writer_.Close();
 }
 
 void WriteAheadLog::WalFile::Init() {
@@ -55,11 +51,11 @@ void WriteAheadLog::WalFile::Init() {
   } else {
     current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_);
     try {
-      writer_.Open(*current_wal_file_);
+      writer_.Open(current_wal_file_);
       encoder_.WriteInt(durability::kVersion);
     } catch (std::ios_base::failure &) {
       LOG(ERROR) << "Failed to open write-ahead log file: "
-                 << *current_wal_file_;
+                 << current_wal_file_;
       current_wal_file_ = std::experimental::filesystem::path();
     }
   }
@@ -69,8 +65,7 @@ void WriteAheadLog::WalFile::Init() {
 
 void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
   std::lock_guard<std::mutex> flush_lock(flush_mutex_);
-  if (current_wal_file_ == std::experimental::nullopt) Init();
-  if (current_wal_file_->empty()) {
+  if (current_wal_file_.empty()) {
     LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
     buffer.clear();
     return;
@@ -100,11 +95,19 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
 void WriteAheadLog::WalFile::RotateFile() {
   writer_.Close();
   std::experimental::filesystem::rename(
-      *current_wal_file_,
+      current_wal_file_,
       WalFilenameForTransactionId(wal_dir_, worker_id_, latest_tx_));
   Init();
 }
 
+void WriteAheadLog::Init() {
+  enabled_ = true;
+  wal_file_.Init();
+  scheduler_.Run("WAL",
+                 std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
+                 [this]() { wal_file_.Flush(deltas_); });
+}
+
 void WriteAheadLog::Emplace(database::StateDelta &&delta) {
   if (enabled_ && FLAGS_wal_flush_interval_millis >= 0)
     deltas_.emplace(std::move(delta));
diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp
index a39b8625b..f4f4006cc 100644
--- a/src/durability/wal.hpp
+++ b/src/durability/wal.hpp
@@ -33,9 +33,10 @@ class WriteAheadLog {
                 bool durability_enabled);
   ~WriteAheadLog();
 
-  /** Enables the WAL. Called at the end of GraphDb construction, after
-   * (optional) recovery. */
-  void Enable() { enabled_ = true; }
+  /** Initializes the WAL. Called at the end of GraphDb construction, after
+   * (optional) recovery. Also responsible for initializing the wal_file.
+   */
+  void Init();
 
   /// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
   void Emplace(database::StateDelta &&delta);
@@ -71,8 +72,7 @@ class WriteAheadLog {
 
     // The file to which the WAL flushes data. The path is fixed, the file gets
     // moved when the WAL gets rotated.
-    std::experimental::optional<std::experimental::filesystem::path>
-        current_wal_file_;
+    std::experimental::filesystem::path current_wal_file_;
 
     // Number of deltas in the current wal file.
     int current_wal_file_delta_count_{0};