From e3cf4d0df8f3074ad08aab834e6b77d2afd98d2d Mon Sep 17 00:00:00 2001
From: Ivan Paljak <ivan.paljak@memgraph.io>
Date: Wed, 6 Feb 2019 13:30:05 +0100
Subject: [PATCH] Make an in-memory copy of HA persistent storage pt. 1

Summary:
In Raft, we often need to access persistent state of the server
without modifying it. In order to speed up such operations, we
keep an in-memory copy of that state.

In this diff we make a copy of all persistent state except for
the log itself. Running our feature benchmark locally, we manage
to increase the throughput for cca 750 queries/s.

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1843
---
 src/raft/raft_server.cpp | 209 +++++++++++++++++++--------------------
 src/raft/raft_server.hpp |  34 ++++---
 2 files changed, 122 insertions(+), 121 deletions(-)

diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp
index 8d4225410..d45634606 100644
--- a/src/raft/raft_server.cpp
+++ b/src/raft/raft_server.cpp
@@ -55,22 +55,23 @@ void RaftServer::Start() {
     auto snapshot_metadata = GetSnapshotMetadata();
     if (snapshot_metadata) {
       RecoverSnapshot(snapshot_metadata->snapshot_filename);
-
       last_applied_ = snapshot_metadata->last_included_index;
       commit_index_ = snapshot_metadata->last_included_index;
     }
   } else {
     // We need to clear persisted data if we don't want any recovery.
     disk_storage_.DeletePrefix("");
-    disk_storage_.Put(kLogSizeKey, "0");
     durability::RemoveAllSnapshots(durability_dir_);
   }
 
   // Persistent storage initialization
-  if (LogSize() == 0) {
-    UpdateTerm(0);
+  if (!disk_storage_.Get(kLogSizeKey)) {
+    SetCurrentTerm(0);
+    SetLogSize(0);
     LogEntry empty_log_entry(0, {});
     AppendLogEntries(0, 0, {empty_log_entry});
+  } else {
+    RecoverPersistentData();
   }
 
   // Peer state initialization
@@ -90,9 +91,8 @@ void RaftServer::Start() {
         // [Raft paper 5.1]
         // "If a server recieves a request with a stale term,
         // it rejects the request"
-        uint64_t current_term = CurrentTerm();
-        if (exiting_ || req.term < current_term) {
-          RequestVoteRes res(false, current_term);
+        if (exiting_ || req.term < current_term_) {
+          RequestVoteRes res(false, current_term_);
           Save(res, res_builder);
           return;
         }
@@ -100,8 +100,8 @@ void RaftServer::Start() {
         // [Raft paper figure 2]
         // If RPC request or response contains term T > currentTerm,
         // set currentTerm = T and convert to follower.
-        if (req.term > current_term) {
-          UpdateTerm(req.term);
+        if (req.term > current_term_) {
+          SetCurrentTerm(req.term);
           if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
         }
 
@@ -111,13 +111,12 @@ void RaftServer::Start() {
         // restriction on votes"
         // Restriction: "The voter denies its vote if its own log is more
         // up-to-date than that of the candidate"
-        std::experimental::optional<uint16_t> voted_for = VotedFor();
         auto last_entry_data = LastEntryData();
         bool grant_vote =
-            (!voted_for || voted_for.value() == req.candidate_id) &&
+            (!voted_for_ || voted_for_.value() == req.candidate_id) &&
             AtLeastUpToDate(req.last_log_index, req.last_log_term,
                             last_entry_data.first, last_entry_data.second);
-        RequestVoteRes res(grant_vote, current_term);
+        RequestVoteRes res(grant_vote, current_term_);
         if (grant_vote) SetNextElectionTimePoint();
         Save(res, res_builder);
       });
@@ -131,9 +130,8 @@ void RaftServer::Start() {
     // [Raft paper 5.1]
     // "If a server receives a request with a stale term, it rejects the
     // request"
-    uint64_t current_term = CurrentTerm();
-    if (exiting_ || req.term < current_term) {
-      AppendEntriesRes res(false, current_term);
+    if (exiting_ || req.term < current_term_) {
+      AppendEntriesRes res(false, current_term_);
       Save(res, res_builder);
       return;
     }
@@ -163,7 +161,7 @@ void RaftServer::Start() {
     if (snapshot_metadata &&
         snapshot_metadata->last_included_index == req.prev_log_index) {
       if (req.prev_log_term != snapshot_metadata->last_included_term) {
-        AppendEntriesRes res(false, current_term);
+        AppendEntriesRes res(false, current_term_);
         Save(res, res_builder);
         return;
       }
@@ -171,13 +169,13 @@ void RaftServer::Start() {
                snapshot_metadata->last_included_index > req.prev_log_index) {
       LOG(ERROR) << "Received entries that are already commited and have been "
                     "compacted";
-      AppendEntriesRes res(false, current_term);
+      AppendEntriesRes res(false, current_term_);
       Save(res, res_builder);
       return;
     } else {
-      if (LogSize() <= req.prev_log_index ||
+      if (log_size_ <= req.prev_log_index ||
           GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
-        AppendEntriesRes res(false, current_term);
+        AppendEntriesRes res(false, current_term_);
         Save(res, res_builder);
         return;
       }
@@ -186,8 +184,8 @@ void RaftServer::Start() {
     // [Raft paper figure 2]
     // If RPC request or response contains term T > currentTerm,
     // set currentTerm = T and convert to follower.
-    if (req.term > current_term) {
-      UpdateTerm(req.term);
+    if (req.term > current_term_) {
+      SetCurrentTerm(req.term);
       if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
     }
 
@@ -196,20 +194,20 @@ void RaftServer::Start() {
     // [Raft paper 5.3]
     // "Once a follower learns that a log entry is committed, it applies
     // the entry to its state machine (in log order)
-    while (req.leader_commit > last_applied_ && last_applied_ + 1 < LogSize()) {
+    while (req.leader_commit > last_applied_ && last_applied_ + 1 < log_size_) {
       ++last_applied_;
       delta_applier_->Apply(GetLogEntry(last_applied_).deltas);
     }
 
     // Respond positively to a heartbeat.
     if (req.entries.empty()) {
-      AppendEntriesRes res(true, current_term);
+      AppendEntriesRes res(true, current_term_);
       Save(res, res_builder);
       if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
       return;
     }
 
-    AppendEntriesRes res(true, current_term);
+    AppendEntriesRes res(true, current_term_);
     Save(res, res_builder);
   });
 
@@ -222,26 +220,25 @@ void RaftServer::Start() {
         InstallSnapshotReq req;
         Load(&req, req_reader);
 
-        uint64_t current_term = CurrentTerm();
-        if (exiting_ || req.term < current_term) {
-          InstallSnapshotRes res(current_term);
+        if (exiting_ || req.term < current_term_) {
+          InstallSnapshotRes res(current_term_);
           Save(res, res_builder);
           return;
         }
 
         // Check if the current state matches the one in snapshot
         if (req.snapshot_metadata.last_included_index == last_applied_ &&
-            req.snapshot_metadata.last_included_term == current_term) {
-          InstallSnapshotRes res(current_term);
+            req.snapshot_metadata.last_included_term == current_term_) {
+          InstallSnapshotRes res(current_term_);
           Save(res, res_builder);
           return;
         }
 
         VLOG(40) << "[InstallSnapshotRpc] Starting.";
 
-        if (req.term > current_term) {
+        if (req.term > current_term_) {
           VLOG(40) << "[InstallSnapshotRpc] Updating term.";
-          UpdateTerm(req.term);
+          SetCurrentTerm(req.term);
           if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
         }
 
@@ -272,8 +269,7 @@ void RaftServer::Start() {
 
         // Discard the all logs. We keep the one at index 0.
         VLOG(40) << "[InstallSnapshotRpc] Discarding logs.";
-        uint64_t log_size = LogSize();
-        for (uint64_t i = 1; i < log_size; ++i)
+        for (uint64_t i = 1; i < log_size_; ++i)
           disk_storage_.Delete(LogEntryKey(i));
 
         // Reset the database.
@@ -291,11 +287,9 @@ void RaftServer::Start() {
         VLOG(40) << "[InstallSnapshotRpc] Update Raft state.";
         last_applied_ = req.snapshot_metadata.last_included_index;
         commit_index_ = req.snapshot_metadata.last_included_index;
-        disk_storage_.Put(
-            kLogSizeKey,
-            std::to_string(req.snapshot_metadata.last_included_index + 1));
+        SetLogSize(req.snapshot_metadata.last_included_index + 1);
 
-        InstallSnapshotRes res(CurrentTerm());
+        InstallSnapshotRes res(current_term_);
         Save(res, res_builder);
       });
 
@@ -333,27 +327,24 @@ void RaftServer::Shutdown() {
   if (snapshot_thread_.joinable()) snapshot_thread_.join();
 }
 
-uint64_t RaftServer::CurrentTerm() {
-  auto opt_value = disk_storage_.Get(kCurrentTermKey);
-  if (opt_value == std::experimental::nullopt)
-    throw MissingPersistentDataException(kCurrentTermKey);
-  return std::stoull(opt_value.value());
+void RaftServer::SetCurrentTerm(uint64_t new_current_term) {
+  current_term_ = new_current_term;
+  disk_storage_.Put(kCurrentTermKey, std::to_string(new_current_term));
+  SetVotedFor(std::experimental::nullopt);
 }
 
-std::experimental::optional<uint16_t> RaftServer::VotedFor() {
-  auto opt_value = disk_storage_.Get(kVotedForKey);
-  if (opt_value == std::experimental::nullopt)
-    return std::experimental::nullopt;
-  return {std::stoul(opt_value.value())};
+void RaftServer::SetVotedFor(
+    std::experimental::optional<uint16_t> new_voted_for) {
+  voted_for_ = new_voted_for;
+  if (new_voted_for)
+    disk_storage_.Put(kVotedForKey, std::to_string(new_voted_for.value()));
+  else
+    disk_storage_.Delete(kVotedForKey);
 }
 
-uint64_t RaftServer::LogSize() {
-  auto opt_value = disk_storage_.Get(kLogSizeKey);
-  if (opt_value == std::experimental::nullopt) {
-    disk_storage_.Put(kLogSizeKey, "0");
-    return 0;
-  }
-  return std::stoull(opt_value.value());
+void RaftServer::SetLogSize(uint64_t new_log_size) {
+  log_size_ = new_log_size;
+  disk_storage_.Put(kLogSizeKey, std::to_string(new_log_size));
 }
 
 std::experimental::optional<SnapshotMetadata>
@@ -407,12 +398,11 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
     return;
   }
 
-  uint64_t log_size = LogSize();
   rlog_->set_active(tx_id);
-  LogEntry new_entry(CurrentTerm(), deltas);
+  LogEntry new_entry(current_term_, deltas);
 
-  disk_storage_.Put(LogEntryKey(log_size), SerializeLogEntry(new_entry));
-  disk_storage_.Put(kLogSizeKey, std::to_string(log_size + 1));
+  disk_storage_.Put(LogEntryKey(log_size_), SerializeLogEntry(new_entry));
+  SetLogSize(log_size_ + 1);
 
   // Force issuing heartbeats
   TimePoint now = Clock::now();
@@ -498,11 +488,28 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
   }
 }
 
+void RaftServer::RecoverPersistentData() {
+  auto opt_term = disk_storage_.Get(kCurrentTermKey);
+  if (opt_term)
+    current_term_ = std::stoull(opt_term.value());
+
+  auto opt_voted_for = disk_storage_.Get(kVotedForKey);
+  if (!opt_voted_for) {
+    voted_for_ = std::experimental::nullopt;
+  } else {
+    voted_for_ = {std::stoul(opt_voted_for.value())};
+  }
+
+  auto opt_log_size = disk_storage_.Get(kLogSizeKey);
+  if (opt_log_size)
+    log_size_ = std::stoull(opt_log_size.value());
+}
+
 void RaftServer::Transition(const Mode &new_mode) {
   switch (new_mode) {
     case Mode::FOLLOWER: {
       VLOG(40) << "Server " << server_id_
-               << ": Transition to FOLLOWER (Term: " << CurrentTerm() << ")";
+               << ": Transition to FOLLOWER (Term: " << current_term_ << ")";
 
       bool reset = mode_ == Mode::LEADER;
       mode_ = Mode::FOLLOWER;
@@ -538,7 +545,7 @@ void RaftServer::Transition(const Mode &new_mode) {
 
     case Mode::CANDIDATE: {
       VLOG(40) << "Server " << server_id_
-               << ": Transition to CANDIDATE (Term: " << CurrentTerm() << ")";
+               << ": Transition to CANDIDATE (Term: " << current_term_ << ")";
 
       // [Raft thesis, section 3.4]
       // "Each candidate restarts its randomized election timeout at the start
@@ -553,7 +560,7 @@ void RaftServer::Transition(const Mode &new_mode) {
       // transitions to candidate state.  It then votes for itself and issues
       // RequestVote RPCs in parallel to each of the other servers in the
       // cluster."
-      disk_storage_.Put(kCurrentTermKey, std::to_string(CurrentTerm() + 1));
+      SetCurrentTerm(current_term_ + 1);
       disk_storage_.Put(kVotedForKey, std::to_string(server_id_));
 
       granted_votes_ = 1;
@@ -572,11 +579,10 @@ void RaftServer::Transition(const Mode &new_mode) {
 
     case Mode::LEADER: {
       VLOG(40) << "Server " << server_id_
-               << ": Transition to LEADER (Term: " << CurrentTerm() << ")";
+               << ": Transition to LEADER (Term: " << current_term_ << ")";
       // Freeze election timer
       next_election_ = TimePoint::max();
       election_change_.notify_all();
-      uint64_t log_size = LogSize();
 
       // Set next heartbeat to correct values
       TimePoint now = Clock::now();
@@ -587,15 +593,15 @@ void RaftServer::Transition(const Mode &new_mode) {
       // "For each server, index of the next log entry to send to that server
       // is initialized to leader's last log index + 1"
       for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
-        next_index_[i] = log_size;
+        next_index_[i] = log_size_;
         match_index_[i] = 0;
       }
 
       // Raft guarantees the Leader Append-Only property [Raft paper 5.2]
       // so its safe to apply everything from our log into our state machine
-      for (int i = last_applied_ + 1; i < log_size; ++i)
+      for (int i = last_applied_ + 1; i < log_size_; ++i)
         delta_applier_->Apply(GetLogEntry(i).deltas);
-      last_applied_ = log_size - 1;
+      last_applied_ = log_size_ - 1;
 
       mode_ = Mode::LEADER;
       log_entry_buffer_.Enable();
@@ -606,22 +612,16 @@ void RaftServer::Transition(const Mode &new_mode) {
   }
 }
 
-void RaftServer::UpdateTerm(uint64_t new_term) {
-  disk_storage_.Put(kCurrentTermKey, std::to_string(new_term));
-  disk_storage_.Delete(kVotedForKey);
-}
-
 void RaftServer::AdvanceCommitIndex() {
   DCHECK(mode_ == Mode::LEADER)
       << "Commit index can only be advanced by the leader";
 
   std::vector<uint64_t> known_replication_indices;
-  uint64_t log_size = LogSize();
   for (int i = 1; i < coordination_->WorkerCount() + 1; ++i) {
     if (i != server_id_)
       known_replication_indices.push_back(match_index_[i]);
     else
-      known_replication_indices.push_back(log_size - 1);
+      known_replication_indices.push_back(log_size_ - 1);
   }
 
   std::sort(known_replication_indices.begin(), known_replication_indices.end());
@@ -638,7 +638,7 @@ void RaftServer::AdvanceCommitIndex() {
   // counting replicas; once an entry from the current term has been committed
   // in this way, then all prior entries are committed indirectly because of the
   // Log Matching Property."
-  if (GetLogEntry(new_commit_index).term != CurrentTerm()) {
+  if (GetLogEntry(new_commit_index).term != current_term_) {
     VLOG(40) << "Server " << server_id_
              << ": cannot commit log entry from "
                 "previous term based on "
@@ -675,7 +675,7 @@ void RaftServer::SendLogEntries(
     uint16_t peer_id,
     const std::experimental::optional<SnapshotMetadata> &snapshot_metadata,
     std::unique_lock<std::mutex> *lock) {
-  uint64_t request_term = CurrentTerm();
+  uint64_t request_term = current_term_;
   uint64_t request_prev_log_index = next_index_[peer_id] - 1;
   uint64_t request_prev_log_term;
 
@@ -687,7 +687,7 @@ void RaftServer::SendLogEntries(
   }
 
   std::vector<LogEntry> request_entries;
-  if (next_index_[peer_id] <= LogSize() - 1)
+  if (next_index_[peer_id] <= log_size_ - 1)
     GetLogSuffix(next_index_[peer_id], request_entries);
 
   bool unreachable_peer = false;
@@ -717,7 +717,7 @@ void RaftServer::SendLogEntries(
     return;
   }
 
-  if (CurrentTerm() != request_term || exiting_) {
+  if (current_term_ != request_term || exiting_) {
     return;
   }
 
@@ -729,7 +729,7 @@ void RaftServer::SendLogEntries(
   DCHECK(mode_ == Mode::LEADER)
       << "Elected leader for term should never change.";
 
-  if (reply.term != CurrentTerm()) {
+  if (reply.term != current_term_) {
     VLOG(40) << "Server " << server_id_
              << ": Ignoring stale AppendEntriesRPC reply from " << peer_id;
     return;
@@ -755,7 +755,7 @@ void RaftServer::SendLogEntries(
 void RaftServer::SendSnapshot(uint16_t peer_id,
                               const SnapshotMetadata &snapshot_metadata,
                               std::unique_lock<std::mutex> *lock) {
-  uint64_t request_term = CurrentTerm();
+  uint64_t request_term = current_term_;
   uint32_t snapshot_size = 0;
   std::unique_ptr<char[]> snapshot;
 
@@ -800,7 +800,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
     return;
   }
 
-  if (CurrentTerm() != request_term || exiting_) {
+  if (current_term_ != request_term || exiting_) {
     return;
   }
 
@@ -809,7 +809,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
     return;
   }
 
-  if (reply.term != CurrentTerm()) {
+  if (reply.term != current_term_) {
     VLOG(40) << "Server " << server_id_
              << ": Ignoring stale InstallSnapshotRpc reply from " << peer_id;
     return;
@@ -827,7 +827,7 @@ void RaftServer::ElectionThreadMain() {
   while (!exiting_) {
     if (Clock::now() >= next_election_) {
       VLOG(40) << "Server " << server_id_
-               << ": Election timeout exceeded (Term: " << CurrentTerm() << ")";
+               << ": Election timeout exceeded (Term: " << current_term_ << ")";
       Transition(Mode::CANDIDATE);
       state_changed_.notify_all();
     }
@@ -865,7 +865,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
           // TODO(ipaljak): Consider backoff.
           wait_until = TimePoint::max();
 
-          auto request_term = CurrentTerm();
+          auto request_term = current_term_;
           auto peer_future = coordination_->ExecuteOnWorker<RequestVoteRes>(
               peer_id, [&](int worker_id, auto &client) {
                 auto last_entry_data = LastEntryData();
@@ -888,7 +888,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
           auto reply = peer_future.get();
           lock.lock();
 
-          if (CurrentTerm() != request_term || mode_ != Mode::CANDIDATE ||
+          if (current_term_ != request_term || mode_ != Mode::CANDIDATE ||
               exiting_) {
             VLOG(40) << "Server " << server_id_
                      << ": Ignoring RequestVoteRPC reply from " << peer_id;
@@ -920,7 +920,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
           if (now >= next_heartbeat_[peer_id]) {
             VLOG(40) << "Server " << server_id_
                      << ": Sending Entries RPC to server " << peer_id
-                     << " (Term: " << CurrentTerm() << ")";
+                     << " (Term: " << current_term_ << ")";
             SendEntries(peer_id, &lock);
             continue;
           }
@@ -1030,21 +1030,19 @@ void RaftServer::SetNextElectionTimePoint() {
 bool RaftServer::HasMajortyVote() {
   if (2 * granted_votes_ > coordination_->WorkerCount()) {
     VLOG(40) << "Server " << server_id_
-             << ": Obtained majority vote (Term: " << CurrentTerm() << ")";
+             << ": Obtained majority vote (Term: " << current_term_ << ")";
     return true;
   }
   return false;
 }
 
 std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
-  uint64_t log_size = LogSize();
-  if (log_size == 0) return {0, 0};
   auto snapshot_metadata = GetSnapshotMetadata();
   if (snapshot_metadata &&
-      snapshot_metadata->last_included_index == log_size - 1) {
-    return {log_size, snapshot_metadata->last_included_term};
+      snapshot_metadata->last_included_index == log_size_ - 1) {
+    return {log_size_, snapshot_metadata->last_included_term};
   }
-  return {log_size, GetLogEntry(log_size - 1).term};
+  return {log_size_, GetLogEntry(log_size_ - 1).term};
 }
 
 bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a,
@@ -1065,7 +1063,7 @@ bool RaftServer::OutOfSync(uint64_t reply_term) {
   // its current term to the larger value. If a candidate or leader
   // discovers that its term is out of date, it immediately reverts to
   // follower state."
-  if (CurrentTerm() < reply_term) {
+  if (current_term_ < reply_term) {
     disk_storage_.Put(kCurrentTermKey, std::to_string(reply_term));
     disk_storage_.Delete(kVotedForKey);
     granted_votes_ = 0;
@@ -1083,48 +1081,43 @@ LogEntry RaftServer::GetLogEntry(int index) {
 }
 
 void RaftServer::DeleteLogSuffix(int starting_index) {
-  uint64_t log_size = LogSize();
-  DCHECK(0 <= starting_index && starting_index < log_size)
+  DCHECK(0 <= starting_index && starting_index < log_size_)
       << "Log index out of bounds.";
-  for (int i = starting_index; i < log_size; ++i)
+  for (int i = starting_index; i < log_size_; ++i)
     disk_storage_.Delete(LogEntryKey(i));
-  disk_storage_.Put(kLogSizeKey, std::to_string(starting_index));
+  SetLogSize(starting_index);
 }
 
 void RaftServer::GetLogSuffix(int starting_index,
                               std::vector<raft::LogEntry> &entries) {
-  uint64_t log_size = LogSize();
-  DCHECK(0 <= starting_index && starting_index < log_size)
+  DCHECK(0 <= starting_index && starting_index < log_size_)
       << "Log index out of bounds.";
-  for (int i = starting_index; i < log_size; ++i)
+  for (int i = starting_index; i < log_size_; ++i)
     entries.push_back(GetLogEntry(i));
 }
 
 void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
                                   uint64_t starting_index,
                                   const std::vector<LogEntry> &new_entries) {
-  uint64_t log_size = LogSize();
   for (int i = 0; i < new_entries.size(); ++i) {
     // If existing entry conflicts with new one, we need to delete the
     // existing entry and all that follow it.
     int current_index = i + starting_index;
-    if (log_size > current_index &&
+    if (log_size_ > current_index &&
         GetLogEntry(current_index).term != new_entries[i].term) {
       DeleteLogSuffix(current_index);
-      log_size = LogSize();
     }
-    DCHECK(log_size >= current_index) << "Current Log index out of bounds.";
-    if (log_size == current_index) {
-      disk_storage_.Put(LogEntryKey(log_size),
+    DCHECK(log_size_ >= current_index) << "Current Log index out of bounds.";
+    if (log_size_ == current_index) {
+      disk_storage_.Put(LogEntryKey(log_size_),
                         SerializeLogEntry(new_entries[i]));
-      disk_storage_.Put(kLogSizeKey, std::to_string(log_size + 1));
-      log_size += 1;
+      SetLogSize(log_size_ + 1);
     }
   }
 
   // See Raft paper 5.3
   if (leader_commit_index > commit_index_) {
-    commit_index_ = std::min(leader_commit_index, log_size - 1);
+    commit_index_ = std::min(leader_commit_index, log_size_ - 1);
   }
 }
 
diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp
index 376be8c33..04c0cab10 100644
--- a/src/raft/raft_server.hpp
+++ b/src/raft/raft_server.hpp
@@ -74,18 +74,17 @@ class RaftServer final : public RaftInterface {
   /// Stops all threads responsible for the Raft protocol.
   void Shutdown();
 
-  /// Retrieves the current term from persistent storage.
-  ///
-  /// @throws MissingPersistentDataException
-  uint64_t CurrentTerm();
+  /// Setter for the current term. It updates the persistent storage as well
+  /// as its in-memory copy.
+  void SetCurrentTerm(uint64_t new_current_term);
 
-  /// Retrieves the ID of the server this server has voted for in
-  /// the current term from persistent storage. Returns std::nullopt
-  /// if such server doesn't exist.
-  std::experimental::optional<uint16_t> VotedFor();
+  /// Setter for `voted for` member. It updates the persistent storage as well
+  /// as its in-memory copy.
+  void SetVotedFor(std::experimental::optional<uint16_t> new_voted_for);
 
-  /// Retrieves log size from persistent storage.
-  uint64_t LogSize();
+  /// Setter for `log size` member. It updates the persistent storage as well
+  /// as its in-memory copy.
+  void SetLogSize(uint64_t new_log_size);
 
   /// Retrieves persisted snapshot metadata or nullopt if not present.
   /// Snapshot metadata is a triplet consisting of the last included term, last
@@ -246,15 +245,24 @@ class RaftServer final : public RaftInterface {
 
   storage::KVStore disk_storage_;
 
+  std::experimental::optional<uint16_t> voted_for_;
+
+  uint64_t current_term_;
+  uint64_t log_size_;
+
+  /// Recovers persistent data from disk and stores its in-memory copies
+  /// that insure faster read-only operations. This method should be called
+  /// on start-up. If parts of persistent data are missing, the method won't
+  /// make a copy of that data, i.e. no exception is thrown and the caller
+  /// should check whether persistent data actually exists.
+  void RecoverPersistentData();
+
   /// Makes a transition to a new `raft::Mode`.
   ///
   /// throws InvalidTransitionException when transitioning between incompatible
   ///                                   `raft::Mode`s.
   void Transition(const raft::Mode &new_mode);
 
-  /// Updates the current term.
-  void UpdateTerm(uint64_t new_term);
-
   /// Tries to advance the commit index on a leader.
   void AdvanceCommitIndex();