diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 19db3b493..6a7e96561 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -47,6 +47,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, db_recover_on_startup_(db_recover_on_startup), commit_index_(0), last_applied_(0), + issue_hb_(false), replication_timeout_(config.replication_timeout), disk_storage_(fs::path(durability_dir) / kRaftDir) {} @@ -343,7 +344,6 @@ void RaftServer::Shutdown() { state_changed_.notify_all(); election_change_.notify_all(); leader_changed_.notify_all(); - hb_condition_.notify_all(); } for (auto &peer_thread : peer_threads_) { @@ -558,6 +558,7 @@ void RaftServer::Transition(const Mode &new_mode) { << ": Transition to FOLLOWER (Term: " << current_term_ << ")"; bool reset = mode_ == Mode::LEADER; + issue_hb_ = false; mode_ = Mode::FOLLOWER; log_entry_buffer_.Disable(); @@ -614,6 +615,7 @@ void RaftServer::Transition(const Mode &new_mode) { granted_votes_ = 1; vote_requested_.assign(coordination_->GetAllNodeCount(), false); + issue_hb_ = false; mode_ = Mode::CANDIDATE; if (HasMajorityVote()) { @@ -639,7 +641,7 @@ void RaftServer::Transition(const Mode &new_mode) { for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now + config_.heartbeat_interval; - hb_condition_.notify_all(); + issue_hb_ = true; // [Raft paper figure 2] // "For each server, index of the next log entry to send to that server @@ -976,54 +978,26 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { void RaftServer::HBThreadMain(uint16_t peer_id) { utils::ThreadSetName(fmt::format("HBThread{}", peer_id)); - std::unique_lock lock(heartbeat_lock_); while (mode_ != Mode::SHUTDOWN) { + if (!issue_hb_) continue; + TimePoint now = Clock::now(); - TimePoint wait_until; - - switch (mode_) { - case Mode::FOLLOWER: { - wait_until = TimePoint::max(); - break; - } - - case Mode::CANDIDATE: { - wait_until = TimePoint::max(); - break; - } - - case Mode::LEADER: { - if (now < next_heartbeat_[peer_id]) { - wait_until = next_heartbeat_[peer_id]; - break; - } - VLOG(40) << "Server " << server_id_ << ": Sending HB to server " - << peer_id << " (Term: " << current_term_ << ")"; - - // For heartbeats, only the current term is relevant, we copy that - // before releasing the lock. - auto server_id = server_id_; - uint64_t request_term = current_term_; - - // Execute the RPC. - lock.unlock(); - coordination_->ExecuteOnOtherNode(peer_id, server_id, - request_term); - lock.lock(); - - // This is ok even if we don't receive the reply. - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; - wait_until = next_heartbeat_[peer_id]; - break; - } - - case Mode::SHUTDOWN: - break; + if (now < next_heartbeat_[peer_id]) { + std::this_thread::sleep_until(next_heartbeat_[peer_id]); + continue; } - if (mode_ == Mode::SHUTDOWN) break; - hb_condition_.wait_until(lock, wait_until); + VLOG(40) << "Server " << server_id_ << ": Sending HB to server " + << peer_id << " (Term: " << current_term_ << ")"; + + if (issue_hb_) { + coordination_->ExecuteOnOtherNode(peer_id, server_id_, + current_term_); + } + + // This is ok even if we don't receive a reply. + next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; } } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 29e847869..347eda9ae 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -178,6 +178,9 @@ class RaftServer final : public RaftInterface { uint64_t commit_index_; ///< Index of the highest known committed entry. uint64_t last_applied_; ///< Index of the highest applied entry to SM. + std::atomic issue_hb_; ///< Flag which signalizes if the current server + ///< should send HBs to the rest of the cluster. + /// Raft log entry buffer. /// /// LogEntryBuffer buffers Raft logs until a log is complete and ready for @@ -206,10 +209,6 @@ class RaftServer final : public RaftInterface { ///< no_op_issuer_thread that a new ///< leader has been elected. - std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread - ///< that a new leader has been - ///< elected. - ////////////////////////////////////////////////////////////////////////////// // volatile state on followers and candidates //////////////////////////////////////////////////////////////////////////////