From 42bf81021e2bd5fc8a028851b4792e8b9be0b3a6 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Fri, 24 May 2019 17:19:01 +0200 Subject: [PATCH] Fix multiple raft issues Summary: Fix condition variable notifications Fix vote requested invalid memory access (size off by one) Fix blocking wait in RaftPeer while candidate Don't copy large log entries when only the term is needed Reviewers: ipaljak, msantl Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2088 --- src/raft/raft_server.cpp | 27 ++++++++++++++++++--------- src/raft/raft_server.hpp | 5 +++++ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 6f99573dc..a3b2ffe02 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -147,6 +147,7 @@ void RaftServer::Start() { // be extended. During this process we will prevent the timeout from // occuring. next_election_ = TimePoint::max(); + election_change_.notify_all(); utils::OnScopeExit extend_election_timeout([this] { // [Raft thesis 3.4] // A server remains in follower state as long as it receives valid RPCs @@ -622,16 +623,11 @@ void RaftServer::Transition(const Mode &new_mode) { SetVotedFor(server_id_); granted_votes_ = 1; - vote_requested_.assign(coordination_->GetAllNodeCount(), false); + vote_requested_.assign(coordination_->GetAllNodeCount() + 1, false); issue_hb_ = false; mode_ = Mode::CANDIDATE; - - if (HasMajorityVote()) { - Transition(Mode::LEADER); - state_changed_.notify_all(); - return; - } + state_changed_.notify_all(); break; } @@ -928,7 +924,10 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { } case Mode::CANDIDATE: { - if (vote_requested_[peer_id]) break; + if (vote_requested_[peer_id]) { + wait_until = TimePoint::max(); + break; + } // TODO(ipaljak): Consider backoff. wait_until = TimePoint::max(); @@ -1160,7 +1159,7 @@ std::pair RaftServer::LastEntryData() { snapshot_metadata_->last_included_index == log_size_ - 1) { return {log_size_, snapshot_metadata_->last_included_term}; } - return {log_size_, GetLogEntry(log_size_ - 1).term}; + return {log_size_, GetLogEntryTerm(log_size_ - 1)}; } bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a, @@ -1201,6 +1200,16 @@ LogEntry RaftServer::GetLogEntry(int index) { return DeserializeLogEntry(opt_value.value()); } +uint64_t RaftServer::GetLogEntryTerm(int index) { + auto it = log_.find(index); + if (it != log_.end()) + return it->second.term; // retrieve in-mem if possible + auto opt_value = disk_storage_.Get(LogEntryKey(index)); + DCHECK(opt_value != std::nullopt) + << "Log index (" << index << ") out of bounds."; + return DeserializeLogEntry(opt_value.value()).term; +} + void RaftServer::DeleteLogSuffix(int starting_index) { DCHECK(0 <= starting_index && starting_index < log_size_) << "Log index out of bounds."; diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 4066c7d84..c6d0d235a 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -399,6 +399,11 @@ class RaftServer final : public RaftInterface { /// @param index Index of the log entry to be retrieved. LogEntry GetLogEntry(int index); + /// Retrieves the term of a log entry from the log at a given index. + /// + /// @param index Index of the log entry whose term is to be retrieved. + uint64_t GetLogEntryTerm(int index); + /// Deletes log entries with indexes that are greater or equal to the given /// starting index. ///