diff --git a/src/raft/raft_rpc_messages.lcp b/src/raft/raft_rpc_messages.lcp index 42062cd11..7851d4012 100644 --- a/src/raft/raft_rpc_messages.lcp +++ b/src/raft/raft_rpc_messages.lcp @@ -39,6 +39,14 @@ cpp<# ((success :bool) (term :uint64_t)))) +(lcp:define-rpc heartbeat + (:request + ((leader-id :uint16_t) + (term :uint64_t))) + (:response + ((success :bool) + (term :uint64_t)))) + (lcp:define-rpc install-snapshot (:request ((leader-id :uint16_t) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 295edf6b9..c2b40a674 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -78,8 +78,8 @@ void RaftServer::Start() { auto cluster_size = coordination_->GetAllNodeCount() + 1; next_index_.resize(cluster_size); match_index_.resize(cluster_size); + next_replication_.resize(cluster_size); next_heartbeat_.resize(cluster_size); - backoff_until_.resize(cluster_size); // RPC registration coordination_->Register( @@ -138,7 +138,9 @@ void RaftServer::Start() { // Everything below is considered to be a valid RPC. This will ensure that // after we finish processing the current request, the election timeout will - // be extended. + // be extended. During this process we will prevent the timeout from + // occuring. + next_election_ = TimePoint::max(); utils::OnScopeExit extend_election_timeout([this] { // [Raft thesis 3.4] // A server remains in follower state as long as it receives valid RPCs @@ -211,6 +213,30 @@ void RaftServer::Start() { Save(res, res_builder); }); + coordination_->Register( + [this](const auto &req_reader, auto *res_builder) { + std::lock_guard guard(lock_); + HeartbeatReq req; + Load(&req, req_reader); + + if (exiting_ || req.term < current_term_) { + HeartbeatRes res(false, current_term_); + Save(res, res_builder); + return; + } + + if (req.term > current_term_) { + SetCurrentTerm(req.term); + if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER); + } + + SetNextElectionTimePoint(); + election_change_.notify_all(); + + HeartbeatRes res(true, current_term_); + Save(res, res_builder); + }); + coordination_->Register( [this](const auto &req_reader, auto *res_builder) { // Acquire snapshot lock. @@ -301,6 +327,7 @@ void RaftServer::Start() { for (auto peer_id : coordination_->GetOtherNodeIds()) { peer_threads_.emplace_back(&RaftServer::PeerThreadMain, this, peer_id); + hb_threads_.emplace_back(&RaftServer::HBThreadMain, this, peer_id); } no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this); @@ -316,12 +343,17 @@ void RaftServer::Shutdown() { state_changed_.notify_all(); election_change_.notify_all(); leader_changed_.notify_all(); + hb_condition_.notify_all(); } for (auto &peer_thread : peer_threads_) { if (peer_thread.joinable()) peer_thread.join(); } + for (auto &hb_thread : hb_threads_) { + if (hb_thread.joinable()) hb_thread.join(); + } + if (election_thread_.joinable()) election_thread_.join(); if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join(); if (snapshot_thread_.joinable()) snapshot_thread_.join(); @@ -403,9 +435,9 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, disk_storage_.Put(LogEntryKey(log_size_), SerializeLogEntry(new_entry)); SetLogSize(log_size_ + 1); - // Force issuing heartbeats + // Force replication TimePoint now = Clock::now(); - for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now; + for (auto &peer_replication : next_replication_) peer_replication = now; // From this point on, we can say that the replication of a LogEntry started. replication_timeout_.Insert(tx_id); @@ -549,6 +581,7 @@ void RaftServer::Transition(const Mode &new_mode) { SetNextElectionTimePoint(); election_change_.notify_all(); + state_changed_.notify_all(); break; } @@ -593,11 +626,15 @@ void RaftServer::Transition(const Mode &new_mode) { next_election_ = TimePoint::max(); election_change_.notify_all(); - // Set next heartbeat to correct values + // Set next heartbeat and replication to correct values TimePoint now = Clock::now(); + for (auto &peer_replication : next_replication_) + peer_replication = now + config_.heartbeat_interval; for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now + config_.heartbeat_interval; + hb_condition_.notify_all(); + // [Raft paper figure 2] // "For each server, index of the next log entry to send to that server // is initialized to leader's last log index + 1" @@ -704,6 +741,9 @@ void RaftServer::SendLogEntries( auto server_id = server_id_; auto commit_index = commit_index_; + VLOG(40) << "Server " << server_id_ + << ": Sending Entries RPC to server " << peer_id + << " (Term: " << current_term_ << ")"; VLOG(40) << "Entries size: " << request_entries.size(); // Execute the RPC. @@ -714,7 +754,7 @@ void RaftServer::SendLogEntries( lock->lock(); if (!reply) { - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; return; } @@ -747,7 +787,7 @@ void RaftServer::SendLogEntries( match_index_[peer_id] = new_match_index; if (request_entries.size() > 0) AdvanceCommitIndex(); next_index_[peer_id] = match_index_[peer_id] + 1; - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; } state_changed_.notify_all(); @@ -776,6 +816,9 @@ void RaftServer::SendSnapshot(uint16_t peer_id, input_stream.close(); } + VLOG(40) << "Server " << server_id_ + << ": Sending Snapshot RPC to server " << peer_id + << " (Term: " << current_term_ << ")"; VLOG(40) << "Snapshot size: " << snapshot_size << " bytes."; // Copy all internal variables before releasing the lock. @@ -789,7 +832,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id, lock->lock(); if (!reply) { - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; return; } @@ -810,7 +853,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id, match_index_[peer_id] = snapshot_metadata.last_included_index; next_index_[peer_id] = snapshot_metadata.last_included_index + 1; - next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + next_replication_[peer_id] = Clock::now() + config_.heartbeat_interval; state_changed_.notify_all(); } @@ -843,78 +886,71 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { TimePoint now = Clock::now(); TimePoint wait_until; - if (mode_ != Mode::FOLLOWER && backoff_until_[peer_id] > now) { - wait_until = backoff_until_[peer_id]; - } else { - switch (mode_) { - case Mode::FOLLOWER: { - wait_until = TimePoint::max(); + switch (mode_) { + case Mode::FOLLOWER: { + wait_until = TimePoint::max(); + break; + } + + case Mode::CANDIDATE: { + if (vote_requested_[peer_id]) break; + + // TODO(ipaljak): Consider backoff. + wait_until = TimePoint::max(); + + // Copy all internal variables before releasing the lock. + auto server_id = server_id_; + auto request_term = current_term_.load(); + auto last_entry_data = LastEntryData(); + + vote_requested_[peer_id] = true; + + // Execute the RPC. + lock.unlock(); // Release lock while waiting for response + auto reply = coordination_->ExecuteOnOtherNode( + peer_id, server_id, request_term, last_entry_data.first, + last_entry_data.second); + lock.lock(); + + // If the peer isn't reachable, it is the same as if he didn't grant + // us his vote. + if (!reply) { + reply = RequestVoteRes(false, request_term); + } + + if (current_term_ != request_term || mode_ != Mode::CANDIDATE || + exiting_) { + VLOG(40) << "Server " << server_id_ + << ": Ignoring RequestVoteRPC reply from " << peer_id; break; } - case Mode::CANDIDATE: { - if (vote_requested_[peer_id]) break; - - // TODO(ipaljak): Consider backoff. - wait_until = TimePoint::max(); - - // Copy all internal variables before releasing the lock. - auto server_id = server_id_; - auto request_term = current_term_.load(); - auto last_entry_data = LastEntryData(); - - vote_requested_[peer_id] = true; - - // Execute the RPC. - lock.unlock(); // Release lock while waiting for response - auto reply = coordination_->ExecuteOnOtherNode( - peer_id, server_id, request_term, last_entry_data.first, - last_entry_data.second); - lock.lock(); - - // If the peer isn't reachable, it is the same as if he didn't grant - // us his vote. - if (!reply) { - reply = RequestVoteRes(false, request_term); - } - - if (current_term_ != request_term || mode_ != Mode::CANDIDATE || - exiting_) { - VLOG(40) << "Server " << server_id_ - << ": Ignoring RequestVoteRPC reply from " << peer_id; - break; - } - - if (OutOfSync(reply->term)) { - state_changed_.notify_all(); - continue; - } - - if (reply->vote_granted) { - VLOG(40) << "Server " << server_id_ << ": Got vote from " - << peer_id; - ++granted_votes_; - if (HasMajorityVote()) Transition(Mode::LEADER); - } else { - VLOG(40) << "Server " << server_id_ << ": Denied vote from " - << peer_id; - } - + if (OutOfSync(reply->term)) { state_changed_.notify_all(); continue; } - case Mode::LEADER: { - if (now >= next_heartbeat_[peer_id]) { - VLOG(40) << "Server " << server_id_ - << ": Sending Entries RPC to server " << peer_id - << " (Term: " << current_term_ << ")"; - SendEntries(peer_id, &lock); - continue; - } - wait_until = next_heartbeat_[peer_id]; - break; + if (reply->vote_granted) { + VLOG(40) << "Server " << server_id_ << ": Got vote from " + << peer_id; + ++granted_votes_; + if (HasMajorityVote()) Transition(Mode::LEADER); + } else { + VLOG(40) << "Server " << server_id_ << ": Denied vote from " + << peer_id; } + + state_changed_.notify_all(); + continue; + } + + case Mode::LEADER: { + if (now >= next_replication_[peer_id]) { + SendEntries(peer_id, &lock); + continue; + } + wait_until = next_replication_[peer_id]; + break; } } @@ -923,6 +959,56 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) { } } +void RaftServer::HBThreadMain(uint16_t peer_id) { + utils::ThreadSetName(fmt::format("HBThread{}", peer_id)); + std::unique_lock lock(heartbeat_lock_); + + while (!exiting_) { + TimePoint now = Clock::now(); + TimePoint wait_until; + + switch (mode_) { + case Mode::FOLLOWER: { + wait_until = TimePoint::max(); + break; + } + + case Mode::CANDIDATE: { + wait_until = TimePoint::max(); + break; + } + + case Mode::LEADER: { + if (now < next_heartbeat_[peer_id]) { + wait_until = next_heartbeat_[peer_id]; + break; + } + VLOG(40) << "Server " << server_id_ << ": Sending HB to server " + << peer_id << " (Term: " << current_term_ << ")"; + + // For heartbeats, only the current term is relevant, we copy that + // before releasing the lock. + auto server_id = server_id_; + uint64_t request_term = current_term_; + + // Execute the RPC. + lock.unlock(); + coordination_->ExecuteOnOtherNode(peer_id, server_id, + request_term); + lock.lock(); + + // This is ok even if we don't receive the reply. + next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval; + wait_until = next_heartbeat_[peer_id]; + break; + } + } + + if (exiting_) break; + hb_condition_.wait_until(lock, wait_until); + } +} + void RaftServer::NoOpIssuerThreadMain() { utils::ThreadSetName(fmt::format("NoOpIssuer")); std::mutex m; diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 8c5ef049b..70f5b2a84 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -157,6 +157,7 @@ class RaftServer final : public RaftInterface { mutable std::mutex lock_; ///< Guards all internal state. mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal. + mutable std::mutex heartbeat_lock_; ///< Guards HB issuing ////////////////////////////////////////////////////////////////////////////// // volatile state on all servers @@ -185,6 +186,10 @@ class RaftServer final : public RaftInterface { std::vector peer_threads_; ///< One thread per peer which ///< handles outgoing RPCs. + std::vector hb_threads_; ///< One thread per peer which is + ///< responsible for sending periodic + ///< heartbeats. + std::condition_variable state_changed_; ///< Notifies all peer threads on ///< relevant state change. @@ -199,6 +204,10 @@ class RaftServer final : public RaftInterface { ///< no_op_issuer_thread that a new ///< leader has been elected. + std::condition_variable hb_condition_; ///< Notifies the HBIssuer thread + ///< that a new leader has been + ///< elected. + std::atomic exiting_{false}; ///< True on server shutdown. ////////////////////////////////////////////////////////////////////////////// @@ -231,9 +240,11 @@ class RaftServer final : public RaftInterface { ///< highest log entry known to be ///< replicated on server. - std::vector next_heartbeat_; ///< for each server, time point for - ///< the next heartbeat. - std::vector backoff_until_; ///< backoff for each server. + std::vector next_replication_; ///< for each server, time point + ///< for the next replication. + + std::vector next_heartbeat_; ///< for each server, time point for + ///< the next heartbeat. // Tracks timepoints until a transactions is allowed to be in the replication // process. @@ -314,6 +325,14 @@ class RaftServer final : public RaftInterface { /// @param peer_id - ID of a receiving node in the cluster. void PeerThreadMain(uint16_t peer_id); + /// Main function of the thread that handles issuing heartbeats towards + /// other peers. At the moment, this function is ignorant about the status + /// of LogEntry replication. Therefore, it might issue unnecessary + /// heartbeats, but we can live with that at this point. + /// + /// @param peer_id - ID of a receiving node in the cluster. + void HBThreadMain(uint16_t peer_id); + /// Issues no-op command when a new leader is elected. This is done to /// force the Raft protocol to commit logs from previous terms that /// have been replicated on a majority of peers.