From 8e796e9fd1be654b5d8d50801c58a351b5d2f3fb Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Mon, 10 Dec 2018 16:04:14 +0100 Subject: [PATCH] Fix infinite wait in leader election. Reviewers: msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1763 --- src/raft/raft_server.cpp | 30 +++++++++++++++++++++++++++--- src/raft/raft_server.hpp | 3 +++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 04dbeb5ea..70e640e4e 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -30,7 +30,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, reset_callback_(reset_callback) { // Persistent storage initialization/recovery. if (Log().empty()) { - disk_storage_.Put(kCurrentTermKey, "0"); + UpdateTerm(0); } else { Recover(); } @@ -59,6 +59,15 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, return; } + // [Raft paper figure 2] + // If RPC request or response contains term T > currentTerm, + // set currentTerm = T and convert to follower. + if (req.term > current_term) { + UpdateTerm(req.term); + if (mode_ != Mode::FOLLOWER) + Transition(Mode::FOLLOWER); + } + // [Raft paper 5.2, 5.4] // "Each server will vote for at most one candidate in a given // term, on a first-come-first-serve basis with an additional @@ -91,6 +100,15 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, return; } + // [Raft paper figure 2] + // If RPC request or response contains term T > currentTerm, + // set currentTerm = T and convert to follower. + if (req.term > current_term) { + UpdateTerm(req.term); + if (mode_ != Mode::FOLLOWER) + Transition(Mode::FOLLOWER); + } + // respond positively to a heartbeat. // TODO(ipaljak) review this when implementing log replication. if (req.entries.empty()) { @@ -98,9 +116,9 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, Save(res, res_builder); if (mode_ != Mode::FOLLOWER) { Transition(Mode::FOLLOWER); - state_changed_.notify_all(); } else { SetNextElectionTimePoint(); + election_change_.notify_all(); } return; } @@ -252,9 +270,10 @@ void RaftServer::Transition(const Mode &new_mode) { reset_callback_(); } LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER"; - SetNextElectionTimePoint(); mode_ = Mode::FOLLOWER; // log_entry_buffer_.Disable(); + SetNextElectionTimePoint(); + election_change_.notify_all(); break; } @@ -314,6 +333,11 @@ void RaftServer::Transition(const Mode &new_mode) { } } +void RaftServer::UpdateTerm(uint64_t new_term) { + disk_storage_.Put(kCurrentTermKey, std::to_string(new_term)); + disk_storage_.Delete(kVotedForKey); +} + void RaftServer::Recover() { throw utils::NotYetImplemented("RaftServer recover"); } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index d9c61bcee..92cd74289 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -201,6 +201,9 @@ class RaftServer final : public RaftInterface { /// `raft::Mode`s. void Transition(const raft::Mode &new_mode); + /// Updates the current term. + void UpdateTerm(uint64_t new_term); + /// Recovers from persistent storage. This function should be called from /// the constructor before the server starts with normal operation. void Recover();