From 145c81376f796e0810420d5cc165fd08a35ac4be Mon Sep 17 00:00:00 2001
From: Matija Santl <matija.santl@memgraph.com>
Date: Thu, 31 Jan 2019 13:40:17 +0100
Subject: [PATCH] Add log compaction for Raft, pt. 2

Summary: Implemented snapshot replication and log compaction.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1840
---
 .gitignore                                    |   2 +
 src/CMakeLists.txt                            |   2 +
 src/database/single_node_ha/graph_db.cpp      |  18 +-
 src/database/single_node_ha/graph_db.hpp      |   7 +-
 src/durability/single_node_ha/paths.cpp       |  11 +-
 src/durability/single_node_ha/paths.hpp       |   7 +-
 src/durability/single_node_ha/recovery.cpp    |  30 +-
 src/durability/single_node_ha/recovery.hpp    |  17 +-
 src/durability/single_node_ha/snapshooter.cpp |   5 +-
 src/durability/single_node_ha/snapshooter.hpp |   4 +-
 src/raft/config.hpp                           |   2 +-
 src/raft/coordination.cpp                     |   9 +-
 src/raft/coordination.hpp                     |  11 +-
 src/raft/raft_rpc_messages.lcp                |  50 ++-
 src/raft/raft_server.cpp                      | 351 +++++++++++++++---
 src/raft/raft_server.hpp                      |  57 ++-
 src/raft/snapshot_metadata.lcp                |  27 ++
 tests/feature_benchmark/ha/raft.json          |   2 +-
 tests/feature_benchmark/ha/runner.sh          |   2 +-
 tests/integration/ha_basic/raft.json          |   2 +-
 20 files changed, 469 insertions(+), 147 deletions(-)
 create mode 100644 src/raft/snapshot_metadata.lcp

diff --git a/.gitignore b/.gitignore
index 7bb06e392..435b236ee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -92,6 +92,8 @@ src/raft/log_entry.capnp
 src/raft/log_entry.hpp
 src/raft/raft_rpc_messages.capnp
 src/raft/raft_rpc_messages.hpp
+src/raft/snapshot_metadata.capnp
+src/raft/snapshot_metadata.hpp
 src/stats/stats_rpc_messages.capnp
 src/stats/stats_rpc_messages.hpp
 src/storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8d78e4699..9c129bc8b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -305,6 +305,8 @@ add_lcp_single_node_ha(raft/raft_rpc_messages.lcp CAPNP_SCHEMA @0xa6c29b4287233b
 add_capnp_single_node_ha(raft/raft_rpc_messages.capnp)
 add_lcp_single_node_ha(raft/log_entry.lcp CAPNP_SCHEMA @0x96c07fe13850c22a)
 add_capnp_single_node_ha(raft/log_entry.capnp)
+add_lcp_single_node_ha(raft/snapshot_metadata.lcp CAPNP_SCHEMA @0xaa08e34991680f6c)
+add_capnp_single_node_ha(raft/snapshot_metadata.capnp)
 
 add_custom_target(generate_lcp_single_node_ha DEPENDS ${generated_lcp_single_node_ha_files})
 
diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp
index 90a9b4572..40b96727d 100644
--- a/src/database/single_node_ha/graph_db.cpp
+++ b/src/database/single_node_ha/graph_db.cpp
@@ -39,19 +39,13 @@ void GraphDb::Start() {
 
 GraphDb::~GraphDb() {}
 
-bool GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
-  bool ret =
-      coordination_.AwaitShutdown([this, &call_before_shutdown]() -> bool {
-        is_accepting_transactions_ = false;
-        tx_engine_.LocalForEachActiveTransaction(
-            [](auto &t) { t.set_should_abort(); });
+void GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
+  coordination_.AwaitShutdown([this, &call_before_shutdown]() {
+    tx_engine_.LocalForEachActiveTransaction(
+        [](auto &t) { t.set_should_abort(); });
 
-        call_before_shutdown();
-
-        return true;
-      });
-
-  return ret;
+    call_before_shutdown();
+  });
 }
 
 void GraphDb::Shutdown() {
diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp
index da9b2b8d0..9d4d3a7ab 100644
--- a/src/database/single_node_ha/graph_db.hpp
+++ b/src/database/single_node_ha/graph_db.hpp
@@ -96,7 +96,7 @@ class GraphDb {
   GraphDb &operator=(GraphDb &&) = delete;
 
   void Start();
-  bool AwaitShutdown(std::function<void(void)> call_before_shutdown);
+  void AwaitShutdown(std::function<void(void)> call_before_shutdown);
   void Shutdown();
 
   /// Create a new accessor by starting a new transaction.
@@ -122,9 +122,6 @@ class GraphDb {
   /// might end up with some stale transactions on the leader.
   void Reset();
 
-  /// When this is false, no new transactions should be created.
-  bool is_accepting_transactions() const { return is_accepting_transactions_; }
-
   /// Get live view of storage stats. Gets updated on RefreshStat.
   const Stat &GetStat() const { return stat_; }
 
@@ -146,8 +143,6 @@ class GraphDb {
  protected:
   Stat stat_;
 
-  std::atomic<bool> is_accepting_transactions_{true};
-
   utils::Scheduler transaction_killer_;
 
   Config config_;
diff --git a/src/durability/single_node_ha/paths.cpp b/src/durability/single_node_ha/paths.cpp
index 07bbe1c27..be1ef6cbe 100644
--- a/src/durability/single_node_ha/paths.cpp
+++ b/src/durability/single_node_ha/paths.cpp
@@ -14,13 +14,16 @@ namespace durability {
 
 namespace fs = std::experimental::filesystem;
 
-fs::path MakeSnapshotPath(const fs::path &durability_dir,
-                          tx::TransactionId tx_id) {
+std::string GetSnapshotFilename(tx::TransactionId tx_id) {
   std::string date_str =
       utils::Timestamp(utils::Timestamp::Now())
           .ToString("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
-  auto file_name = date_str + "_tx_" + std::to_string(tx_id);
-  return durability_dir / kSnapshotDir / file_name;
+  return date_str + "_tx_" + std::to_string(tx_id);
+}
+
+fs::path MakeSnapshotPath(const fs::path &durability_dir,
+                          const std::string &snapshot_filename) {
+  return durability_dir / kSnapshotDir / snapshot_filename;
 }
 
 std::experimental::optional<tx::TransactionId>
diff --git a/src/durability/single_node_ha/paths.hpp b/src/durability/single_node_ha/paths.hpp
index 2f17de8b3..4e04b94cc 100644
--- a/src/durability/single_node_ha/paths.hpp
+++ b/src/durability/single_node_ha/paths.hpp
@@ -9,12 +9,15 @@ namespace durability {
 const std::string kSnapshotDir = "snapshots";
 const std::string kBackupDir = ".backup";
 
-/// Generates a path for a DB snapshot in the given folder in a well-defined
+/// Generates a filename for a DB snapshot in the given folder in a well-defined
 /// sortable format with transaction from which the snapshot is created appended
 /// to the file name.
+std::string GetSnapshotFilename(tx::TransactionId tx_id);
+
+/// Generates a full path for a DB snapshot.
 std::experimental::filesystem::path MakeSnapshotPath(
     const std::experimental::filesystem::path &durability_dir,
-    tx::TransactionId tx_id);
+    const std::string &snapshot_filename);
 
 /// Returns the transaction id contained in the file name. If the filename is
 /// not a parseable snapshot file name, nullopt is returned.
diff --git a/src/durability/single_node_ha/recovery.cpp b/src/durability/single_node_ha/recovery.cpp
index 6bb3687cb..dc509b0b0 100644
--- a/src/durability/single_node_ha/recovery.cpp
+++ b/src/durability/single_node_ha/recovery.cpp
@@ -147,26 +147,24 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
 
 }  // anonymous namespace
 
-bool RecoverOnlySnapshot(const fs::path &durability_dir, database::GraphDb *db,
-                         RecoveryData *recovery_data) {
-  // Attempt to recover from snapshot files in reverse order (from newest
-  // backwards).
+bool RecoverSnapshot(database::GraphDb *db, RecoveryData *recovery_data,
+                     const fs::path &durability_dir,
+                     const std::string &snapshot_filename) {
   const auto snapshot_dir = durability_dir / kSnapshotDir;
-  std::vector<fs::path> snapshot_files;
-
-  if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
-    for (auto &file : fs::directory_iterator(snapshot_dir))
-      snapshot_files.emplace_back(file);
-
-  if (snapshot_files.size() != 1) {
-    LOG(WARNING) << "Expected only one snapshot file for recovery!";
+  if (!fs::exists(snapshot_dir) || !fs::is_directory(snapshot_dir)) {
+    LOG(WARNING) << "Missing snapshot directory!";
     return false;
   }
 
-  auto snapshot_file = *snapshot_files.begin();
-  LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
-  if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
-    LOG(WARNING) << "Snapshot recovery failed";
+  const auto snapshot = snapshot_dir / snapshot_filename;
+  if (!fs::exists(snapshot)) {
+    LOG(WARNING) << "Missing snapshot file!";
+    return false;
+  }
+
+  LOG(INFO) << "Starting snapshot recovery from: " << snapshot;
+  if (!RecoverSnapshot(snapshot, db, recovery_data)) {
+    LOG(WARNING) << "Snapshot recovery failed.";
     return false;
   }
 
diff --git a/src/durability/single_node_ha/recovery.hpp b/src/durability/single_node_ha/recovery.hpp
index 05034415b..be8e72c98 100644
--- a/src/durability/single_node_ha/recovery.hpp
+++ b/src/durability/single_node_ha/recovery.hpp
@@ -44,17 +44,20 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
                          int64_t &edge_count, uint64_t &hash);
 
 /**
- * Recovers database from the latest possible snapshot. If recovering fails,
- * false is returned and db_accessor aborts transaction, else true is returned
- * and transaction is commited.
+ * Recovers database from the given snapshot. If recovering fails, false is
+ * returned and db_accessor aborts transaction, else true is returned and
+ * transaction is commited.
  *
- * @param durability_dir - Path to durability directory.
  * @param db - The database to recover into.
+ * @param recovery_data - Struct that will contain additional recovery data.
+ * @param durability_dir - Path to durability directory.
+ * @param snapshot_filename - Snapshot filename.
  * @return - recovery info
  */
-bool RecoverOnlySnapshot(
-    const std::experimental::filesystem::path &durability_dir,
-    database::GraphDb *db, durability::RecoveryData *recovery_data);
+bool RecoverSnapshot(database::GraphDb *db,
+                     durability::RecoveryData *recovery_data,
+                     const std::experimental::filesystem::path &durability_dir,
+                     const std::string &snapshot_filename);
 
 void RecoverIndexes(database::GraphDb *db,
                     const std::vector<IndexRecoveryData> &indexes);
diff --git a/src/durability/single_node_ha/snapshooter.cpp b/src/durability/single_node_ha/snapshooter.cpp
index 8c041d174..f63eb0c44 100644
--- a/src/durability/single_node_ha/snapshooter.cpp
+++ b/src/durability/single_node_ha/snapshooter.cpp
@@ -94,10 +94,11 @@ void RemoveOldSnapshots(const fs::path &snapshot_dir, uint16_t keep) {
 }  // namespace
 
 bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
-                  const fs::path &durability_dir) {
+                  const fs::path &durability_dir,
+                  const std::string &snapshot_filename) {
   if (!utils::EnsureDir(durability_dir / kSnapshotDir)) return false;
   const auto snapshot_file =
-      MakeSnapshotPath(durability_dir, dba.transaction_id());
+      MakeSnapshotPath(durability_dir, snapshot_filename);
   if (fs::exists(snapshot_file)) return false;
   if (Encode(snapshot_file, db, dba)) {
     // Only keep the latest snapshot.
diff --git a/src/durability/single_node_ha/snapshooter.hpp b/src/durability/single_node_ha/snapshooter.hpp
index c8baacab0..b884d79ce 100644
--- a/src/durability/single_node_ha/snapshooter.hpp
+++ b/src/durability/single_node_ha/snapshooter.hpp
@@ -11,8 +11,10 @@ namespace durability {
 /// @param dba - db accessor with which we are creating a snapshot (reading
 ///              data)
 /// @param durability_dir - directory where durability data is stored.
+/// @param snapshot_filename - filename for the snapshot.
 bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
-                  const std::experimental::filesystem::path &durability_dir);
+                  const std::experimental::filesystem::path &durability_dir,
+                  const std::string &snapshot_filename);
 
 /// Remove all snapshots inside the snapshot durability directory.
 void RemoveAllSnapshots(
diff --git a/src/raft/config.hpp b/src/raft/config.hpp
index 2b8fd96d1..2ac793e17 100644
--- a/src/raft/config.hpp
+++ b/src/raft/config.hpp
@@ -19,7 +19,7 @@ struct Config {
   std::chrono::milliseconds election_timeout_min;
   std::chrono::milliseconds election_timeout_max;
   std::chrono::milliseconds heartbeat_interval;
-  uint64_t log_size_snapshot_threshold;
+  int64_t log_size_snapshot_threshold;
 
   static Config LoadFromFile(const std::string &raft_config_file) {
     if (!std::experimental::filesystem::exists(raft_config_file))
diff --git a/src/raft/coordination.cpp b/src/raft/coordination.cpp
index 5a318724e..68532cab2 100644
--- a/src/raft/coordination.cpp
+++ b/src/raft/coordination.cpp
@@ -96,22 +96,19 @@ bool Coordination::Start() {
   return true;
 }
 
-bool Coordination::AwaitShutdown(
-    std::function<bool(void)> call_before_shutdown) {
+void Coordination::AwaitShutdown(
+    std::function<void(void)> call_before_shutdown) {
   // Wait for a shutdown notification.
   while (alive_) {
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
   }
 
   // Call the before shutdown callback.
-  bool ret = call_before_shutdown();
+  call_before_shutdown();
 
   // Shutdown our RPC server.
   server_.Shutdown();
   server_.AwaitShutdown();
-
-  // Return `true` if the `call_before_shutdown` succeeded.
-  return ret;
 }
 
 void Coordination::Shutdown() { alive_.store(false); }
diff --git a/src/raft/coordination.hpp b/src/raft/coordination.hpp
index 1aed88c17..e4ceefaf3 100644
--- a/src/raft/coordination.hpp
+++ b/src/raft/coordination.hpp
@@ -68,8 +68,8 @@ class Coordination final {
   template <typename TResult>
   auto ExecuteOnWorker(
       int worker_id,
-      std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
-          execute) {
+      const std::function<TResult(int worker_id,
+                                  communication::rpc::ClientPool &)> &execute) {
     auto client_pool = GetClientPool(worker_id);
     return thread_pool_.Run(execute, worker_id, std::ref(*client_pool));
   }
@@ -79,8 +79,8 @@ class Coordination final {
   template <typename TResult>
   auto ExecuteOnWorkers(
       int skip_worker_id,
-      std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
-          execute) {
+      const std::function<TResult(int worker_id,
+                                  communication::rpc::ClientPool &)> &execute) {
     std::vector<std::future<TResult>> futures;
     for (auto &worker_id : GetWorkerIds()) {
       if (worker_id == skip_worker_id) continue;
@@ -112,8 +112,7 @@ class Coordination final {
   /// Starts the coordination and its servers.
   bool Start();
 
-  bool AwaitShutdown(std::function<bool(void)> call_before_shutdown =
-                         []() -> bool { return true; });
+  void AwaitShutdown(std::function<void(void)> call_before_shutdown);
 
   /// Hints that the coordination should start shutting down the whole cluster.
   void Shutdown();
diff --git a/src/raft/raft_rpc_messages.lcp b/src/raft/raft_rpc_messages.lcp
index 779d6e1c8..42062cd11 100644
--- a/src/raft/raft_rpc_messages.lcp
+++ b/src/raft/raft_rpc_messages.lcp
@@ -1,18 +1,21 @@
 #>cpp
 #pragma once
 
+#include <cstring>
 #include <vector>
 
 #include "communication/rpc/messages.hpp"
 #include "raft/log_entry.hpp"
 #include "raft/raft_rpc_messages.capnp.h"
+#include "raft/snapshot_metadata.hpp"
 cpp<#
 
 (lcp:namespace raft)
 
 (lcp:capnp-namespace "raft")
 
-(lcp:capnp-import 'raft "/raft/log_entry.capnp")
+(lcp:capnp-import 'log "/raft/log_entry.capnp")
+(lcp:capnp-import 'snap "/raft/snapshot_metadata.capnp")
 
 (lcp:define-rpc request-vote
   (:request
@@ -31,9 +34,52 @@ cpp<#
      (term :uint64_t)
      (prev-log-index :uint64_t)
      (prev-log-term :uint64_t)
-     (entries "std::vector<raft::LogEntry>" :capnp-type "List(Raft.LogEntry)")))
+     (entries "std::vector<raft::LogEntry>" :capnp-type "List(Log.LogEntry)")))
   (:response
     ((success :bool)
      (term :uint64_t))))
 
+(lcp:define-rpc install-snapshot
+  (:request
+    ((leader-id :uint16_t)
+     (term :uint64_t)
+     (snapshot-metadata "raft::SnapshotMetadata" :capnp-type "Snap.SnapshotMetadata")
+     (data "std::unique_ptr<char[]>"
+           :initarg :move
+					 :capnp-type "Data"
+					 :capnp-init nil
+           :capnp-save (lambda (builder member capnp-name)
+                         #>cpp
+                         auto data_builder = ${builder}->initData(self.size);
+                         memcpy(data_builder.begin(), ${member}.get(), self.size);
+                         cpp<#)
+					 :slk-save (lambda (member)
+                         #>cpp
+                         slk::Save(self.size, builder);
+                         for (uint32_t i = 0; i < self.size; ++i) {
+                           slk::Save(self.data[i], builder);
+                         }
+                         cpp<#)
+           :capnp-load (lambda (reader member capnp-name)
+                         (declare (ignore capnp-name))
+                         #>cpp
+                         auto data_reader = ${reader}.getData();
+                         self->size = data_reader.size();
+                         ${member}.reset(new char[self->size]);
+                         memcpy(${member}.get(), data_reader.begin(), self->size);
+                         cpp<#)
+           :slk-load (lambda (member)
+                         #>cpp
+                         slk::Load(&self->size, reader);
+                         self->data.reset(new char[self->size]);
+                         for (uint32_t i = 0; i < self->size; ++i) {
+                           uint8_t curr;
+                           slk::Load(&curr, reader);
+                           self->data[i] = curr;
+                         }
+                         cpp<#))
+     (size :uint32_t :dont-save t)))
+  (:response
+    ((term :uint64_t))))
+
 (lcp:pop-namespace) ;; raft
diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp
index 72751acac..8d4225410 100644
--- a/src/raft/raft_server.cpp
+++ b/src/raft/raft_server.cpp
@@ -2,7 +2,7 @@
 
 #include <kj/std/iostream.h>
 #include <chrono>
-#include <experimental/filesystem>
+#include <iostream>
 #include <memory>
 
 #include <fmt/format.h>
@@ -10,13 +10,13 @@
 #include <glog/logging.h>
 
 #include "database/graph_db_accessor.hpp"
+#include "durability/single_node_ha/paths.hpp"
 #include "durability/single_node_ha/recovery.hpp"
 #include "durability/single_node_ha/snapshooter.hpp"
 #include "raft/exceptions.hpp"
 #include "utils/exceptions.hpp"
 #include "utils/on_scope_exit.hpp"
 #include "utils/serialization.hpp"
-#include "utils/string.hpp"
 #include "utils/thread.hpp"
 
 namespace raft {
@@ -44,7 +44,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
       rlog_(std::make_unique<ReplicationLog>()),
       mode_(Mode::FOLLOWER),
       server_id_(server_id),
-      durability_dir_(durability_dir),
+      durability_dir_(fs::path(durability_dir)),
       db_recover_on_startup_(db_recover_on_startup),
       commit_index_(0),
       last_applied_(0),
@@ -54,14 +54,15 @@ void RaftServer::Start() {
   if (db_recover_on_startup_) {
     auto snapshot_metadata = GetSnapshotMetadata();
     if (snapshot_metadata) {
-      RecoverSnapshot();
+      RecoverSnapshot(snapshot_metadata->snapshot_filename);
 
-      last_applied_ = snapshot_metadata->second;
-      commit_index_ = snapshot_metadata->second;
+      last_applied_ = snapshot_metadata->last_included_index;
+      commit_index_ = snapshot_metadata->last_included_index;
     }
   } else {
     // We need to clear persisted data if we don't want any recovery.
     disk_storage_.DeletePrefix("");
+    disk_storage_.Put(kLogSizeKey, "0");
     durability::RemoveAllSnapshots(durability_dir_);
   }
 
@@ -131,7 +132,7 @@ void RaftServer::Start() {
     // "If a server receives a request with a stale term, it rejects the
     // request"
     uint64_t current_term = CurrentTerm();
-    if (req.term < current_term) {
+    if (exiting_ || req.term < current_term) {
       AppendEntriesRes res(false, current_term);
       Save(res, res_builder);
       return;
@@ -157,11 +158,29 @@ void RaftServer::Start() {
     //     term, then they store the same command.
     //   - If two entries in different logs have the same index and term,
     //     then the logs are identical in all preceding entries.
-    if (LogSize() <= req.prev_log_index ||
-        GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
+    auto snapshot_metadata = GetSnapshotMetadata();
+
+    if (snapshot_metadata &&
+        snapshot_metadata->last_included_index == req.prev_log_index) {
+      if (req.prev_log_term != snapshot_metadata->last_included_term) {
+        AppendEntriesRes res(false, current_term);
+        Save(res, res_builder);
+        return;
+      }
+    } else if (snapshot_metadata &&
+               snapshot_metadata->last_included_index > req.prev_log_index) {
+      LOG(ERROR) << "Received entries that are already commited and have been "
+                    "compacted";
       AppendEntriesRes res(false, current_term);
       Save(res, res_builder);
       return;
+    } else {
+      if (LogSize() <= req.prev_log_index ||
+          GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
+        AppendEntriesRes res(false, current_term);
+        Save(res, res_builder);
+        return;
+      }
     }
 
     // [Raft paper figure 2]
@@ -194,6 +213,92 @@ void RaftServer::Start() {
     Save(res, res_builder);
   });
 
+  coordination_->Register<InstallSnapshotRpc>(
+      [this](const auto &req_reader, auto *res_builder) {
+        // Acquire snapshot lock.
+        std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
+        std::lock_guard<std::mutex> guard(lock_);
+
+        InstallSnapshotReq req;
+        Load(&req, req_reader);
+
+        uint64_t current_term = CurrentTerm();
+        if (exiting_ || req.term < current_term) {
+          InstallSnapshotRes res(current_term);
+          Save(res, res_builder);
+          return;
+        }
+
+        // Check if the current state matches the one in snapshot
+        if (req.snapshot_metadata.last_included_index == last_applied_ &&
+            req.snapshot_metadata.last_included_term == current_term) {
+          InstallSnapshotRes res(current_term);
+          Save(res, res_builder);
+          return;
+        }
+
+        VLOG(40) << "[InstallSnapshotRpc] Starting.";
+
+        if (req.term > current_term) {
+          VLOG(40) << "[InstallSnapshotRpc] Updating term.";
+          UpdateTerm(req.term);
+          if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
+        }
+
+        utils::OnScopeExit extend_election_timeout([this] {
+          SetNextElectionTimePoint();
+          election_change_.notify_all();
+        });
+
+        // Temporary freeze election timer while we handle the snapshot.
+        next_election_ = TimePoint::max();
+
+        VLOG(40) << "[InstallSnapshotRpc] Remove all snapshots.";
+        // Remove all previous snapshots
+        durability::RemoveAllSnapshots(durability_dir_);
+
+        const auto snapshot_path = durability::MakeSnapshotPath(
+            durability_dir_, req.snapshot_metadata.snapshot_filename);
+
+        // Save snapshot file
+        {
+          VLOG(40) << "[InstallSnapshotRpc] Saving received snapshot.";
+          std::ofstream output_stream;
+          output_stream.open(snapshot_path, std::ios::out | std::ios::binary);
+          output_stream.write(req.data.get(), req.size);
+          output_stream.flush();
+          output_stream.close();
+        }
+
+        // Discard the all logs. We keep the one at index 0.
+        VLOG(40) << "[InstallSnapshotRpc] Discarding logs.";
+        uint64_t log_size = LogSize();
+        for (uint64_t i = 1; i < log_size; ++i)
+          disk_storage_.Delete(LogEntryKey(i));
+
+        // Reset the database.
+        VLOG(40) << "[InstallSnapshotRpc] Reset database.";
+        db_->Reset();
+
+        // Apply the snapshot.
+        VLOG(40) << "[InstallSnapshotRpc] Recover from received snapshot.";
+        RecoverSnapshot(req.snapshot_metadata.snapshot_filename);
+
+        VLOG(40) << "[InstallSnapshotRpc] Persist snapshot metadata.";
+        PersistSnapshotMetadata(req.snapshot_metadata);
+
+        // Update the state to match the one from snapshot.
+        VLOG(40) << "[InstallSnapshotRpc] Update Raft state.";
+        last_applied_ = req.snapshot_metadata.last_included_index;
+        commit_index_ = req.snapshot_metadata.last_included_index;
+        disk_storage_.Put(
+            kLogSizeKey,
+            std::to_string(req.snapshot_metadata.last_included_index + 1));
+
+        InstallSnapshotRes res(CurrentTerm());
+        Save(res, res_builder);
+      });
+
   // start threads
 
   SetNextElectionTimePoint();
@@ -251,27 +356,41 @@ uint64_t RaftServer::LogSize() {
   return std::stoull(opt_value.value());
 }
 
-std::experimental::optional<std::pair<uint64_t, uint64_t>>
+std::experimental::optional<SnapshotMetadata>
 RaftServer::GetSnapshotMetadata() {
   auto opt_value = disk_storage_.Get(kSnapshotMetadataKey);
   if (opt_value == std::experimental::nullopt) {
     return std::experimental::nullopt;
   }
 
-  auto value = utils::Split(opt_value.value(), " ");
-  if (value.size() != 2) {
-    LOG(WARNING) << "Malformed snapshot metdata";
-    return std::experimental::nullopt;
-  }
-  return std::make_pair(std::stoull(value[0]), std::stoull(value[1]));
+  ::capnp::MallocMessageBuilder message;
+  std::stringstream stream(std::ios_base::in | std::ios_base::out |
+                           std::ios_base::binary);
+  kj::std::StdInputStream std_stream(stream);
+  kj::BufferedInputStreamWrapper buffered_stream(std_stream);
+  stream << *opt_value;
+  readMessageCopy(buffered_stream, message);
+  capnp::SnapshotMetadata::Reader reader =
+      message.getRoot<capnp::SnapshotMetadata>().asReader();
+  SnapshotMetadata deserialized;
+  Load(&deserialized, reader);
+  return std::experimental::make_optional(deserialized);
 }
 
-void RaftServer::PersistSnapshotMetadata(uint64_t last_included_term,
-                                         uint64_t last_included_index) {
-  auto value = utils::Join(
-      {std::to_string(last_included_term), std::to_string(last_included_index)},
-      " ");
-  disk_storage_.Put(kSnapshotMetadataKey, value);
+void RaftServer::PersistSnapshotMetadata(
+    const SnapshotMetadata &snapshot_metadata) {
+  std::stringstream stream(std::ios_base::in | std::ios_base::out |
+                           std::ios_base::binary);
+  {
+    ::capnp::MallocMessageBuilder message;
+    capnp::SnapshotMetadata::Builder builder =
+        message.initRoot<capnp::SnapshotMetadata>();
+    Save(snapshot_metadata, &builder);
+    kj::std::StdOutputStream std_stream(stream);
+    kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
+    writeMessage(buffered_stream, message);
+  }
+  disk_storage_.Put(kSnapshotMetadataKey, stream.str());
 }
 
 void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
@@ -289,13 +408,9 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
   }
 
   uint64_t log_size = LogSize();
-  DCHECK(last_applied_ == log_size - 1) << "Everything from the leaders log "
-                                           "should be applied into our state "
-                                           "machine";
   rlog_->set_active(tx_id);
   LogEntry new_entry(CurrentTerm(), deltas);
 
-  ++last_applied_;
   disk_storage_.Put(LogEntryKey(log_size), SerializeLogEntry(new_entry));
   disk_storage_.Put(kLogSizeKey, std::to_string(log_size + 1));
 
@@ -402,14 +517,17 @@ void RaftServer::Transition(const Mode &new_mode) {
         ResetReplicationLog();
 
         // Re-apply raft log.
-        auto snapshot_metadata = GetSnapshotMetadata();
         uint64_t starting_index = 1;
+        auto snapshot_metadata = GetSnapshotMetadata();
         if (snapshot_metadata) {
-          RecoverSnapshot();
-          starting_index = snapshot_metadata->second + 1;
+          RecoverSnapshot(snapshot_metadata->snapshot_filename);
+          starting_index = snapshot_metadata->last_included_index + 1;
         }
-        for (uint64_t i = starting_index; i <= commit_index_; ++i)
+
+        for (uint64_t i = starting_index; i <= commit_index_; ++i) {
           delta_applier_->Apply(GetLogEntry(i).deltas);
+        }
+
         last_applied_ = commit_index_;
       }
 
@@ -538,17 +656,35 @@ void RaftServer::AdvanceCommitIndex() {
   }
 
   commit_index_ = new_commit_index;
-}
-
-void RaftServer::Recover() {
-  throw utils::NotYetImplemented("RaftServer Recover");
+  last_applied_ = new_commit_index;
 }
 
 void RaftServer::SendEntries(uint16_t peer_id,
-                             std::unique_lock<std::mutex> &lock) {
+                             std::unique_lock<std::mutex> *lock) {
+  auto snapshot_metadata = GetSnapshotMetadata();
+
+  if (snapshot_metadata &&
+      snapshot_metadata->last_included_index >= next_index_[peer_id]) {
+    SendSnapshot(peer_id, *snapshot_metadata, lock);
+  } else {
+    SendLogEntries(peer_id, snapshot_metadata, lock);
+  }
+}
+
+void RaftServer::SendLogEntries(
+    uint16_t peer_id,
+    const std::experimental::optional<SnapshotMetadata> &snapshot_metadata,
+    std::unique_lock<std::mutex> *lock) {
   uint64_t request_term = CurrentTerm();
   uint64_t request_prev_log_index = next_index_[peer_id] - 1;
-  uint64_t request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term;
+  uint64_t request_prev_log_term;
+
+  if (snapshot_metadata &&
+      snapshot_metadata->last_included_index == next_index_[peer_id] - 1) {
+    request_prev_log_term = snapshot_metadata->last_included_term;
+  } else {
+    request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term;
+  }
 
   std::vector<LogEntry> request_entries;
   if (next_index_[peer_id] <= LogSize() - 1)
@@ -572,9 +708,9 @@ void RaftServer::SendEntries(uint16_t peer_id,
 
   VLOG(40) << "Entries size: " << request_entries.size();
 
-  lock.unlock();  // Release lock while waiting for response.
+  lock->unlock();  // Release lock while waiting for response.
   auto reply = peer_future.get();
-  lock.lock();
+  lock->lock();
 
   if (unreachable_peer) {
     next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
@@ -616,6 +752,76 @@ void RaftServer::SendEntries(uint16_t peer_id,
   state_changed_.notify_all();
 }
 
+void RaftServer::SendSnapshot(uint16_t peer_id,
+                              const SnapshotMetadata &snapshot_metadata,
+                              std::unique_lock<std::mutex> *lock) {
+  uint64_t request_term = CurrentTerm();
+  uint32_t snapshot_size = 0;
+  std::unique_ptr<char[]> snapshot;
+
+  {
+    const auto snapshot_path = durability::MakeSnapshotPath(
+        durability_dir_, snapshot_metadata.snapshot_filename);
+
+    std::ifstream input_stream;
+    input_stream.open(snapshot_path, std::ios::in | std::ios::binary);
+    input_stream.seekg(0, std::ios::end);
+    snapshot_size = input_stream.tellg();
+
+    snapshot.reset(new char[snapshot_size]);
+
+    input_stream.seekg(0, std::ios::beg);
+    input_stream.read(snapshot.get(), snapshot_size);
+    input_stream.close();
+  }
+
+  VLOG(40) << "Snapshot size: " << snapshot_size << " bytes.";
+
+  bool unreachable_peer = false;
+  auto peer_future = coordination_->ExecuteOnWorker<InstallSnapshotRes>(
+      peer_id, [&](int worker_id, auto &client) {
+        try {
+          auto res = client.template Call<InstallSnapshotRpc>(
+              server_id_, request_term, snapshot_metadata, std::move(snapshot),
+              snapshot_size);
+          return res;
+        } catch (...) {
+          unreachable_peer = true;
+          return InstallSnapshotRes(request_term);
+        }
+      });
+
+  lock->unlock();
+  auto reply = peer_future.get();
+  lock->lock();
+
+  if (unreachable_peer) {
+    next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
+    return;
+  }
+
+  if (CurrentTerm() != request_term || exiting_) {
+    return;
+  }
+
+  if (OutOfSync(reply.term)) {
+    state_changed_.notify_all();
+    return;
+  }
+
+  if (reply.term != CurrentTerm()) {
+    VLOG(40) << "Server " << server_id_
+             << ": Ignoring stale InstallSnapshotRpc reply from " << peer_id;
+    return;
+  }
+
+  match_index_[peer_id] = snapshot_metadata.last_included_index;
+  next_index_[peer_id] = snapshot_metadata.last_included_index + 1;
+  next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
+
+  state_changed_.notify_all();
+}
+
 void RaftServer::ElectionThreadMain() {
   std::unique_lock<std::mutex> lock(lock_);
   while (!exiting_) {
@@ -713,9 +919,9 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
         case Mode::LEADER: {
           if (now >= next_heartbeat_[peer_id]) {
             VLOG(40) << "Server " << server_id_
-                     << ": Send AppendEntries RPC to server " << peer_id
+                     << ": Sending Entries RPC to server " << peer_id
                      << " (Term: " << CurrentTerm() << ")";
-            SendEntries(peer_id, lock);
+            SendEntries(peer_id, &lock);
             continue;
           }
           wait_until = next_heartbeat_[peer_id];
@@ -743,43 +949,59 @@ void RaftServer::NoOpIssuerThreadMain() {
 
 void RaftServer::SnapshotThread() {
   utils::ThreadSetName(fmt::format("RaftSnapshot"));
-  while (!exiting_) {
-    {
-      std::unique_lock<std::mutex> lock(lock_);
+  if (config_.log_size_snapshot_threshold == -1) return;
 
-      uint64_t uncompacted_log_size = LogSize();
+  while (true) {
+    {
+      // Acquire snapshot lock before we acquire the Raft lock. This should
+      // avoid the situation where we release the lock to start writing a
+      // snapshot but the `InstallSnapshotRpc` deletes it and we write wrong
+      // metadata.
+      std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
+      std::unique_lock<std::mutex> lock(lock_);
+      if (exiting_) break;
+
+      uint64_t committed_log_size = last_applied_;
       auto snapshot_metadata = GetSnapshotMetadata();
       if (snapshot_metadata) {
-        uncompacted_log_size -= snapshot_metadata->second;
+        committed_log_size -= snapshot_metadata->last_included_index;
       }
 
       // Compare the log size to the config
-      if (config_.log_size_snapshot_threshold < uncompacted_log_size) {
+      if (config_.log_size_snapshot_threshold < committed_log_size) {
+        VLOG(40) << "[LogCompaction] Starting log compaction.";
         // Create a DB accessor for snapshot creation
         std::unique_ptr<database::GraphDbAccessor> dba = db_->Access();
-        uint64_t current_term = CurrentTerm();
-        uint64_t last_applied = last_applied_;
+        uint64_t last_included_term = GetLogEntry(last_applied_).term;
+        uint64_t last_included_index = last_applied_;
+        std::string snapshot_filename =
+            durability::GetSnapshotFilename(dba->transaction_id());
 
         lock.unlock();
-        bool status =
-            durability::MakeSnapshot(*db_, *dba, fs::path(durability_dir_));
+        VLOG(40) << "[LogCompaction] Creating snapshot.";
+        bool status = durability::MakeSnapshot(*db_, *dba, durability_dir_,
+                                               snapshot_filename);
         lock.lock();
 
         if (status) {
           uint64_t log_compaction_start_index = 1;
           if (snapshot_metadata) {
-            log_compaction_start_index = snapshot_metadata->second + 1;
+            log_compaction_start_index =
+                snapshot_metadata->last_included_index + 1;
           }
 
-          PersistSnapshotMetadata(current_term, last_applied);
+          VLOG(40) << "[LogCompaction] Persisting snapshot metadata";
+          PersistSnapshotMetadata(
+              {last_included_term, last_included_index, snapshot_filename});
 
           // Log compaction.
-          // TODO (msantl): In order to handle log compaction correctly, we need
-          // to be able to send snapshots over the wire and implement additional
-          // logic to handle log entries that were compacted into a snapshot.
-          // for (int i = log_compaction_start_index; i <= last_applied_; ++i) {
-          // disk_storage_.Delete(LogEntryKey(i));
-          // }
+          VLOG(40) << "[LogCompaction] Compacting log from "
+                   << log_compaction_start_index << " to "
+                   << last_included_index;
+          for (int i = log_compaction_start_index; i <= last_included_index;
+               ++i) {
+            disk_storage_.Delete(LogEntryKey(i));
+          }
         }
 
         lock.unlock();
@@ -817,6 +1039,11 @@ bool RaftServer::HasMajortyVote() {
 std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
   uint64_t log_size = LogSize();
   if (log_size == 0) return {0, 0};
+  auto snapshot_metadata = GetSnapshotMetadata();
+  if (snapshot_metadata &&
+      snapshot_metadata->last_included_index == log_size - 1) {
+    return {log_size, snapshot_metadata->last_included_term};
+  }
   return {log_size, GetLogEntry(log_size - 1).term};
 }
 
@@ -940,10 +1167,12 @@ void RaftServer::ResetReplicationLog() {
   rlog_ = std::make_unique<ReplicationLog>();
 }
 
-void RaftServer::RecoverSnapshot() {
+void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) {
   durability::RecoveryData recovery_data;
-  CHECK(durability::RecoverOnlySnapshot(durability_dir_, db_, &recovery_data))
-      << "Failed to recover from snapshot";
+  bool recovery = durability::RecoverSnapshot(
+      db_, &recovery_data, durability_dir_, snapshot_filename);
+
+  CHECK(recovery);
   durability::RecoverIndexes(db_, recovery_data.indexes);
 }
 
diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp
index 66725ca7a..376be8c33 100644
--- a/src/raft/raft_server.hpp
+++ b/src/raft/raft_server.hpp
@@ -3,6 +3,7 @@
 #pragma once
 
 #include <atomic>
+#include <experimental/filesystem>
 #include <mutex>
 #include <unordered_map>
 #include <vector>
@@ -15,6 +16,7 @@
 #include "raft/raft_interface.hpp"
 #include "raft/raft_rpc_messages.hpp"
 #include "raft/replication_log.hpp"
+#include "raft/snapshot_metadata.hpp"
 #include "storage/common/kvstore/kvstore.hpp"
 #include "transactions/type.hpp"
 #include "utils/scheduler.hpp"
@@ -86,12 +88,12 @@ class RaftServer final : public RaftInterface {
   uint64_t LogSize();
 
   /// Retrieves persisted snapshot metadata or nullopt if not present.
-  std::experimental::optional<std::pair<uint64_t, uint64_t>>
-  GetSnapshotMetadata();
+  /// Snapshot metadata is a triplet consisting of the last included term, last
+  /// last included log entry index and the snapshot filename.
+  std::experimental::optional<SnapshotMetadata> GetSnapshotMetadata();
 
   /// Persists snapshot metadata.
-  void PersistSnapshotMetadata(uint64_t last_included_term,
-                               uint64_t last_included_index);
+  void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata);
 
   /// Append to the log a list of batched state deltasa that are ready to be
   /// replicated.
@@ -148,7 +150,8 @@ class RaftServer final : public RaftInterface {
     RaftServer *raft_server_{nullptr};
   };
 
-  mutable std::mutex lock_;  ///< Guards all internal state.
+  mutable std::mutex lock_;           ///< Guards all internal state.
+  mutable std::mutex snapshot_lock_;  ///< Guards snapshot creation and removal.
 
   //////////////////////////////////////////////////////////////////////////////
   // volatile state on all servers
@@ -160,9 +163,10 @@ class RaftServer final : public RaftInterface {
   database::GraphDb *db_{nullptr};
   std::unique_ptr<ReplicationLog> rlog_{nullptr};
 
-  std::atomic<Mode> mode_;      ///< Server's current mode.
-  uint16_t server_id_;          ///< ID of the current server.
-  std::string durability_dir_;  ///< Durability directory.
+  std::atomic<Mode> mode_;  ///< Server's current mode.
+  uint16_t server_id_;      ///< ID of the current server.
+  std::experimental::filesystem::path
+      durability_dir_;          ///< Durability directory.
   bool db_recover_on_startup_;  ///< Flag indicating if recovery should happen
                                 ///< on startup.
   uint64_t commit_index_;       ///< Index of the highest known committed entry.
@@ -192,7 +196,7 @@ class RaftServer final : public RaftInterface {
                                             ///< no_op_issuer_thread that a new
                                             ///< leader has been elected.
 
-  bool exiting_ = false;  ///< True on server shutdown.
+  bool exiting_{false};  ///< True on server shutdown.
 
   //////////////////////////////////////////////////////////////////////////////
   // volatile state on followers and candidates
@@ -254,17 +258,34 @@ class RaftServer final : public RaftInterface {
   /// Tries to advance the commit index on a leader.
   void AdvanceCommitIndex();
 
-  /// Recovers from persistent storage. This function should be called from
-  /// the constructor before the server starts with normal operation.
-  void Recover();
-
-  /// Sends Entries to peer. This function should only be called in leader
-  /// mode.
+  /// Decides whether to send Log Entires or Snapshot to the given peer.
   ///
   /// @param peer_id ID of the peer which receives entries.
   /// @param lock Lock from the peer thread (released while waiting for
   ///             response)
-  void SendEntries(uint16_t peer_id, std::unique_lock<std::mutex> &lock);
+  void SendEntries(uint16_t peer_id, std::unique_lock<std::mutex> *lock);
+
+  /// Sends Log Entries to peer. This function should only be called in leader
+  /// mode.
+  ///
+  /// @param peer_id ID of the peer which receives entries.
+  /// @param snapshot_metadata metadata of the last snapshot, if any.
+  /// @param lock Lock from the peer thread (released while waiting for
+  ///             response)
+  void SendLogEntries(
+      uint16_t peer_id,
+      const std::experimental::optional<SnapshotMetadata> &snapshot_metadata,
+      std::unique_lock<std::mutex> *lock);
+
+  /// Send Snapshot to peer. This function should only be called in leader
+  /// mode.
+  ///
+  /// @param peer_id ID of the peer which receives entries.
+  /// @param snapshot_metadata metadata of the snapshot to send.
+  /// @param lock Lock from the peer thread (released while waiting for
+  ///             response)
+  void SendSnapshot(uint16_t peer_id, const SnapshotMetadata &snapshot_metadata,
+                    std::unique_lock<std::mutex> *lock);
 
   /// Main function of the `election_thread_`. It is responsible for
   /// transition to CANDIDATE mode when election timeout elapses.
@@ -365,8 +386,8 @@ class RaftServer final : public RaftInterface {
   /// Resets the replication log used to indicate the replication status.
   void ResetReplicationLog();
 
-  /// Recovers the latest snapshot that exists in the durability directory.
-  void RecoverSnapshot();
+  /// Recovers the given snapshot if it exists in the durability directory.
+  void RecoverSnapshot(const std::string &snapshot_filename);
 
   /// Start a new transaction with a NO-OP StateDelta.
   void NoOpCreate();
diff --git a/src/raft/snapshot_metadata.lcp b/src/raft/snapshot_metadata.lcp
new file mode 100644
index 000000000..26daad523
--- /dev/null
+++ b/src/raft/snapshot_metadata.lcp
@@ -0,0 +1,27 @@
+#>cpp
+#pragma once
+#include <string>
+
+#include "raft/snapshot_metadata.capnp.h"
+#include "utils/typeinfo.hpp"
+cpp<#
+
+(lcp:namespace raft)
+
+(lcp:capnp-namespace "raft")
+
+(lcp:define-struct snapshot-metadata ()
+  ((last-included-term :uint64_t)
+   (last-included-index :uint64_t)
+   (snapshot-filename "std::string"))
+   (:public #>cpp
+   SnapshotMetadata() = default;
+   SnapshotMetadata(uint64_t _last_included_term, uint64_t _last_included_index,
+                    const std::string &_snapshot_filename)
+       : last_included_term(_last_included_term),
+         last_included_index(_last_included_index),
+         snapshot_filename(_snapshot_filename) {}
+   cpp<#)
+  (:serialize (:slk) (:capnp)))
+
+(lcp:pop-namespace) ;; raft
diff --git a/tests/feature_benchmark/ha/raft.json b/tests/feature_benchmark/ha/raft.json
index bb4f343b1..9dc159ec4 100644
--- a/tests/feature_benchmark/ha/raft.json
+++ b/tests/feature_benchmark/ha/raft.json
@@ -2,5 +2,5 @@
   "election_timeout_min": 350,
   "election_timeout_max": 700,
   "heartbeat_interval": 100,
-  "log_size_snapshot_threshold": 100000
+  "log_size_snapshot_threshold": -1
 }
diff --git a/tests/feature_benchmark/ha/runner.sh b/tests/feature_benchmark/ha/runner.sh
index 6eecf81bd..1f3be8f7b 100755
--- a/tests/feature_benchmark/ha/runner.sh
+++ b/tests/feature_benchmark/ha/runner.sh
@@ -32,7 +32,7 @@ RESULTS="$DIR/.apollo_measurements"
 # Benchmark parameters
 DURATION=10
 
-## Startup
+# Startup
 declare -a HA_PIDS
 
 for server_id in 1 2 3
diff --git a/tests/integration/ha_basic/raft.json b/tests/integration/ha_basic/raft.json
index 43d4000f6..5983863d8 100644
--- a/tests/integration/ha_basic/raft.json
+++ b/tests/integration/ha_basic/raft.json
@@ -2,5 +2,5 @@
   "election_timeout_min": 200,
   "election_timeout_max": 500,
   "heartbeat_interval": 100,
-  "log_size_snapshot_threshold": 100000
+  "log_size_snapshot_threshold": -1
 }