From 358391bd2214f6180fecc3513682f5610fbcdb90 Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Thu, 9 May 2019 15:07:17 +0200 Subject: [PATCH] Fix low read throughput due to active waits Summary: Read throughput dropped by about 50% from the last diff. This should fix it (at least according to local measurements). Reviewers: msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2027 --- src/raft/raft_server.cpp | 39 ++++++++++++++++++++++++--------------- src/raft/raft_server.hpp | 4 ++++ 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 6a7e96561..cd2087a9d 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -344,6 +344,7 @@ void RaftServer::Shutdown() { state_changed_.notify_all(); election_change_.notify_all(); leader_changed_.notify_all(); + hb_condition_.notify_all(); } for (auto &peer_thread : peer_threads_) { @@ -642,6 +643,7 @@ void RaftServer::Transition(const Mode &new_mode) { peer_heartbeat = now + config_.heartbeat_interval; issue_hb_ = true; + hb_condition_.notify_all(); // [Raft paper figure 2] // "For each server, index of the next log entry to send to that server @@ -978,26 +980,33 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { void RaftServer::HBThreadMain(uint16_t peer_id) { utils::ThreadSetName(fmt::format("HBThread{}", peer_id)); + std::unique_lock lock(heartbeat_lock_); while (mode_ != Mode::SHUTDOWN) { - if (!issue_hb_) continue; + TimePoint wait_until; - TimePoint now = Clock::now(); - if (now < next_heartbeat_[peer_id]) { - std::this_thread::sleep_until(next_heartbeat_[peer_id]); - continue; + if (!issue_hb_) { + wait_until = TimePoint::max(); + } else { + TimePoint now = Clock::now(); + if (now < next_heartbeat_[peer_id]) { + wait_until = next_heartbeat_[peer_id]; + } else { + VLOG(40) << "Server " << server_id_ << ": Sending HB to server " + << peer_id << " (Term: " << current_term_ << ")"; + + lock.unlock(); + coordination_->ExecuteOnOtherNode(peer_id, server_id_, + current_term_); + lock.lock(); + + // This is ok even if we don't receive a reply. + next_heartbeat_[peer_id] = now + config_.heartbeat_interval; + wait_until = next_heartbeat_[peer_id]; + } } - VLOG(40) << "Server " << server_id_ << ": Sending HB to server " - << peer_id << " (Term: " << current_term_ << ")"; - - if (issue_hb_) { - coordination_->ExecuteOnOtherNode(peer_id, server_id_, - current_term_); - } - - // This is ok even if we don't receive a reply. - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + hb_condition_.wait_until(lock, wait_until); } } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 347eda9ae..892c584c8 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -209,6 +209,10 @@ class RaftServer final : public RaftInterface { ///< no_op_issuer_thread that a new ///< leader has been elected. + std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread + ///< that it should start sending + ///< heartbeats. + ////////////////////////////////////////////////////////////////////////////// // volatile state on followers and candidates //////////////////////////////////////////////////////////////////////////////