From e502b1a30679c9bc00884bf3b21f1207f6feb88a Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Thu, 9 May 2019 12:53:29 +0200 Subject: [PATCH] Remove SM simulation from HB issuer thread Summary: This allows us to decouple issuing heartbeats from the server mode which is useful when we know we will transition to LEADER but cannot yet change the mode due to some Raft internals. It can now happen that a couple of HBs are sent when in FOLLOWER or CANDIDATE mode, but this doesn't affect the correctness of the protocol. Reviewers: msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2025 --- src/raft/raft_server.cpp | 64 ++++++++++++---------------------------- src/raft/raft_server.hpp | 7 ++--- 2 files changed, 22 insertions(+), 49 deletions(-) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 19db3b493..6a7e96561 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -47,6 +47,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, db_recover_on_startup_(db_recover_on_startup), commit_index_(0), last_applied_(0), + issue_hb_(false), replication_timeout_(config.replication_timeout), disk_storage_(fs::path(durability_dir) / kRaftDir) {} @@ -343,7 +344,6 @@ void RaftServer::Shutdown() { state_changed_.notify_all(); election_change_.notify_all(); leader_changed_.notify_all(); - hb_condition_.notify_all(); } for (auto &peer_thread : peer_threads_) { @@ -558,6 +558,7 @@ void RaftServer::Transition(const Mode &new_mode) { << ": Transition to FOLLOWER (Term: " << current_term_ << ")"; bool reset = mode_ == Mode::LEADER; + issue_hb_ = false; mode_ = Mode::FOLLOWER; log_entry_buffer_.Disable(); @@ -614,6 +615,7 @@ void RaftServer::Transition(const Mode &new_mode) { granted_votes_ = 1; vote_requested_.assign(coordination_->GetAllNodeCount(), false); + issue_hb_ = false; mode_ = Mode::CANDIDATE; if (HasMajorityVote()) { @@ -639,7 +641,7 @@ void RaftServer::Transition(const Mode &new_mode) { for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now + config_.heartbeat_interval; - hb_condition_.notify_all(); + issue_hb_ = true; // [Raft paper figure 2] // "For each server, index of the next log entry to send to that server @@ -976,54 +978,26 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { void RaftServer::HBThreadMain(uint16_t peer_id) { utils::ThreadSetName(fmt::format("HBThread{}", peer_id)); - std::unique_lock lock(heartbeat_lock_); while (mode_ != Mode::SHUTDOWN) { + if (!issue_hb_) continue; + TimePoint now = Clock::now(); - TimePoint wait_until; - - switch (mode_) { - case Mode::FOLLOWER: { - wait_until = TimePoint::max(); - break; - } - - case Mode::CANDIDATE: { - wait_until = TimePoint::max(); - break; - } - - case Mode::LEADER: { - if (now < next_heartbeat_[peer_id]) { - wait_until = next_heartbeat_[peer_id]; - break; - } - VLOG(40) << "Server " << server_id_ << ": Sending HB to server " - << peer_id << " (Term: " << current_term_ << ")"; - - // For heartbeats, only the current term is relevant, we copy that - // before releasing the lock. - auto server_id = server_id_; - uint64_t request_term = current_term_; - - // Execute the RPC. - lock.unlock(); - coordination_->ExecuteOnOtherNode(peer_id, server_id, - request_term); - lock.lock(); - - // This is ok even if we don't receive the reply. - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; - wait_until = next_heartbeat_[peer_id]; - break; - } - - case Mode::SHUTDOWN: - break; + if (now < next_heartbeat_[peer_id]) { + std::this_thread::sleep_until(next_heartbeat_[peer_id]); + continue; } - if (mode_ == Mode::SHUTDOWN) break; - hb_condition_.wait_until(lock, wait_until); + VLOG(40) << "Server " << server_id_ << ": Sending HB to server " + << peer_id << " (Term: " << current_term_ << ")"; + + if (issue_hb_) { + coordination_->ExecuteOnOtherNode(peer_id, server_id_, + current_term_); + } + + // This is ok even if we don't receive a reply. + next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; } } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 29e847869..347eda9ae 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -178,6 +178,9 @@ class RaftServer final : public RaftInterface { uint64_t commit_index_; ///< Index of the highest known committed entry. uint64_t last_applied_; ///< Index of the highest applied entry to SM. + std::atomic issue_hb_; ///< Flag which signalizes if the current server + ///< should send HBs to the rest of the cluster. + /// Raft log entry buffer. /// /// LogEntryBuffer buffers Raft logs until a log is complete and ready for @@ -206,10 +209,6 @@ class RaftServer final : public RaftInterface { ///< no_op_issuer_thread that a new ///< leader has been elected. - std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread - ///< that a new leader has been - ///< elected. - ////////////////////////////////////////////////////////////////////////////// // volatile state on followers and candidates //////////////////////////////////////////////////////////////////////////////