Keep last raft log entry metadata in memory
Reviewers: msantl, mferencevic Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2096
This commit is contained in:
parent
f6264ab2ae
commit
c21d04ce8d
src/raft
@ -48,6 +48,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
|
||||
db_recover_on_startup_(db_recover_on_startup),
|
||||
commit_index_(0),
|
||||
last_applied_(0),
|
||||
last_entry_term_(0),
|
||||
issue_hb_(false),
|
||||
replication_timeout_(config.replication_timeout),
|
||||
disk_storage_(fs::path(durability_dir) / kRaftDir) {}
|
||||
@ -59,6 +60,7 @@ void RaftServer::Start() {
|
||||
RecoverSnapshot(snapshot_metadata_->snapshot_filename);
|
||||
last_applied_ = snapshot_metadata_->last_included_index;
|
||||
commit_index_ = snapshot_metadata_->last_included_index;
|
||||
last_entry_term_ = snapshot_metadata_->last_included_term;
|
||||
}
|
||||
} else {
|
||||
// We need to clear persisted data if we don't want any recovery.
|
||||
@ -439,6 +441,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
|
||||
|
||||
log_[log_size_] = new_entry;
|
||||
disk_storage_.Put(LogEntryKey(log_size_), SerializeLogEntry(new_entry));
|
||||
last_entry_term_ = new_entry.term;
|
||||
SetLogSize(log_size_ + 1);
|
||||
|
||||
// Force replication
|
||||
@ -560,6 +563,14 @@ void RaftServer::RecoverPersistentData() {
|
||||
|
||||
auto opt_log_size = disk_storage_.Get(kLogSizeKey);
|
||||
if (opt_log_size) log_size_ = std::stoull(opt_log_size.value());
|
||||
|
||||
if (log_size_ != 0) {
|
||||
auto opt_last_log_entry = disk_storage_.Get(LogEntryKey(log_size_ - 1));
|
||||
DCHECK(opt_last_log_entry != std::nullopt)
|
||||
<< "Log size is equal to " << log_size_
|
||||
<< ", but there is no log entry on index: " << log_size_ - 1;
|
||||
last_entry_term_ = DeserializeLogEntry(opt_last_log_entry.value()).term;
|
||||
}
|
||||
}
|
||||
|
||||
void RaftServer::Transition(const Mode &new_mode) {
|
||||
@ -1159,7 +1170,7 @@ std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
|
||||
snapshot_metadata_->last_included_index == log_size_ - 1) {
|
||||
return {log_size_, snapshot_metadata_->last_included_term};
|
||||
}
|
||||
return {log_size_, GetLogEntryTerm(log_size_ - 1)};
|
||||
return {log_size_, last_entry_term_};
|
||||
}
|
||||
|
||||
bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a,
|
||||
@ -1200,16 +1211,6 @@ LogEntry RaftServer::GetLogEntry(int index) {
|
||||
return DeserializeLogEntry(opt_value.value());
|
||||
}
|
||||
|
||||
uint64_t RaftServer::GetLogEntryTerm(int index) {
|
||||
auto it = log_.find(index);
|
||||
if (it != log_.end())
|
||||
return it->second.term; // retrieve in-mem if possible
|
||||
auto opt_value = disk_storage_.Get(LogEntryKey(index));
|
||||
DCHECK(opt_value != std::nullopt)
|
||||
<< "Log index (" << index << ") out of bounds.";
|
||||
return DeserializeLogEntry(opt_value.value()).term;
|
||||
}
|
||||
|
||||
void RaftServer::DeleteLogSuffix(int starting_index) {
|
||||
DCHECK(0 <= starting_index && starting_index < log_size_)
|
||||
<< "Log index out of bounds.";
|
||||
@ -1252,6 +1253,8 @@ void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
|
||||
if (leader_commit_index > commit_index_) {
|
||||
commit_index_ = std::min(leader_commit_index, log_size_ - 1);
|
||||
}
|
||||
|
||||
last_entry_term_ = GetLogEntry(log_size_ - 1).term;
|
||||
}
|
||||
|
||||
std::string RaftServer::LogEntryKey(uint64_t index) {
|
||||
|
@ -183,6 +183,7 @@ class RaftServer final : public RaftInterface {
|
||||
///< on startup.
|
||||
uint64_t commit_index_; ///< Index of the highest known committed entry.
|
||||
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
|
||||
uint64_t last_entry_term_; ///< Term of the last entry in Raft log
|
||||
|
||||
std::atomic<bool> issue_hb_; ///< Flag which signalizes if the current server
|
||||
///< should send HBs to the rest of the cluster.
|
||||
@ -399,11 +400,6 @@ class RaftServer final : public RaftInterface {
|
||||
/// @param index Index of the log entry to be retrieved.
|
||||
LogEntry GetLogEntry(int index);
|
||||
|
||||
/// Retrieves the term of a log entry from the log at a given index.
|
||||
///
|
||||
/// @param index Index of the log entry whose term is to be retrieved.
|
||||
uint64_t GetLogEntryTerm(int index);
|
||||
|
||||
/// Deletes log entries with indexes that are greater or equal to the given
|
||||
/// starting index.
|
||||
///
|
||||
|
Loading…
Reference in New Issue
Block a user