diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 6f99573dc..a3b2ffe02 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -147,6 +147,7 @@ void RaftServer::Start() { // be extended. During this process we will prevent the timeout from // occuring. next_election_ = TimePoint::max(); + election_change_.notify_all(); utils::OnScopeExit extend_election_timeout([this] { // [Raft thesis 3.4] // A server remains in follower state as long as it receives valid RPCs @@ -622,16 +623,11 @@ void RaftServer::Transition(const Mode &new_mode) { SetVotedFor(server_id_); granted_votes_ = 1; - vote_requested_.assign(coordination_->GetAllNodeCount(), false); + vote_requested_.assign(coordination_->GetAllNodeCount() + 1, false); issue_hb_ = false; mode_ = Mode::CANDIDATE; - - if (HasMajorityVote()) { - Transition(Mode::LEADER); - state_changed_.notify_all(); - return; - } + state_changed_.notify_all(); break; } @@ -928,7 +924,10 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { } case Mode::CANDIDATE: { - if (vote_requested_[peer_id]) break; + if (vote_requested_[peer_id]) { + wait_until = TimePoint::max(); + break; + } // TODO(ipaljak): Consider backoff. wait_until = TimePoint::max(); @@ -1160,7 +1159,7 @@ std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() { snapshot_metadata_->last_included_index == log_size_ - 1) { return {log_size_, snapshot_metadata_->last_included_term}; } - return {log_size_, GetLogEntry(log_size_ - 1).term}; + return {log_size_, GetLogEntryTerm(log_size_ - 1)}; } bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a, @@ -1201,6 +1200,16 @@ LogEntry RaftServer::GetLogEntry(int index) { return DeserializeLogEntry(opt_value.value()); } +uint64_t RaftServer::GetLogEntryTerm(int index) { + auto it = log_.find(index); + if (it != log_.end()) + return it->second.term; // retrieve in-mem if possible + auto opt_value = disk_storage_.Get(LogEntryKey(index)); + DCHECK(opt_value != std::nullopt) + << "Log index (" << index << ") out of bounds."; + return DeserializeLogEntry(opt_value.value()).term; +} + void RaftServer::DeleteLogSuffix(int starting_index) { DCHECK(0 <= starting_index && starting_index < log_size_) << "Log index out of bounds."; diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 4066c7d84..c6d0d235a 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -399,6 +399,11 @@ class RaftServer final : public RaftInterface { /// @param index Index of the log entry to be retrieved. LogEntry GetLogEntry(int index); + /// Retrieves the term of a log entry from the log at a given index. + /// + /// @param index Index of the log entry whose term is to be retrieved. + uint64_t GetLogEntryTerm(int index); + /// Deletes log entries with indexes that are greater or equal to the given /// starting index. ///