Fix multiple raft issues

Summary:
Fix condition variable notifications
Fix vote requested invalid memory access (size off by one)
Fix blocking wait in RaftPeer while candidate
Don't copy large log entries when only the term is needed

Reviewers: ipaljak, msantl

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2088
This commit is contained in:
Matej Ferencevic 2019-05-24 17:19:01 +02:00
parent 81a7a7b600
commit 42bf81021e
2 changed files with 23 additions and 9 deletions

View File

@ -147,6 +147,7 @@ void RaftServer::Start() {
// be extended. During this process we will prevent the timeout from
// occuring.
next_election_ = TimePoint::max();
election_change_.notify_all();
utils::OnScopeExit extend_election_timeout([this] {
// [Raft thesis 3.4]
// A server remains in follower state as long as it receives valid RPCs
@ -622,16 +623,11 @@ void RaftServer::Transition(const Mode &new_mode) {
SetVotedFor(server_id_);
granted_votes_ = 1;
vote_requested_.assign(coordination_->GetAllNodeCount(), false);
vote_requested_.assign(coordination_->GetAllNodeCount() + 1, false);
issue_hb_ = false;
mode_ = Mode::CANDIDATE;
if (HasMajorityVote()) {
Transition(Mode::LEADER);
state_changed_.notify_all();
return;
}
state_changed_.notify_all();
break;
}
@ -928,7 +924,10 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
}
case Mode::CANDIDATE: {
if (vote_requested_[peer_id]) break;
if (vote_requested_[peer_id]) {
wait_until = TimePoint::max();
break;
}
// TODO(ipaljak): Consider backoff.
wait_until = TimePoint::max();
@ -1160,7 +1159,7 @@ std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
snapshot_metadata_->last_included_index == log_size_ - 1) {
return {log_size_, snapshot_metadata_->last_included_term};
}
return {log_size_, GetLogEntry(log_size_ - 1).term};
return {log_size_, GetLogEntryTerm(log_size_ - 1)};
}
bool RaftServer::AtLeastUpToDate(uint64_t last_log_index_a,
@ -1201,6 +1200,16 @@ LogEntry RaftServer::GetLogEntry(int index) {
return DeserializeLogEntry(opt_value.value());
}
uint64_t RaftServer::GetLogEntryTerm(int index) {
auto it = log_.find(index);
if (it != log_.end())
return it->second.term; // retrieve in-mem if possible
auto opt_value = disk_storage_.Get(LogEntryKey(index));
DCHECK(opt_value != std::nullopt)
<< "Log index (" << index << ") out of bounds.";
return DeserializeLogEntry(opt_value.value()).term;
}
void RaftServer::DeleteLogSuffix(int starting_index) {
DCHECK(0 <= starting_index && starting_index < log_size_)
<< "Log index out of bounds.";

View File

@ -399,6 +399,11 @@ class RaftServer final : public RaftInterface {
/// @param index Index of the log entry to be retrieved.
LogEntry GetLogEntry(int index);
/// Retrieves the term of a log entry from the log at a given index.
///
/// @param index Index of the log entry whose term is to be retrieved.
uint64_t GetLogEntryTerm(int index);
/// Deletes log entries with indexes that are greater or equal to the given
/// starting index.
///