From 13ba9cc23e949e1f430eca39746ab85b040edbe2 Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Tue, 29 Oct 2019 15:35:57 +0100
Subject: [PATCH] Implement WAL writing for storage v2

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2515
---
 src/storage/v2/config.hpp            |  14 +-
 src/storage/v2/durability.cpp        | 316 +++++++++++++--
 src/storage/v2/durability.hpp        |  20 +
 src/storage/v2/storage.cpp           |  82 ++++
 src/storage/v2/storage.hpp           |  32 +-
 tests/apollo_runs.py                 |   4 +
 tests/unit/storage_v2_durability.cpp | 568 ++++++++++++++++++++++++++-
 7 files changed, 958 insertions(+), 78 deletions(-)

diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp
index 88880d32a..7f15c955d 100644
--- a/src/storage/v2/config.hpp
+++ b/src/storage/v2/config.hpp
@@ -21,16 +21,26 @@ struct Config {
   } items;
 
   struct Durability {
-    enum class SnapshotType { NONE, PERIODIC };
+    enum class SnapshotWalMode {
+      DISABLED,
+      PERIODIC_SNAPSHOT,
+      PERIODIC_SNAPSHOT_WITH_WAL
+    };
 
     std::filesystem::path storage_directory{"storage"};
 
     bool recover_on_startup{false};
 
-    SnapshotType snapshot_type{SnapshotType::NONE};
+    SnapshotWalMode snapshot_wal_mode{SnapshotWalMode::DISABLED};
+
     std::chrono::milliseconds snapshot_interval{std::chrono::minutes(2)};
     uint64_t snapshot_retention_count{3};
+
+    uint64_t wal_file_size_kibibytes{20 * 1024};
+    uint64_t wal_file_flush_every_n_tx{100000};
+
     bool snapshot_on_exit{false};
+
   } durability;
 };
 
diff --git a/src/storage/v2/durability.cpp b/src/storage/v2/durability.cpp
index 95ef2ba19..795a7ad66 100644
--- a/src/storage/v2/durability.cpp
+++ b/src/storage/v2/durability.cpp
@@ -1,6 +1,7 @@
 #include "storage/v2/durability.hpp"
 
 #include <algorithm>
+#include <tuple>
 #include <unordered_map>
 #include <unordered_set>
 
@@ -1265,6 +1266,7 @@ Durability::Durability(Config::Durability config,
       constraints_(constraints),
       items_(items),
       snapshot_directory_(config_.storage_directory / kSnapshotDirectory),
+      wal_directory_(config_.storage_directory / kWalDirectory),
       uuid_(utils::GenerateUUID()) {}
 
 std::optional<Durability::RecoveryInfo> Durability::Initialize(
@@ -1275,7 +1277,8 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
   if (config_.recover_on_startup) {
     ret = RecoverData();
   }
-  if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC ||
+  if (config_.snapshot_wal_mode !=
+          Config::Durability::SnapshotWalMode::DISABLED ||
       config_.snapshot_on_exit) {
     // Create the directory initially to crash the database in case of
     // permission errors. This is done early to crash the database on startup
@@ -1283,7 +1286,13 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
     // could be an unpleasant surprise).
     utils::EnsureDirOrDie(snapshot_directory_);
   }
-  if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC) {
+  if (config_.snapshot_wal_mode ==
+      Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) {
+    // Same reasoning as above.
+    utils::EnsureDirOrDie(wal_directory_);
+  }
+  if (config_.snapshot_wal_mode !=
+      Config::Durability::SnapshotWalMode::DISABLED) {
     snapshot_runner_.Run("Snapshot", config_.snapshot_interval, [this] {
       execute_with_transaction_(
           [this](Transaction *transaction) { CreateSnapshot(transaction); });
@@ -1293,7 +1302,9 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
 }
 
 void Durability::Finalize() {
-  if (config_.snapshot_type == Config::Durability::SnapshotType::PERIODIC) {
+  wal_file_ = std::nullopt;
+  if (config_.snapshot_wal_mode !=
+      Config::Durability::SnapshotWalMode::DISABLED) {
     snapshot_runner_.Stop();
   }
   if (config_.snapshot_on_exit) {
@@ -1302,6 +1313,171 @@ void Durability::Finalize() {
   }
 }
 
+void Durability::AppendToWal(const Transaction &transaction,
+                             uint64_t final_commit_timestamp) {
+  if (!InitializeWalFile()) return;
+  // Traverse deltas and append them to the WAL file.
+  // A single transaction will always be contained in a single WAL file.
+  auto current_commit_timestamp =
+      transaction.commit_timestamp->load(std::memory_order_acq_rel);
+  // Helper lambda that traverses the delta chain on order to find the first
+  // delta that should be processed and then appends all discovered deltas.
+  auto find_and_apply_deltas = [&](const auto *delta, auto *parent,
+                                   auto filter) {
+    while (true) {
+      auto older = delta->next.load(std::memory_order_acq_rel);
+      if (older == nullptr ||
+          older->timestamp->load(std::memory_order_acq_rel) !=
+              current_commit_timestamp)
+        break;
+      delta = older;
+    }
+    while (true) {
+      if (filter(delta->action)) {
+        wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
+      }
+      auto prev = delta->prev.Get();
+      if (prev.type != PreviousPtr::Type::DELTA) break;
+      delta = prev.delta;
+    }
+  };
+
+  // The deltas are ordered correctly in the `transaction.deltas` buffer, but we
+  // don't traverse them in that order. That is because for each delta we need
+  // information about the vertex or edge they belong to and that information
+  // isn't stored in the deltas themselves. In order to find out information
+  // about the corresponding vertex or edge it is necessary to traverse the
+  // delta chain for each delta until a vertex or edge is encountered. This
+  // operation is very expensive as the chain grows.
+  // Instead, we traverse the edges until we find a vertex or edge and traverse
+  // their delta chains. This approach has a drawback because we lose the
+  // correct order of the operations. Because of that, we need to traverse the
+  // deltas several times and we have to manually ensure that the stored deltas
+  // will be ordered correctly.
+
+  // 1. Process all Vertex deltas and store all operations that create vertices
+  // and modify vertex data.
+  for (const auto &delta : transaction.deltas) {
+    auto prev = delta.prev.Get();
+    if (prev.type != PreviousPtr::Type::VERTEX) continue;
+    find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
+      switch (action) {
+        case Delta::Action::DELETE_OBJECT:
+        case Delta::Action::SET_PROPERTY:
+        case Delta::Action::ADD_LABEL:
+        case Delta::Action::REMOVE_LABEL:
+          return true;
+
+        case Delta::Action::RECREATE_OBJECT:
+        case Delta::Action::ADD_IN_EDGE:
+        case Delta::Action::ADD_OUT_EDGE:
+        case Delta::Action::REMOVE_IN_EDGE:
+        case Delta::Action::REMOVE_OUT_EDGE:
+          return false;
+      }
+    });
+  }
+  // 2. Process all Vertex deltas and store all operations that create edges.
+  for (const auto &delta : transaction.deltas) {
+    auto prev = delta.prev.Get();
+    if (prev.type != PreviousPtr::Type::VERTEX) continue;
+    find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
+      switch (action) {
+        case Delta::Action::REMOVE_OUT_EDGE:
+          return true;
+
+        case Delta::Action::DELETE_OBJECT:
+        case Delta::Action::RECREATE_OBJECT:
+        case Delta::Action::SET_PROPERTY:
+        case Delta::Action::ADD_LABEL:
+        case Delta::Action::REMOVE_LABEL:
+        case Delta::Action::ADD_IN_EDGE:
+        case Delta::Action::ADD_OUT_EDGE:
+        case Delta::Action::REMOVE_IN_EDGE:
+          return false;
+      }
+    });
+  }
+  // 3. Process all Edge deltas and store all operations that modify edge data.
+  for (const auto &delta : transaction.deltas) {
+    auto prev = delta.prev.Get();
+    if (prev.type != PreviousPtr::Type::EDGE) continue;
+    find_and_apply_deltas(&delta, prev.edge, [](auto action) {
+      switch (action) {
+        case Delta::Action::SET_PROPERTY:
+          return true;
+
+        case Delta::Action::DELETE_OBJECT:
+        case Delta::Action::RECREATE_OBJECT:
+        case Delta::Action::ADD_LABEL:
+        case Delta::Action::REMOVE_LABEL:
+        case Delta::Action::ADD_IN_EDGE:
+        case Delta::Action::ADD_OUT_EDGE:
+        case Delta::Action::REMOVE_IN_EDGE:
+        case Delta::Action::REMOVE_OUT_EDGE:
+          return false;
+      }
+    });
+  }
+  // 4. Process all Vertex deltas and store all operations that delete edges.
+  for (const auto &delta : transaction.deltas) {
+    auto prev = delta.prev.Get();
+    if (prev.type != PreviousPtr::Type::VERTEX) continue;
+    find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
+      switch (action) {
+        case Delta::Action::ADD_OUT_EDGE:
+          return true;
+
+        case Delta::Action::DELETE_OBJECT:
+        case Delta::Action::RECREATE_OBJECT:
+        case Delta::Action::SET_PROPERTY:
+        case Delta::Action::ADD_LABEL:
+        case Delta::Action::REMOVE_LABEL:
+        case Delta::Action::ADD_IN_EDGE:
+        case Delta::Action::REMOVE_IN_EDGE:
+        case Delta::Action::REMOVE_OUT_EDGE:
+          return false;
+      }
+    });
+  }
+  // 5. Process all Vertex deltas and store all operations that delete vertices.
+  for (const auto &delta : transaction.deltas) {
+    auto prev = delta.prev.Get();
+    if (prev.type != PreviousPtr::Type::VERTEX) continue;
+    find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
+      switch (action) {
+        case Delta::Action::RECREATE_OBJECT:
+          return true;
+
+        case Delta::Action::DELETE_OBJECT:
+        case Delta::Action::SET_PROPERTY:
+        case Delta::Action::ADD_LABEL:
+        case Delta::Action::REMOVE_LABEL:
+        case Delta::Action::ADD_IN_EDGE:
+        case Delta::Action::ADD_OUT_EDGE:
+        case Delta::Action::REMOVE_IN_EDGE:
+        case Delta::Action::REMOVE_OUT_EDGE:
+          return false;
+      }
+    });
+  }
+
+  // Add a delta that indicates that the transaction is fully written to the WAL
+  // file.
+  wal_file_->AppendTransactionEnd(final_commit_timestamp);
+
+  FinalizeWalFile();
+}
+
+void Durability::AppendToWal(StorageGlobalOperation operation, LabelId label,
+                             std::optional<PropertyId> property,
+                             uint64_t final_commit_timestamp) {
+  if (!InitializeWalFile()) return;
+  wal_file_->AppendOperation(operation, label, property,
+                             final_commit_timestamp);
+  FinalizeWalFile();
+}
+
 void Durability::CreateSnapshot(Transaction *transaction) {
   // Ensure that the storage directory exists.
   utils::EnsureDirOrDie(snapshot_directory_);
@@ -1545,37 +1721,92 @@ void Durability::CreateSnapshot(Transaction *transaction) {
   LOG(INFO) << "Snapshot creation successful!";
 
   // Ensure exactly `snapshot_retention_count` snapshots exist.
-  std::vector<std::filesystem::path> old_snapshot_files;
-  std::error_code error_code;
-  for (auto &item :
-       std::filesystem::directory_iterator(snapshot_directory_, error_code)) {
-    if (!item.is_regular_file()) continue;
-    if (item.path() == path) continue;
-    try {
-      auto info = ReadSnapshotInfo(item.path());
-      if (info.uuid == uuid_) {
-        old_snapshot_files.push_back(item.path());
+  std::vector<std::pair<uint64_t, std::filesystem::path>> old_snapshot_files;
+  {
+    std::error_code error_code;
+    for (const auto &item :
+         std::filesystem::directory_iterator(snapshot_directory_, error_code)) {
+      if (!item.is_regular_file()) continue;
+      if (item.path() == path) continue;
+      try {
+        auto info = ReadSnapshotInfo(item.path());
+        if (info.uuid != uuid_) continue;
+        old_snapshot_files.emplace_back(info.start_timestamp, item.path());
+      } catch (const RecoveryFailure &e) {
+        LOG(WARNING) << "Found a corrupt snapshot file " << item.path()
+                     << " because of: " << e.what();
+        continue;
       }
-    } catch (const RecoveryFailure &e) {
-      LOG(WARNING) << "Found a corrupt snapshot file " << item.path()
-                   << " because of: " << e.what();
-      continue;
+    }
+    LOG_IF(ERROR, error_code)
+        << "Couldn't ensure that exactly " << config_.snapshot_retention_count
+        << " snapshots exist because an error occurred: "
+        << error_code.message() << "!";
+    std::sort(old_snapshot_files.begin(), old_snapshot_files.end());
+    if (old_snapshot_files.size() > config_.snapshot_retention_count - 1) {
+      auto num_to_erase =
+          old_snapshot_files.size() - (config_.snapshot_retention_count - 1);
+      for (size_t i = 0; i < num_to_erase; ++i) {
+        const auto &[start_timestamp, snapshot_path] = old_snapshot_files[i];
+        if (!utils::DeleteFile(snapshot_path)) {
+          LOG(WARNING) << "Couldn't delete snapshot file " << snapshot_path
+                       << "!";
+        }
+      }
+      old_snapshot_files.erase(old_snapshot_files.begin(),
+                               old_snapshot_files.begin() + num_to_erase);
     }
   }
-  if (error_code) {
-    LOG(ERROR) << "Couldn't ensure that exactly "
-               << config_.snapshot_retention_count
-               << " snapshots exist because an error occurred: "
-               << error_code.message() << "!";
-  }
-  if (old_snapshot_files.size() >= config_.snapshot_retention_count) {
-    std::sort(old_snapshot_files.begin(), old_snapshot_files.end());
-    for (size_t i = 0;
-         i <= old_snapshot_files.size() - config_.snapshot_retention_count;
-         ++i) {
-      const auto &path = old_snapshot_files[i];
-      if (!utils::DeleteFile(path)) {
-        LOG(WARNING) << "Couldn't delete snapshot file " << path << "!";
+
+  // Ensure that only the absolutely necessary WAL files exist.
+  if (old_snapshot_files.size() == config_.snapshot_retention_count - 1 &&
+      utils::DirExists(wal_directory_)) {
+    std::vector<std::tuple<uint64_t, uint64_t, uint64_t, std::filesystem::path>>
+        wal_files;
+    std::error_code error_code;
+    for (const auto &item :
+         std::filesystem::directory_iterator(wal_directory_, error_code)) {
+      if (!item.is_regular_file()) continue;
+      try {
+        auto info = ReadWalInfo(item.path());
+        if (info.uuid != uuid_) continue;
+        wal_files.emplace_back(info.seq_num, info.from_timestamp,
+                               info.to_timestamp, item.path());
+      } catch (const RecoveryFailure &e) {
+        continue;
+      }
+    }
+    LOG_IF(ERROR, error_code)
+        << "Couldn't ensure that only the absolutely necessary WAL files exist "
+           "because an error occurred: "
+        << error_code.message() << "!";
+    std::sort(wal_files.begin(), wal_files.end());
+    uint64_t snapshot_start_timestamp = transaction->start_timestamp;
+    if (!old_snapshot_files.empty()) {
+      snapshot_start_timestamp = old_snapshot_files.begin()->first;
+    }
+    std::optional<uint64_t> pos = 0;
+    for (uint64_t i = 0; i < wal_files.size(); ++i) {
+      const auto &[seq_num, from_timestamp, to_timestamp, wal_path] =
+          wal_files[i];
+      if (to_timestamp <= snapshot_start_timestamp) {
+        pos = i;
+      } else {
+        break;
+      }
+    }
+    if (pos && *pos > 0) {
+      // We need to leave at least one WAL file that contains deltas that were
+      // created before the oldest snapshot. Because we always leave at least
+      // one WAL file that contains deltas before the snapshot, this correctly
+      // handles the edge case when that one file is the current WAL file that
+      // is being appended to.
+      for (uint64_t i = 0; i < *pos; ++i) {
+        const auto &[seq_num, from_timestamp, to_timestamp, wal_path] =
+            wal_files[i];
+        if (!utils::DeleteFile(wal_path)) {
+          LOG(WARNING) << "Couldn't delete WAL file " << wal_path << "!";
+        }
       }
     }
   }
@@ -2018,4 +2249,27 @@ Durability::RecoveryInfo Durability::LoadSnapshot(
   return ret;
 }
 
+bool Durability::InitializeWalFile() {
+  if (config_.snapshot_wal_mode !=
+      Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
+    return false;
+  if (!wal_file_) {
+    wal_file_.emplace(wal_directory_, uuid_, items_, name_id_mapper_,
+                      wal_seq_num_++);
+  }
+  return true;
+}
+
+void Durability::FinalizeWalFile() {
+  ++wal_unsynced_transactions_;
+  if (wal_unsynced_transactions_ >= config_.wal_file_flush_every_n_tx) {
+    wal_file_->Sync();
+    wal_unsynced_transactions_ = 0;
+  }
+  if (wal_file_->GetSize() / 1024 >= config_.wal_file_size_kibibytes) {
+    wal_file_ = std::nullopt;
+    wal_unsynced_transactions_ = 0;
+  }
+}
+
 }  // namespace storage
diff --git a/src/storage/v2/durability.hpp b/src/storage/v2/durability.hpp
index 94155b9d7..701616292 100644
--- a/src/storage/v2/durability.hpp
+++ b/src/storage/v2/durability.hpp
@@ -3,10 +3,12 @@
 #include <cstdint>
 #include <filesystem>
 #include <functional>
+#include <list>
 #include <optional>
 #include <string>
 #include <string_view>
 #include <type_traits>
+#include <utility>
 
 #include "storage/v2/config.hpp"
 #include "storage/v2/constraints.hpp"
@@ -353,6 +355,13 @@ class Durability final {
 
   void Finalize();
 
+  void AppendToWal(const Transaction &transaction,
+                   uint64_t final_commit_timestamp);
+
+  void AppendToWal(StorageGlobalOperation operation, LabelId label,
+                   std::optional<PropertyId> property,
+                   uint64_t final_commit_timestamp);
+
  private:
   void CreateSnapshot(Transaction *transaction);
 
@@ -360,6 +369,10 @@ class Durability final {
 
   RecoveryInfo LoadSnapshot(const std::filesystem::path &path);
 
+  bool InitializeWalFile();
+
+  void FinalizeWalFile();
+
   Config::Durability config_;
 
   utils::SkipList<Vertex> *vertices_;
@@ -373,10 +386,17 @@ class Durability final {
       execute_with_transaction_;
 
   std::filesystem::path snapshot_directory_;
+  std::filesystem::path wal_directory_;
+
   utils::Scheduler snapshot_runner_;
 
   // UUID used to distinguish snapshots and to link snapshots to WALs
   std::string uuid_;
+  // Sequence number used to keep track of the chain of WALs.
+  uint64_t wal_seq_num_{0};
+
+  std::optional<WalFile> wal_file_;
+  uint64_t wal_unsynced_transactions_{0};
 };
 
 }  // namespace storage
diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp
index 17a9a5346..18816d529 100644
--- a/src/storage/v2/storage.cpp
+++ b/src/storage/v2/storage.cpp
@@ -669,6 +669,15 @@ Storage::Accessor::Commit() {
       std::unique_lock<utils::SpinLock> engine_guard(storage_->engine_lock_);
       commit_timestamp = storage_->timestamp_++;
 
+      // Write transaction to WAL while holding the engine lock to make sure
+      // that committed transactions are sorted by the commit timestamp in the
+      // WAL files. We supply the new commit timestamp to the function so that
+      // it knows what will be the final commit timestamp. The WAL must be
+      // written before actually committing the transaction (before setting the
+      // commit timestamp) so that no other transaction can see the
+      // modifications before they are written to disk.
+      storage_->durability_.AppendToWal(transaction_, commit_timestamp);
+
       // Take committed_transactions lock while holding the engine lock to
       // make sure that committed transactions are sorted by the commit
       // timestamp in the list.
@@ -912,6 +921,79 @@ EdgeTypeId Storage::NameToEdgeType(const std::string &name) {
   return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name));
 }
 
+bool Storage::CreateIndex(LabelId label) {
+  std::unique_lock<utils::RWLock> storage_guard(main_lock_);
+  if (!indices_.label_index.CreateIndex(label, vertices_.access()))
+    return false;
+  // Here it is safe to use `timestamp_` as the final commit timestamp of this
+  // operation even though this operation isn't transactional. The `timestamp_`
+  // variable holds the next timestamp that will be used. Because the above
+  // `storage_guard` ensures that no transactions are currently active, the
+  // value of `timestamp_` is guaranteed to be used as a start timestamp for the
+  // next regular transaction after this operation. This prevents collisions of
+  // commit timestamps between non-transactional operations and transactional
+  // operations.
+  durability_.AppendToWal(StorageGlobalOperation::LABEL_INDEX_CREATE, label,
+                          std::nullopt, timestamp_);
+  return true;
+}
+
+bool Storage::CreateIndex(LabelId label, PropertyId property) {
+  std::unique_lock<utils::RWLock> storage_guard(main_lock_);
+  if (!indices_.label_property_index.CreateIndex(label, property,
+                                                 vertices_.access()))
+    return false;
+  // For a description why using `timestamp_` is correct, see
+  // `CreateIndex(LabelId label)`.
+  durability_.AppendToWal(StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE,
+                          label, property, timestamp_);
+  return true;
+}
+
+bool Storage::DropIndex(LabelId label) {
+  std::unique_lock<utils::RWLock> storage_guard(main_lock_);
+  if (!indices_.label_index.DropIndex(label)) return false;
+  // For a description why using `timestamp_` is correct, see
+  // `CreateIndex(LabelId label)`.
+  durability_.AppendToWal(StorageGlobalOperation::LABEL_INDEX_DROP, label,
+                          std::nullopt, timestamp_);
+  return true;
+}
+
+bool Storage::DropIndex(LabelId label, PropertyId property) {
+  std::unique_lock<utils::RWLock> storage_guard(main_lock_);
+  if (!indices_.label_property_index.DropIndex(label, property)) return false;
+  // For a description why using `timestamp_` is correct, see
+  // `CreateIndex(LabelId label)`.
+  durability_.AppendToWal(StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP,
+                          label, property, timestamp_);
+  return true;
+}
+
+utils::BasicResult<ExistenceConstraintViolation, bool>
+Storage::CreateExistenceConstraint(LabelId label, PropertyId property) {
+  std::unique_lock<utils::RWLock> storage_guard(main_lock_);
+  auto ret = ::storage::CreateExistenceConstraint(&constraints_, label,
+                                                  property, vertices_.access());
+  if (ret.HasError() || !ret.GetValue()) return ret;
+  // For a description why using `timestamp_` is correct, see
+  // `CreateIndex(LabelId label)`.
+  durability_.AppendToWal(StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE,
+                          label, property, timestamp_);
+  return true;
+}
+
+bool Storage::DropExistenceConstraint(LabelId label, PropertyId property) {
+  std::unique_lock<utils::RWLock> storage_guard(main_lock_);
+  if (!::storage::DropExistenceConstraint(&constraints_, label, property))
+    return false;
+  // For a description why using `timestamp_` is correct, see
+  // `CreateIndex(LabelId label)`.
+  durability_.AppendToWal(StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP,
+                          label, property, timestamp_);
+  return true;
+}
+
 VerticesIterable Storage::Accessor::Vertices(LabelId label, View view) {
   return VerticesIterable(
       storage_->indices_.label_index.Vertices(label, view, &transaction_));
diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp
index 0d59ecf32..e77a6b629 100644
--- a/src/storage/v2/storage.hpp
+++ b/src/storage/v2/storage.hpp
@@ -299,27 +299,14 @@ class Storage final {
   EdgeTypeId NameToEdgeType(const std::string &name);
 
   /// @throw std::bad_alloc
-  bool CreateIndex(LabelId label) {
-    std::unique_lock<utils::RWLock> storage_guard(main_lock_);
-    return indices_.label_index.CreateIndex(label, vertices_.access());
-  }
+  bool CreateIndex(LabelId label);
 
   /// @throw std::bad_alloc
-  bool CreateIndex(LabelId label, PropertyId property) {
-    std::unique_lock<utils::RWLock> storage_guard(main_lock_);
-    return indices_.label_property_index.CreateIndex(label, property,
-                                                     vertices_.access());
-  }
+  bool CreateIndex(LabelId label, PropertyId property);
 
-  bool DropIndex(LabelId label) {
-    std::unique_lock<utils::RWLock> storage_guard(main_lock_);
-    return indices_.label_index.DropIndex(label);
-  }
+  bool DropIndex(LabelId label);
 
-  bool DropIndex(LabelId label, PropertyId property) {
-    std::unique_lock<utils::RWLock> storage_guard(main_lock_);
-    return indices_.label_property_index.DropIndex(label, property);
-  }
+  bool DropIndex(LabelId label, PropertyId property);
 
   bool LabelIndexExists(LabelId label) const {
     return indices_.label_index.IndexExists(label);
@@ -342,18 +329,11 @@ class Storage final {
   /// @throw std::bad_alloc
   /// @throw std::length_error
   utils::BasicResult<ExistenceConstraintViolation, bool>
-  CreateExistenceConstraint(LabelId label, PropertyId property) {
-    std::unique_lock<utils::RWLock> storage_guard(main_lock_);
-    return ::storage::CreateExistenceConstraint(&constraints_, label, property,
-                                                vertices_.access());
-  }
+  CreateExistenceConstraint(LabelId label, PropertyId property);
 
   /// Removes a unique constraint. Returns true if the constraint was removed,
   /// and false if it doesn't exist.
-  bool DropExistenceConstraint(LabelId label, PropertyId property) {
-    std::unique_lock<utils::RWLock> storage_guard(main_lock_);
-    return ::storage::DropExistenceConstraint(&constraints_, label, property);
-  }
+  bool DropExistenceConstraint(LabelId label, PropertyId property);
 
   ConstraintsInfo ListAllConstraints() const {
     return {ListExistenceConstraints(constraints_)};
diff --git a/tests/apollo_runs.py b/tests/apollo_runs.py
index 9c2424d07..89f398777 100755
--- a/tests/apollo_runs.py
+++ b/tests/apollo_runs.py
@@ -50,6 +50,10 @@ for test in tests:
     if name.startswith("benchmark") or name.startswith("concurrent"):
         prefix = "TIMEOUT=600 "
 
+    # larger timeout for storage_v2_durability unit test
+    if name.endswith("storage_v2_durability"):
+        prefix = "TIMEOUT=300 "
+
     outfile_paths = []
     if name.startswith("unit"):
         dirname = dirname.replace("/build_debug/", "/build_coverage/")
diff --git a/tests/unit/storage_v2_durability.cpp b/tests/unit/storage_v2_durability.cpp
index 303f92aee..934157601 100644
--- a/tests/unit/storage_v2_durability.cpp
+++ b/tests/unit/storage_v2_durability.cpp
@@ -1,25 +1,39 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include <csignal>
+
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
 #include <algorithm>
 #include <chrono>
 #include <filesystem>
+#include <iostream>
 #include <thread>
 
 #include "storage/v2/durability.hpp"
 #include "storage/v2/storage.hpp"
 #include "utils/file.hpp"
+#include "utils/timer.hpp"
 
 using testing::Contains;
 using testing::UnorderedElementsAre;
 
 class DurabilityTest : public ::testing::TestWithParam<bool> {
- private:
+ protected:
   const uint64_t kNumBaseVertices = 1000;
   const uint64_t kNumBaseEdges = 10000;
   const uint64_t kNumExtendedVertices = 100;
   const uint64_t kNumExtendedEdges = 1000;
 
+  // We don't want to flush the WAL while we are doing operations because the
+  // flushing adds a large overhead that slows down execution.
+  const uint64_t kFlushWalEvery = (kNumBaseVertices + kNumBaseEdges +
+                                   kNumExtendedVertices + kNumExtendedEdges) *
+                                  2;
+
  public:
   DurabilityTest()
       : base_vertex_gids_(kNumBaseVertices),
@@ -445,14 +459,11 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
   }
 
   std::vector<std::filesystem::path> GetSnapshotsList() {
-    std::vector<std::filesystem::path> ret;
-    for (auto &item : std::filesystem::directory_iterator(
-             storage_directory / storage::kSnapshotDirectory)) {
-      ret.push_back(item.path());
-    }
-    std::sort(ret.begin(), ret.end());
-    std::reverse(ret.begin(), ret.end());
-    return ret;
+    return GetFilesList(storage_directory / storage::kSnapshotDirectory);
+  }
+
+  std::vector<std::filesystem::path> GetWalsList() {
+    return GetFilesList(storage_directory / storage::kWalDirectory);
   }
 
   std::filesystem::path storage_directory{
@@ -460,6 +471,18 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
       "MG_test_unit_storage_v2_durability"};
 
  private:
+  std::vector<std::filesystem::path> GetFilesList(
+      const std::filesystem::path &path) {
+    std::vector<std::filesystem::path> ret;
+    std::error_code ec;  // For exception suppression.
+    for (auto &item : std::filesystem::directory_iterator(path, ec)) {
+      ret.push_back(item.path());
+    }
+    std::sort(ret.begin(), ret.end());
+    std::reverse(ret.begin(), ret.end());
+    return ret;
+  }
+
   void Clear() {
     if (!std::filesystem::exists(storage_directory)) return;
     std::filesystem::remove_all(storage_directory);
@@ -491,6 +514,9 @@ TEST_P(DurabilityTest, SnapshotOnExit) {
     VerifyExtendedDataset(&store);
   }
 
+  ASSERT_EQ(GetSnapshotsList().size(), 1);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
   // Recover snapshot.
   storage::Storage store({.items = {.properties_on_edges = GetParam()},
                           .durability = {.storage_directory = storage_directory,
@@ -515,13 +541,16 @@ TEST_P(DurabilityTest, SnapshotPeriodic) {
     storage::Storage store(
         {.items = {.properties_on_edges = GetParam()},
          .durability = {.storage_directory = storage_directory,
-                        .snapshot_type =
-                            storage::Config::Durability::SnapshotType::PERIODIC,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT,
                         .snapshot_interval = std::chrono::milliseconds(2000)}});
     CreateBaseDataset(&store, GetParam());
     std::this_thread::sleep_for(std::chrono::milliseconds(2500));
   }
 
+  ASSERT_GE(GetSnapshotsList().size(), 1);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
   // Recover snapshot.
   storage::Storage store({.items = {.properties_on_edges = GetParam()},
                           .durability = {.storage_directory = storage_directory,
@@ -545,8 +574,8 @@ TEST_P(DurabilityTest, SnapshotFallback) {
     storage::Storage store(
         {.items = {.properties_on_edges = GetParam()},
          .durability = {.storage_directory = storage_directory,
-                        .snapshot_type =
-                            storage::Config::Durability::SnapshotType::PERIODIC,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT,
                         .snapshot_interval = std::chrono::milliseconds(2000)}});
     CreateBaseDataset(&store, GetParam());
     std::this_thread::sleep_for(std::chrono::milliseconds(2500));
@@ -554,6 +583,9 @@ TEST_P(DurabilityTest, SnapshotFallback) {
     std::this_thread::sleep_for(std::chrono::milliseconds(2500));
   }
 
+  ASSERT_GE(GetSnapshotsList().size(), 2);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
   // Destroy last snapshot.
   {
     auto snapshots = GetSnapshotsList();
@@ -589,13 +621,29 @@ TEST_P(DurabilityTest, SnapshotFallback) {
 
 // NOLINTNEXTLINE(hicpp-special-member-functions)
 TEST_P(DurabilityTest, SnapshotRetention) {
+  // Create unrelated snapshot.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_on_exit = true}});
+    auto acc = store.Access();
+    for (uint64_t i = 0; i < 1000; ++i) {
+      acc.CreateVertex();
+    }
+    ASSERT_FALSE(acc.Commit().HasError());
+  }
+
+  ASSERT_GE(GetSnapshotsList().size(), 1);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
   // Create snapshot.
   {
     storage::Storage store(
         {.items = {.properties_on_edges = GetParam()},
          .durability = {.storage_directory = storage_directory,
-                        .snapshot_type =
-                            storage::Config::Durability::SnapshotType::PERIODIC,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT,
                         .snapshot_interval = std::chrono::milliseconds(2000),
                         .snapshot_retention_count = 3}});
     CreateBaseDataset(&store, GetParam());
@@ -603,13 +651,24 @@ TEST_P(DurabilityTest, SnapshotRetention) {
     std::this_thread::sleep_for(std::chrono::milliseconds(10000));
   }
 
-  // Verify that exactly 3 snapshots exist.
+  ASSERT_EQ(GetSnapshotsList().size(), 1 + 3);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
+  // Verify that exactly 3 snapshots and 1 unrelated snapshot exist.
   {
     auto snapshots = GetSnapshotsList();
-    ASSERT_EQ(snapshots.size(), 3);
-    for (const auto &path : snapshots) {
+    ASSERT_EQ(snapshots.size(), 1 + 3);
+    std::string uuid;
+    for (size_t i = 0; i < snapshots.size(); ++i) {
+      const auto &path = snapshots[i];
       // This shouldn't throw.
-      storage::ReadSnapshotInfo(path);
+      auto info = storage::ReadSnapshotInfo(path);
+      if (i == 0) uuid = info.uuid;
+      if (i < snapshots.size() - 1) {
+        ASSERT_EQ(info.uuid, uuid);
+      } else {
+        ASSERT_NE(info.uuid, uuid);
+      }
     }
   }
 
@@ -645,6 +704,9 @@ TEST_F(DurabilityTest,
     VerifyExtendedDataset(&store);
   }
 
+  ASSERT_EQ(GetSnapshotsList().size(), 1);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
   // Recover snapshot.
   storage::Storage store({.items = {.properties_on_edges = true},
                           .durability = {.storage_directory = storage_directory,
@@ -746,6 +808,9 @@ TEST_F(DurabilityTest,
     }
   }
 
+  ASSERT_EQ(GetSnapshotsList().size(), 1);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
   // Recover snapshot.
   storage::Storage store({.items = {.properties_on_edges = false},
                           .durability = {.storage_directory = storage_directory,
@@ -762,3 +827,468 @@ TEST_F(DurabilityTest,
     ASSERT_FALSE(acc.Commit().HasError());
   }
 }
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalBasic) {
+  // Create WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    CreateBaseDataset(&store, GetParam());
+    CreateExtendedDataset(&store);
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalCreateInSingleTransaction) {
+  // NOLINTNEXTLINE(readability-isolate-declaration)
+  storage::Gid gid_v1, gid_v2, gid_e1, gid_v3;
+
+  // Create WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    auto acc = store.Access();
+    auto v1 = acc.CreateVertex();
+    gid_v1 = v1.Gid();
+    auto v2 = acc.CreateVertex();
+    gid_v2 = v2.Gid();
+    auto e1 = acc.CreateEdge(&v1, &v2, store.NameToEdgeType("e1"));
+    ASSERT_TRUE(e1.HasValue());
+    gid_e1 = e1->Gid();
+    ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l11")).HasValue());
+    ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l12")).HasValue());
+    ASSERT_TRUE(v1.AddLabel(store.NameToLabel("l13")).HasValue());
+    if (GetParam()) {
+      ASSERT_TRUE(e1->SetProperty(store.NameToProperty("test"),
+                                  storage::PropertyValue("nandare"))
+                      .HasValue());
+    }
+    ASSERT_TRUE(v2.AddLabel(store.NameToLabel("l21")).HasValue());
+    ASSERT_TRUE(v2.SetProperty(store.NameToProperty("hello"),
+                               storage::PropertyValue("world"))
+                    .HasValue());
+    auto v3 = acc.CreateVertex();
+    gid_v3 = v3.Gid();
+    ASSERT_TRUE(
+        v3.SetProperty(store.NameToProperty("v3"), storage::PropertyValue(42))
+            .HasValue());
+    ASSERT_FALSE(acc.Commit().HasError());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalCreateAndRemoveEverything) {
+  // Create WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    CreateBaseDataset(&store, GetParam());
+    CreateExtendedDataset(&store);
+    auto indices = store.ListAllIndices();
+    for (auto index : indices.label) {
+      ASSERT_TRUE(store.DropIndex(index));
+    }
+    for (auto index : indices.label_property) {
+      ASSERT_TRUE(store.DropIndex(index.first, index.second));
+    }
+    auto constraints = store.ListAllConstraints();
+    for (auto constraint : constraints.existence) {
+      ASSERT_TRUE(
+          store.DropExistenceConstraint(constraint.first, constraint.second));
+    }
+    auto acc = store.Access();
+    for (auto vertex : acc.Vertices(storage::View::OLD)) {
+      ASSERT_TRUE(acc.DetachDeleteVertex(&vertex).HasValue());
+    }
+    ASSERT_FALSE(acc.Commit().HasError());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalTransactionOrdering) {
+  // NOLINTNEXTLINE(readability-isolate-declaration)
+  storage::Gid gid1, gid2, gid3;
+
+  // Create WAL.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery,
+                        .wal_file_size_kibibytes = 100000}});
+    auto acc1 = store.Access();
+    auto acc2 = store.Access();
+
+    // Create vertex in transaction 2.
+    {
+      auto vertex2 = acc2.CreateVertex();
+      gid2 = vertex2.Gid();
+      ASSERT_TRUE(vertex2
+                      .SetProperty(store.NameToProperty("id"),
+                                   storage::PropertyValue(2))
+                      .HasValue());
+    }
+
+    auto acc3 = store.Access();
+
+    // Create vertex in transaction 3.
+    {
+      auto vertex3 = acc3.CreateVertex();
+      gid3 = vertex3.Gid();
+      ASSERT_TRUE(vertex3
+                      .SetProperty(store.NameToProperty("id"),
+                                   storage::PropertyValue(3))
+                      .HasValue());
+    }
+
+    // Create vertex in transaction 1.
+    {
+      auto vertex1 = acc1.CreateVertex();
+      gid1 = vertex1.Gid();
+      ASSERT_TRUE(vertex1
+                      .SetProperty(store.NameToProperty("id"),
+                                   storage::PropertyValue(1))
+                      .HasValue());
+    }
+
+    // Commit transaction 3, then 1, then 2.
+    ASSERT_FALSE(acc3.Commit().HasError());
+    ASSERT_FALSE(acc1.Commit().HasError());
+    ASSERT_FALSE(acc2.Commit().HasError());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_EQ(GetWalsList().size(), 1);
+
+  // Verify WAL data.
+  {
+    auto path = GetWalsList().front();
+    auto info = storage::ReadWalInfo(path);
+    storage::Decoder wal;
+    wal.Initialize(path, storage::kWalMagic);
+    wal.SetPosition(info.offset_deltas);
+    ASSERT_EQ(info.num_deltas, 9);
+    std::vector<std::pair<uint64_t, storage::WalDeltaData>> data;
+    for (uint64_t i = 0; i < info.num_deltas; ++i) {
+      auto timestamp = storage::ReadWalDeltaHeader(&wal);
+      data.emplace_back(timestamp, storage::ReadWalDeltaData(&wal));
+    }
+    // Verify timestamps.
+    ASSERT_EQ(data[1].first, data[0].first);
+    ASSERT_EQ(data[2].first, data[1].first);
+    ASSERT_GT(data[3].first, data[2].first);
+    ASSERT_EQ(data[4].first, data[3].first);
+    ASSERT_EQ(data[5].first, data[4].first);
+    ASSERT_GT(data[6].first, data[5].first);
+    ASSERT_EQ(data[7].first, data[6].first);
+    ASSERT_EQ(data[8].first, data[7].first);
+    // Verify transaction 3.
+    ASSERT_EQ(data[0].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
+    ASSERT_EQ(data[0].second.vertex_create_delete.gid, gid3);
+    ASSERT_EQ(data[1].second.type,
+              storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
+    ASSERT_EQ(data[1].second.vertex_edge_set_property.gid, gid3);
+    ASSERT_EQ(data[1].second.vertex_edge_set_property.property, "id");
+    ASSERT_EQ(data[1].second.vertex_edge_set_property.value,
+              storage::PropertyValue(3));
+    ASSERT_EQ(data[2].second.type,
+              storage::WalDeltaData::Type::TRANSACTION_END);
+    // Verify transaction 1.
+    ASSERT_EQ(data[3].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
+    ASSERT_EQ(data[3].second.vertex_create_delete.gid, gid1);
+    ASSERT_EQ(data[4].second.type,
+              storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
+    ASSERT_EQ(data[4].second.vertex_edge_set_property.gid, gid1);
+    ASSERT_EQ(data[4].second.vertex_edge_set_property.property, "id");
+    ASSERT_EQ(data[4].second.vertex_edge_set_property.value,
+              storage::PropertyValue(1));
+    ASSERT_EQ(data[5].second.type,
+              storage::WalDeltaData::Type::TRANSACTION_END);
+    // Verify transaction 2.
+    ASSERT_EQ(data[6].second.type, storage::WalDeltaData::Type::VERTEX_CREATE);
+    ASSERT_EQ(data[6].second.vertex_create_delete.gid, gid2);
+    ASSERT_EQ(data[7].second.type,
+              storage::WalDeltaData::Type::VERTEX_SET_PROPERTY);
+    ASSERT_EQ(data[7].second.vertex_edge_set_property.gid, gid2);
+    ASSERT_EQ(data[7].second.vertex_edge_set_property.property, "id");
+    ASSERT_EQ(data[7].second.vertex_edge_set_property.value,
+              storage::PropertyValue(2));
+    ASSERT_EQ(data[8].second.type,
+              storage::WalDeltaData::Type::TRANSACTION_END);
+  }
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalCreateAndRemoveOnlyBaseDataset) {
+  // Create WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    CreateBaseDataset(&store, GetParam());
+    CreateExtendedDataset(&store);
+    auto label_indexed = store.NameToLabel("base_indexed");
+    auto label_unindexed = store.NameToLabel("base_unindexed");
+    auto acc = store.Access();
+    for (auto vertex : acc.Vertices(storage::View::OLD)) {
+      auto has_indexed = vertex.HasLabel(label_indexed, storage::View::OLD);
+      ASSERT_TRUE(has_indexed.HasValue());
+      auto has_unindexed = vertex.HasLabel(label_unindexed, storage::View::OLD);
+      if (!*has_indexed && !*has_unindexed) continue;
+      ASSERT_TRUE(acc.DetachDeleteVertex(&vertex).HasValue());
+    }
+    ASSERT_FALSE(acc.Commit().HasError());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalDeathResilience) {
+  pid_t pid = fork();
+  if (pid == 0) {
+    // Create WALs.
+    {
+      storage::Storage store(
+          {.items = {.properties_on_edges = GetParam()},
+           .durability = {.storage_directory = storage_directory,
+                          .snapshot_wal_mode = storage::Config::Durability::
+                              SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                          .snapshot_interval = std::chrono::minutes(20),
+                          .wal_file_flush_every_n_tx = kFlushWalEvery}});
+      // Create one million vertices.
+      for (uint64_t i = 0; i < 1000000; ++i) {
+        auto acc = store.Access();
+        acc.CreateVertex();
+        CHECK(!acc.Commit().HasError()) << "Couldn't commit transaction!";
+      }
+    }
+  } else if (pid > 0) {
+    // Wait for WALs to be created.
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    int status;
+    EXPECT_EQ(waitpid(pid, &status, WNOHANG), 0);
+    EXPECT_EQ(kill(pid, SIGKILL), 0);
+    EXPECT_EQ(waitpid(pid, &status, 0), pid);
+    EXPECT_NE(status, 0);
+  } else {
+    LOG(FATAL) << "Couldn't create process to execute test!";
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalAllOperationsInSingleTransaction) {
+  // Create WALs
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_size_kibibytes = 1,
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    auto acc = store.Access();
+    auto vertex1 = acc.CreateVertex();
+    auto vertex2 = acc.CreateVertex();
+    ASSERT_TRUE(vertex1.AddLabel(acc.NameToLabel("nandare")).HasValue());
+    ASSERT_TRUE(vertex2
+                    .SetProperty(acc.NameToProperty("haihai"),
+                                 storage::PropertyValue(42))
+                    .HasValue());
+    ASSERT_TRUE(vertex1.RemoveLabel(acc.NameToLabel("nandare")).HasValue());
+    auto edge1 = acc.CreateEdge(&vertex1, &vertex2, acc.NameToEdgeType("et1"));
+    ASSERT_TRUE(edge1.HasValue());
+    ASSERT_TRUE(
+        vertex2
+            .SetProperty(acc.NameToProperty("haihai"), storage::PropertyValue())
+            .HasValue());
+    auto vertex3 = acc.CreateVertex();
+    auto edge2 = acc.CreateEdge(&vertex3, &vertex3, acc.NameToEdgeType("et2"));
+    ASSERT_TRUE(edge2.HasValue());
+    if (GetParam()) {
+      ASSERT_TRUE(edge2
+                      ->SetProperty(acc.NameToProperty("meaning"),
+                                    storage::PropertyValue(true))
+                      .HasValue());
+      ASSERT_TRUE(edge1
+                      ->SetProperty(acc.NameToProperty("hello"),
+                                    storage::PropertyValue("world"))
+                      .HasValue());
+      ASSERT_TRUE(edge2
+                      ->SetProperty(acc.NameToProperty("meaning"),
+                                    storage::PropertyValue())
+                      .HasValue());
+    }
+    ASSERT_TRUE(vertex3.AddLabel(acc.NameToLabel("test")).HasValue());
+    ASSERT_TRUE(vertex3
+                    .SetProperty(acc.NameToProperty("nonono"),
+                                 storage::PropertyValue(-1))
+                    .HasValue());
+    ASSERT_TRUE(
+        vertex3
+            .SetProperty(acc.NameToProperty("nonono"), storage::PropertyValue())
+            .HasValue());
+    if (GetParam()) {
+      ASSERT_TRUE(edge1
+                      ->SetProperty(acc.NameToProperty("hello"),
+                                    storage::PropertyValue())
+                      .HasValue());
+    }
+    ASSERT_TRUE(vertex3.RemoveLabel(acc.NameToLabel("test")).HasValue());
+    ASSERT_TRUE(acc.DetachDeleteVertex(&vertex1).HasValue());
+    ASSERT_TRUE(acc.DeleteEdge(&*edge2).HasValue());
+    ASSERT_TRUE(acc.DeleteVertex(&vertex2).HasValue());
+    ASSERT_TRUE(acc.DeleteVertex(&vertex3).HasValue());
+    ASSERT_FALSE(acc.Commit().HasError());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalAndSnapshot) {
+  // Create snapshot and WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::milliseconds(2000),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    CreateBaseDataset(&store, GetParam());
+    std::this_thread::sleep_for(std::chrono::milliseconds(2500));
+    CreateExtendedDataset(&store);
+  }
+
+  ASSERT_GE(GetSnapshotsList().size(), 1);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshot) {
+  // Create snapshot.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_on_exit = true}});
+    CreateBaseDataset(&store, GetParam());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 1);
+  ASSERT_EQ(GetWalsList().size(), 0);
+
+  // Recover snapshot.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .recover_on_startup = true}});
+    VerifyBaseDataset(&store, GetParam(), false);
+  }
+
+  // Recover snapshot and create WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .recover_on_startup = true,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    CreateExtendedDataset(&store);
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 1);
+  ASSERT_GE(GetWalsList().size(), 1);
+}
+
+// NOLINTNEXTLINE(hicpp-special-member-functions)
+TEST_P(DurabilityTest, WalAndSnapshotWalRetention) {
+  // Create unrelated WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::minutes(20),
+                        .wal_file_size_kibibytes = 1,
+                        .wal_file_flush_every_n_tx = kFlushWalEvery}});
+    auto acc = store.Access();
+    for (uint64_t i = 0; i < 1000; ++i) {
+      acc.CreateVertex();
+    }
+    ASSERT_FALSE(acc.Commit().HasError());
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 0);
+  ASSERT_GE(GetWalsList().size(), 1);
+
+  uint64_t unrelated_wals = GetWalsList().size();
+
+  uint64_t items_created = 0;
+
+  // Create snapshot and WALs.
+  {
+    storage::Storage store(
+        {.items = {.properties_on_edges = GetParam()},
+         .durability = {.storage_directory = storage_directory,
+                        .snapshot_wal_mode = storage::Config::Durability::
+                            SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
+                        .snapshot_interval = std::chrono::seconds(2),
+                        .wal_file_size_kibibytes = 1,
+                        .wal_file_flush_every_n_tx = 1}});
+    utils::Timer timer;
+    // Allow at least 6 snapshots to be created.
+    while (timer.Elapsed().count() < 13.0) {
+      auto acc = store.Access();
+      acc.CreateVertex();
+      ASSERT_FALSE(acc.Commit().HasError());
+      ++items_created;
+    }
+  }
+
+  ASSERT_EQ(GetSnapshotsList().size(), 3);
+  ASSERT_GE(GetWalsList().size(), unrelated_wals + 1);
+}