Remove SM simulation from HB issuer thread

Summary:
This allows us to decouple issuing heartbeats from the server mode
which is useful when we know we will transition to LEADER but cannot yet
change the mode due to some Raft internals.

It can now happen that a couple of HBs are sent when in FOLLOWER or CANDIDATE
mode, but this doesn't affect the correctness of the protocol.

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2025
This commit is contained in:
Ivan Paljak 2019-05-09 12:53:29 +02:00
parent fb6350135d
commit e502b1a306
2 changed files with 22 additions and 49 deletions

View File

@ -47,6 +47,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
db_recover_on_startup_(db_recover_on_startup),
commit_index_(0),
last_applied_(0),
issue_hb_(false),
replication_timeout_(config.replication_timeout),
disk_storage_(fs::path(durability_dir) / kRaftDir) {}
@ -343,7 +344,6 @@ void RaftServer::Shutdown() {
state_changed_.notify_all();
election_change_.notify_all();
leader_changed_.notify_all();
hb_condition_.notify_all();
}
for (auto &peer_thread : peer_threads_) {
@ -558,6 +558,7 @@ void RaftServer::Transition(const Mode &new_mode) {
<< ": Transition to FOLLOWER (Term: " << current_term_ << ")";
bool reset = mode_ == Mode::LEADER;
issue_hb_ = false;
mode_ = Mode::FOLLOWER;
log_entry_buffer_.Disable();
@ -614,6 +615,7 @@ void RaftServer::Transition(const Mode &new_mode) {
granted_votes_ = 1;
vote_requested_.assign(coordination_->GetAllNodeCount(), false);
issue_hb_ = false;
mode_ = Mode::CANDIDATE;
if (HasMajorityVote()) {
@ -639,7 +641,7 @@ void RaftServer::Transition(const Mode &new_mode) {
for (auto &peer_heartbeat : next_heartbeat_)
peer_heartbeat = now + config_.heartbeat_interval;
hb_condition_.notify_all();
issue_hb_ = true;
// [Raft paper figure 2]
// "For each server, index of the next log entry to send to that server
@ -976,54 +978,26 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
void RaftServer::HBThreadMain(uint16_t peer_id) {
utils::ThreadSetName(fmt::format("HBThread{}", peer_id));
std::unique_lock<std::mutex> lock(heartbeat_lock_);
while (mode_ != Mode::SHUTDOWN) {
if (!issue_hb_) continue;
TimePoint now = Clock::now();
TimePoint wait_until;
switch (mode_) {
case Mode::FOLLOWER: {
wait_until = TimePoint::max();
break;
}
case Mode::CANDIDATE: {
wait_until = TimePoint::max();
break;
}
case Mode::LEADER: {
if (now < next_heartbeat_[peer_id]) {
wait_until = next_heartbeat_[peer_id];
break;
}
VLOG(40) << "Server " << server_id_ << ": Sending HB to server "
<< peer_id << " (Term: " << current_term_ << ")";
// For heartbeats, only the current term is relevant, we copy that
// before releasing the lock.
auto server_id = server_id_;
uint64_t request_term = current_term_;
// Execute the RPC.
lock.unlock();
coordination_->ExecuteOnOtherNode<HeartbeatRpc>(peer_id, server_id,
request_term);
lock.lock();
// This is ok even if we don't receive the reply.
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
wait_until = next_heartbeat_[peer_id];
break;
}
case Mode::SHUTDOWN:
break;
if (now < next_heartbeat_[peer_id]) {
std::this_thread::sleep_until(next_heartbeat_[peer_id]);
continue;
}
if (mode_ == Mode::SHUTDOWN) break;
hb_condition_.wait_until(lock, wait_until);
VLOG(40) << "Server " << server_id_ << ": Sending HB to server "
<< peer_id << " (Term: " << current_term_ << ")";
if (issue_hb_) {
coordination_->ExecuteOnOtherNode<HeartbeatRpc>(peer_id, server_id_,
current_term_);
}
// This is ok even if we don't receive a reply.
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
}
}

View File

@ -178,6 +178,9 @@ class RaftServer final : public RaftInterface {
uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
std::atomic<bool> issue_hb_; ///< Flag which signalizes if the current server
///< should send HBs to the rest of the cluster.
/// Raft log entry buffer.
///
/// LogEntryBuffer buffers Raft logs until a log is complete and ready for
@ -206,10 +209,6 @@ class RaftServer final : public RaftInterface {
///< no_op_issuer_thread that a new
///< leader has been elected.
std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread
///< that a new leader has been
///< elected.
//////////////////////////////////////////////////////////////////////////////
// volatile state on followers and candidates
//////////////////////////////////////////////////////////////////////////////