Fix low read throughput due to active waits

Summary:
Read throughput dropped by about 50% from the last diff.
This should fix it (at least according to local measurements).

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2027
This commit is contained in:
Ivan Paljak 2019-05-09 15:07:17 +02:00
parent 394039a05e
commit 358391bd22
2 changed files with 28 additions and 15 deletions

View File

@ -344,6 +344,7 @@ void RaftServer::Shutdown() {
state_changed_.notify_all();
election_change_.notify_all();
leader_changed_.notify_all();
hb_condition_.notify_all();
}
for (auto &peer_thread : peer_threads_) {
@ -642,6 +643,7 @@ void RaftServer::Transition(const Mode &new_mode) {
peer_heartbeat = now + config_.heartbeat_interval;
issue_hb_ = true;
hb_condition_.notify_all();
// [Raft paper figure 2]
// "For each server, index of the next log entry to send to that server
@ -978,26 +980,33 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
void RaftServer::HBThreadMain(uint16_t peer_id) {
utils::ThreadSetName(fmt::format("HBThread{}", peer_id));
std::unique_lock<std::mutex> lock(heartbeat_lock_);
while (mode_ != Mode::SHUTDOWN) {
if (!issue_hb_) continue;
TimePoint wait_until;
TimePoint now = Clock::now();
if (now < next_heartbeat_[peer_id]) {
std::this_thread::sleep_until(next_heartbeat_[peer_id]);
continue;
if (!issue_hb_) {
wait_until = TimePoint::max();
} else {
TimePoint now = Clock::now();
if (now < next_heartbeat_[peer_id]) {
wait_until = next_heartbeat_[peer_id];
} else {
VLOG(40) << "Server " << server_id_ << ": Sending HB to server "
<< peer_id << " (Term: " << current_term_ << ")";
lock.unlock();
coordination_->ExecuteOnOtherNode<HeartbeatRpc>(peer_id, server_id_,
current_term_);
lock.lock();
// This is ok even if we don't receive a reply.
next_heartbeat_[peer_id] = now + config_.heartbeat_interval;
wait_until = next_heartbeat_[peer_id];
}
}
VLOG(40) << "Server " << server_id_ << ": Sending HB to server "
<< peer_id << " (Term: " << current_term_ << ")";
if (issue_hb_) {
coordination_->ExecuteOnOtherNode<HeartbeatRpc>(peer_id, server_id_,
current_term_);
}
// This is ok even if we don't receive a reply.
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
hb_condition_.wait_until(lock, wait_until);
}
}

View File

@ -209,6 +209,10 @@ class RaftServer final : public RaftInterface {
///< no_op_issuer_thread that a new
///< leader has been elected.
std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread
///< that it should start sending
///< heartbeats.
//////////////////////////////////////////////////////////////////////////////
// volatile state on followers and candidates
//////////////////////////////////////////////////////////////////////////////