diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ab074a10a..421e3ae69 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,7 +7,6 @@ set(memgraph_src_files communication/messaging/distributed.cpp communication/messaging/local.cpp communication/messaging/protocol.cpp - communication/raft/raft.cpp communication/reactor/protocol.cpp communication/reactor/reactor_distributed.cpp communication/reactor/reactor_local.cpp diff --git a/src/communication/raft/raft-inl.hpp b/src/communication/raft/raft-inl.hpp new file mode 100644 index 000000000..7b8850222 --- /dev/null +++ b/src/communication/raft/raft-inl.hpp @@ -0,0 +1,642 @@ +#pragma once + +#include "fmt/format.h" +#include "glog/logging.h" + +namespace communication::raft { + +namespace impl { + +template +RaftMemberImpl::RaftMemberImpl(RaftNetworkInterface &network, + RaftStorageInterface &storage, + const MemberId &id, + const RaftConfig &config) + : network_(network), storage_(storage), id_(id), config_(config) { + std::lock_guard lock(mutex_); + + tie(term_, voted_for_) = storage_.GetTermAndVotedFor(); + + for (const auto &peer_id : config_.members) { + peer_states_[peer_id] = std::make_unique(); + } + + SetElectionTimer(); +} + +template +RaftMemberImpl::~RaftMemberImpl() { + Stop(); +} + +template +void RaftMemberImpl::Stop() { + { + std::lock_guard lock(mutex_); + if (!exiting_) { + LogInfo("Stopping..."); + exiting_ = true; + } + } + state_changed_.notify_all(); +} + +template +template +void RaftMemberImpl::LogInfo(const std::string &format, + Args &&... args) { + LOG(INFO) << fmt::format("[id = {}, term = {}] {}", id_, term_, + fmt::format(format, std::forward(args)...)) + << std::endl; +} + +template +void RaftMemberImpl::TimerThreadMain() { + std::unique_lock lock(mutex_); + while (!exiting_) { + if (Clock::now() >= next_election_time_) { + StartNewElection(); + } + state_changed_.wait_until(lock, next_election_time_); + } +} + +template +void RaftMemberImpl::PeerThreadMain(std::string peer_id) { + RaftPeerState &peer_state = *peer_states_[peer_id]; + + LogInfo("Peer thread started for {}", peer_id); + + std::unique_lock lock(mutex_); + + /* This loop will either call a function that issues an RPC or wait on the + * condition variable. It must not do both! Lock on `mutex_` is released while + * waiting for RPC response, which might cause us to miss a notification on + * `state_changed_` conditional variable and wait indefinitely. The safest + * thing to do is to assume some important part of state was modified while we + * were waiting for the response and loop around to check. */ + while (!exiting_) { + TimePoint wait_until = TimePoint::max(); + + switch (mode_) { + case RaftMode::FOLLOWER: + break; + case RaftMode::CANDIDATE: + if (!peer_state.request_vote_done) { + RequestVote(peer_id, peer_state, lock); + continue; + } + break; + case RaftMode::LEADER: + if (peer_state.next_index <= storage_.GetLastLogIndex() || + Clock::now() >= peer_state.next_heartbeat_time) { + AppendEntries(peer_id, peer_state, lock); + continue; + } else { + wait_until = peer_state.next_heartbeat_time; + } + break; + } + + state_changed_.wait_until(lock, wait_until); + } + + LogInfo("Peer thread exiting for {}", peer_id); +} + +template +void RaftMemberImpl::CandidateOrLeaderTransitionToFollower() { + DCHECK(mode_ != RaftMode::FOLLOWER) + << "`CandidateOrLeaderTransitionToFollower` called from follower mode"; + mode_ = RaftMode::FOLLOWER; + leader_ = {}; + SetElectionTimer(); +} + +template +void RaftMemberImpl::CandidateTransitionToLeader() { + DCHECK(mode_ == RaftMode::CANDIDATE) + << "`CandidateTransitionToLeader` called while not in candidate mode"; + mode_ = RaftMode::LEADER; + leader_ = id_; + + /* We don't want to trigger elections while in leader mode. */ + next_election_time_ = TimePoint::max(); + + /* [Raft thesis, Section 6.4] + * "The Leader Completeness Property guarantees that a leader has all + * committed entries, but at the start of its term, it may not know which + * those are. To find out, it needs to commit an entry from its term. Raft + * handles this by having each leader commit a blank no-op entry into the log + * at the start of its term. As soon as this no-op entry is committed, the + * leader’s commit index will be at least as large as any other servers’ + * during its term." */ + + LogEntry entry; + entry.term = term_; + entry.command = std::experimental::nullopt; + storage_.AppendLogEntry(entry); +} + +template +void RaftMemberImpl::UpdateTermAndVotedFor( + const TermId new_term, + const std::experimental::optional &new_voted_for) { + term_ = new_term; + voted_for_ = new_voted_for; + leader_ = {}; + + storage_.WriteTermAndVotedFor(term_, voted_for_); +} + +template +void RaftMemberImpl::SetElectionTimer() { + /* [Raft thesis, section 3.4] + * "Raft uses randomized election timeouts to ensure that split votes are rare + * and that they are resolved quickly. To prevent split votes in the first + * place, election timeouts are chosen randomly from a fixed interval (e.g., + * 150–300 ms)." */ + std::uniform_int_distribution distribution( + config_.leader_timeout_min.count(), config_.leader_timeout_max.count()); + Clock::duration wait_interval = std::chrono::milliseconds(distribution(rng_)); + next_election_time_ = Clock::now() + wait_interval; +} + +template +bool RaftMemberImpl::SendRPC(const std::string &recipient, + const PeerRPCRequest &request, + PeerRPCReply &reply, + std::unique_lock &lock) { + DCHECK(mode_ != RaftMode::FOLLOWER) << "Follower should not send RPCs"; + + bool was_candidate = mode_ == RaftMode::CANDIDATE; + + /* Release lock before issuing RPC and waiting for response. */ + /* TODO(mtomic): Revise how this will work with RPC cancellation. */ + lock.unlock(); + bool ok = network_.SendRPC(recipient, request, reply); + lock.lock(); + + /* TODO(mtomic): RPC retrying */ + if (!ok) { + return false; + } + + /* We released the lock while waiting for RPC response. It is possible that + * the internal state has changed while we we're waiting and we don't care for + * RPC reply anymore for any of these reasons: + * (a) we are not the leader anymore + * (b) election timeout + * (c) we are elected as leader + * (d) out election was interrupted by another leader + * (e) destructor was called + */ + if (term_ != request.Term() || + (was_candidate && mode_ != RaftMode::CANDIDATE) || exiting_) { + LogInfo("Ignoring RPC reply from {}", recipient); + return false; + } + + /* [Raft thesis, Section 3.3] + * "Current terms are exchanged whenever servers communicate; if one server's + * current term is smaller than the other's, then it updates its current term + * to the larger value. If a candidate or leader discovers that its term is + * out of date, it immediately reverts to follower state." */ + if (reply.Term() > term_) { + UpdateTermAndVotedFor(reply.Term(), {}); + CandidateOrLeaderTransitionToFollower(); + state_changed_.notify_all(); + return false; + } + + return true; +} + +template +void RaftMemberImpl::StartNewElection() { + LogInfo("Starting new election"); + /* [Raft thesis, section 3.4] + * "To begin an election, a follower increments its current term and + * transitions to candidate state. It then votes for itself and issues + * RequestVote RPCs in parallel to each of the other servers in the cluster." + */ + UpdateTermAndVotedFor(term_ + 1, id_); + mode_ = RaftMode::CANDIDATE; + + /* [Raft thesis, section 3.4] + * "Each candidate restarts its randomized election timeout at the start of an + * election, and it waits for that timeout to elapse before starting the next + * election; this reduces the likelihood of another split vote in the new + * election." */ + SetElectionTimer(); + + for (const auto &peer_id : config_.members) { + if (peer_id == id_) { + continue; + } + auto &peer_state = peer_states_[peer_id]; + peer_state->request_vote_done = false; + peer_state->voted_for_me = false; + peer_state->match_index = 0; + peer_state->next_index = storage_.GetLastLogIndex() + 1; + + /* [Raft thesis, section 3.5] + * "Until the leader has discovered where it and the follower's logs match, + * the leader can send AppendEntries with no entries (like heartbeats) to + * save bandwidth. Then, once the matchIndex immediately precedes the + * nextIndex, the leader should begin to send the actual entries." */ + peer_state->suppress_log_entries = true; + + /* [Raft thesis, section 3.4] + * "Once a candidate wins an election, it becomes leader. It then sends + * heartbeat messages to all of the other servers to establish its authority + * and prevent new elections." + * + * This will make newly elected leader send heartbeats immediately. + */ + peer_state->next_heartbeat_time = TimePoint::min(); + } + + /* Notify peer threads to start issuing RequestVote RPCs. */ + state_changed_.notify_all(); +} + +template +bool RaftMemberImpl::CountVotes() { + DCHECK(mode_ == RaftMode::CANDIDATE) + << "`CountVotes` should only be called from candidate mode"; + int num_votes = 0; + for (const auto &peer_id : config_.members) { + if (peer_id == id_ || peer_states_[peer_id]->voted_for_me) { + num_votes++; + } + } + + return 2 * num_votes > config_.members.size(); +} + +template +void RaftMemberImpl::RequestVote(const std::string &peer_id, + RaftPeerState &peer_state, + std::unique_lock &lock) { + LogInfo("Requesting vote from {}", peer_id); + + PeerRPCRequest request; + request.type = PeerRPCRequest::Type::REQUEST_VOTE; + request.request_vote.candidate_term = term_; + request.request_vote.candidate_id = id_; + + PeerRPCReply reply; + + /* Release lock before issuing RPC and waiting for response. */ + if (!SendRPC(peer_id, request, reply, lock)) { + return; + } + + DCHECK(reply.request_vote.term >= term_) << "Stale RequestVote RPC reply"; + + peer_state.request_vote_done = true; + + if (reply.request_vote.vote_granted) { + peer_state.voted_for_me = true; + LogInfo("Got vote from {}", peer_id); + + if (CountVotes()) { + LogInfo("Elected as leader."); + CandidateTransitionToLeader(); + } + } else { + LogInfo("Vote denied from {}", peer_id); + } + + state_changed_.notify_all(); +} + +template +void RaftMemberImpl::AdvanceCommitIndex() { + DCHECK(mode_ == RaftMode::LEADER) + << "`AdvanceCommitIndex` can only be called from leader mode"; + + std::vector match_indices; + for (const auto &peer : peer_states_) { + match_indices.push_back(peer.second->match_index); + } + match_indices.push_back(storage_.GetLastLogIndex()); + sort(match_indices.begin(), match_indices.end(), std::greater()); + LogIndex new_commit_index_ = match_indices[(config_.members.size() - 1) / 2]; + + LogInfo("Trying to advance commit index {} to {}", commit_index_, + new_commit_index_); + + /* This can happen because we reset `match_index` to 0 for every peer when + * elected. */ + if (commit_index_ >= new_commit_index_) { + return; + } + + /* [Raft thesis, section 3.6.2] + * (...) Raft never commits log entries from previous terms by counting + * replicas. Only log entries from the leader's current term are committed by + * counting replicas; once an entry from the current term has been committed + * in this way, then all prior entries are committed indirectly because of the + * Log Matching Property." */ + if (storage_.GetLogTerm(new_commit_index_) != term_) { + LogInfo("Cannot commit log entry from previous term"); + return; + } + + commit_index_ = std::max(commit_index_, new_commit_index_); +} + +template +void RaftMemberImpl::AppendEntries(const std::string &peer_id, + RaftPeerState &peer_state, + std::unique_lock &lock) { + LogInfo("Appending entries to {}", peer_id); + + PeerRPCRequest request; + request.type = PeerRPCRequest::Type::APPEND_ENTRIES; + request.append_entries.leader_term = term_; + request.append_entries.leader_id = id_; + + request.append_entries.prev_log_index = peer_state.next_index - 1; + request.append_entries.prev_log_term = + storage_.GetLogTerm(peer_state.next_index - 1); + + if (!peer_state.suppress_log_entries && + peer_state.next_index <= storage_.GetLastLogIndex()) { + request.append_entries.entries = + storage_.GetLogSuffix(peer_state.next_index); + } else { + request.append_entries.entries = {}; + } + + request.append_entries.leader_commit = commit_index_; + + PeerRPCReply reply; + + if (!SendRPC(peer_id, request, reply, lock)) { + /* There is probably something wrong with this peer, let's avoid sending log + * entries. */ + peer_state.suppress_log_entries = true; + return; + } + + DCHECK(mode_ == RaftMode::LEADER) + << "Elected leader for term should never change"; + DCHECK(reply.append_entries.term == term_) << "Got stale AppendEntries reply"; + + if (reply.append_entries.success) { + /* We've found a match, we can start sending log entries. */ + peer_state.suppress_log_entries = false; + + LogIndex new_match_index = request.append_entries.prev_log_index + + request.append_entries.entries.size(); + DCHECK(peer_state.match_index <= new_match_index) + << "`match_index` should increase monotonically within a term"; + peer_state.match_index = new_match_index; + AdvanceCommitIndex(); + peer_state.next_index = peer_state.match_index + 1; + peer_state.next_heartbeat_time = Clock::now() + config_.heartbeat_interval; + } else { + DCHECK(peer_state.next_index > 1) + << "Log replication should not fail for first log entry."; + --peer_state.next_index; + } + + state_changed_.notify_all(); +} + +template +PeerRPCReply::RequestVote RaftMemberImpl::OnRequestVote( + const typename PeerRPCRequest::RequestVote &request) { + std::lock_guard lock(mutex_); + LogInfo("RequestVote RPC request from {}", request.candidate_id); + + PeerRPCReply::RequestVote reply; + + /* [Raft thesis, Section 3.3] + * "If a server receives a request with a stale term number, it rejects the + * request." */ + if (request.candidate_term < term_) { + reply.term = term_; + reply.vote_granted = false; + return reply; + } + + /* [Raft thesis, Section 3.6.1] + * "Raft uses the voting process to prevent a candidate from winning an + * election unless its log contains all committed entries. (...) The + * RequestVote RPC implements this restriction: the RPC includes information + * about the candidate's log, and the voter denies its vote if its own log is + * more up-to-date than that of the candidate. Raft determines which of two + * logs is more up-to-date by comparing the index and term of the last entries + * in the logs. If the logs have last entries with different terms, then the + * log with the later term is more up-to-date. If the logs end with the same + * term, then whichever log is longer is more up-to-date." */ + LogIndex my_last_log_index = storage_.GetLastLogIndex(); + TermId my_last_log_term = storage_.GetLogTerm(my_last_log_index); + if (my_last_log_term > request.last_log_term || + (my_last_log_term == request.last_log_term && + my_last_log_index > request.last_log_index)) { + reply.term = term_; + reply.vote_granted = false; + return reply; + } + + /* [Raft thesis, Section 3.4] + * "Each server will vote for at most one candidate in a given term, on a + * firstcome-first-served basis." + */ + + /* We voted for someone else in this term. */ + if (request.candidate_term == term_ && voted_for_ && + *voted_for_ != request.candidate_id) { + reply.term = term_; + reply.vote_granted = false; + return reply; + } + + /* [Raft thesis, Section 3.3] + * "Current terms are exchanged whenever servers communicate; if one server's + * current term is smaller than the other's, then it updates its current term + * to the larger value. If a candidate or leader discovers that its term is + * out of date, it immediately reverts to follower state." */ + if (request.candidate_term > term_) { + if (mode_ != RaftMode::FOLLOWER) { + CandidateOrLeaderTransitionToFollower(); + } + } + + /* Now we now we will vote for this candidate, because it's term is at least + * as big as ours and we haven't voted for anyone else. */ + UpdateTermAndVotedFor(request.candidate_term, request.candidate_id); + + /* [Raft thesis, Section 3.4] + * A server remains in follower state as long as it receives valid RPCs from a + * leader or candidate. */ + SetElectionTimer(); + state_changed_.notify_all(); + + reply.term = request.candidate_term; + reply.vote_granted = true; + return reply; +} + +template +PeerRPCReply::AppendEntries RaftMemberImpl::OnAppendEntries( + const typename PeerRPCRequest::AppendEntries &request) { + std::lock_guard lock(mutex_); + LogInfo("AppendEntries RPC request from {}", request.leader_id); + + PeerRPCReply::AppendEntries reply; + + /* [Raft thesis, Section 3.3] + * "If a server receives a request with a stale term number, it rejects the + * request." */ + if (request.leader_term < term_) { + reply.term = term_; + reply.success = false; + return reply; + } + + /* [Raft thesis, Section 3.3] + * "Current terms are exchanged whenever servers communicate; if one server's + * current term is smaller than the other's, then it updates its current term + * to the larger value. If a candidate or leader discovers that its term is + * out of date, it immediately reverts to follower state." */ + if (request.leader_term > term_) { + if (mode_ != RaftMode::FOLLOWER) { + CandidateOrLeaderTransitionToFollower(); + } + UpdateTermAndVotedFor(request.leader_term, {}); + } + + /* [Raft thesis, Section 3.4] + * "While waiting for votes, a candidate may receive an AppendEntries RPC from + * another server claiming to be leader. If the leader's term (included in its + * RPC) is at least as large as the candidate's current term, then the + * candidate recognizes the leader as legitimate and returns to follower + * state." */ + if (mode_ == RaftMode::CANDIDATE && request.leader_term == term_) { + CandidateOrLeaderTransitionToFollower(); + } + + DCHECK(mode_ != RaftMode::LEADER) + << "Leader cannot accept `AppendEntries` RPC"; + DCHECK(term_ == request.leader_term) << "Term should be equal to request " + "term when accepting `AppendEntries` " + "RPC"; + + leader_ = request.leader_id; + + /* [Raft thesis, Section 3.4] + * A server remains in follower state as long as it receives valid RPCs from a + * leader or candidate. */ + SetElectionTimer(); + state_changed_.notify_all(); + + /* [Raft thesis, Section 3.5] + * "When sending an AppendEntries RPC, the leader includes the index and term + * of the entry in its log that immediately precedes the new entries. If the + * follower does not find an entry in its log with the same index and term, + * then it refuses the new entries." */ + if (request.prev_log_index > storage_.GetLastLogIndex() || + storage_.GetLogTerm(request.prev_log_index) != request.prev_log_term) { + reply.term = term_; + reply.success = false; + return reply; + } + + /* [Raft thesis, Section 3.5] + * "To bring a follower's log into consistency with its own, the leader must + * find the latest log entry where the two logs agree, delete any entries in + * the follower's log after that point, and send the follower all of the + * leader's entries after that point." */ + + /* Entry at `request.prev_log_index` is the last entry where ours and leader's + * logs agree. It's time to replace the tail of the log with new entries from + * the leader. We have to be careful here as duplicated AppendEntries RPCs + * could cause data loss. + * + * There is a possibility that an old AppendEntries RPC is duplicated and + * received after processing newer one. For example, leader appends entry 3 + * and then entry 4, but follower recieves entry 3, then entry 4, and then + * entry 3 again. We have to be careful not to delete entry 4 from log when + * processing the last RPC. */ + LogIndex index = request.prev_log_index; + auto it = request.entries.begin(); + for (; it != request.entries.end(); ++it) { + ++index; + if (index > storage_.GetLastLogIndex()) { + break; + } + if (storage_.GetLogTerm(index) != it->term) { + LogInfo("Truncating log suffix from index {}", index); + DCHECK(commit_index_ < index) + << "Committed entries should never be truncated form the log"; + storage_.TruncateLogSuffix(index); + break; + } + } + + LogInfo("Appending {} out of {} logs from {}.", request.entries.end() - it, + request.entries.size(), request.leader_id); + + for (; it != request.entries.end(); ++it) { + storage_.AppendLogEntry(*it); + } + + commit_index_ = std::max(commit_index_, request.leader_commit); + + /* Let's bump election timer once again, we don't want to take down the leader + * because of our long disk writes. */ + SetElectionTimer(); + state_changed_.notify_all(); + + reply.term = term_; + reply.success = true; + return reply; +} + +} // namespace impl + +template +RaftMember::RaftMember(RaftNetworkInterface &network, + RaftStorageInterface &storage, + const MemberId &id, const RaftConfig &config) + : impl_(network, storage, id, config) { + timer_thread_ = + std::thread(&impl::RaftMemberImpl::TimerThreadMain, &impl_); + + for (const auto &peer_id : config.members) { + if (peer_id != id) { + peer_threads_.emplace_back(&impl::RaftMemberImpl::PeerThreadMain, + &impl_, peer_id); + } + } +} + +template +RaftMember::~RaftMember() { + impl_.Stop(); + timer_thread_.join(); + + for (auto &peer_thread : peer_threads_) { + peer_thread.join(); + } +} + +template +PeerRPCReply::RequestVote RaftMember::OnRequestVote( + const typename PeerRPCRequest::RequestVote &request) { + return impl_.OnRequestVote(request); +} + +template +PeerRPCReply::AppendEntries RaftMember::OnAppendEntries( + const typename PeerRPCRequest::AppendEntries &request) { + return impl_.OnAppendEntries(request); +} + +} // namespace communication::raft diff --git a/src/communication/raft/raft.cpp b/src/communication/raft/raft.cpp deleted file mode 100644 index 19872ae50..000000000 --- a/src/communication/raft/raft.cpp +++ /dev/null @@ -1,263 +0,0 @@ -#include "raft.hpp" - -#include - -#include "fmt/format.h" -#include "glog/logging.h" - -#include "communication/raft/raft_network.hpp" - -using std::experimental::optional; -using std::placeholders::_1; -using std::placeholders::_2; - -using namespace communication::reactor; -using namespace std::chrono_literals; - -namespace communication::raft { - -RaftMember::RaftMember(System &system, const std::string &id, - const RaftConfig &config, RaftNetworkInterface &network) - : id_(id), - system_(system), - config_(config), - network_(network), - mode_(Mode::FOLLOWER), - leader_watchdog_(config_.leader_timeout_min, config_.leader_timeout_max, - [this]() { - LocalChannelWriter channel(id_, "main", system_); - channel.Send(); - }), - heartbeat_watchdog_( - config_.heartbeat_interval, config_.heartbeat_interval, - [this]() { - for (const auto &member : config_.members) { - if (id_ != member) { - network_.AppendEntries(member, MAppendEntries(term_, id_)); - } - } - }, - true), - reactor_(system.Spawn(id, [this](Reactor &r) { - EventStream *stream = r.main_.first; - - stream->OnEvent([this]( - const MLeaderTimeout &, const Subscription &) { RunElection(); }); - - stream->OnEvent( - [this](const MRequestVote &req, const Subscription &) { - network_.RequestVoteReply(req.sender_id, OnRequestVote(req)); - }); - stream->OnEvent( - [this](const MRequestVoteReply &req, const Subscription &) { - OnRequestVoteReply(req); - }); - - stream->OnEvent( - [this](const MAppendEntries &req, const Subscription &) { - network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req)); - }); - stream->OnEvent( - [this](const MAppendEntriesReply &rep, const Subscription &) { - OnAppendEntriesReply(rep); - }); - })) {} - -RaftMember::~RaftMember() { LogInfo("Shutting down..."); } - -template -void RaftMember::LogInfo(const std::string &format, Args &&... args) { - LOG(INFO) << fmt::format("(node = {}, term = {}) ", id_, term_) - << fmt::format(format, std::forward(args)...) << std::endl; -} - -void RaftMember::TransitionToFollower() { - /* Stop sending heartbeats to followers, start listening for them. */ - heartbeat_watchdog_.Block(); - leader_watchdog_.Unblock(); - - mode_ = Mode::FOLLOWER; -} - -void RaftMember::TransitionToCandidate() { - /* This transition only happens if we are in follower or candidate mode. - * Leader timeout watchdog is not blocked because it also triggers new - * elections if the current one times out. */ - DCHECK(mode_ != Mode::LEADER) - << "Transition to candidate mode from leader mode."; - mode_ = Mode::CANDIDATE; - votes_ = {}; -} - -void RaftMember::TransitionToLeader() { - /* This transition only happens if we are in candidate mode. */ - DCHECK(mode_ == Mode::CANDIDATE) - << "Transition to leader mode from leader or follower mode."; - - /* Stop listening for leader heartbeats, start sending them. */ - leader_watchdog_.Block(); - heartbeat_watchdog_.Unblock(); - - mode_ = Mode::LEADER; - leader_ = id_; -} - -void RaftMember::UpdateTerm(int new_term) { - term_ = new_term; - voted_for_ = {}; - leader_ = {}; -} - -void RaftMember::RunElection() { - /* Elections will be skipped if we believe we are the leader. This can happen - * if leader timeout message was delayed for some reason. */ - if (mode_ == Mode::LEADER) { - return; - } - - LogInfo("Running for leader."); - - /* [Raft paper, Section 5.2.] - * "To begin an election, a follower increments its current term and - * transitions to candidate state. It then votes for itself and issues - * RequestVote RPCs in parallel to each of the other servers in - * the cluster." */ - TransitionToCandidate(); - UpdateTerm(term_ + 1); - - voted_for_ = id_; - votes_.insert(id_); - - for (const auto &member_id : config_.members) { - if (member_id != id_) { - network_.RequestVote(member_id, MRequestVote(term_, id_)); - } - } -} - -MRequestVoteReply RaftMember::OnRequestVote(const MRequestVote &req) { - LogInfo("Vote requested from {}, candidate_term = {}", req.sender_id, - req.term); - /* [Raft paper, Section 5.1] - * "Current terms are exchanged whenever servers communicate; if one server's - * current term is smaller than the other's, then it updates its current term - * to the larger value. If a candidate or leader discovers that its term is - * out of date, it immediately reverts to follower state." */ - if (req.term > term_) { - TransitionToFollower(); - UpdateTerm(req.term); - } - - /* [Raft paper, Figure 2] - * "Reply false if term < currentTerm." */ - if (req.term < term_) { - return MRequestVoteReply(term_, id_, false); - } - - /* [Raft paper, Figure 2] - * "If votedFor is null or candidateId, and candidate's log is at least as - * up-to-date as receiver's log, grant vote." */ - if (voted_for_ && *voted_for_ != req.sender_id) { - return MRequestVoteReply(term_, id_, false); - } - - LogInfo("Granting vote to {}.", req.sender_id); - - voted_for_ = req.sender_id; - /* [Raft paper, Section 5.2] - * "A server remains in follower state as long as it receives valid RPCs from - * a leader or candidate." */ - leader_watchdog_.Notify(); - - DCHECK(mode_ != Mode::LEADER) << "Granted vote as a leader."; - - return MRequestVoteReply(term_, id_, true); -} - -void RaftMember::OnRequestVoteReply(const MRequestVoteReply &rep) { - /* Ignore leftover messages from old elections. */ - if (mode_ != Mode::CANDIDATE || rep.term < term_) { - return; - } - - /* [Raft paper, Section 5.1] - * "Current terms are exchanged whenever servers communicate; if one server's - * current term is smaller than the other's, then it updates its current term - * to the larger value. If a candidate or leader discovers that its term is - * out of date, it immediately reverts to follower state." */ - if (rep.term > term_) { - LogInfo( - "Vote denied from {} with greater term {}, transitioning to follower " - "mode.", - rep.sender_id, rep.term); - TransitionToFollower(); - UpdateTerm(rep.term); - return; - } - - if (!rep.success) { - LogInfo("Vote rejected from {}.", rep.sender_id); - return; - } - - LogInfo("Vote granted from {}.", rep.sender_id); - votes_.insert(rep.sender_id); - if (2 * votes_.size() > config_.members.size()) { - LogInfo("Elected as leader."); - TransitionToLeader(); - } -} - -MAppendEntriesReply RaftMember::OnAppendEntries(const MAppendEntries &req) { - LogInfo("Append entries from {}.", req.sender_id); - - /* [Raft paper, Section 5.1] - * "If a server receives a request with a stale term number, it rejects - * the request." */ - if (req.term < term_) { - return MAppendEntriesReply(term_, id_, false); - } - - /* [Raft paper, Section 5.1] - * "Current terms are exchanged whenever servers communicate; if one server's - * current term is smaller than the other's, then it updates its current term - * to the larger value. If a candidate or leader discovers that its term is - * out of date, it immediately reverts to follower state." */ - if (req.term > term_) { - TransitionToFollower(); - UpdateTerm(req.term); - } - - /* [Raft paper, Section 5.2] - * "While waiting for votes, a candidate may receive an AppendEntries RPC from - * another server claiming to be leader. If the leader’s term (included in its - * RPC) is at least as large as the candidate’s current term, then the - * candidate recognizes the leader as legitimate and returns to follower - * state." */ - if (req.term == term_ && mode_ == Mode::CANDIDATE) { - TransitionToFollower(); - UpdateTerm(req.term); - } - - /* [Raft paper, Section 5.2] - * "A server remains in follower state as long as it receives - * valid RPCs from a leader or candidate." */ - leader_ = req.sender_id; - leader_watchdog_.Notify(); - - return MAppendEntriesReply(term_, id_, true); -} - -void RaftMember::OnAppendEntriesReply(const MAppendEntriesReply &rep) { - /* [Raft paper, Section 5.1] - * "Current terms are exchanged whenever servers communicate; if one server's - * current term is smaller than the other's, then it updates its current term - * to the larger value. If a candidate or leader discovers that its term is - * out of date, it immediately reverts to follower state." */ - if (rep.term > term_) { - TransitionToFollower(); - UpdateTerm(rep.term); - } -} - -} // namespace communication::raft diff --git a/src/communication/raft/raft.hpp b/src/communication/raft/raft.hpp index 1aaa73ad9..063965341 100644 --- a/src/communication/raft/raft.hpp +++ b/src/communication/raft/raft.hpp @@ -1,70 +1,258 @@ #pragma once #include +#include #include +#include +#include +#include #include +#include #include -#include "communication/reactor/reactor_local.hpp" -#include "utils/watchdog.hpp" - namespace communication::raft { -struct MAppendEntries; -struct MAppendEntriesReply; -struct MRequestVote; -struct MRequestVoteReply; +using Clock = std::chrono::system_clock; +using TimePoint = std::chrono::system_clock::time_point; -class RaftNetworkInterface; +using MemberId = std::string; +using TermId = uint64_t; + +using ClientId = uint64_t; +using CommandId = uint64_t; + +using LogIndex = uint64_t; + +template +struct LogEntry { + int term; + + std::experimental::optional command; + + bool operator==(const LogEntry &rhs) const { + return term == rhs.term && command == rhs.command; + } + bool operator!=(const LogEntry &rhs) const { return !(*this == rhs); } +}; + +/* Raft RPC requests and replies as described in [Raft thesis, Figure 3.1]. */ +template +struct PeerRPCRequest { + enum class Type { REQUEST_VOTE, APPEND_ENTRIES }; + + Type type; + + struct RequestVote { + TermId candidate_term; + MemberId candidate_id; + LogIndex last_log_index; + TermId last_log_term; + } request_vote; + + struct AppendEntries { + TermId leader_term; + MemberId leader_id; + LogIndex prev_log_index; + TermId prev_log_term; + std::vector> entries; + LogIndex leader_commit; + } append_entries; + + TermId Term() const { + switch (type) { + case Type::REQUEST_VOTE: + return request_vote.candidate_term; + case Type::APPEND_ENTRIES: + return append_entries.leader_term; + } + } +}; + +struct PeerRPCReply { + enum class Type { REQUEST_VOTE, APPEND_ENTRIES }; + + Type type; + + struct RequestVote { + TermId term; + bool vote_granted; + } request_vote; + + struct AppendEntries { + TermId term; + bool success; + } append_entries; + + TermId Term() const { + switch (type) { + case Type::REQUEST_VOTE: + return request_vote.term; + case Type::APPEND_ENTRIES: + return append_entries.term; + } + } +}; + +template +class RaftNetworkInterface { + public: + virtual ~RaftNetworkInterface() = default; + + /* Returns false if RPC failed for some reason (e.g. cannot establish + * connection, request timeout or request cancelled). Otherwise `reply` + * contains response from peer. */ + virtual bool SendRPC(const MemberId &recipient, + const PeerRPCRequest &request, + PeerRPCReply &reply) = 0; +}; + +template +class RaftStorageInterface { + public: + virtual ~RaftStorageInterface() = default; + + virtual void WriteTermAndVotedFor( + const TermId term, + const std::experimental::optional &voted_for) = 0; + virtual std::pair> + GetTermAndVotedFor() = 0; + virtual void AppendLogEntry(const LogEntry &entry) = 0; + virtual TermId GetLogTerm(const LogIndex index) = 0; + virtual LogEntry GetLogEntry(const LogIndex index) = 0; + virtual std::vector> GetLogSuffix(const LogIndex index) = 0; + virtual LogIndex GetLastLogIndex() = 0; + virtual void TruncateLogSuffix(const LogIndex index) = 0; +}; struct RaftConfig { - std::vector members; + std::vector members; std::chrono::milliseconds leader_timeout_min; std::chrono::milliseconds leader_timeout_max; std::chrono::milliseconds heartbeat_interval; }; -class RaftMember { +namespace impl { + +enum class RaftMode { FOLLOWER, CANDIDATE, LEADER }; + +struct RaftPeerState { + bool request_vote_done; + bool voted_for_me; + LogIndex match_index; + LogIndex next_index; + bool suppress_log_entries; + Clock::time_point next_heartbeat_time; +}; + +template +class RaftMemberImpl { public: - RaftMember(communication::reactor::System &system, const std::string &id, - const RaftConfig &config, RaftNetworkInterface &network); - virtual ~RaftMember(); + explicit RaftMemberImpl(RaftNetworkInterface &network, + RaftStorageInterface &storage, + const MemberId &id, const RaftConfig &config); - protected: - std::string id_; - std::experimental::optional leader_; + ~RaftMemberImpl(); - private: - enum class Mode { FOLLOWER, CANDIDATE, LEADER }; + void Stop(); - communication::reactor::System &system_; - RaftConfig config_; - RaftNetworkInterface &network_; - Mode mode_; - uint64_t term_ = 0; - std::experimental::optional voted_for_; + void TimerThreadMain(); + void PeerThreadMain(std::string peer_id); - std::set votes_; + void UpdateTermAndVotedFor( + const TermId new_term, + const std::experimental::optional &new_voted_for); + void CandidateOrLeaderTransitionToFollower(); + void CandidateTransitionToLeader(); - Watchdog leader_watchdog_; - Watchdog heartbeat_watchdog_; + bool SendRPC(const std::string &recipient, + const PeerRPCRequest &request, PeerRPCReply &reply, + std::unique_lock &lock); - std::unique_ptr reactor_; + void StartNewElection(); + void SetElectionTimer(); + bool CountVotes(); + void RequestVote(const MemberId &peer_id, RaftPeerState &peer_state, + std::unique_lock &lock); - void RunElection(); - void TransitionToFollower(); - void TransitionToCandidate(); - void TransitionToLeader(); - void UpdateTerm(int new_term); + void AdvanceCommitIndex(); + void AppendEntries(const MemberId &peer_id, RaftPeerState &peer_state, + std::unique_lock &lock); - MRequestVoteReply OnRequestVote(const MRequestVote &); - void OnRequestVoteReply(const MRequestVoteReply &); - - MAppendEntriesReply OnAppendEntries(const MAppendEntries &); - void OnAppendEntriesReply(const MAppendEntriesReply &); + PeerRPCReply::RequestVote OnRequestVote( + const typename PeerRPCRequest::RequestVote &request); + PeerRPCReply::AppendEntries OnAppendEntries( + const typename PeerRPCRequest::AppendEntries &request); template void LogInfo(const std::string &, Args &&...); + + RaftNetworkInterface &network_; + RaftStorageInterface &storage_; + + MemberId id_; + RaftConfig config_; + + TermId term_; + RaftMode mode_ = RaftMode::FOLLOWER; + std::experimental::optional voted_for_ = std::experimental::nullopt; + std::experimental::optional leader_ = std::experimental::nullopt; + + TimePoint next_election_time_; + + LogIndex commit_index_ = 0; + + bool exiting_ = false; + + std::map> peer_states_; + + /* This mutex protects all of the internal state. */ + std::mutex mutex_; + + /* Used to notify waiting threads that some of the internal state has changed. + * It is notified when following events occurr: + * - mode change + * - election start + * - `next_election_time_` update on RPC from leader or candidate + * - destructor is called + * - `commit_index_` is advanced + */ + std::condition_variable state_changed_; + + std::mt19937_64 rng_ = std::mt19937_64(std::random_device{}()); +}; + +} // namespace internal + +template +class RaftMember final { + public: + explicit RaftMember(RaftNetworkInterface &network, + RaftStorageInterface &storage, const MemberId &id, + const RaftConfig &config); + ~RaftMember(); + + /* Just to make the tests work for now until we clean up the reactor stuff. */ + std::experimental::optional Leader() { + std::lock_guard lock(impl_.mutex_); + return impl_.leader_; + } + MemberId Id() const { return impl_.id_; } + + PeerRPCReply::RequestVote OnRequestVote( + const typename PeerRPCRequest::RequestVote &request); + PeerRPCReply::AppendEntries OnAppendEntries( + const typename PeerRPCRequest::AppendEntries &request); + + private: + impl::RaftMemberImpl impl_; + + /* Timer thread for triggering elections. */ + std::thread timer_thread_; + + /* One thread per peer for outgoing RPCs. */ + std::vector peer_threads_; }; } // namespace communication::raft + +#include "raft-inl.hpp" diff --git a/src/communication/raft/raft_network.hpp b/src/communication/raft/raft_network.hpp deleted file mode 100644 index a9b2315a5..000000000 --- a/src/communication/raft/raft_network.hpp +++ /dev/null @@ -1,158 +0,0 @@ -#pragma once - -#include -#include - -#include "communication/reactor/reactor_local.hpp" - -namespace communication::raft { - -struct MLeaderTimeout : public communication::reactor::Message {}; - -struct RaftMessage : public communication::reactor::Message { - RaftMessage(int term, const std::string &sender_id) - : term(term), sender_id(sender_id) {} - int term; - std::string sender_id; -}; - -struct RaftMessageReply : public RaftMessage { - RaftMessageReply(int term, const std::string &sender_id, bool success) - : RaftMessage(term, sender_id), success(success) {} - std::experimental::optional leader; - bool success; -}; - -struct MRequestVote : public RaftMessage { - MRequestVote(int candidate_term, const std::string &candidate_id) - : RaftMessage(candidate_term, candidate_id) {} -}; - -struct MRequestVoteReply : public RaftMessageReply { - MRequestVoteReply(int term, const std::string &sender_id, bool vote_granted) - : RaftMessageReply(term, sender_id, vote_granted) {} -}; - -struct MAppendEntries : public RaftMessage { - MAppendEntries(int term, const std::string &sender_id) - : RaftMessage(term, sender_id) {} -}; - -struct MAppendEntriesReply : public RaftMessageReply { - MAppendEntriesReply(int term, const std::string &sender_id, bool success) - : RaftMessageReply(term, sender_id, success) {} -}; - -class RaftNetworkInterface { - public: - virtual ~RaftNetworkInterface() {} - - virtual bool RequestVote(const std::string &recipient, - const MRequestVote &msg) = 0; - virtual bool RequestVoteReply(const std::string &recipient, - const MRequestVoteReply &msg) = 0; - virtual bool AppendEntries(const std::string &recipient, - const MAppendEntries &msg) = 0; - virtual bool AppendEntriesReply(const std::string &recipient, - const MAppendEntriesReply &msg) = 0; -}; - -class LocalReactorNetworkInterface : public RaftNetworkInterface { - public: - explicit LocalReactorNetworkInterface(communication::reactor::System &system) - : system_(system) {} - - bool RequestVote(const std::string &recipient, - const MRequestVote &msg) override { - return SendMessage(recipient, msg); - } - - bool RequestVoteReply(const std::string &recipient, - const MRequestVoteReply &msg) override { - return SendMessage(recipient, msg); - } - - bool AppendEntries(const std::string &recipient, - const MAppendEntries &msg) override { - return SendMessage(recipient, msg); - } - - bool AppendEntriesReply(const std::string &recipient, - const MAppendEntriesReply &msg) override { - return SendMessage(recipient, msg); - } - - private: - template - bool SendMessage(const std::string &recipient, const TMessage &msg) { - reactor::LocalChannelWriter channel(recipient, "main", system_); - channel.Send(msg); - // TODO: We always return true here even though we do not know if message - // was delievered or not. Maybe return value of functions in - // RaftNetworkInterface should be changed to be void, not bool. - return true; - } - - communication::reactor::System &system_; -}; - -class FakeNetworkInterface : public RaftNetworkInterface { - public: - explicit FakeNetworkInterface(communication::reactor::System &system) - : system_(system) {} - - bool RequestVote(const std::string &recipient, - const MRequestVote &msg) override { - return SendMessage(recipient, msg); - } - - bool RequestVoteReply(const std::string &recipient, - const MRequestVoteReply &msg) override { - return SendMessage(recipient, msg); - } - - bool AppendEntries(const std::string &recipient, - const MAppendEntries &msg) override { - return SendMessage(recipient, msg); - } - - bool AppendEntriesReply(const std::string &recipient, - const MAppendEntriesReply &msg) override { - return SendMessage(recipient, msg); - } - - void Disconnect(const std::string &id) { - std::lock_guard guard(mutex_); - status_[id] = false; - } - - void Connect(const std::string &id) { - std::lock_guard guard(mutex_); - status_[id] = true; - } - - private: - template - bool SendMessage(const std::string &recipient, const TMessage &msg) { - bool ok; - - { - std::lock_guard guard(mutex_); - ok = status_[msg.sender_id] && status_[recipient]; - } - - if (ok) { - reactor::LocalChannelWriter channel(recipient, "main", system_); - channel.Send(msg); - } - - return ok; - } - - communication::reactor::System &system_; - - std::mutex mutex_; - std::unordered_map status_; -}; - -} // namespace communication::raft diff --git a/src/communication/raft/raft_reactor.hpp b/src/communication/raft/raft_reactor.hpp new file mode 100644 index 000000000..6dbf9630d --- /dev/null +++ b/src/communication/raft/raft_reactor.hpp @@ -0,0 +1,136 @@ +#pragma once + +#include + +#include "communication/raft/raft.hpp" +#include "communication/reactor/reactor_local.cpp" + +/* This is junk, hopefully we get rid of reactor leftovers soon. */ + +namespace communication::raft { + +using namespace communication::reactor; + +template +class LocalReactorNetworkInterface : public RaftNetworkInterface { + public: + explicit LocalReactorNetworkInterface( + communication::reactor::System &system, const std::string &id, + const std::function &)> + &rpc_callback) + : id_(id), + reactor_(system.Spawn( + id, + [this, rpc_callback](reactor::Reactor &r) { + reactor::EventStream *stream = r.main_.first; + stream->OnEvent([this, rpc_callback, &r]( + const MPeerRPCRequest &request, + const reactor::Subscription &) { + if (!connected_) { + return; + } + PeerRPCReply reply = rpc_callback(request.request); + reactor::LocalChannelWriter channel_writer( + request.sender, request.reply_channel, r.system_); + auto channel = + r.system_.Resolve(request.sender, request.reply_channel); + channel_writer.Send(reply); + }); + })), + connected_(false) {} + + ~LocalReactorNetworkInterface() {} + + void Connect() { connected_ = true; } + void Disconnect() { connected_ = false; } + + bool SendRPC(const std::string &recipient, + const PeerRPCRequest &request, PeerRPCReply &reply) { + if (!connected_) { + return false; + } + reactor::LocalChannelWriter request_channel_writer(recipient, "main", + reactor_->system_); + + std::string reply_channel_id = fmt::format("{}", request_id_++); + auto reply_channel = reactor_->Open(reply_channel_id).first; + + std::atomic got_reply{false}; + + reply_channel->template OnEvent([&reply, &got_reply]( + const MPeerRPCReply &reply_message, const reactor::Subscription &) { + reply = reply_message.reply; + got_reply = true; + }); + + request_channel_writer.Send( + MPeerRPCRequest(request, id_, reply_channel_id)); + + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + return got_reply; + } + + private: + struct MPeerRPCRequest : public communication::reactor::Message { + MPeerRPCRequest(const PeerRPCRequest &request, + const std::string &sender, const std::string &reply_channel) + : request(request), sender(sender), reply_channel(reply_channel) {} + PeerRPCRequest request; + std::string sender; + std::string reply_channel; + }; + + struct MPeerRPCReply : public communication::reactor::Message { + MPeerRPCReply(const PeerRPCReply &reply) : reply(reply) {} + PeerRPCReply reply; + }; + + std::string id_; + std::unique_ptr reactor_; + std::atomic connected_; + std::atomic request_id_{0}; +}; + +struct MShutdown : public Message {}; + +template +class RaftMemberLocalReactor { + public: + explicit RaftMemberLocalReactor(communication::reactor::System &system, + RaftStorageInterface &storage, + const std::string &id, + const RaftConfig &config) + : network_(system, id, + [this](const PeerRPCRequest &request) -> PeerRPCReply { + return this->OnRPC(request); + }), + member_(network_, storage, id, config) {} + + void Connect() { network_.Connect(); } + void Disconnect() { network_.Disconnect(); } + + virtual ~RaftMemberLocalReactor() {} + + private: + LocalReactorNetworkInterface network_; + + PeerRPCReply OnRPC(const PeerRPCRequest &request) { + PeerRPCReply reply; + if (request.type == PeerRPCRequest::Type::REQUEST_VOTE) { + reply.type = PeerRPCReply::Type::REQUEST_VOTE; + reply.request_vote = member_.OnRequestVote(request.request_vote); + } else if (request.type == PeerRPCRequest::Type::APPEND_ENTRIES) { + reply.type = PeerRPCReply::Type::APPEND_ENTRIES; + reply.append_entries = member_.OnAppendEntries(request.append_entries); + } else { + LOG(FATAL) << "Unknown Raft RPC request type"; + } + return reply; + } + + protected: + RaftMember member_; +}; + +} // namespace communication::raft diff --git a/src/communication/raft/test_utils.hpp b/src/communication/raft/test_utils.hpp new file mode 100644 index 000000000..75f810957 --- /dev/null +++ b/src/communication/raft/test_utils.hpp @@ -0,0 +1,106 @@ +#include "communication/raft/raft.hpp" + +namespace communication::raft::test_utils { + +struct DummyState { + struct Change { + bool operator==(const Change &) const { return true; } + bool operator!=(const Change &) const { return false; } + }; + struct Query {}; + struct Result {}; +}; + +template +class NoOpNetworkInterface : public RaftNetworkInterface { + public: + ~NoOpNetworkInterface() {} + + bool SendRPC(const MemberId &recipient, const PeerRPCRequest &request, + PeerRPCReply &reply) { + return false; + } +}; + +template +class NextReplyNetworkInterface : public RaftNetworkInterface { + public: + ~NextReplyNetworkInterface() {} + + bool SendRPC(const MemberId &recipient, const PeerRPCRequest &request, + PeerRPCReply &reply) { + on_request_(request); + if (!next_reply_) { + return false; + } + reply = *next_reply_; + return true; + } + + std::function &)> on_request_; + std::experimental::optional next_reply_; +}; + +template +class NoOpStorageInterface : public RaftStorageInterface { + public: + NoOpStorageInterface() {} + + void WriteTermAndVotedFor(const TermId, + const std::experimental::optional &) {} + + std::pair> + GetTermAndVotedFor() { + return {0, {}}; + } + void AppendLogEntry(const LogEntry &) {} + TermId GetLogTerm(const LogIndex) { return 0; } + LogEntry GetLogEntry(const LogIndex) { assert(false); } + std::vector> GetLogSuffix(const LogIndex) { return {}; } + LogIndex GetLastLogIndex() { return 0; } + void TruncateLogSuffix(const LogIndex) {} + + TermId term_; + std::experimental::optional voted_for_; + std::vector> log_; +}; + +template +class InMemoryStorageInterface : public RaftStorageInterface { + public: + InMemoryStorageInterface( + const TermId term, + const std::experimental::optional &voted_for, + const std::vector> log) + : term_(term), voted_for_(voted_for), log_(log) {} + + void WriteTermAndVotedFor( + const TermId term, + const std::experimental::optional &voted_for) { + term_ = term; + voted_for_ = voted_for; + } + + std::pair> + GetTermAndVotedFor() { + return {term_, voted_for_}; + } + void AppendLogEntry(const LogEntry &entry) { log_.push_back(entry); } + TermId GetLogTerm(const LogIndex index) { + return index > 0 ? log_[index - 1].term : 0; + } + LogEntry GetLogEntry(const LogIndex index) { return log_[index - 1]; } + std::vector> GetLogSuffix(const LogIndex index) { + return std::vector>(log_.begin() + index - 1, log_.end()); + } + LogIndex GetLastLogIndex(void) { return log_.size(); } + void TruncateLogSuffix(const LogIndex index) { + log_.erase(log_.begin() + index - 1, log_.end()); + } + + TermId term_; + std::experimental::optional voted_for_; + std::vector> log_; +}; + +} diff --git a/tests/manual/raft_experiments.cpp b/tests/manual/raft_experiments.cpp index 3292708b7..c0d3489ff 100644 --- a/tests/manual/raft_experiments.cpp +++ b/tests/manual/raft_experiments.cpp @@ -3,40 +3,46 @@ #include "fmt/format.h" #include "glog/logging.h" -#include "communication/raft/raft.hpp" -#include "communication/raft/raft_network.hpp" +#include "communication/raft/raft_reactor.hpp" +#include "communication/raft/test_utils.hpp" using std::chrono::milliseconds; using std::experimental::optional; using namespace communication::raft; +using namespace communication::raft::test_utils; using namespace communication::reactor; using namespace std::chrono_literals; -class RaftMemberTest : RaftMember { +template +class RaftMemberTest : public RaftMemberLocalReactor { public: - std::string Id() const { return id_; } - optional Leader() const { return leader_; } + MemberId Id() { return member_.Id(); } + std::experimental::optional Leader() { return member_.Leader(); } - using RaftMember::RaftMember; + private: + using RaftMemberLocalReactor::RaftMemberLocalReactor; + using RaftMemberLocalReactor::member_; }; +using RaftMemberDummy = RaftMemberTest; + milliseconds InitialElection(const RaftConfig &config) { System sys; - FakeNetworkInterface network(sys); + NoOpStorageInterface storage; std::chrono::system_clock::time_point start, end; LOG(INFO) << "Starting..." << std::endl; { - std::vector> members; + std::vector> members; start = std::chrono::system_clock::now(); for (const auto &member_id : config.members) { members.push_back( - std::make_unique(sys, member_id, config, network)); - network.Connect(member_id); + std::make_unique(sys, storage, member_id, config)); + members.back()->Connect(); } bool leader_elected = false; @@ -56,19 +62,19 @@ milliseconds InitialElection(const RaftConfig &config) { milliseconds Reelection(const RaftConfig &config) { System sys; - FakeNetworkInterface network(sys); + NoOpStorageInterface storage; std::chrono::system_clock::time_point start, end; LOG(INFO) << "Starting..." << std::endl; { - std::vector> members; + std::vector> members; for (const auto &member_id : config.members) { members.push_back( - std::make_unique(sys, member_id, config, network)); - network.Connect(member_id); + std::make_unique(sys, storage, member_id, config)); + members.back()->Connect(); } bool leader_elected = false; @@ -87,7 +93,12 @@ milliseconds Reelection(const RaftConfig &config) { std::this_thread::sleep_for(config.heartbeat_interval); start = std::chrono::system_clock::now(); - network.Disconnect(first_leader); + for (const auto &member : members) { + if (member->Id() == first_leader) { + member->Disconnect(); + break; + } + } leader_elected = false; do { diff --git a/tests/unit/raft.cpp b/tests/unit/raft.cpp index 4e5a0439c..c18edefa7 100644 --- a/tests/unit/raft.cpp +++ b/tests/unit/raft.cpp @@ -1,78 +1,617 @@ #include "gtest/gtest.h" #include +#include #include #include "communication/raft/raft.hpp" -#include "communication/raft/raft_network.hpp" -#include "communication/reactor/reactor_local.hpp" +#include "communication/raft/test_utils.hpp" using namespace std::chrono_literals; using namespace communication::raft; +using namespace communication::raft::test_utils; -class RaftMemberTest : RaftMember { +using testing::Values; + +const RaftConfig test_config2{{"a", "b"}, 150ms, 300ms, 70ms}; +const RaftConfig test_config3{{"a", "b", "c"}, 150ms, 300ms, 70ms}; +const RaftConfig test_config5{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms}; + +using communication::raft::impl::RaftMemberImpl; +using communication::raft::impl::RaftMode; + +class RaftMemberTest : public ::testing::Test { public: - std::string Id() { return id_; } - std::string Leader() { return *leader_; } + RaftMemberTest() + : storage_(1, "a", {}), member(network_, storage_, "a", test_config5) {} - using RaftMember::RaftMember; + void SetLog(std::vector> log) { + storage_.log_ = std::move(log); + } + + NoOpNetworkInterface network_; + InMemoryStorageInterface storage_; + RaftMemberImpl member; }; -const RaftConfig test_config{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms}; - -TEST(Raft, InitialElection) { - communication::reactor::System sys; - FakeNetworkInterface network(sys); - - { - std::vector> members; - for (const auto &member_id : test_config.members) { - members.push_back(std::make_unique(sys, member_id, - test_config, network)); - network.Connect(member_id); - } - - std::this_thread::sleep_for(500ms); - - std::string leader = members[0]->Leader(); - for (const auto &member : members) { - EXPECT_EQ(member->Leader(), leader); - } - } +TEST_F(RaftMemberTest, Constructor) { + EXPECT_EQ(member.mode_, RaftMode::FOLLOWER); + EXPECT_EQ(member.term_, 1); + EXPECT_EQ(*member.voted_for_, "a"); + EXPECT_EQ(member.commit_index_, 0); } -TEST(Raft, Reelection) { - communication::reactor::System sys; - FakeNetworkInterface network(sys); +TEST_F(RaftMemberTest, CandidateOrLeaderTransitionToFollower) { + member.mode_ = RaftMode::CANDIDATE; + member.CandidateTransitionToLeader(); - { - std::vector> members; - for (const auto &member_id : test_config.members) { - members.push_back(std::make_unique(sys, member_id, - test_config, network)); - network.Connect(member_id); + member.CandidateOrLeaderTransitionToFollower(); + EXPECT_EQ(member.mode_, RaftMode::FOLLOWER); + EXPECT_EQ(member.leader_, std::experimental::nullopt); + EXPECT_LT(member.next_election_time_, TimePoint::max()); +} + +TEST_F(RaftMemberTest, CandidateTransitionToLeader) { + member.mode_ = RaftMode::CANDIDATE; + member.CandidateTransitionToLeader(); + + EXPECT_EQ(member.mode_, RaftMode::LEADER); + EXPECT_EQ(*member.leader_, "a"); + EXPECT_EQ(member.next_election_time_, TimePoint::max()); +} + +TEST_F(RaftMemberTest, StartNewElection) { + member.StartNewElection(); + + EXPECT_EQ(member.mode_, RaftMode::CANDIDATE); + EXPECT_EQ(member.term_, 2); + EXPECT_EQ(member.voted_for_, member.id_); +} + +TEST_F(RaftMemberTest, CountVotes) { + member.StartNewElection(); + EXPECT_FALSE(member.CountVotes()); + + member.peer_states_["b"]->voted_for_me = true; + EXPECT_FALSE(member.CountVotes()); + + member.peer_states_["c"]->voted_for_me = true; + EXPECT_TRUE(member.CountVotes()); +} + +TEST_F(RaftMemberTest, AdvanceCommitIndex) { + SetLog({{1}, {1}, {1}, {1}, {2}, {2}, {2}, {2}}); + + member.mode_ = RaftMode::LEADER; + member.term_ = 2; + + member.peer_states_["b"]->match_index = 4; + member.peer_states_["c"]->match_index = 4; + + EXPECT_EQ(member.commit_index_, 0); + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 0); + + member.peer_states_["b"]->match_index = 4; + member.peer_states_["c"]->match_index = 4; + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 0); + + member.peer_states_["b"]->match_index = 5; + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 0); + + member.peer_states_["c"]->match_index = 5; + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 5); + + member.peer_states_["d"]->match_index = 6; + member.peer_states_["e"]->match_index = 7; + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 6); + + member.peer_states_["c"]->match_index = 8; + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 7); + + member.peer_states_["a"]->match_index = 8; + member.AdvanceCommitIndex(); + EXPECT_EQ(member.commit_index_, 8); +} + +TEST(RequestVote, SimpleElection) { + NextReplyNetworkInterface network; + InMemoryStorageInterface storage(1, {}, {{1}, {1}}); + RaftMemberImpl member(network, storage, "a", test_config5); + + member.StartNewElection(); + + std::unique_lock lock(member.mutex_); + + PeerRPCReply next_reply; + next_reply.type = PeerRPCReply::Type::REQUEST_VOTE; + + network.on_request_ = [](const PeerRPCRequest &request) { + ASSERT_EQ(request.type, PeerRPCRequest::Type::REQUEST_VOTE); + ASSERT_EQ(request.request_vote.candidate_term, 2); + ASSERT_EQ(request.request_vote.candidate_id, "a"); + }; + + /* member 'b' first voted for us */ + next_reply.request_vote.term = 2; + next_reply.request_vote.vote_granted = true; + network.next_reply_ = next_reply; + member.RequestVote("b", *member.peer_states_["b"], lock); + EXPECT_EQ(member.mode_, RaftMode::CANDIDATE); + EXPECT_TRUE(member.peer_states_["b"]->request_vote_done); + EXPECT_TRUE(member.peer_states_["b"]->voted_for_me); + + /* member 'c' didn't */ + next_reply.request_vote.vote_granted = false; + network.next_reply_ = next_reply; + member.RequestVote("c", *member.peer_states_["c"], lock); + EXPECT_TRUE(member.peer_states_["c"]->request_vote_done); + EXPECT_FALSE(member.peer_states_["c"]->voted_for_me); + EXPECT_EQ(member.mode_, RaftMode::CANDIDATE); + + /* but member 'd' did */ + next_reply.request_vote.vote_granted = true; + network.next_reply_ = next_reply; + member.RequestVote("d", *member.peer_states_["d"], lock); + EXPECT_TRUE(member.peer_states_["d"]->request_vote_done); + EXPECT_TRUE(member.peer_states_["d"]->voted_for_me); + EXPECT_EQ(member.mode_, RaftMode::LEADER); + + /* no-op entry should be at the end of leader's log */ + EXPECT_EQ(storage.log_.back().term, 2); + EXPECT_EQ(storage.log_.back().command, std::experimental::nullopt); +} + +TEST(AppendEntries, SimpleLogSync) { + NextReplyNetworkInterface network; + InMemoryStorageInterface storage(3, {}, {{1}, {1}, {2}, {3}}); + RaftMemberImpl member(network, storage, "a", test_config2); + + member.mode_ = RaftMode::LEADER; + + std::unique_lock lock(member.mutex_); + + PeerRPCReply reply; + reply.type = PeerRPCReply::Type::APPEND_ENTRIES; + + reply.append_entries.term = 3; + reply.append_entries.success = false; + network.next_reply_ = reply; + + LogIndex expected_prev_log_index; + TermId expected_prev_log_term; + std::vector> expected_entries; + + network.on_request_ = [&](const PeerRPCRequest &request) { + EXPECT_EQ(request.type, PeerRPCRequest::Type::APPEND_ENTRIES); + EXPECT_EQ(request.append_entries.leader_term, 3); + EXPECT_EQ(request.append_entries.leader_id, "a"); + EXPECT_EQ(request.append_entries.prev_log_index, expected_prev_log_index); + EXPECT_EQ(request.append_entries.prev_log_term, expected_prev_log_term); + EXPECT_EQ(request.append_entries.entries, expected_entries); + }; + + /* initial state after election */ + auto &peer_state = *member.peer_states_["b"]; + peer_state.match_index = 0; + peer_state.next_index = 5; + peer_state.suppress_log_entries = true; + + /* send a heartbeat and find out logs don't match */ + expected_prev_log_index = 4; + expected_prev_log_term = 3; + expected_entries = {}; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 0); + EXPECT_EQ(peer_state.next_index, 4); + EXPECT_EQ(member.commit_index_, 0); + + /* move `next_index` until we find a match, `expected_entries` will be empty + * because `suppress_log_entries` will be true */ + expected_entries = {}; + + expected_prev_log_index = 3; + expected_prev_log_term = 2; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 0); + EXPECT_EQ(peer_state.next_index, 3); + EXPECT_EQ(peer_state.suppress_log_entries, true); + EXPECT_EQ(member.commit_index_, 0); + + expected_prev_log_index = 2; + expected_prev_log_term = 1; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 0); + EXPECT_EQ(peer_state.next_index, 2); + EXPECT_EQ(peer_state.suppress_log_entries, true); + EXPECT_EQ(member.commit_index_, 0); + + /* we found a match */ + reply.append_entries.success = true; + network.next_reply_ = reply; + + expected_prev_log_index = 1; + expected_prev_log_term = 1; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 1); + EXPECT_EQ(peer_state.next_index, 2); + EXPECT_EQ(peer_state.suppress_log_entries, false); + EXPECT_EQ(member.commit_index_, 4); + + /* now sync them */ + expected_prev_log_index = 1; + expected_prev_log_term = 1; + expected_entries = {{1}, {2}, {3}}; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 4); + EXPECT_EQ(peer_state.next_index, 5); + EXPECT_EQ(peer_state.suppress_log_entries, false); + EXPECT_EQ(member.commit_index_, 4); + + /* heartbeat after successful log sync */ + expected_prev_log_index = 4; + expected_prev_log_term = 3; + expected_entries = {}; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 4); + EXPECT_EQ(peer_state.next_index, 5); + EXPECT_EQ(member.commit_index_, 4); + + /* replicate a newly appended entry */ + storage.AppendLogEntry({3}); + + expected_prev_log_index = 4; + expected_prev_log_term = 3; + expected_entries = {{3}}; + member.AppendEntries("b", peer_state, lock); + EXPECT_EQ(peer_state.match_index, 5); + EXPECT_EQ(peer_state.next_index, 6); + EXPECT_EQ(member.commit_index_, 5); +} + +template +class RaftMemberParamTest : public ::testing::TestWithParam { + public: + virtual void SetUp() { + /* Some checks to verify that test case is valid. */ + + /* Member's term should be greater than or equal to last log term. */ + ASSERT_GE(storage_.term_, storage_.GetLogTerm(storage_.GetLastLogIndex())); + + ASSERT_GE(peer_storage_.term_, + peer_storage_.GetLogTerm(peer_storage_.GetLastLogIndex())); + + /* If two logs match at some index, the entire prefix should match. */ + LogIndex pos = + std::min(storage_.GetLastLogIndex(), peer_storage_.GetLastLogIndex()); + + for (; pos > 0; --pos) { + if (storage_.GetLogEntry(pos) == peer_storage_.GetLogEntry(pos)) { + break; + } } - std::this_thread::sleep_for(500ms); - - std::string first_leader = members[0]->Leader(); - for (const auto &member : members) { - EXPECT_EQ(member->Leader(), first_leader); - } - - network.Disconnect(first_leader); - - std::this_thread::sleep_for(500ms); - - std::string second_leader = members[0]->Id() == first_leader - ? members[1]->Leader() - : members[0]->Leader(); - network.Connect(first_leader); - - std::this_thread::sleep_for(100ms); - - for (const auto &member : members) { - EXPECT_EQ(member->Leader(), second_leader); + for (; pos > 0; --pos) { + ASSERT_EQ(storage_.GetLogEntry(pos), peer_storage_.GetLogEntry(pos)); } } + + RaftMemberParamTest(InMemoryStorageInterface storage, + InMemoryStorageInterface peer_storage) + : network_(NoOpNetworkInterface()), + storage_(storage), + member_(network_, storage_, "a", test_config3), + peer_storage_(peer_storage) {} + + NoOpNetworkInterface network_; + InMemoryStorageInterface storage_; + RaftMemberImpl member_; + + InMemoryStorageInterface peer_storage_; +}; + +struct OnRequestVoteTestParam { + TermId term; + std::experimental::optional voted_for; + std::vector> log; + + TermId peer_term; + std::vector> peer_log; + + bool expected_reply; +}; + +class OnRequestVoteTest : public RaftMemberParamTest { + public: + OnRequestVoteTest() + : RaftMemberParamTest( + InMemoryStorageInterface( + GetParam().term, GetParam().voted_for, GetParam().log), + InMemoryStorageInterface(GetParam().peer_term, {}, + GetParam().peer_log)) {} + virtual ~OnRequestVoteTest() {} +}; + +TEST_P(OnRequestVoteTest, RequestVoteTest) { + auto reply = member_.OnRequestVote( + {GetParam().peer_term, "b", peer_storage_.GetLastLogIndex(), + peer_storage_.GetLogTerm(peer_storage_.GetLastLogIndex())}); + + EXPECT_EQ(reply.vote_granted, GetParam().expected_reply); + + /* If we accepted the request, our term should be equal to candidate's term + * and voted_for should be set. */ + EXPECT_EQ(reply.term, + reply.vote_granted ? GetParam().peer_term : GetParam().term); + EXPECT_EQ(storage_.term_, + reply.vote_granted ? GetParam().peer_term : GetParam().term); + EXPECT_EQ(storage_.voted_for_, + reply.vote_granted ? "b" : GetParam().voted_for); } + +/* Member 'b' is starting an election for term 5 and sending RequestVote RPC + * to 'a'. Logs are empty so log-up-to-date check will always pass. */ +INSTANTIATE_TEST_CASE_P( + TermAndVotedForCheck, OnRequestVoteTest, + Values( + /* we didn't vote for anyone in a smaller term -> accept */ + OnRequestVoteTestParam{3, {}, {}, 5, {}, true}, + /* we voted for someone in smaller term -> accept */ + OnRequestVoteTestParam{4, "c", {}, 5, {}, true}, + /* equal term but we didn't vote for anyone in it -> accept */ + OnRequestVoteTestParam{5, {}, {}, 5, {}, true}, + /* equal term but we voted for this candidate-> accept */ + OnRequestVoteTestParam{5, "b", {}, 5, {}, true}, + /* equal term but we voted for someone else -> decline */ + OnRequestVoteTestParam{5, "c", {}, 5, {}, false}, + /* larger term and haven't voted for anyone -> decline */ + OnRequestVoteTestParam{6, {}, {}, 5, {}, false}, + /* larger term and we voted for someone else -> decline */ + OnRequestVoteTestParam{6, "a", {}, 5, {}, false})); + +/* Member 'a' log: + * 1 2 3 4 5 6 7 + * | 1 | 1 | 1 | 2 | 3 | 3 | + * + * It is in term 5. + */ + +/* Member 'b' is sending RequestVote RPC to 'a' for term 8. */ +INSTANTIATE_TEST_CASE_P( + LogUpToDateCheck, OnRequestVoteTest, + Values( + /* candidate's last log term is smaller -> decline */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {1}, {1}, {2}}, + false}, + /* candidate's last log term is smaller -> decline */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {1}, {1}, {2}, {2}, {2}, {2}}, + false}, + /* candidate's term is equal, but our log is longer -> decline */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {1}, {1}, {2}, {3}}, + false}, + /* equal logs -> accept */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {1}, {1}, {2}, {3}, {3}}, + true}, + /* candidate's term is larger -> accept */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {1}, {1}, {2}, {4}}, + true}, + /* equal terms, but candidate's log is longer -> accept */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {1}, {1}, {2}, {3}, {3}, {3}}, + true}, + /* candidate's last log term is larger -> accept */ + OnRequestVoteTestParam{5, + {}, + {{1}, {1}, {1}, {2}, {3}, {3}}, + 8, + {{1}, {2}, {3}, {4}, {5}}, + true})); + +struct OnAppendEntriesTestParam { + TermId term; + std::vector> log; + + TermId peer_term; + std::vector> peer_log; + LogIndex peer_next_index; + + bool expected_reply; + TermId expected_term; + std::vector> expected_log; +}; + +class OnAppendEntriesTest + : public RaftMemberParamTest { + public: + OnAppendEntriesTest() + : RaftMemberParamTest( + InMemoryStorageInterface(GetParam().term, {}, + GetParam().log), + InMemoryStorageInterface(GetParam().peer_term, {}, + GetParam().peer_log)) {} + virtual ~OnAppendEntriesTest() {} +}; + +TEST_P(OnAppendEntriesTest, All) { + auto last_log_index = GetParam().peer_next_index - 1; + auto last_log_term = peer_storage_.GetLogTerm(last_log_index); + auto entries = peer_storage_.GetLogSuffix(GetParam().peer_next_index); + auto reply = member_.OnAppendEntries( + {GetParam().peer_term, "b", last_log_index, last_log_term, entries, 0}); + + EXPECT_EQ(reply.success, GetParam().expected_reply); + EXPECT_EQ(reply.term, GetParam().expected_term); + EXPECT_EQ(storage_.log_, GetParam().expected_log); +} +/* Member 'a' recieved AppendEntries RPC from member 'b'. The request will + * contain no log entries, representing just a heartbeat, as it is not + * important in these scenarios. */ +INSTANTIATE_TEST_CASE_P( + TermAndLogConsistencyCheck, OnAppendEntriesTest, + Values( + /* sender has stale term -> decline */ + OnAppendEntriesTestParam{/* my term*/ 8, + {{1}, {1}, {2}}, + 7, + {{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}, + 7, + false, + 8, + {{1}, {1}, {2}}}, + /* we're missing entries 4, 5 and 6 -> decline, but update term */ + OnAppendEntriesTestParam{4, + {{1}, {1}, {2}}, + 8, + {{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}, + 7, + false, + 8, + {{1}, {1}, {2}}}, + /* we're missing entry 4 -> decline, but update term */ + OnAppendEntriesTestParam{5, + {{1}, {1}, {2}}, + 8, + {{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}, + 5, + false, + 8, + {{1}, {1}, {2}}}, + /* log terms don't match at entry 4 -> decline, but update term */ + OnAppendEntriesTestParam{5, + {{1}, {1}, {2}}, + 8, + {{1}, {1}, {3}, {3}, {4}, {5}, {5}, {6}}, + 4, + false, + 8, + {{1}, {1}, {2}}}, + /* logs match -> accept and update term */ + OnAppendEntriesTestParam{5, + {{1}, {1}, {2}}, + 8, + {{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}, + 4, + true, + 8, + {{1}, {1}, {2}, {3}, {4}, {5}, {5}, {6}}}, + /* now follow some log truncation tests */ + /* no truncation, append a single entry */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 9, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}, + /* no truncation, append multiple entries */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 4, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}, + /* no truncation, leader's log is prefix of ours */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {6}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 4, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {6}}}, + /* another one, now with entries from newer term */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 4, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}, {7}, {7}}}, + /* no truncation, partial match between our log and appended entries + */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}, {4}, {5}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 4, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}, + /* truncate suffix */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}, {4}, {4}, {4}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 5, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}, + /* truncate suffix, with partial match between our log and appened + entries */ + OnAppendEntriesTestParam{ + 8, + {{1}, {1}, {1}, {4}, {4}, {4}, {4}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 4, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}, + /* delete whole log */ + OnAppendEntriesTestParam{ + 8, + {{5}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 1, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}, + /* append on empty log */ + OnAppendEntriesTestParam{ + 8, + {{}}, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}, + 1, + true, + 8, + {{1}, {1}, {1}, {4}, {4}, {5}, {5}, {6}, {6}, {6}}}));