Fix infinite wait in leader election.

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1763
This commit is contained in:
Ivan Paljak 2018-12-10 16:04:14 +01:00
parent f501980973
commit 8e796e9fd1
2 changed files with 30 additions and 3 deletions

View File

@ -30,7 +30,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
reset_callback_(reset_callback) {
// Persistent storage initialization/recovery.
if (Log().empty()) {
disk_storage_.Put(kCurrentTermKey, "0");
UpdateTerm(0);
} else {
Recover();
}
@ -59,6 +59,15 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
return;
}
// [Raft paper figure 2]
// If RPC request or response contains term T > currentTerm,
// set currentTerm = T and convert to follower.
if (req.term > current_term) {
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER)
Transition(Mode::FOLLOWER);
}
// [Raft paper 5.2, 5.4]
// "Each server will vote for at most one candidate in a given
// term, on a first-come-first-serve basis with an additional
@ -91,6 +100,15 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
return;
}
// [Raft paper figure 2]
// If RPC request or response contains term T > currentTerm,
// set currentTerm = T and convert to follower.
if (req.term > current_term) {
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER)
Transition(Mode::FOLLOWER);
}
// respond positively to a heartbeat.
// TODO(ipaljak) review this when implementing log replication.
if (req.entries.empty()) {
@ -98,9 +116,9 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
Save(res, res_builder);
if (mode_ != Mode::FOLLOWER) {
Transition(Mode::FOLLOWER);
state_changed_.notify_all();
} else {
SetNextElectionTimePoint();
election_change_.notify_all();
}
return;
}
@ -252,9 +270,10 @@ void RaftServer::Transition(const Mode &new_mode) {
reset_callback_();
}
LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER";
SetNextElectionTimePoint();
mode_ = Mode::FOLLOWER;
// log_entry_buffer_.Disable();
SetNextElectionTimePoint();
election_change_.notify_all();
break;
}
@ -314,6 +333,11 @@ void RaftServer::Transition(const Mode &new_mode) {
}
}
void RaftServer::UpdateTerm(uint64_t new_term) {
disk_storage_.Put(kCurrentTermKey, std::to_string(new_term));
disk_storage_.Delete(kVotedForKey);
}
void RaftServer::Recover() {
throw utils::NotYetImplemented("RaftServer recover");
}

View File

@ -201,6 +201,9 @@ class RaftServer final : public RaftInterface {
/// `raft::Mode`s.
void Transition(const raft::Mode &new_mode);
/// Updates the current term.
void UpdateTerm(uint64_t new_term);
/// Recovers from persistent storage. This function should be called from
/// the constructor before the server starts with normal operation.
void Recover();