From 5833e6cc0f7d4a86cec96b823e54ffabbdf0f55f Mon Sep 17 00:00:00 2001 From: Ivan Paljak Date: Mon, 6 May 2019 12:34:56 +0200 Subject: [PATCH] Introduce new RPC for heartbeats in Raft Summary: Queries such as `UNWIND RANGE(1, 90000) AS x CREATE(:node{id:x});` work now. At some point (cca 100000 state deltas) the messages become too large for Cap'n Proto to handle and that exception kills the cluster. This is fine since we are about to replace it anyway. Also, the issue with leadership change during replication still remains if the user tempers with the constants (election timeout more or less the same to hb interval). This is unavoidable, so we need to address that issue in the future. Also, I've removed the unnecessary `backoff_until_` variable and the logic around it. It's a small change so I kept it within this diff (sorry). Reviewers: msantl, mferencevic Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1998 --- src/raft/raft_rpc_messages.lcp | 8 ++ src/raft/raft_server.cpp | 232 ++++++++++++++++++++++----------- src/raft/raft_server.hpp | 25 +++- 3 files changed, 189 insertions(+), 76 deletions(-) diff --git a/src/raft/raft_rpc_messages.lcp b/src/raft/raft_rpc_messages.lcp index 42062cd11..7851d4012 100644 --- a/src/raft/raft_rpc_messages.lcp +++ b/src/raft/raft_rpc_messages.lcp @@ -39,6 +39,14 @@ cpp<# ((success :bool) (term :uint64_t)))) +(lcp:define-rpc heartbeat + (:request + ((leader-id :uint16_t) + (term :uint64_t))) + (:response + ((success :bool) + (term :uint64_t)))) + (lcp:define-rpc install-snapshot (:request ((leader-id :uint16_t) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 295edf6b9..c2b40a674 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -78,8 +78,8 @@ void RaftServer::Start() { auto cluster_size = coordination_->GetAllNodeCount() + 1; next_index_.resize(cluster_size); match_index_.resize(cluster_size); + next_replication_.resize(cluster_size); next_heartbeat_.resize(cluster_size); - backoff_until_.resize(cluster_size); // RPC registration coordination_->Register( @@ -138,7 +138,9 @@ void RaftServer::Start() { // Everything below is considered to be a valid RPC. This will ensure that // after we finish processing the current request, the election timeout will - // be extended. + // be extended. During this process we will prevent the timeout from + // occuring. + next_election_ = TimePoint::max(); utils::OnScopeExit extend_election_timeout([this] { // [Raft thesis 3.4] // A server remains in follower state as long as it receives valid RPCs @@ -211,6 +213,30 @@ void RaftServer::Start() { Save(res, res_builder); }); + coordination_->Register( + [this](const auto &req_reader, auto *res_builder) { + std::lock_guard guard(lock_); + HeartbeatReq req; + Load(&req, req_reader); + + if (exiting_ || req.term < current_term_) { + HeartbeatRes res(false, current_term_); + Save(res, res_builder); + return; + } + + if (req.term > current_term_) { + SetCurrentTerm(req.term); + if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER); + } + + SetNextElectionTimePoint(); + election_change_.notify_all(); + + HeartbeatRes res(true, current_term_); + Save(res, res_builder); + }); + coordination_->Register( [this](const auto &req_reader, auto *res_builder) { // Acquire snapshot lock. @@ -301,6 +327,7 @@ void RaftServer::Start() { for (auto peer_id : coordination_->GetOtherNodeIds()) { peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id); + hb_threads_.emplace_back(&RaftServer::HBThreadMain, this, peer_id); } no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this); @@ -316,12 +343,17 @@ void RaftServer::Shutdown() { state_changed_.notify_all(); election_change_.notify_all(); leader_changed_.notify_all(); + hb_condition_.notify_all(); } for (auto &peer_thread : peer_threads_) { if (peer_thread.joinable()) peer_thread.join(); } + for (auto &hb_thread : hb_threads_) { + if (hb_thread.joinable()) hb_thread.join(); + } + if (election_thread_.joinable()) election_thread_.join(); if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join(); if (snapshot_thread_.joinable()) snapshot_thread_.join(); @@ -403,9 +435,9 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, disk_storage_.Put(LogEntryKey(log_size_), SerializeLogEntry(new_entry)); SetLogSize(log_size_ + 1); - // Force issuing heartbeats + // Force replication TimePoint now = Clock::now(); - for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now; + for (auto &peer_replication : next_replication_) peer_replication = now; // From this point on, we can say that the replication of a LogEntry started. replication_timeout_.Insert(tx_id); @@ -549,6 +581,7 @@ void RaftServer::Transition(const Mode &new_mode) { SetNextElectionTimePoint(); election_change_.notify_all(); + state_changed_.notify_all(); break; } @@ -593,11 +626,15 @@ void RaftServer::Transition(const Mode &new_mode) { next_election_ = TimePoint::max(); election_change_.notify_all(); - // Set next heartbeat to correct values + // Set next heartbeat and replication to correct values TimePoint now = Clock::now(); + for (auto &peer_replication : next_replication_) + peer_replication = now + config_.heartbeat_interval; for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now + config_.heartbeat_interval; + hb_condition_.notify_all(); + // [Raft paper figure 2] // "For each server, index of the next log entry to send to that server // is initialized to leader's last log index + 1" @@ -704,6 +741,9 @@ void RaftServer::SendLogEntries( auto server_id = server_id_; auto commit_index = commit_index_; + VLOG(40) << "Server " << server_id_ + << ": Sending Entries RPC to server " << peer_id + << " (Term: " << current_term_ << ")"; VLOG(40) << "Entries size: " << request_entries.size(); // Execute the RPC. @@ -714,7 +754,7 @@ void RaftServer::SendLogEntries( lock->lock(); if (!reply) { - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; return; } @@ -747,7 +787,7 @@ void RaftServer::SendLogEntries( match_index_[peer_id] = new_match_index; if (request_entries.size() > 0) AdvanceCommitIndex(); next_index_[peer_id] = match_index_[peer_id] + 1; - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; } state_changed_.notify_all(); @@ -776,6 +816,9 @@ void RaftServer::SendSnapshot(uint16_t peer_id, input_stream.close(); } + VLOG(40) << "Server " << server_id_ + << ": Sending Snapshot RPC to server " << peer_id + << " (Term: " << current_term_ << ")"; VLOG(40) << "Snapshot size: " << snapshot_size << " bytes."; // Copy all internal variables before releasing the lock. @@ -789,7 +832,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id, lock->lock(); if (!reply) { - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; return; } @@ -810,7 +853,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id, match_index_[peer_id] = snapshot_metadata.last_included_index; next_index_[peer_id] = snapshot_metadata.last_included_index + 1; - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; state_changed_.notify_all(); } @@ -843,78 +886,71 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { TimePoint now = Clock::now(); TimePoint wait_until; - if (mode_ != Mode::FOLLOWER && backoff_until_[peer_id] > now) { - wait_until = backoff_until_[peer_id]; - } else { - switch (mode_) { - case Mode::FOLLOWER: { - wait_until = TimePoint::max(); + switch (mode_) { + case Mode::FOLLOWER: { + wait_until = TimePoint::max(); + break; + } + + case Mode::CANDIDATE: { + if (vote_requested_[peer_id]) break; + + // TODO(ipaljak): Consider backoff. + wait_until = TimePoint::max(); + + // Copy all internal variables before releasing the lock. + auto server_id = server_id_; + auto request_term = current_term_.load(); + auto last_entry_data = LastEntryData(); + + vote_requested_[peer_id] = true; + + // Execute the RPC. + lock.unlock(); // Release lock while waiting for response + auto reply = coordination_->ExecuteOnOtherNode( + peer_id, server_id, request_term, last_entry_data.first, + last_entry_data.second); + lock.lock(); + + // If the peer isn't reachable, it is the same as if he didn't grant + // us his vote. + if (!reply) { + reply = RequestVoteRes(false, request_term); + } + + if (current_term_ != request_term || mode_ != Mode::CANDIDATE || + exiting_) { + VLOG(40) << "Server " << server_id_ + << ": Ignoring RequestVoteRPC reply from " << peer_id; break; } - case Mode::CANDIDATE: { - if (vote_requested_[peer_id]) break; - - // TODO(ipaljak): Consider backoff. - wait_until = TimePoint::max(); - - // Copy all internal variables before releasing the lock. - auto server_id = server_id_; - auto request_term = current_term_.load(); - auto last_entry_data = LastEntryData(); - - vote_requested_[peer_id] = true; - - // Execute the RPC. - lock.unlock(); // Release lock while waiting for response - auto reply = coordination_->ExecuteOnOtherNode( - peer_id, server_id, request_term, last_entry_data.first, - last_entry_data.second); - lock.lock(); - - // If the peer isn't reachable, it is the same as if he didn't grant - // us his vote. - if (!reply) { - reply = RequestVoteRes(false, request_term); - } - - if (current_term_ != request_term || mode_ != Mode::CANDIDATE || - exiting_) { - VLOG(40) << "Server " << server_id_ - << ": Ignoring RequestVoteRPC reply from " << peer_id; - break; - } - - if (OutOfSync(reply->term)) { - state_changed_.notify_all(); - continue; - } - - if (reply->vote_granted) { - VLOG(40) << "Server " << server_id_ << ": Got vote from " - << peer_id; - ++granted_votes_; - if (HasMajorityVote()) Transition(Mode::LEADER); - } else { - VLOG(40) << "Server " << server_id_ << ": Denied vote from " - << peer_id; - } - + if (OutOfSync(reply->term)) { state_changed_.notify_all(); continue; } - case Mode::LEADER: { - if (now >= next_heartbeat_[peer_id]) { - VLOG(40) << "Server " << server_id_ - << ": Sending Entries RPC to server " << peer_id - << " (Term: " << current_term_ << ")"; - SendEntries(peer_id, &lock); - continue; - } - wait_until = next_heartbeat_[peer_id]; - break; + if (reply->vote_granted) { + VLOG(40) << "Server " << server_id_ << ": Got vote from " + << peer_id; + ++granted_votes_; + if (HasMajorityVote()) Transition(Mode::LEADER); + } else { + VLOG(40) << "Server " << server_id_ << ": Denied vote from " + << peer_id; } + + state_changed_.notify_all(); + continue; + } + + case Mode::LEADER: { + if (now >= next_replication_[peer_id]) { + SendEntries(peer_id, &lock); + continue; + } + wait_until = next_replication_[peer_id]; + break; } } @@ -923,6 +959,56 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { } } +void RaftServer::HBThreadMain(uint16_t peer_id) { + utils::ThreadSetName(fmt::format("HBThread{}", peer_id)); + std::unique_lock lock(heartbeat_lock_); + + while (!exiting_) { + TimePoint now = Clock::now(); + TimePoint wait_until; + + switch (mode_) { + case Mode::FOLLOWER: { + wait_until = TimePoint::max(); + break; + } + + case Mode::CANDIDATE: { + wait_until = TimePoint::max(); + break; + } + + case Mode::LEADER: { + if (now < next_heartbeat_[peer_id]) { + wait_until = next_heartbeat_[peer_id]; + break; + } + VLOG(40) << "Server " << server_id_ << ": Sending HB to server " + << peer_id << " (Term: " << current_term_ << ")"; + + // For heartbeats, only the current term is relevant, we copy that + // before releasing the lock. + auto server_id = server_id_; + uint64_t request_term = current_term_; + + // Execute the RPC. + lock.unlock(); + coordination_->ExecuteOnOtherNode(peer_id, server_id, + request_term); + lock.lock(); + + // This is ok even if we don't receive the reply. + next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + wait_until = next_heartbeat_[peer_id]; + break; + } + } + + if (exiting_) break; + hb_condition_.wait_until(lock, wait_until); + } +} + void RaftServer::NoOpIssuerThreadMain() { utils::ThreadSetName(fmt::format("NoOpIssuer")); std::mutex m; diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 8c5ef049b..70f5b2a84 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -157,6 +157,7 @@ class RaftServer final : public RaftInterface { mutable std::mutex lock_; ///< Guards all internal state. mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal. + mutable std::mutex heartbeat_lock_; ///< Guards HB issuing ////////////////////////////////////////////////////////////////////////////// // volatile state on all servers @@ -185,6 +186,10 @@ class RaftServer final : public RaftInterface { std::vector peer_threads_; ///< One thread per peer which ///< handles outgoing RPCs. + std::vector hb_threads_; ///< One thread per peer which is + ///< responsible for sending periodic + ///< heartbeats. + std::condition_variable state_changed_; ///< Notifies all peer threads on ///< relevant state change. @@ -199,6 +204,10 @@ class RaftServer final : public RaftInterface { ///< no_op_issuer_thread that a new ///< leader has been elected. + std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread + ///< that a new leader has been + ///< elected. + std::atomic exiting_{false}; ///< True on server shutdown. ////////////////////////////////////////////////////////////////////////////// @@ -231,9 +240,11 @@ class RaftServer final : public RaftInterface { ///< highest log entry known to be ///< replicated on server. - std::vector next_heartbeat_; ///< for each server, time point for - ///< the next heartbeat. - std::vector backoff_until_; ///< backoff for each server. + std::vector next_replication_; ///< for each server, time point + ///< for the next replication. + + std::vector next_heartbeat_; ///< for each server, time point for + ///< the next heartbeat. // Tracks timepoints until a transactions is allowed to be in the replication // process. @@ -314,6 +325,14 @@ class RaftServer final : public RaftInterface { /// @param peer_id - ID of a receiving node in the cluster. void PeerThreadMain(uint16_t peer_id); + /// Main function of the thread that handles issuing heartbeats towards + /// other peers. At the moment, this function is ignorant about the status + /// of LogEntry replication. Therefore, it might issue unnecessary + /// heartbeats, but we can live with that at this point. + /// + /// @param peer_id - ID of a receiving node in the cluster. + void HBThreadMain(uint16_t peer_id); + /// Issues no-op command when a new leader is elected. This is done to /// force the Raft protocol to commit logs from previous terms that /// have been replicated on a majority of peers.